HsxWorkFlow 自动化工作流框架
HsxWorkFlow 自动化工作流框架
HsxWorkFlow 是一个面向自动化场景的 Python 工作流编排框架,采用装饰器驱动的声明式编程范式,提供简洁的工作流定义方式和强大的调度执行能力。框架支持多种执行模式、Web 管理界面、浏览器自动化和分布式任务调度。
文档目录
快速入门
- 快速开始 - 环境准备、第一个工作流、运行方式
核心概念
核心概念 - WorkRegisterHandler、StepActionHandler、WorkflowManager 详解
装饰器 API - @register_class、@step、@step_child、@step_many 使用方法
技术架构
技术架构 - 系统整体架构设计、分层架构、模块关系
核心引擎 - Code 模块详解:StepActionHandler、WorkFlowHandler、WorkRegisterHandler
执行模式 - single、traversal、indefinitely、queue、custom 五种执行模式详解
功能特性
Web 管理界面 - REST API、WebSocket 事件、实时监控
高级特性 - 步骤重试、子步骤分支、参数传递、全局数据共享、生命周期钩子
定时调度 - 基于 APScheduler 的 Cron 表达式调度
集成扩展
最佳实践
- 最佳实践 - 开发建议、常见问题、示例代码
项目结构
HsxWorkFlow/
├── src/hsxworkflow/
│ ├── Code/ # 核心工作流引擎
│ │ ├── StepAction.py # 步骤执行处理器(基类)
│ │ ├── StepActionUtils.py # 状态枚举、结果封装、线程控制
│ │ ├── WorkFlowHandler.py # 执行模式实现
│ │ └── WorkRegister.py # 装饰器注册系统
│ │
│ ├── DP/ # 浏览器自动化模块
│ │ └── DPClient.py # DrissionPage 客户端封装
│ │
│ ├── Utils/ # 工具库
│ │ ├── database.py # 数据库连接管理
│ │ ├── handler.py # 数量检测处理器
│ │ ├── logger.py # 日志管理器
│ │ ├── RecordHandler.py # 数据记录器
│ │ └── redis_handler.py # Redis 操作封装
│ │
│ ├── Web/ # Web 管理模块
│ │ ├── api/ # REST API
│ │ │ └── workflow.py # 工作流 CRUD 接口
│ │ ├── ws/ # WebSocket
│ │ │ └── workflow_ws.py # SocketIO 事件处理
│ │ ├── app.py # FastAPI 应用入口
│ │ └── dependencies.py # 依赖注入
│ │
│ ├── persistence/ # 持久化模块
│ │ ├── models.py # 数据模型定义
│ │ └── repository.py # 数据访问层
│ │
│ ├── Config.py # 配置系统
│ ├── WorkFlowMain.py # 工作流管理器
│ ├── hooks.py # 生命周期钩子系统
│ ├── scheduler.py # 定时调度器
│ ├── main.py # CLI 入口
│ └── __init__.py # 公开 API 导出
│
├── tests/ # 测试用例
├── pyproject.toml # 项目配置
├── .python-version # Python 版本
└── README.md # 用户文档
技术栈
| 分类 | 技术 | 版本 | 用途 |
|---|---|---|---|
| Web 框架 | FastAPI | >= 0.109.0 | REST API 服务 |
| ASGI 服务器 | Uvicorn | >= 0.27.0 | 异步服务运行 |
| WebSocket | python-socketio | >= 5.11.0 | 实时通信 |
| 配置管理 | pydantic-settings | >= 2.1.0 | 类型安全配置 |
| 调度器 | APScheduler | >= 3.10.0 | 定时任务调度 |
| 浏览器自动化 | DrissionPage | >= 4.1.0 | Chromium 操作 |
| 缓存 | Redis | >= 5.0.0 | 分布式缓存 |
| 数据库 | aiomysql | >= 0.2.0 | 异步 MySQL |
系统架构图
核心特性
装饰器驱动
通过 @step、@step_child、@step_many 等装饰器声明式定义工作流,代码简洁易读。
@wf.step(sort=1)
def prepare(self):
self.socket_log("正在初始化资源...", level="i", base=True)
return self.set_success(message="资源初始化完成")
@wf.step(sort=2, retry=3, retry_interval=2)
def process(self):
return self.set_success(message="数据处理完成")
多执行模式
支持五种执行模式,满足不同场景需求:
- single - 单次执行,按顺序执行所有步骤后结束
- traversal - 遍历执行,对数据集合中的每条数据执行一轮完整步骤
- indefinitely - 无限循环,持续执行直到主动停止
- queue - 队列模式,基于队列的工作流执行
- custom - 自定义模式,完全控制执行流程
Web 管理界面
基于 FastAPI + Socket.IO 的实时监控和管理界面:
- RESTful API 接口
- WebSocket 实时通信
- OpenAPI 自动生成文档
- 工作流状态实时监控
- 步骤日志实时推送
定时调度
基于 APScheduler 的 Cron 表达式调度:
- Cron 表达式定时触发
- 间隔执行
- 一次性任务
- 任务管理(添加、移除、暂停、恢复)
浏览器自动化
集成 DrissionPage 支持 Chromium 操作:
- WebDriver 封装
- 页面操作
- 数据抓取
- 资源清理
生命周期钩子
工作流和步骤级别的生命周期事件监听:
- 工作流启动、完成、错误、停止
- 步骤开始、完成、错误、重试
- 全局钩子和实例级钩子
应用场景
RPA 流程自动化
- Web 爬虫
- 表单填写
- 数据录入
数据处理流水线
- ETL 任务
- 数据清洗
- 批量转换
定时批处理任务
- 报表生成
- 定时同步
- 监控告警
浏览器自动化
- UI 测试
- 网页操作
- 截图任务
快速开始
安装
# 使用 uv(推荐)
uv pip install hsxworkflow
# 或使用 pip
pip install hsxworkflow
第一个工作流
from hsxworkflow import WorkflowManager, StepActionHandler, WorkRegisterHandler
# 创建工作流注册器
wf = WorkRegisterHandler(
step_key="demo",
title="演示工作流",
desc="三步走的示例流程",
)
# 定义工作流类
@wf.register_class()
class DemoWorkFlow(StepActionHandler):
def __init__(self):
super().__init__()
self.title = "演示工作流"
self.desc = "展示基本用法"
@wf.step(sort=1)
def prepare(self):
"""准备阶段"""
self.socket_log("正在初始化资源...", level="i", base=True)
self.set_global_data("task_id", "TASK-001")
return self.set_success(message="资源初始化完成")
@wf.step(sort=2, retry=3, retry_interval=2)
def process(self):
"""处理阶段"""
task_id = self.get_global_data("task_id")
self.socket_log(f"处理任务 {task_id}...", level="i", base=True)
return self.set_success(message="数据处理完成", data={"count": 42})
@wf.step(sort=3)
def report(self):
"""报告阶段"""
self.socket_log("生成报告...", level="i", base=True)
return self.set_success(message="流程结束")
# 启动
if __name__ == "__main__":
manager = WorkflowManager()
manager.register_object(wf)
manager.run_app() # 访问 http://localhost:5050
运行方式
Web 管理界面(推荐):
manager.run_app(host="0.0.0.0", port=5050, debug=False)
访问 http://localhost:5050 查看 Web 界面,访问 http://localhost:5050/docs 查看 API 文档。
直接执行:
manager = WorkflowManager()
manager.register_object(wf)
manager.start_workflow()
results = manager.wait_result()
相关资源
许可证
目录
暂无目录