定时调度
2026/3/20大约 4 分钟
定时调度
HsxWorkFlow 基于 APScheduler 提供强大的定时调度功能,支持 Cron 表达式、间隔执行和一次性任务。
概览
WorkflowScheduler
WorkflowScheduler 是工作流调度器的核心类,封装了 APScheduler 的功能。
创建调度器
from hsxworkflow import WorkflowManager
from hsxworkflow.scheduler import WorkflowScheduler
manager = WorkflowManager()
manager.register_object(wf)
scheduler = WorkflowScheduler(manager)
启动调度器
scheduler.start()
关闭调度器
scheduler.shutdown(wait=True)
Cron 表达式
基于 Cron 表达式的定时触发。
基本用法
# 每天早上 8:30 执行
job_id = scheduler.schedule_cron(
workflow_id=123,
cron="30 8 * * *",
job_id="daily_task"
)
Cron 表达式格式
分钟 小时 日期 月份 星期
* * * * *
| 字段 | 取值范围 | 特殊字符 |
|---|---|---|
| 分钟 | 0-59 | * , - / |
| 小时 | 0-23 | * , - / |
| 日期 | 1-31 | * , - / ? |
| 月份 | 1-12 | * , - / |
| 星期 | 0-7 | * , - / ? |
常用示例
# 每天早上 8:00
cron="0 8 * * *"
# 每周一早上 9:00
cron="0 9 * * 1"
# 每月 1 号凌晨 0:00
cron="0 0 1 * *"
# 每小时执行一次
cron="0 * * * *"
# 每 5 分钟执行一次
cron="*/5 * * * *"
# 工作日早上 9:00(周一到周五)
cron="0 9 * * 1-5"
完整示例
from hsxworkflow import WorkflowManager, StepActionHandler, WorkRegisterHandler
from hsxworkflow.scheduler import WorkflowScheduler
# 创建工作流
wf = WorkRegisterHandler(step_key="scheduled_task")
@wf.register_class()
class ScheduledTask(StepActionHandler):
@wf.step(sort=1)
def execute(self):
self.socket_log("执行定时任务...", base=True)
# 业务逻辑
return self.set_success()
# 创建管理器和调度器
manager = WorkflowManager()
manager.register_object(wf)
scheduler = WorkflowScheduler(manager)
# 添加 Cron 定时任务(每天早上 8:30)
job_id = scheduler.schedule_cron(
workflow_id=list(manager.get_work_objects().keys())[0],
cron="30 8 * * *",
job_id="daily_task"
)
# 启动调度器
scheduler.start()
# 保持运行
if __name__ == "__main__":
print("调度器已启动,按 Ctrl+C 停止")
import time
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
scheduler.shutdown()
间隔执行
按固定时间间隔重复触发。
基本用法
# 每 10 分钟执行一次
job_id = scheduler.schedule_interval(
workflow_id=123,
minutes=10
)
参数说明
| 参数 | 类型 | 描述 |
|---|---|---|
weeks | int | 间隔周数 |
days | int | 间隔天数 |
hours | int | 间隔小时数 |
minutes | int | 间隔分钟数 |
seconds | int | 间隔秒数 |
常用示例
# 每 5 分钟执行一次
scheduler.schedule_interval(workflow_id=123, minutes=5)
# 每 1 小时执行一次
scheduler.schedule_interval(workflow_id=123, hours=1)
# 每天执行一次
scheduler.schedule_interval(workflow_id=123, days=1)
# 每 30 秒执行一次
scheduler.schedule_interval(workflow_id=123, seconds=30)
一次性任务
在指定时间点执行一次。
基本用法
from datetime import datetime
# 在指定时间执行
job_id = scheduler.schedule_once(
workflow_id=123,
run_date=datetime(2025, 12, 31, 23, 59)
)
完整示例
from datetime import datetime, timedelta
# 10 秒后执行
job_id = scheduler.schedule_once(
workflow_id=123,
run_date=datetime.now() + timedelta(seconds=10)
)
# 明天早上 9:00 执行
tomorrow = datetime.now() + timedelta(days=1)
job_id = scheduler.schedule_once(
workflow_id=123,
run_date=datetime(tomorrow.year, tomorrow.month, tomorrow.day, 9, 0)
)
任务管理
暂停任务
scheduler.pause_job("job_id")
恢复任务
scheduler.resume_job("job_id")
移除任务
scheduler.unschedule("job_id")
获取任务信息
job = scheduler.get_job("job_id")
print(job.to_dict())
列出所有任务
jobs = scheduler.list_jobs()
for job in jobs:
print(job.job_id, job.trigger_type, job.next_run_time)
按工作流筛选任务
jobs = scheduler.list_jobs(workflow_id=123)
获取统计信息
stats = scheduler.get_stats()
print(stats)
输出:
{
"running": true,
"total_jobs": 5,
"enabled_jobs": 4,
"jobs": [
{
"job_id": "daily_task",
"workflow_id": 123,
"trigger_type": "cron",
"trigger_args": {"cron": "30 8 * * *"},
"next_run_time": "2025-01-31T08:30:00",
"enabled": true,
"created_at": "2025-01-30T10:00:00",
"last_run_at": "2025-01-30T08:30:00",
"run_count": 1
}
]
}
ScheduledJob
调度任务信息类。
属性
| 属性 | 类型 | 描述 |
|---|---|---|
job_id | str | 任务 ID |
workflow_id | int | 工作流实例 ID |
trigger_type | str | 触发类型(cron/interval/date) |
trigger_args | dict | 触发参数 |
next_run_time | datetime | 下次执行时间 |
enabled | bool | 是否启用 |
created_at | datetime | 创建时间 |
last_run_at | datetime | 上次执行时间 |
run_count | int | 执行次数 |
方法
job.to_dict() # 转换为字典
最佳实践
1. 任务 ID 管理
# ✅ 使用有意义的任务 ID
scheduler.schedule_cron(
workflow_id=123,
cron="30 8 * * *",
job_id="daily_report"
)
# ❌ 使用默认生成的任务 ID
scheduler.schedule_cron(workflow_id=123, cron="30 8 * * *")
2. 错误处理
try:
job_id = scheduler.schedule_cron(
workflow_id=123,
cron="30 8 * * *"
)
except Exception as e:
print(f"创建任务失败: {e}")
3. 任务清理
# 关闭前清理所有任务
def cleanup():
jobs = scheduler.list_jobs()
for job in jobs:
scheduler.unschedule(job.job_id)
scheduler.shutdown()
# 注册退出处理
import atexit
atexit.register(cleanup)
下一步
现在你已经了解了定时调度的使用方法,可以继续学习: