技术架构
2026/3/20大约 8 分钟
技术架构
本文档详细介绍 HsxWorkFlow 的系统整体架构设计、分层架构和模块关系。
架构概览
HsxWorkFlow 采用分层架构设计,从上到下分为客户端层、Web 服务层、管理层、执行引擎层、注册层、集成层和工具层。
分层架构详解
1. 客户端层
职责: 提供用户交互界面和客户端工具。
组件:
- Web 管理界面: 基于 HTML/JavaScript 的 Web UI,提供可视化的工作流管理界面
- REST API 客户端: 支持 cURL、Postman 等工具调用 RESTful API
- WebSocket 客户端: 支持 Socket.IO 协议的客户端,用于实时通信
技术栈:
- HTML5 / CSS3 / JavaScript
- Socket.IO Client
2. Web 服务层
职责: 提供 HTTP 服务和 WebSocket 服务。
组件:
- FastAPI: 高性能的 Web 框架,提供 RESTful API
- python-socketio: WebSocket 服务器,实现实时通信
- OpenAPI: 自动生成 API 文档(/docs)
技术栈:
- FastAPI >= 0.109.0
- Uvicorn >= 0.27.0
- python-socketio >= 5.11.0
核心功能:
- RESTful API 接口
- WebSocket 实时通信
- 自动生成 API 文档
- 请求验证和错误处理
3. 管理层
职责: 管理工作流的注册、启动、停止和生命周期。
组件:
- WorkflowManager: 工作流管理器,负责工作流的生命周期管理
- ThreadPoolExecutor: 线程池,支持并发执行多个工作流
核心功能:
- 工作流注册和管理
- 工作流启停控制
- 线程池管理
- 任务调度
4. 执行引擎层
职责: 负责工作流的实际执行和步骤调度。
组件:
- WorkDataHandler: 执行模式分发器,根据不同的执行模式调度工作流
- StepActionHandler: 步骤处理器,执行具体的工作流步骤
- GlobalDataManager: 全局数据管理器,提供线程安全的数据共享
核心功能:
- 执行模式分发(single/traversal/indefinitely/queue/custom)
- 步骤执行和调度
- 重试机制
- 状态管理
- 全局数据共享
5. 调度层
职责: 提供定时调度功能,支持 Cron 表达式和间隔执行。
组件:
- WorkflowScheduler: 工作流调度器,封装 APScheduler
- APScheduler: 定时任务调度库
核心功能:
- Cron 表达式调度
- 间隔执行
- 一次性任务
- 任务管理(添加、移除、暂停、恢复)
6. 注册层
职责: 负责工作流和步骤的声明式注册。
组件:
- WorkRegisterHandler: 工作流注册器,收集工作流定义
- @step 装饰器: 步骤装饰器,注册工作流步骤
- @register_class 装饰器: 类装饰器,注册工作流类
核心功能:
- 装饰器驱动的声明式注册
- 步骤顺序管理
- 子步骤分支注册
- 批量步骤注册
7. 集成层
职责: 提供与外部系统的集成能力。
组件:
- DriverClient: DrissionPage 客户端,支持浏览器自动化
- RedisHandler: Redis 操作封装,提供缓存和队列功能
- DatabaseHandler: 数据库连接管理,支持 MySQL
核心功能:
- 浏览器自动化
- Redis 缓存和队列
- 数据库操作
8. 工具层
职责: 提供通用的工具和服务。
组件:
- Logger: 日志管理器,提供日志记录功能
- Config: 配置系统,基于 pydantic-settings
- HookManager: 钩子管理器,提供生命周期钩子
核心功能:
- 日志记录
- 配置管理
- 生命周期钩子
模块关系
核心模块依赖关系
执行流程
技术选型
Web 框架
| 技术 | 版本 | 选择理由 |
|---|---|---|
| FastAPI | >= 0.109.0 | 高性能、自动生成 API 文档、类型安全 |
| Uvicorn | >= 0.27.0 | 高性能 ASGI 服务器 |
| python-socketio | >= 5.11.0 | 成熟的 WebSocket 解决方案 |
配置管理
| 技术 | 版本 | 选择理由 |
|---|---|---|
| pydantic-settings | >= 2.1.0 | 类型安全、支持环境变量、.env 文件 |
调度器
| 技术 | 版本 | 选择理由 |
|---|---|---|
| APScheduler | >= 3.10.0 | 成熟的定时任务调度库、支持 Cron 表达式 |
浏览器自动化
| 技术 | 版本 | 选择理由 |
|---|---|---|
| DrissionPage | >= 4.1.0 | 功能强大、易于使用、支持 Chromium |
缓存和队列
| 技术 | 版本 | 选择理由 |
|---|---|---|
| Redis | >= 5.0.0 | 高性能、支持多种数据结构、分布式 |
数据库
| 技术 | 版本 | 选择理由 |
|---|---|---|
| aiomysql | >= 0.2.0 | 异步 MySQL 客户端、高性能 |
设计模式
1. 装饰器模式
用于工作流和步骤的声明式注册:
@wf.register_class()
class MyWorkFlow(StepActionHandler):
@wf.step(sort=1)
def step_one(self):
return self.set_success()
2. 生成器模式
用于步骤的顺序执行:
def _get_step_generator(self, step_action_list, *args, **kwargs):
for step_name in step_action_list:
# 执行步骤
result = self._execute_step(step_name)
yield result
3. 模板方法模式
StepActionHandler 定义了工作流执行的骨架:
def run_all_steps(self, *args, **kwargs):
# 模板方法
for step_result in self._get_step_generator(step_action_list):
# 处理每个步骤的结果
pass
4. 观察者模式
生命周期钩子系统基于观察者模式:
# HookManager 是主题
class HookManager:
_hooks: Dict[HookEvent, List[HookCallback]] = {}
@classmethod
def register(cls, event, callback):
# 注册观察者
pass
@classmethod
def emit(cls, event, **kwargs):
# 通知所有观察者
pass
5. 单例模式
GlobalDataManager 使用类变量实现单例:
class GlobalDataManager:
_instance = None
def __new__(cls):
if cls._instance is None:
cls._instance = super().__new__(cls)
return cls._instance
6. 策略模式
不同的执行模式对应不同的策略:
class WorkDataHandler:
def start_workflow(self, cls):
mode = cls.runtype.get("mode")
if mode == "single":
return self._run_single(cls)
elif mode == "traversal":
return self._run_traversal(cls)
elif mode == "indefinitely":
return self._run_indefinitely(cls)
# ...
线程安全
全局数据管理器
class GlobalDataManager:
def __init__(self):
self._data = {}
self._lock = threading.Lock()
def get(self, key, default=None):
with self._lock:
return self._data.get(key, default)
def set(self, key, value):
with self._lock:
self._data[key] = value
线程控制器
class ThreadController:
def __init__(self):
self._pause_event = threading.Event()
self._stop_event = threading.Event()
def pause(self):
self._pause_event.clear()
def resume(self):
self._pause_event.set()
def stop(self):
self._stop_event.set()
扩展性
自定义执行模式
def custom_logic(self, cls, **kwargs):
"""自定义执行逻辑"""
# 实现自定义逻辑
pass
manager.work_data_handler.set_custom_work(custom_logic)
自定义钩子
from hsxworkflow import HookManager, HookEvent
HookManager.register(HookEvent.WORKFLOW_START, my_callback)
自定义状态枚举
from hsxworkflow import StepStatus
MyStatus = StepStatus.set_new_status("CUSTOM_STATUS", 200)
性能优化
1. 线程池管理
使用 ThreadPoolExecutor 实现并发执行:
self.th_work = ThreadPoolExecutor(max_workers=max_workers)
2. 异步 I/O
使用 FastAPI 和 aiomysql 实现异步 I/O:
async def get_workflow_detail(class_id: int):
info = await database.get_workflow(class_id)
return info
3. 缓存机制
使用 Redis 缓存频繁访问的数据:
rdb = get_redis_handler()
cached_data = rdb.get("workflow_cache")
4. WebSocket 推送优化
使用房间机制实现精准推送:
# 只向特定房间推送
await sio.emit(event, data, room=str(class_id))
下一步
现在你已经了解了 HsxWorkFlow 的技术架构,可以继续学习: