核心引擎
2026/3/20大约 8 分钟
核心引擎
HsxWorkFlow 的核心引擎由 Code 模块提供,包含 StepActionHandler、WorkFlowHandler 和 WorkRegisterHandler 三个核心组件。本文档详细介绍这些组件的实现原理。
模块结构
src/hsxworkflow/Code/
├── StepAction.py # 步骤执行处理器(基类)
├── StepActionUtils.py # 状态枚举、结果封装、线程控制
├── WorkFlowHandler.py # 执行模式实现
└── WorkRegister.py # 装饰器注册系统
StepActionHandler
StepActionHandler 是所有工作流类的基类,提供步骤执行的基础设施。
类图
核心属性
| 属性 | 类型 | 描述 |
|---|---|---|
title | str | 工作流标题 |
desc | str | 工作流描述 |
WorkflowName | str | 工作流名称(step_key) |
current_step | str | 当前执行的步骤名 |
status_text | str | 状态文本(standby/running/stopped/error) |
flow_run_count | int | 工作流运行次数 |
step_action_list | list | 步骤方法名列表 |
controller | ThreadController | 线程控制器 |
execution_result | StepResult | 当前执行结果 |
步骤执行生成器
步骤执行使用生成器模式,支持暂停、恢复和停止控制。
def _create_step_generator(self, step_action_list, *args, **kwargs):
"""创建步骤执行生成器"""
next_param = {}
step_len = len(step_action_list)
for index, step_name in enumerate(step_action_list):
# 检查线程控制器(暂停/停止)
if self.controller.wait_if_paused() is False:
yield self._execution_result
return
# 设置当前步骤信息
self.last_step = index == step_len - 1
self.current_step = step_name
self._execution_result.step_name = step_name
self._execution_result.step_index = index
self._execution_result = self._execution_result.start_step_status()
# 执行步骤方法
try:
step_method = getattr(self, step_name)
# 获取步骤方法的参数
sig = inspect.signature(step_method)
parameters = sig.parameters
kwargs.update(next_param)
params_dict = {
param: kwargs.get(param, None)
for param in parameters
if param in kwargs
}
# 执行重试逻辑
qualname = step_method.__qualname__
retry_tuple = self._step_retry_dict.get(qualname, (0, 0,))
retry = retry_tuple[0] if retry_tuple[0] and retry_tuple[0] > 0 else 1
for _ in range(retry):
if _ != 0:
self.socket_log(f"Step: {step_name} 失败,开始执行重试 => 重试次数: {_}", "i", True)
# 执行步骤方法
if type(self) is StepActionHandler:
result = step_method(self, **params_dict)
else:
result = step_method(**params_dict)
# 处理返回值
next_param = self._process_step_result(result)
# 如果成功则跳出重试循环
if not self._execution_result.is_failure():
break
# 重试间隔
if retry_tuple[1] and retry_tuple[1] > 0:
sleep(retry_tuple[1])
# 停止计时
self._execution_result.stop_timer()
# 执行子步骤分支
self._execution_result = self.run_child_step(step_name)
# 检查是否需要停止
if self._execution_result.is_failure() and qualname in self.fail_stop_func:
return
# 记录执行日志
if self._execution_result.is_success():
self.socket_log(f"Step '{step_name}' 执行完成: {self._execution_result.to_dict()}", "i", True)
else:
self.socket_log(f"Step '{step_name}' 终止下一步骤: {self._execution_result.to_dict()}", "e", True)
yield self._execution_result
# 如果步骤失败则终止执行
if self._execution_result.is_failure():
return
except Exception as e:
error_msg = f"执行 '{step_name}' 步骤错误: {str(e)}"
self.socket_log(error_msg, "e", True, exc_info=True)
yield self._execution_result.record_exception(e)
return
重试机制
重试机制在步骤执行失败时自动重试,支持配置重试次数和重试间隔。
@wf.step(sort=1, retry=3, retry_interval=2)
def unreliable_step(self):
"""最多重试 3 次,每次间隔 2 秒"""
result = call_external_api()
if result.ok:
return self.set_success(data=result.data)
return self.set_failure(message="API 调用失败")
子步骤分支
子步骤分支根据步骤执行结果触发不同的后续处理。
@wf.step(sort=1)
@wf.step_child(SUCCESS="handle_success", FAILURE="handle_failure")
def check_condition(self):
if some_condition():
return self.set_success()
return self.set_failure()
def handle_success(self):
"""成功分支"""
return self.set_success()
def handle_failure(self):
"""失败分支"""
return self.set_success()
StepActionUtils
StepActionUtils 提供状态枚举、结果封装、线程控制等工具类。
StepStatus - 状态枚举
class StepStatus(int, Enum):
"""步骤状态枚举"""
SUCCESS = 1 # 成功
FAILURE = 2 # 失败
ERROR = 44 # 错误
IN_PROGRESS = 55 # 进行中
PENDING = 66 # 待处理
SKIPPED = 77 # 已跳过
TIMEOUT = 88 # 超时
STOP = 99 # 手动停止
PAUSE = 100 # 暂停
StepResult - 结果对象
StepResult 封装步骤执行结果,支持计时、异常记录、序列化。
class StepResult:
"""步骤执行结果"""
def __init__(
self,
status: StepStatus = None,
data: Any = None,
message: str = "",
metadata: StepMetadata = None,
**extra_data
):
self._status = status or StepStatus.PENDING
self._data = data
self._message = message
self._metadata = metadata or StepMetadata()
self._extra_data = extra_data
self._next_step_params = {}
self._exception = None
self._step_name = ""
self._step_index = 0
def is_success(self) -> bool:
"""是否成功"""
return self._status == StepStatus.SUCCESS
def is_failure(self) -> bool:
"""是否失败"""
return self._status == StepStatus.FAILURE
def is_completed(self) -> bool:
"""是否已完成"""
return self._status in [StepStatus.SUCCESS, StepStatus.FAILURE, StepStatus.ERROR]
def to_dict(self) -> dict:
"""转换为字典"""
return {
"status": self._status.name,
"status_value": self._status.value,
"data": self._data,
"message": self._message,
"metadata": self._metadata.to_dict(),
**self._extra_data
}
GlobalDataManager - 全局数据管理器
GlobalDataManager 提供线程安全的全局数据共享。
class GlobalDataManager:
"""全局数据管理器(线程安全)"""
def __init__(self):
self._data = {}
self._lock = threading.Lock()
def get(self, key: str, default: Any = None) -> Any:
"""获取数据"""
with self._lock:
return self._data.get(key, default)
def set(self, key: str, value: Any) -> None:
"""设置数据"""
with self._lock:
self._data[key] = value
def update(self, **kwargs) -> None:
"""批量更新数据"""
with self._lock:
self._data.update(kwargs)
def delete(self, key: str) -> None:
"""删除数据"""
with self._lock:
self._data.pop(key, None)
def clear(self) -> None:
"""清空数据"""
with self._lock:
self._data.clear()
def to_dict(self) -> dict:
"""转换为字典"""
with self._lock:
return self._data.copy()
ThreadController - 线程控制器
ThreadController 提供线程控制功能,支持暂停、恢复、停止。
class ThreadController:
"""线程控制器"""
def __init__(self, logger):
self._pause_event = threading.Event()
self._stop_event = threading.Event()
self._logger = logger
def pause(self) -> None:
"""暂停"""
self._pause_event.clear()
self._logger.info("Thread paused")
def resume(self) -> None:
"""恢复"""
self._pause_event.set()
self._logger.info("Thread resumed")
def stop(self) -> None:
"""停止"""
self._stop_event.set()
self._pause_event.set() # 确保暂停的线程也能停止
self._logger.info("Thread stopped")
def init(self) -> None:
"""初始化"""
self._pause_event.set()
self._stop_event.clear()
def wait_if_paused(self) -> bool:
"""如果暂停则等待,返回是否应该继续执行"""
if self._stop_event.is_set():
return False
self._pause_event.wait()
return True
WorkFlowHandler
WorkFlowHandler 负责不同模式下的工作流执行。
执行模式
核心方法
class WorkDataHandler:
"""工作数据处理器"""
def __init__(self, time_interval: float = 1.0):
self.time_interval = time_interval
self._custom_work_func = None
def start_workflow(self, cls: StepActionHandler) -> Optional[StepResult]:
"""启动工作流"""
mode = cls.runtype.get("mode", "single")
if mode == "single":
return self._run_single(cls)
elif mode == "traversal":
return self._run_traversal(cls)
elif mode == "indefinitely":
return self._run_indefinitely(cls)
elif mode == "queue":
return self._run_queue(cls)
elif mode == "custom":
return self._run_custom(cls)
else:
raise ValueError(f"Unknown mode: {mode}")
def run_func(self, cls: StepActionHandler) -> StepResult:
"""执行工作流的所有步骤"""
return cls.run_all_steps()
def set_custom_work(self, func: Callable) -> None:
"""设置自定义执行逻辑"""
self._custom_work_func = func
def _run_single(self, cls: StepActionHandler) -> StepResult:
"""单次执行"""
return self.run_func(cls)
def _run_traversal(self, cls: StepActionHandler) -> Optional[StepResult]:
"""遍历执行"""
traversal_data = cls.traversal_data
if not traversal_data:
return None
final_result = None
for item in traversal_data:
cls.traversal_ = item
result = self.run_func(cls)
if result:
final_result = result
return final_result
def _run_indefinitely(self, cls: StepActionHandler) -> Optional[StepResult]:
"""无限循环"""
while not cls.execution_result.is_stop():
result = self.run_func(cls)
if result and result.is_stop():
break
sleep(self.time_interval)
return result
def _run_queue(self, cls: StepActionHandler) -> Optional[StepResult]:
"""队列模式(需自行实现)"""
# TODO: 实现队列模式
return None
def _run_custom(self, cls: StepActionHandler) -> Optional[StepResult]:
"""自定义模式"""
if self._custom_work_func:
return self._custom_work_func(self, cls)
return None
WorkRegisterHandler
WorkRegisterHandler 负责收集工作流类定义和步骤声明。
核心功能
核心方法
class WorkRegisterHandler:
"""工作流注册器"""
_registration_lock = threading.Lock()
_registered_classes: Dict[str, Type[StepActionHandler]] = {}
def __init__(self, step_key: str = "default", **kwargs):
self._step_key = step_key
self._step_action_registry: Dict[str, List[Tuple]] = {}
self._step_actions: List[Union[Callable, str]] = []
self._work_objects: Dict[int, StepActionHandler] = {}
self._child_step_actions: Dict[str, Union[Callable, dict]] = {}
self._kwargs = kwargs
@classmethod
def register_class(cls, copy_run: int = 1):
"""类注册装饰器"""
def decorator(target_class):
with cls._registration_lock:
class_name = target_class.__name__
if class_name in cls._registered_classes:
raise ValueError(f"类 '{class_name}' 已经注册")
if not issubclass(target_class, StepActionHandler):
raise TypeError(f"注册的类 '{class_name}' 必须是 StepActionHandler 的子类")
cls._registered_classes[class_name] = target_class
cls._classes_copy_run[class_name] = copy_run
return target_class
return decorator
def step(self, sort: Optional[int] = None, **kwargs):
"""步骤装饰器"""
def decorator(func):
kwargs["sort"] = sort
self._step_decorator(func, **kwargs)
@wraps(func)
def wrapper(*arg, **kwarg):
return func(*arg, **kwarg)
return wrapper
return decorator
def step_child(self, **kwargs):
"""子步骤装饰器"""
def decorator(func):
self._step_decorator(func, child_func=True, **kwargs)
@wraps(func)
def wrapper(*arg, **kwarg):
return func(*arg, **kwarg)
return wrapper
return decorator
def generate_step_action_list(self):
"""生成步骤动作列表"""
if not self._start_:
return
self._set_work_flow_mode()
for composite_key, step_entries in self._step_action_registry.items():
class_parts = composite_key.split("<:>")
class_name, step_key = class_parts
if class_name not in self._registered_classes and class_name != "StepActionHandler":
raise ValueError(f"类 '{class_name}' 未注册")
work_class = self._registered_classes.get(class_name)
step_actions = [entry[0] for entry in step_entries]
self.register_work_task(
step_actions=step_actions,
work_class=work_class,
step_key=step_key
)
def register_work_task(self, step_actions, work_class, step_key):
"""注册工作任务"""
work_instance = work_class() if work_class else StepActionHandler()
# 配置工作实例
work_instance.step_action_list = step_actions
work_instance.child_step_actions = self._child_step_actions
work_instance.WorkflowName = step_key
work_instance.runtype = self._work_run_mode.get(step_key, {"mode": "single"})
work_instance.handler_init(
global_log=self._global_log,
stepStatus=self._step_status
)
self._work_objects[id(work_instance)] = work_instance
def get_work_objects(self) -> dict:
"""获取工作对象列表"""
return self._work_objects
执行流程
工作流注册流程
工作流执行流程
下一步
现在你已经了解了核心引擎的实现,可以继续学习: