核心概念
核心概念
本文档介绍 HsxWorkFlow 框架的核心概念,包括 WorkRegisterHandler、StepActionHandler 和 WorkflowManager 三个核心组件。
概览
HsxWorkFlow 的核心架构由三个主要组件构成:
- WorkRegisterHandler - 工作流注册器,负责收集工作流定义
- WorkflowManager - 工作流管理器,负责注册、启动、停止工作流
- StepActionHandler - 步骤处理器,所有工作流类的基类
WorkRegisterHandler - 工作流注册器
WorkRegisterHandler 是工作流的"蓝图",负责收集工作流类定义和步骤声明。
构造函数
WorkRegisterHandler(
step_key: str = "default",
title: str = "",
desc: str = "",
global_log: bool = True,
start_: bool = True,
StatusModel: StepStatus = StepStatus,
group_info: dict = None,
**kwargs
)
参数说明
| 参数 | 类型 | 默认值 | 描述 |
|---|---|---|---|
step_key | str | "default" | 工作流唯一标识 |
title | str | "" | 显示标题 |
desc | str | "" | 描述信息 |
global_log | bool | True | 是否使用全局日志 |
start_ | bool | True | 是否自动启动 |
StatusModel | StepStatus | StepStatus | 状态枚举类 |
group_info | dict | None | 分组信息 |
single | str/tuple/dict | - | 单次执行模式配置 |
traversal | str/tuple/dict | - | 遍历模式配置 |
traversal_data | list/dict | - | 遍历数据 |
indefinitely | str/tuple | - | 无限循环模式配置 |
custom | str | - | 自定义模式配置 |
基本用法
from hsxworkflow import WorkRegisterHandler
# 创建注册器
wf = WorkRegisterHandler(
step_key="my_flow",
title="我的工作流",
desc="工作流描述",
global_log=True,
start_=True,
)
# 定义工作流类
@wf.register_class()
class MyWorkFlow(StepActionHandler):
pass
执行模式配置
# 单次执行模式
wf = WorkRegisterHandler(
step_key="once_task",
single="once_task"
)
# 遍历模式
wf = WorkRegisterHandler(
step_key="batch",
traversal="batch",
traversal_data=[
{"url": "https://example.com/1"},
{"url": "https://example.com/2"},
]
)
# 无限循环模式
wf = WorkRegisterHandler(
step_key="monitor",
indefinitely="monitor"
)
# 自定义模式
wf = WorkRegisterHandler(
step_key="custom",
custom="custom"
)
核心方法
register_class()
注册工作流类的装饰器。
@wf.register_class(copy_run=1)
class MyWorkFlow(StepActionHandler):
pass
参数:
copy_run: 实例副本数量(默认为 1)
step()
注册步骤方法的装饰器。
@wf.step(sort=1, retry=3, retry_interval=2)
def my_step(self):
return self.set_success()
参数:
sort: 执行顺序(越小越先执行)retry: 失败重试次数retry_interval: 重试间隔(秒)step_key: 步骤标识(默认使用注册器的 step_key)
step_child()
注册子步骤分支的装饰器。
@wf.step_child(
SUCCESS="handle_success",
FAILURE="handle_failure",
child_fail_stop=False
)
def parent_step(self):
return self.set_success()
参数:
SUCCESS: 成功时执行的方法名或方法FAILURE: 失败时执行的方法名或方法child_fail_stop: 子步骤失败是否停止整个流程
step_many()
批量注册步骤到多个工作流。
@wf.step_many(step_many={
"flow_a": {"sort": 1, "retry": 2},
"flow_b": {"sort": 2, "retry": 0},
})
def shared_step(self):
return self.set_success()
参数:
step_many: 步骤配置映射(dict/list/tuple)
StepActionHandler - 步骤处理器
StepActionHandler 是所有工作流类的基类,提供步骤执行的基础设施。
核心属性
| 属性 | 类型 | 描述 |
|---|---|---|
title | str | 工作流标题 |
desc | str | 工作流描述 |
WorkflowName | str | 工作流名称(step_key) |
current_step | str | 当前执行的步骤名 |
status_text | str | 状态文本(standby/running/stopped/error) |
flow_run_count | int | 工作流运行次数 |
step_action_list | list | 步骤方法名列表 |
controller | ThreadController | 线程控制器 |
execution_result | StepResult | 当前执行结果 |
基本用法
from hsxworkflow import StepActionHandler, WorkRegisterHandler
wf = WorkRegisterHandler(step_key="demo")
@wf.register_class()
class MyWorkFlow(StepActionHandler):
def __init__(self):
super().__init__()
self.title = "我的工作流"
self.desc = "工作流描述"
@wf.step(sort=1)
def prepare(self):
# 工作流逻辑
return self.set_success(message="准备完成")
状态设置方法
set_success()
标记步骤为成功状态。
self.set_success(message="", data=None, **extra)
参数:
message: 成功消息data: 返回数据**extra: 额外数据
示例:
@wf.step(sort=1)
def fetch_data(self):
data = {"users": [1, 2, 3]}
return self.set_success(
message="数据获取成功",
data=data,
count=len(data)
)
set_failure()
标记步骤为失败状态。
self.set_failure(message="", data=None, **extra)
示例:
@wf.step(sort=1)
def validate(self):
if not self.is_valid():
return self.set_failure(message="验证失败")
return self.set_success()
set_error()
标记步骤为错误状态。
self.set_error(message="", data=None, **extra)
示例:
@wf.step(sort=1)
def process(self):
try:
# 业务逻辑
return self.set_success()
except Exception as e:
return self.set_error(message=f"处理错误: {str(e)}")
set_stop()
停止工作流执行。
self.set_stop(message="手动停止")
示例:
@wf.step(sort=1)
def check_condition(self):
if self.should_stop():
return self.set_stop("停止条件满足")
return self.set_success()
全局数据方法
get_global_data()
获取全局共享数据。
self.get_global_data(key, default=None)
参数:
key: 数据键default: 默认值
示例:
@wf.step(sort=2)
def process(self):
task_id = self.get_global_data("task_id", default="UNKNOWN")
return self.set_success(message=f"处理任务 {task_id}")
set_global_data()
设置全局共享数据。
self.set_global_data(key, value)
示例:
@wf.step(sort=1)
def init(self):
self.set_global_data("token", "abc123")
self.set_global_data("count", 0)
return self.set_success()
update_global_data()
批量更新全局共享数据。
self.update_global_data(**kwargs)
示例:
@wf.step(sort=1)
def init(self):
self.update_global_data(
token="abc123",
count=0,
status="ready"
)
return self.set_success()
日志方法
socket_log()
输出日志并推送到 WebSocket。
self.socket_log(msg, level="i", base=False, **kwargs)
参数:
msg: 日志消息level: 日志级别(i/d/w/e/c)base: 是否为顶级日志(控制缩进)
日志级别:
| 参数 | 级别 | 描述 |
|---|---|---|
"i" | INFO | 信息 |
"d" | DEBUG | 调试 |
"w" | WARNING | 警告 |
"e" | ERROR | 错误 |
"c" | CRITICAL | 严重 |
示例:
@wf.step(sort=1)
def process(self):
# 顶级日志(无缩进)
self.socket_log("开始处理...", level="i", base=True)
# 子级日志(有缩进)
self.socket_log("处理细节 1", level="i", base=False)
self.socket_log("处理细节 2", level="i")
# 调试日志
self.socket_log("调试信息", level="d")
# 错误日志
self.socket_log("发生错误", level="e")
return self.set_success()
参数传递方法
to_next_step_param()
传递参数给下一个步骤。
self.to_next_step_param(**kwargs)
示例:
@wf.step(sort=1)
def fetch_data(self):
data = {"users": [1, 2, 3]}
self.to_next_step_param(user_list=data["users"], total=3)
return self.set_success()
@wf.step(sort=2)
def process_data(self, user_list=None, total=None):
"""参数名必须匹配"""
self.socket_log(f"处理 {total} 个用户: {user_list}")
return self.set_success()
执行方法
run_all_steps()
执行所有步骤并返回最终结果。
self.run_all_steps(*args, **kwargs)
execute_next_step()
执行下一步并返回结果。
self.execute_next_step()
注意: 此方法使用缓存的生成器实例,确保连续调用时执行不同的步骤。如需重新开始,请先调用 init_result() 或 reset_step_generator()。
reset_step_generator()
重置步骤生成器,用于重新开始执行。
self.reset_step_generator()
信息获取方法
get_current_workflow_info()
获取当前工作流详细信息。
info = self.get_current_workflow_info()
返回值:
{
"WorkflowTitle": "工作流标题",
"WorkflowDesc": "工作流描述",
"WorkflowClass": "MyWorkFlow",
"WorkflowGroup": [{
"WorkflowName": "demo",
"WorkflowID": 140234567890,
"WorkflowRunType": "single",
"WorkflowStatus": "standby",
"WorkflowCurrentStep": None,
"WorkflowStartTime": "--",
"WorkflowRunCount": 0
}]
}
WorkflowManager - 工作流管理器
WorkflowManager 是顶层管理器,负责注册、启动、停止工作流以及启动 Web 服务。
构造函数
WorkflowManager(
max_workers: int = 5,
work_data_handler: Type[WorkDataHandler] = WorkDataHandler,
*args,
**kwargs
)
参数说明
| 参数 | 类型 | 默认值 | 描述 |
|---|---|---|---|
max_workers | int | 5 | 线程池最大工作线程数 |
work_data_handler | WorkDataHandler | WorkDataHandler | 自定义执行引擎 |
基本用法
from hsxworkflow import WorkflowManager, WorkRegisterHandler
# 创建工作流注册器
wf = WorkRegisterHandler(step_key="demo")
# 定义工作流类
@wf.register_class()
class DemoWorkFlow(StepActionHandler):
@wf.step(sort=1)
def hello(self):
return self.set_success()
# 创建管理器并加载注册工作流
manager = WorkflowManager(max_workers=5)
manager.register_object(wf)
# 启动
manager.run_app()
核心方法
register_object()
注册工作流对象。
manager.register_object(wrh: WorkRegisterHandler)
参数:
wrh: WorkRegisterHandler 对象
示例:
wf = WorkRegisterHandler(step_key="demo")
@wf.register_class()
class DemoWorkFlow(StepActionHandler):
pass
manager = WorkflowManager()
manager.register_object(wf)
start_workflow()
启动工作流执行。
manager.start_workflow(work_id: Optional[int] = None)
参数:
work_id: 工作流实例 ID,如果为 None 则启动所有工作流
示例:
# 启动所有工作流
manager.start_workflow()
# 启动指定工作流
manager.start_workflow(work_id=140234567890)
stop_workflow()
停止指定的工作流。
manager.stop_workflow(work_id: int)
参数:
work_id: 工作流实例 ID
示例:
manager.stop_workflow(work_id=140234567890)
wait_result()
等待并获取所有工作流任务的结果。
results = manager.wait_result()
返回值: 所有任务的结果列表,按提交顺序返回。如果某个任务失败,对应位置为 None。
示例:
manager.start_workflow()
results = manager.wait_result()
for r in results:
if r:
print(r.to_dict())
else:
print("执行失败")
run_app()
启动 FastAPI Web 应用。
manager.run_app(
host: Optional[str] = None,
port: Optional[int] = None,
debug: Optional[bool] = None,
**kwargs
)
参数:
host: 监听地址(默认从配置读取)port: 监听端口(默认从配置读取)debug: 是否启用调试模式(默认从配置读取)**kwargs: 传递给 uvicorn.run 的其他参数
示例:
# 使用默认配置
manager.run_app()
# 自定义配置
manager.run_app(
host="0.0.0.0",
port=8080,
debug=True
)
shutdown()
关闭工作流管理器。
manager.shutdown()
功能: 停止所有运行中的工作流并关闭线程池。
get_work_objects()
获取所有工作流对象。
work_objects = manager.get_work_objects()
返回值: 工作流对象字典({id: StepActionHandler})
get_work_object_id_list()
获取所有工作流 ID。
work_ids = manager.get_work_object_id_list()
返回值: 工作流 ID 列表
get_work_object_info_by_id()
获取工作流详情。
info = manager.get_work_object_info_by_id(class_id: int)
参数:
class_id: 工作流实例 ID
返回值: 工作流详细信息字典
完整示例
from hsxworkflow import WorkflowManager, StepActionHandler, WorkRegisterHandler
# 1. 创建工作流注册器
wf = WorkRegisterHandler(
step_key="demo",
title="演示工作流",
desc="展示核心概念"
)
# 2. 定义工作流类
@wf.register_class()
class DemoWorkFlow(StepActionHandler):
def __init__(self):
super().__init__()
self.title = "演示工作流"
self.desc = "展示核心概念"
@wf.step(sort=1)
def init(self):
"""初始化阶段"""
self.socket_log("初始化工作流...", level="i", base=True)
# 设置全局数据
self.set_global_data("task_id", "TASK-001")
self.update_global_data(count=0, status="ready")
return self.set_success(message="初始化完成")
@wf.step(sort=2)
def process(self):
"""处理阶段"""
self.socket_log("处理数据...", level="i", base=True)
# 获取全局数据
task_id = self.get_global_data("task_id")
count = self.get_global_data("count", default=0)
# 业务逻辑
result = {"processed": count + 1}
# 传递参数给下一步
self.to_next_step_param(result=result)
return self.set_success(
message=f"处理任务 {task_id} 完成",
data=result
)
@wf.step(sort=3)
def finish(self, result=None):
"""完成阶段"""
self.socket_log("完成工作流...", level="i", base=True)
if result:
self.socket_log(f"结果: {result}", level="i")
return self.set_success(message="工作流完成")
# 3. 创建管理器
manager = WorkflowManager(max_workers=3)
manager.register_object(wf)
# 4. 启动
if __name__ == "__main__":
# 方式一:Web 管理界面
manager.run_app()
# 方式二:直接执行
# manager.start_workflow()
# results = manager.wait_result()
# for r in results:
# print(r.to_dict() if r else "执行失败")
下一步
现在你已经了解了 HsxWorkFlow 的核心概念,可以继续学习: