执行模式
2026/3/20大约 8 分钟
执行模式
HsxWorkFlow 支持五种执行模式,满足不同场景的需求。本文档详细介绍每种执行模式的使用方法和适用场景。
模式概览
| 模式 | 描述 | 使用场景 |
|---|---|---|
single | 单次执行 | 一次性任务、简单流程 |
traversal | 遍历执行 | 批量处理数据集合 |
indefinitely | 无限循环 | 持续监控任务 |
queue | 队列模式 | 基于队列的任务执行 |
custom | 自定义模式 | 完全控制执行流程 |
single - 单次执行
单次执行是默认模式,按顺序执行所有步骤后结束。
配置方式
# 方式 1:默认模式(不指定任何模式)
wf = WorkRegisterHandler(step_key="once_task")
# 方式 2:显式指定
wf = WorkRegisterHandler(
step_key="once_task",
single="once_task"
)
# 方式 3:布尔值
wf = WorkRegisterHandler(
step_key="once_task",
single=True
)
执行流程
示例
from hsxworkflow import WorkRegisterHandler, StepActionHandler
wf = WorkRegisterHandler(
step_key="once_task",
title="单次执行任务",
desc="执行一次后结束"
)
@wf.register_class()
class OnceTask(StepActionHandler):
@wf.step(sort=1)
def prepare(self):
"""准备阶段"""
self.socket_log("准备资源...", base=True)
return self.set_success()
@wf.step(sort=2)
def execute(self):
"""执行阶段"""
self.socket_log("执行任务...", base=True)
# 业务逻辑
return self.set_success()
@wf.step(sort=3)
def cleanup(self):
"""清理阶段"""
self.socket_log("清理资源...", base=True)
return self.set_success()
适用场景
- 一次性数据处理任务
- 报表生成
- 系统初始化
- 备份任务
traversal - 遍历执行
遍历模式对数据集合中的每条数据执行一轮完整的步骤。
配置方式
# 列表遍历
wf = WorkRegisterHandler(
step_key="batch",
traversal="batch",
traversal_data=[
{"url": "https://example.com/1"},
{"url": "https://example.com/2"},
{"url": "https://example.com/3"},
]
)
# 字典遍历
wf = WorkRegisterHandler(
step_key="batch",
traversal="batch",
traversal_data={
"task1": {"url": "https://example.com/1"},
"task2": {"url": "https://example.com/2"},
}
)
执行流程
示例
from hsxworkflow import WorkRegisterHandler, StepActionHandler
wf = WorkRegisterHandler(
step_key="batch_process",
traversal="batch_process",
traversal_data=[
{"url": "https://example.com/page1", "name": "页面1"},
{"url": "https://example.com/page2", "name": "页面2"},
{"url": "https://example.com/page3", "name": "页面3"},
]
)
@wf.register_class()
class BatchProcess(StepActionHandler):
@wf.step(sort=1)
def process_item(self, traversal_=None):
"""处理单个项目"""
if traversal_ is None:
return self.set_failure(message="无数据")
url = traversal_.get("url")
name = traversal_.get("name")
self.socket_log(f"处理 {name}: {url}", base=True)
# 处理逻辑
result = self.scrape_data(url)
return self.set_success(
message=f"{name} 处理完成",
data=result
)
def scrape_data(self, url):
"""模拟数据抓取"""
# 实际业务逻辑
return {"url": url, "data": "sample data"}
动态数据
可以在工作流中动态设置遍历数据:
@wf.register_class()
class DynamicTraversal(StepActionHandler):
@wf.step(sort=1)
def load_data(self):
"""加载数据"""
# 从数据库或 API 加载数据
data = self.fetch_data_from_api()
self.traversal_data = data
return self.set_success(message=f"加载了 {len(data)} 条数据")
@wf.step(sort=2)
def process_item(self, traversal_=None):
"""处理单个项目"""
if traversal_:
self.socket_log(f"处理: {traversal_}", base=True)
return self.set_success()
def fetch_data_from_api(self):
"""模拟从 API 获取数据"""
return [
{"id": 1, "name": "Item 1"},
{"id": 2, "name": "Item 2"},
{"id": 3, "name": "Item 3"},
]
适用场景
- 批量数据处理
- 批量文件处理
- 批量 API 调用
- 批量数据抓取
indefinitely - 无限循环
无限循环模式持续执行,直到主动停止或步骤返回 set_stop()。
配置方式
wf = WorkRegisterHandler(
step_key="monitor",
indefinitely="monitor"
)
执行流程
示例
from hsxworkflow import WorkRegisterHandler, StepActionHandler
import time
wf = WorkRegisterHandler(
step_key="monitor",
title="监控任务",
desc="持续监控系统状态"
)
@wf.register_class()
class MonitorTask(StepActionHandler):
@wf.step(sort=1)
def check_status(self):
"""检查状态"""
self.socket_log("检查系统状态...", base=True)
# 检查某些条件
status = self.get_system_status()
if status == "error":
return self.set_stop("系统错误,停止监控")
self.socket_log(f"系统状态: {status}", base=True)
return self.set_success()
@wf.step(sort=2)
def process_alerts(self):
"""处理告警"""
alerts = self.get_alerts()
if alerts:
self.socket_log(f"处理 {len(alerts)} 个告警", base=True)
# 处理告警逻辑
return self.set_success()
@wf.step(sort=3)
def wait_interval(self):
"""等待间隔"""
time.sleep(5) # 等待 5 秒
return self.set_success()
def get_system_status(self):
"""获取系统状态"""
# 模拟系统状态
import random
return random.choice(["ok", "ok", "ok", "error"])
def get_alerts(self):
"""获取告警"""
# 模拟告警
return []
主动停止
可以通过多种方式停止无限循环:
@wf.step(sort=1)
def check_condition(self):
# 方式 1: 返回 set_stop()
if self.should_stop():
return self.set_stop("停止条件满足")
# 方式 2: 通过线程控制器
if self.controller._stop_event.is_set():
return self.set_stop("外部停止")
return self.set_success()
适用场景
- 系统监控
- 实时数据同步
- 消息队列消费
- 守护进程
queue - 队列模式
队列模式基于队列的工作流执行(需自行实现)。
配置方式
wf = WorkRegisterHandler(
step_key="queue_task",
queue="queue_task"
)
示例
from hsxworkflow import WorkRegisterHandler, StepActionHandler
wf = WorkRegisterHandler(
step_key="queue_task",
title="队列任务",
desc="从队列中消费任务"
)
@wf.register_class()
class QueueTask(StepActionHandler):
@wf.step(sort=1)
def dequeue(self):
"""从队列中获取任务"""
task = self.get_task_from_queue()
if not task:
# 队列为空,等待一段时间
import time
time.sleep(1)
return self.set_success()
# 设置任务数据
self.task_data = task
return self.set_success(data=task)
@wf.step(sort=2)
def process(self):
"""处理任务"""
if not hasattr(self, 'task_data'):
return self.set_failure(message="无任务数据")
task = self.task_data
self.socket_log(f"处理任务: {task}", base=True)
# 处理逻辑
result = self.process_task(task)
return self.set_success(data=result)
def get_task_from_queue(self):
"""从队列获取任务"""
# 实际业务逻辑
return {"id": 1, "data": "sample"}
def process_task(self, task):
"""处理任务"""
# 实际业务逻辑
return {"status": "completed"}
适用场景
- 任务队列消费
- 异步任务处理
- 消息队列消费
custom - 自定义模式
自定义模式完全控制执行流程。
配置方式
from hsxworkflow import WorkDataHandler, StepActionHandler
def my_custom_logic(self: WorkDataHandler, cls: StepActionHandler, **kwargs):
"""自定义执行逻辑"""
while True:
if cls.execution_result.is_stop():
break
# 获取任务数据
data = fetch_next_task()
if not data:
time.sleep(1)
continue
# 设置任务数据并执行
cls.task_data = data
result = self.run_func(cls)
if result is None or result.is_stop():
break
return result
# 注册自定义处理器
manager.work_data_handler.set_custom_work(my_custom_logic)
示例
from hsxworkflow import WorkflowManager, StepActionHandler, WorkRegisterHandler
from hsxworkflow.Code.WorkFlowHandler import WorkDataHandler
import time
# 定义自定义执行逻辑
def custom_logic(self: WorkDataHandler, cls: StepActionHandler, **kwargs):
"""自定义执行逻辑:带条件判断的循环"""
max_iterations = 10
iteration = 0
while iteration < max_iterations:
if cls.execution_result.is_stop():
break
cls.socket_log(f"执行第 {iteration + 1} 次迭代", base=True)
# 执行一轮完整步骤
result = self.run_func(cls)
if result and result.is_stop():
break
iteration += 1
time.sleep(1)
return result
# 创建工作流
wf = WorkRegisterHandler(
step_key="custom_task",
title="自定义模式任务",
desc="使用自定义执行逻辑"
)
@wf.register_class()
class CustomTask(StepActionHandler):
@wf.step(sort=1)
def process(self):
"""处理逻辑"""
self.socket_log("执行处理...", base=True)
# 业务逻辑
return self.set_success()
# 创建管理器并注册自定义逻辑
manager = WorkflowManager()
manager.work_data_handler.set_custom_work(custom_logic)
manager.register_object(wf)
# 启动
if __name__ == "__main__":
manager.run_app()
单个工作流的自定义逻辑
# 为单个工作流注册自定义逻辑
def CustomOfSingle_my_flow(self: WorkDataHandler, cls: StepActionHandler, **kwargs):
"""单个工作流的自定义逻辑"""
# 实现特定工作流的执行逻辑
pass
manager.work_data_handler.set_custom_work(CustomOfSingle_my_flow)
# 使用 CustomOfSingle_ 前缀
wf = WorkRegisterHandler(
step_key="my_flow",
CustomOfSingle_work="my_flow"
)
适用场景
- 复杂的执行逻辑
- 需要精确控制执行流程
- 特殊的业务需求
模式对比
| 特性 | single | traversal | indefinitely | queue | custom |
|---|---|---|---|---|---|
| 执行次数 | 1 次 | N 次(数据集合大小) | 无限 | 取决于队列 | 自定义 |
| 适用场景 | 一次性任务 | 批量处理 | 持续监控 | 队列消费 | 完全自定义 |
| 数据来源 | 无 | traversal_data | 无 | 队列 | 自定义 |
| 停止条件 | 执行完成 | 遍历完成 | set_stop() | 队列为空 | 自定义 |
| 配置复杂度 | 低 | 中 | 低 | 中 | 高 |
最佳实践
1. 选择合适的执行模式
# ✅ 一次性任务使用 single
wf = WorkRegisterHandler(step_key="backup", single="backup")
# ✅ 批量处理使用 traversal
wf = WorkRegisterHandler(
step_key="batch",
traversal="batch",
traversal_data=data_list
)
# ✅ 持续监控使用 indefinitely
wf = WorkRegisterHandler(step_key="monitor", indefinitely="monitor")
# ✅ 复杂逻辑使用 custom
manager.work_data_handler.set_custom_work(custom_logic)
2. 合理设置遍历数据
# ✅ 使用列表遍历
wf = WorkRegisterHandler(
step_key="batch",
traversal="batch",
traversal_data=[item1, item2, item3]
)
# ✅ 使用字典遍历(带键)
wf = WorkRegisterHandler(
step_key="batch",
traversal="batch",
traversal_data={
"task1": item1,
"task2": item2
}
)
# ✅ 动态加载数据
@wf.step(sort=1)
def load_data(self):
self.traversal_data = self.fetch_data()
return self.set_success()
3. 无限循环的停止条件
@wf.step(sort=1)
def check_stop(self):
# ✅ 设置明确的停止条件
if self.should_stop():
return self.set_stop("停止条件满足")
# ✅ 检查外部停止信号
if self.controller._stop_event.is_set():
return self.set_stop("外部停止")
# ✅ 设置最大迭代次数
self.iteration_count += 1
if self.iteration_count > self.max_iterations:
return self.set_stop("达到最大迭代次数")
return self.set_success()
4. 自定义模式的错误处理
def custom_logic(self, cls, **kwargs):
try:
while True:
if cls.execution_result.is_stop():
break
result = self.run_func(cls)
if result is None or result.is_stop():
break
# 处理错误
if result.is_error():
cls.socket_log(f"执行错误: {result.message}", level="e")
# 决定是否继续
except Exception as e:
cls.socket_log(f"自定义逻辑错误: {str(e)}", level="e")
return cls.set_error(message=f"执行错误: {str(e)}")
return result
下一步
现在你已经了解了不同的执行模式,可以继续学习: