装饰器 API
2026/3/20大约 7 分钟
装饰器 API
HsxWorkFlow 采用装饰器驱动的声明式编程范式,通过装饰器简洁地定义工作流和步骤。本文档详细介绍所有装饰器的使用方法。
概览
HsxWorkFlow 提供了以下装饰器:
- @register_class() - 注册工作流类
- @step() - 注册步骤方法
- @step_child() - 注册子步骤分支
- @step_many() - 批量注册步骤
@register_class()
注册工作流类的装饰器,所有工作流类都必须使用此装饰器注册。
语法
@wf.register_class(copy_run: int = 1)
class WorkFlowClass(StepActionHandler):
pass
参数
| 参数 | 类型 | 默认值 | 描述 |
|---|---|---|---|
copy_run | int | 1 | 实例副本数量 |
基本用法
from hsxworkflow import WorkRegisterHandler, StepActionHandler
wf = WorkRegisterHandler(step_key="demo")
@wf.register_class()
class DemoWorkFlow(StepActionHandler):
@wf.step(sort=1)
def step_one(self):
return self.set_success()
多实例配置
# 创建 3 个工作流实例
@wf.register_class(copy_run=3)
class DemoWorkFlow(StepActionHandler):
@wf.step(sort=1)
def step_one(self):
return self.set_success()
注意事项
- 工作流类必须继承
StepActionHandler - 每个工作流类只能注册一次
- 同一个类名不能重复注册
@step()
注册步骤方法的装饰器,定义工作流的执行步骤。
语法
@wf.step(
sort: Optional[int] = None,
retry: int = 0,
retry_interval: int = 0,
step_key: str = "default"
)
def step_method(self):
return self.set_success()
参数
| 参数 | 类型 | 默认值 | 描述 |
|---|---|---|---|
sort | int | None | 执行顺序(从小到大) |
retry | int | 0 | 失败重试次数 |
retry_interval | int | 0 | 重试间隔(秒) |
step_key | str | "default" | 步骤标识(通常使用注册器的 step_key) |
基本用法
wf = WorkRegisterHandler(step_key="demo")
@wf.register_class()
class DemoWorkFlow(StepActionHandler):
@wf.step(sort=1)
def step_one(self):
"""第一个步骤"""
return self.set_success(message="步骤一完成")
@wf.step(sort=2)
def step_two(self):
"""第二个步骤"""
return self.set_success(message="步骤二完成")
@wf.step(sort=3)
def step_three(self):
"""第三个步骤"""
return self.set_success(message="步骤三完成")
执行顺序
步骤按照 sort 参数从小到大执行:
@wf.register_class()
class OrderedWorkFlow(StepActionHandler):
@wf.step(sort=2)
def step_b(self):
# 第二个执行
return self.set_success()
@wf.step(sort=1)
def step_a(self):
# 第一个执行
return self.set_success()
@wf.step(sort=3)
def step_c(self):
# 第三个执行
return self.set_success()
执行顺序: step_a → step_b → step_c
重试机制
配置重试次数和重试间隔:
@wf.register_class()
class RetryWorkFlow(StepActionHandler):
@wf.step(sort=1, retry=3, retry_interval=2)
def unreliable_step(self):
"""最多重试 3 次,每次间隔 2 秒"""
import random
if random.random() < 0.5:
return self.set_failure(message="随机失败")
return self.set_success(message="成功")
步骤标识
使用不同的 step_key 区分步骤组:
wf = WorkRegisterHandler(step_key="main")
@wf.register_class()
class MultiKeyWorkFlow(StepActionHandler):
# 使用默认 step_key(main)
@wf.step(sort=1)
def main_step(self):
return self.set_success()
# 使用其他 step_key
@wf.step(sort=1, step_key="secondary")
def secondary_step(self):
return self.set_success()
返回值处理
步骤方法可以返回多种类型的值:
@wf.register_class()
class ReturnValueWorkFlow(StepActionHandler):
@wf.step(sort=1)
def return_step_result(self):
"""返回 StepResult 对象"""
return self.set_success(
message="成功",
data={"count": 42}
)
@wf.step(sort=2)
def return_status(self):
"""返回 StepStatus 枚举"""
return self.stepStatus.SUCCESS
@wf.step(sort=3)
def return_dict(self):
"""返回字典"""
return {
"status": "success",
"message": "成功",
"data": {"key": "value"}
}
@wf.step(sort=4)
def return_any(self):
"""返回任意值(自动标记为成功)"""
return {"result": "data"}
@wf.step(sort=5)
def return_none(self):
"""返回 None(自动标记为成功)"""
pass
@step_child()
注册子步骤分支的装饰器,根据步骤执行结果触发不同的后续处理。
语法
@wf.step_child(
SUCCESS: Union[str, Callable] = None,
FAILURE: Union[str, Callable] = None,
child_fail_stop: bool = False
)
def parent_step(self):
return self.set_success()
参数
| 参数 | 类型 | 默认值 | 描述 |
|---|---|---|---|
SUCCESS | str/Callable | None | 成功时执行的方法名或方法 |
FAILURE | str/Callable | None | 失败时执行的方法名或方法 |
child_fail_stop | bool | False | 子步骤失败是否停止整个流程 |
基本用法
wf = WorkRegisterHandler(step_key="branch")
@wf.register_class()
class BranchWorkFlow(StepActionHandler):
@wf.step(sort=1)
@wf.step_child(SUCCESS="on_success", FAILURE="on_failure")
def check_condition(self):
"""检查条件"""
import random
if random.choice([True, False]):
return self.set_success()
return self.set_failure(message="条件不满足")
def on_success(self):
"""成功分支"""
self.socket_log("执行成功分支逻辑", base=True)
return self.set_success()
def on_failure(self):
"""失败分支"""
self.socket_log("执行失败补偿逻辑", base=True)
return self.set_success()
@wf.step(sort=2)
def finalize(self):
"""最终步骤"""
self.socket_log("工作流结束", base=True)
return self.set_success()
执行流程
使用方法引用
可以直接传递方法引用而非方法名:
@wf.register_class()
class MethodRefWorkFlow(StepActionHandler):
@wf.step(sort=1)
@wf.step_child(SUCCESS=self.handle_success, FAILURE=self.handle_failure)
def check_condition(self):
return self.set_success()
def handle_success(self):
return self.set_success()
def handle_failure(self):
return self.set_success()
子步骤失败停止
设置 child_fail_stop=True,子步骤失败时停止整个工作流:
@wf.register_class()
class FailStopWorkFlow(StepActionHandler):
@wf.step(sort=1)
@wf.step_child(
SUCCESS="on_success",
FAILURE="on_failure",
child_fail_stop=True
)
def critical_step(self):
"""关键步骤,子步骤失败时停止整个工作流"""
return self.set_failure(message="关键步骤失败")
def on_success(self):
return self.set_success()
def on_failure(self):
return self.set_failure(message="补偿逻辑也失败")
多状态分支
可以配置多个状态的分支:
@wf.register_class()
class MultiStatusWorkFlow(StepActionHandler):
@wf.step(sort=1)
@wf.step_child(
SUCCESS="on_success",
FAILURE="on_failure",
ERROR="on_error",
child_fail_stop=False
)
def process_step(self):
# 根据不同情况返回不同状态
if some_condition():
return self.set_success()
elif error_condition():
return self.set_error()
else:
return self.set_failure()
def on_success(self):
return self.set_success()
def on_failure(self):
return self.set_success()
def on_error(self):
return self.set_success()
@step_many()
批量注册步骤到多个工作流或步骤组的装饰器。
语法
@wf.step_many(step_many: Union[dict, list, tuple])
def shared_step(self):
return self.set_success()
参数
| 参数 | 类型 | 描述 |
|---|---|---|
step_many | dict/list/tuple | 步骤配置映射 |
基本用法
wf = WorkRegisterHandler(step_key="main")
@wf.register_class()
class SharedStepWorkFlow(StepActionHandler):
# 批量注册到多个工作流
@wf.step_many(step_many={
"flow_a": {"sort": 1, "retry": 2},
"flow_b": {"sort": 2, "retry": 0},
"flow_c": {"sort": 1, "retry": 1},
})
def shared_step(self):
"""这个步骤会被注册到 flow_a、flow_b、flow_c 三个工作流"""
return self.set_success()
列表形式
使用列表简化配置:
@wf.step_many(step_many=["flow_a", "flow_b", "flow_c"])
def shared_step(self):
"""使用默认配置注册到多个工作流"""
return self.set_success()
元组形式
@wf.step_many(step_many=("flow_a", "flow_b", "flow_c"))
def shared_step(self):
return self.set_success()
默认配置
不传参数时使用默认配置:
@wf.step_many()
def default_step(self):
"""使用默认配置"""
return self.set_success()
完整示例
from hsxworkflow import WorkRegisterHandler, StepActionHandler
# 创建多个工作流注册器
wf_a = WorkRegisterHandler(step_key="flow_a")
wf_b = WorkRegisterHandler(step_key="flow_b")
wf_c = WorkRegisterHandler(step_key="flow_c")
# 定义共享步骤
@wf_a.step_many(step_many={
"flow_a": {"sort": 1, "retry": 2},
"flow_b": {"sort": 1, "retry": 0},
"flow_c": {"sort": 1, "retry": 1},
})
def common_step(self):
"""共享步骤"""
self.socket_log("执行共享步骤", base=True)
return self.set_success(message="共享步骤完成")
# 定义工作流类
@wf_a.register_class()
class FlowA(StepActionHandler):
@wf_a.step(sort=2)
def specific_step_a(self):
"""flow_a 特有步骤"""
return self.set_success()
@wf_b.register_class()
class FlowB(StepActionHandler):
@wf_b.step(sort=2)
def specific_step_b(self):
"""flow_b 特有步骤"""
return self.set_success()
@wf_c.register_class()
class FlowC(StepActionHandler):
@wf_c.step(sort=2)
def specific_step_c(self):
"""flow_c 特有步骤"""
return self.set_success()
装饰器组合
多个装饰器叠加
@wf.register_class()
class DecoratorComboWorkFlow(StepActionHandler):
@wf.step(sort=1, retry=3, retry_interval=2)
@wf.step_child(SUCCESS="on_success", FAILURE="on_failure")
def process_with_retry_and_branch(self):
"""同时使用重试和分支"""
return self.set_success()
def on_success(self):
return self.set_success()
def on_failure(self):
return self.set_success()
装饰器顺序
装饰器的顺序很重要,@step_child 应该放在 @step 之后:
# ✅ 正确的顺序
@wf.step(sort=1)
@wf.step_child(SUCCESS="on_success")
def step_method(self):
return self.set_success()
# ❌ 错误的顺序
@wf.step_child(SUCCESS="on_success")
@wf.step(sort=1)
def step_method(self):
return self.set_success()
最佳实践
1. 合理命名步骤
@wf.register_class()
class NamingWorkFlow(StepActionHandler):
# ✅ 好的命名
@wf.step(sort=1)
def fetch_data(self):
return self.set_success()
@wf.step(sort=2)
def validate_data(self):
return self.set_success()
@wf.step(sort=3)
def save_data(self):
return self.set_success()
# ❌ 不好的命名
@wf.step(sort=1)
def step1(self):
return self.set_success()
@wf.step(sort=2)
def do_something(self):
return self.set_success()
2. 使用文档字符串
@wf.step(sort=1)
def fetch_data(self):
"""
从 API 获取数据
Returns:
StepResult: 包含获取的数据
"""
data = self.api_call()
return self.set_success(data=data)
3. 合理使用重试
# ✅ 合理使用重试(外部 API 调用)
@wf.step(sort=1, retry=3, retry_interval=2)
def call_external_api(self):
result = requests.get("https://api.example.com")
if result.ok:
return self.set_success(data=result.json())
return self.set_failure(message="API 调用失败")
# ❌ 不合理使用重试(逻辑错误)
@wf.step(sort=1, retry=3, retry_interval=2)
def validate_input(self):
if not self.is_valid():
return self.set_failure(message="输入无效")
return self.set_success()
4. 子步骤分支处理
@wf.register_class()
class BestPracticeWorkFlow(StepActionHandler):
@wf.step(sort=1)
@wf.step_child(SUCCESS="handle_success", FAILURE="handle_failure")
def process(self):
if self.is_success():
return self.set_success()
return self.set_failure()
def handle_success(self):
"""成功分支:记录日志和更新状态"""
self.socket_log("处理成功", base=True)
self.set_global_data("status", "completed")
return self.set_success()
def handle_failure(self):
"""失败分支:记录错误和清理资源"""
self.socket_log("处理失败", base=True)
self.cleanup_resources()
return self.set_success()
下一步
现在你已经掌握了装饰器的使用方法,可以继续学习: