高级特性
2026/3/20大约 9 分钟
高级特性
HsxWorkFlow 提供了许多高级特性,包括步骤重试、子步骤分支、参数传递、全局数据共享、生命周期钩子等。本文档详细介绍这些高级特性的使用方法。
特性概览
步骤重试机制
步骤执行失败时自动重试,可配置重试次数和重试间隔。
基本用法
@wf.step(sort=1, retry=3, retry_interval=2)
def unreliable_step(self):
"""最多重试 3 次,每次间隔 2 秒"""
result = call_external_api()
if result.ok:
return self.set_success(data=result.data)
return self.set_failure(message="API 调用失败")
重试流程
完整示例
from hsxworkflow import WorkRegisterHandler, StepActionHandler
wf = WorkRegisterHandler(step_key="retry_demo")
@wf.register_class()
class RetryDemo(StepActionHandler):
@wf.step(sort=1, retry=5, retry_interval=3)
def call_external_api(self):
"""调用外部 API,最多重试 5 次,每次间隔 3 秒"""
import random
# 模拟 API 调用
success = random.random() < 0.3 # 30% 成功率
if success:
return self.set_success(
message="API 调用成功",
data={"result": "data"}
)
else:
return self.set_failure(message="API 调用失败")
重试日志
重试时会自动输出日志:
[INFO] Step: call_external_api 失败,开始执行重试 => 重试次数: 1
[INFO] Step: call_external_api 失败,开始执行重试 => 重试次数: 2
[INFO] Step: call_external_api 失败,开始执行重试 => 重试次数: 3
子步骤分支
根据步骤执行结果触发不同的后续处理。
基本用法
@wf.step(sort=1)
@wf.step_child(SUCCESS="handle_success", FAILURE="handle_failure")
def check_condition(self):
"""检查条件"""
if some_condition():
return self.set_success()
return self.set_failure()
def handle_success(self):
"""成功分支"""
self.socket_log("执行成功分支逻辑", base=True)
return self.set_success()
def handle_failure(self):
"""失败分支"""
self.socket_log("执行失败补偿逻辑", base=True)
return self.set_success()
执行流程
完整示例
from hsxworkflow import WorkRegisterHandler, StepActionHandler
wf = WorkRegisterHandler(step_key="branch_demo")
@wf.register_class()
class BranchDemo(StepActionHandler):
@wf.step(sort=1)
@wf.step_child(SUCCESS="on_success", FAILURE="on_failure")
def validate_data(self):
"""验证数据"""
data = self.get_input_data()
if self.is_valid(data):
return self.set_success(message="数据验证通过")
else:
return self.set_failure(message="数据验证失败")
def on_success(self):
"""成功分支:处理数据"""
self.socket_log("数据处理...", base=True)
data = self.get_input_data()
result = self.process_data(data)
return self.set_success(data=result)
def on_failure(self):
"""失败分支:记录错误"""
self.socket_log("记录错误日志...", base=True)
self.log_error("数据验证失败")
return self.set_success(message="错误已记录")
@wf.step(sort=2)
def finalize(self):
"""最终步骤"""
self.socket_log("工作流结束", base=True)
return self.set_success()
def get_input_data(self):
return {"key": "value"}
def is_valid(self, data):
return True
def process_data(self, data):
return {"processed": True}
def log_error(self, message):
pass
失败停止整个流程
设置 child_fail_stop=True,子步骤失败时停止整个工作流:
@wf.step(sort=1)
@wf.step_child(
SUCCESS="on_success",
FAILURE="on_failure",
child_fail_stop=True
)
def critical_step(self):
"""关键步骤,子步骤失败时停止整个工作流"""
return self.set_failure(message="关键步骤失败")
def on_success(self):
return self.set_success()
def on_failure(self):
return self.set_failure(message="补偿逻辑也失败")
步骤间参数传递
通过 to_next_step_param() 方法将参数传递给下一个步骤。
基本用法
@wf.step(sort=1)
def fetch_data(self):
data = {"users": [1, 2, 3]}
self.to_next_step_param(user_list=data["users"], total=3)
return self.set_success()
@wf.step(sort=2)
def process_data(self, user_list=None, total=None):
"""参数名必须匹配"""
self.socket_log(f"处理 {total} 个用户: {user_list}", base=True)
return self.set_success()
参数传递流程
完整示例
from hsxworkflow import WorkRegisterHandler, StepActionHandler
wf = WorkRegisterHandler(step_key="param_demo")
@wf.register_class()
class ParamDemo(StepActionHandler):
@wf.step(sort=1)
def fetch_data(self):
"""获取数据"""
self.socket_log("获取数据...", base=True)
data = {
"users": [
{"id": 1, "name": "Alice"},
{"id": 2, "name": "Bob"},
{"id": 3, "name": "Charlie"}
]
}
# 传递参数给下一步
self.to_next_step_param(
user_list=data["users"],
total=len(data["users"])
)
return self.set_success(message=f"获取了 {len(data['users'])} 个用户")
@wf.step(sort=2)
def process_users(self, user_list=None, total=None):
"""处理用户数据"""
if user_list is None:
return self.set_failure(message="无用户数据")
self.socket_log(f"处理 {total} 个用户...", base=True)
processed_users = []
for user in user_list:
processed_user = self.process_user(user)
processed_users.append(processed_user)
# 传递处理后的数据
self.to_next_step_param(processed_users=processed_users)
return self.set_success(
message=f"处理了 {len(processed_users)} 个用户",
data={"count": len(processed_users)}
)
@wf.step(sort=3)
def save_results(self, processed_users=None):
"""保存结果"""
if processed_users is None:
return self.set_failure(message="无处理结果")
self.socket_log(f"保存 {len(processed_users)} 个结果...", base=True)
# 保存到数据库
self.save_to_database(processed_users)
return self.set_success(message="结果已保存")
def process_user(self, user):
"""处理单个用户"""
return {
"id": user["id"],
"name": user["name"],
"processed": True
}
def save_to_database(self, data):
"""保存到数据库"""
pass
注意事项
- 参数名必须匹配: 下一步骤的方法参数名必须与传递的参数名一致
- 类型安全: 确保传递的参数类型与接收参数的类型匹配
- 可选参数: 使用默认值处理参数缺失的情况
全局数据共享
通过 GlobalDataManager 实现线程安全的跨步骤数据共享。
基本用法
@wf.step(sort=1)
def step_one(self):
# 设置全局数据
self.set_global_data("token", "abc123")
self.update_global_data(count=0, status="ready")
return self.set_success()
@wf.step(sort=2)
def step_two(self):
# 读取全局数据
token = self.get_global_data("token")
count = self.get_global_data("count", default=0)
return self.set_success()
全局数据管理器
class GlobalDataManager:
"""全局数据管理器(线程安全)"""
def get(self, key: str, default: Any = None) -> Any:
"""获取数据"""
def set(self, key: str, value: Any) -> None:
"""设置数据"""
def update(self, **kwargs) -> None:
"""批量更新数据"""
def delete(self, key: str) -> None:
"""删除数据"""
def clear(self) -> None:
"""清空数据"""
完整示例
from hsxworkflow import WorkRegisterHandler, StepActionHandler
wf = WorkRegisterHandler(step_key="global_data_demo")
@wf.register_class()
class GlobalDataDemo(StepActionHandler):
@wf.step(sort=1)
def initialize(self):
"""初始化全局数据"""
self.socket_log("初始化全局数据...", base=True)
# 设置单个数据
self.set_global_data("task_id", "TASK-001")
# 批量更新数据
self.update_global_data(
token="abc123",
count=0,
status="ready",
config={"timeout": 30}
)
return self.set_success(message="全局数据已初始化")
@wf.step(sort=2)
def process(self):
"""处理数据"""
self.socket_log("处理数据...", base=True)
# 读取全局数据
task_id = self.get_global_data("task_id")
token = self.get_global_data("token")
count = self.get_global_data("count", default=0)
config = self.get_global_data("config", default={})
self.socket_log(f"任务 ID: {task_id}", base=True)
self.socket_log(f"Token: {token}", base=True)
self.socket_log(f"计数: {count}", base=True)
self.socket_log(f"配置: {config}", base=True)
# 更新计数
new_count = count + 1
self.set_global_data("count", new_count)
return self.set_success(
message=f"处理完成,计数: {new_count}"
)
@wf.step(sort=3)
def cleanup(self):
"""清理全局数据"""
self.socket_log("清理全局数据...", base=True)
# 删除特定数据
self.delete_global_data("token")
# 或者清空所有数据
# self.clear_global_data()
return self.set_success(message="全局数据已清理")
线程安全
GlobalDataManager 使用 threading.Lock 确保线程安全:
class GlobalDataManager:
def __init__(self):
self._data = {}
self._lock = threading.Lock()
def get(self, key, default=None):
with self._lock:
return self._data.get(key, default)
def set(self, key, value):
with self._lock:
self._data[key] = value
生命周期钩子
生命周期钩子提供工作流和步骤级别的事件监听。
全局钩子(装饰器方式)
from hsxworkflow import (
on_workflow_start,
on_workflow_complete,
on_workflow_error,
on_step_start,
on_step_complete,
on_step_error,
on_step_retry,
)
@on_workflow_start
def notify_start(workflow_id, workflow_name):
print(f"工作流 {workflow_name} 开始执行")
@on_workflow_complete
def notify_complete(workflow_id, workflow_name, **kwargs):
print(f"工作流 {workflow_name} 执行完成")
@on_step_error
def log_step_error(workflow_id, step_name, error, **kwargs):
print(f"步骤 {step_name} 失败: {error}")
实例级钩子(方法重写)
from hsxworkflow import StepActionHandler
from hsxworkflow.hooks import HookMixin
class MyFlow(StepActionHandler, HookMixin):
def on_workflow_start(self):
self.socket_log("流程启动", base=True)
def on_workflow_complete(self, result):
self.socket_log(f"流程完成: {result.message}", base=True)
def on_step_start(self, step_name):
self.socket_log(f"步骤 {step_name} 开始", base=True)
def on_step_complete(self, step_name, result):
self.socket_log(f"步骤 {step_name} 完成", base=True)
支持的事件
| 事件 | 触发时机 | 回调参数 |
|---|---|---|
workflow.start | 工作流启动 | workflow_id, workflow_name |
workflow.complete | 工作流完成 | workflow_id, workflow_name, result |
workflow.error | 工作流出错 | workflow_id, workflow_name, error |
workflow.stop | 工作流被停止 | workflow_id, workflow_name |
step.start | 步骤开始 | workflow_id, step_name |
step.complete | 步骤完成 | workflow_id, step_name, result |
step.error | 步骤出错 | workflow_id, step_name, error |
step.retry | 步骤重试 | workflow_id, step_name, attempt, error |
完整示例
from hsxworkflow import WorkRegisterHandler, StepActionHandler
from hsxworkflow import on_workflow_start, on_step_complete
# 全局钩子
@on_workflow_start
def log_workflow_start(workflow_id, workflow_name):
print(f"[GLOBAL] 工作流 {workflow_name} (ID: {workflow_id}) 开始执行")
@on_step_complete
def log_step_complete(workflow_id, step_name, result):
print(f"[GLOBAL] 步骤 {step_name} 完成: {result.status.name}")
# 实例级钩子
wf = WorkRegisterHandler(step_key="hooks_demo")
@wf.register_class()
class HooksDemo(StepActionHandler):
def on_workflow_start(self):
self.socket_log("=== 工作流启动 ===", base=True)
def on_workflow_complete(self, result):
self.socket_log("=== 工作流完成 ===", base=True)
self.socket_log(f"最终状态: {result.status.name}", base=True)
def on_step_start(self, step_name):
self.socket_log(f"→ {step_name}", base=True)
def on_step_complete(self, step_name, result):
status_icon = "✓" if result.is_success() else "✗"
self.socket_log(f" {status_icon} {step_name}: {result.status.name}", base=True)
@wf.step(sort=1)
def step_one(self):
import time
time.sleep(0.5)
return self.set_success()
@wf.step(sort=2)
def step_two(self):
import time
time.sleep(0.5)
return self.set_success()
@wf.step(sort=3)
def step_three(self):
import time
time.sleep(0.5)
return self.set_success()
HookManager 类方法
from hsxworkflow import HookManager, HookEvent
# 注册钩子
HookManager.register(HookEvent.WORKFLOW_START, callback_func)
# 注销钩子
HookManager.unregister(HookEvent.WORKFLOW_START, callback_func)
# 手动触发事件
HookManager.emit(HookEvent.WORKFLOW_START, workflow_id=123, workflow_name="test")
# 清除钩子
HookManager.clear(HookEvent.WORKFLOW_START) # 清除特定事件
HookManager.clear() # 清除所有
# 获取钩子列表
hooks = HookManager.get_hooks(HookEvent.WORKFLOW_START)
最佳实践
1. 合理使用重试机制
# ✅ 合理使用重试(外部 API 调用)
@wf.step(sort=1, retry=3, retry_interval=2)
def call_external_api(self):
result = requests.get("https://api.example.com")
if result.ok:
return self.set_success(data=result.json())
return self.set_failure(message="API 调用失败")
# ❌ 不合理使用重试(逻辑错误)
@wf.step(sort=1, retry=3, retry_interval=2)
def validate_input(self):
if not self.is_valid():
return self.set_failure(message="输入无效")
return self.set_success()
2. 子步骤分支处理
@wf.step(sort=1)
@wf.step_child(SUCCESS="handle_success", FAILURE="handle_failure")
def process(self):
if self.is_success():
return self.set_success()
return self.set_failure()
def handle_success(self):
"""成功分支:记录日志和更新状态"""
self.socket_log("处理成功", base=True)
self.set_global_data("status", "completed")
return self.set_success()
def handle_failure(self):
"""失败分支:记录错误和清理资源"""
self.socket_log("处理失败", base=True)
self.cleanup_resources()
return self.set_success()
3. 参数传递和全局数据选择
# ✅ 使用参数传递(相邻步骤间)
@wf.step(sort=1)
def fetch_data(self):
data = self.fetch()
self.to_next_step_param(data=data)
return self.set_success()
@wf.step(sort=2)
def process_data(self, data=None):
# 处理数据
return self.set_success()
# ✅ 使用全局数据(跨步骤共享)
@wf.step(sort=1)
def init(self):
self.set_global_data("config", self.load_config())
return self.set_success()
@wf.step(sort=2)
def process(self):
config = self.get_global_data("config")
return self.set_success()
@wf.step(sort=3)
def cleanup(self):
config = self.get_global_data("config")
return self.set_success()
4. 生命周期钩子使用
# ✅ 全局钩子(跨工作流的通用逻辑)
@on_workflow_start
def notify_admin(workflow_id, workflow_name):
send_notification(f"工作流 {workflow_name} 启动")
# ✅ 实例级钩子(工作流特定的逻辑)
class MyWorkFlow(StepActionHandler, HookMixin):
def on_workflow_complete(self, result):
if result.is_success():
self.send_report()
else:
self.alert_admin()
下一步
现在你已经了解了高级特性的使用方法,可以继续学习: