最佳实践
2026/3/20大约 5 分钟
最佳实践
本文档总结了 HsxWorkFlow 的开发建议和常见问题的解决方案。
开发建议
1. 合理使用重试机制
# ✅ 合理使用重试(外部 API 调用)
@wf.step(sort=1, retry=3, retry_interval=2)
def call_external_api(self):
result = requests.get("https://api.example.com")
if result.ok:
return self.set_success(data=result.json())
return self.set_failure(message="API 调用失败")
# ❌ 不合理使用重试(逻辑错误)
@wf.step(sort=1, retry=3, retry_interval=2)
def validate_input(self):
if not self.is_valid():
return self.set_failure(message="输入无效")
return self.set_success()
2. 善用全局数据
# ✅ 使用全局数据(跨步骤共享)
@wf.step(sort=1)
def init(self):
self.set_global_data("config", self.load_config())
return self.set_success()
@wf.step(sort=2)
def process(self):
config = self.get_global_data("config")
return self.set_success()
@wf.step(sort=3)
def cleanup(self):
config = self.get_global_data("config")
return self.set_success()
3. 子步骤分支处理
@wf.step(sort=1)
@wf.step_child(SUCCESS="handle_success", FAILURE="handle_failure")
def process(self):
if self.is_success():
return self.set_success()
return self.set_failure()
def handle_success(self):
"""成功分支:记录日志和更新状态"""
self.socket_log("处理成功", base=True)
self.set_global_data("status", "completed")
return self.set_success()
def handle_failure(self):
"""失败分支:记录错误和清理资源"""
self.socket_log("处理失败", base=True)
self.cleanup_resources()
return self.set_success()
4. 错误处理
@wf.step(sort=1)
def process_with_error_handling(self):
try:
result = self.call_external_api()
return self.set_success(data=result)
except requests.RequestException as e:
return self.set_error(message=f"API 调用失败: {str(e)}")
except Exception as e:
return self.set_error(message=f"未知错误: {str(e)}")
5. 日志记录
@wf.step(sort=1)
def process_with_logging(self):
# 顶级日志(无缩进)
self.socket_log("开始处理...", level="i", base=True)
# 子级日志(有缩进)
self.socket_log("处理细节 1", level="i", base=False)
self.socket_log("处理细节 2", level="i")
# 调试日志
self.socket_log("调试信息", level="d")
# 错误日志
self.socket_log("发生错误", level="e")
return self.set_success()
6. 步骤命名
# ✅ 好的命名
@wf.step(sort=1)
def fetch_data(self):
return self.set_success()
@wf.step(sort=2)
def validate_data(self):
return self.set_success()
@wf.step(sort=3)
def save_data(self):
return self.set_success()
# ❌ 不好的命名
@wf.step(sort=1)
def step1(self):
return self.set_success()
@wf.step(sort=2)
def do_something(self):
return self.set_success()
7. 使用文档字符串
@wf.step(sort=1)
def fetch_data(self):
"""
从 API 获取数据
Returns:
StepResult: 包含获取的数据
"""
data = self.api_call()
return self.set_success(data=data)
常见问题
Q1: 如何在步骤之间传递数据?
方法一:使用 to_next_step_param
@wf.step(sort=1)
def step_one(self):
self.to_next_step_param(user_id=123)
return self.set_success()
@wf.step(sort=2)
def step_two(self, user_id=None):
print(user_id) # 123
方法二:使用全局数据
@wf.step(sort=1)
def step_one(self):
self.set_global_data("key", "value")
return self.set_success()
@wf.step(sort=2)
def step_two(self):
value = self.get_global_data("key")
return self.set_success()
Q2: 如何停止正在运行的工作流?
方法一:通过 WebSocket
socket.emit("stop", { classId: workflow_id });
方法二:通过 REST API
curl -X POST http://localhost:5050/api/workflow/stop \
-H "Content-Type: application/json" \
-d '{"classId": 140234567890}'
方法三:通过代码
manager.stop_workflow(work_id=workflow_id)
方法四:在步骤内部
return self.set_stop("停止原因")
Q3: 如何自定义状态枚举?
from hsxworkflow import StepStatus
# 动态添加新状态
MyStatus = StepStatus.set_new_status("CUSTOM_STATUS", 200)
# 使用自定义状态
wf = WorkRegisterHandler(step_key="test", StatusModel=StepStatus)
Q4: 如何处理步骤执行超时?
import concurrent.futures
@wf.step(sort=1)
def timeout_step(self):
with concurrent.futures.ThreadPoolExecutor() as executor:
future = executor.submit(long_running_task)
try:
result = future.result(timeout=30) # 30秒超时
return self.set_success(data=result)
except concurrent.futures.TimeoutError:
return self.set_failure(message="执行超时")
Q5: 如何并行执行多个步骤?
import concurrent.futures
@wf.step(sort=1)
def parallel_tasks(self):
tasks = [task1, task2, task3]
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
results = list(executor.map(lambda t: t(), tasks))
return self.set_success(data=results)
Q6: 如何查看工作流执行日志?
方法一:Web 界面
访问 http://localhost:5050 查看实时日志。
方法二:WebSocket
socket.on("work_flow_step_log", (data) => {
console.log(`[${data.WorkflowLogType}] ${data.WorkflowLog}`);
});
方法三:控制台
使用 socket_log() 输出到控制台。
Q7: 如何调试工作流?
方法一:启用调试模式
manager.run_app(debug=True)
方法二:使用调试日志
self.socket_log("调试信息", level="d")
方法三:查看 API 文档
访问 http://localhost:5050/docs 查看接口说明。
Q8: 支持哪些 Python 版本?
HsxWorkFlow 支持 Python 3.10 及以上版本,推荐使用 Python 3.11 或 3.12。
Q9: 如何升级 HsxWorkFlow?
# 使用 uv
uv pip install --upgrade hsxworkflow
# 使用 pip
pip install --upgrade hsxworkflow
Q10: 如何配置数据库连接?
在 .env 文件中配置:
DB_HOST=localhost
DB_PORT=3306
DB_USER=root
DB_PASSWORD=secret
DB_DATABASE=workflow_db
DB_CHARSET=utf8mb4
Q11: 如何配置 Redis 连接?
在 .env 文件中配置:
REDIS_HOST=localhost
REDIS_PORT=6379
REDIS_PASSWORD=
REDIS_DB=0
REDIS_MAX_CONNECTIONS=10
Q12: 如何设置 CORS?
在 .env 文件中配置:
WEB_CORS_ORIGINS=["http://localhost:3000", "http://localhost:8080"]
Q13: 如何禁用 API 文档?
在 .env 文件中配置:
WEB_DOCS_ENABLED=false
Q14: 如何设置日志级别?
在 .env 文件中配置:
LOG_LEVEL=DEBUG # DEBUG, INFO, WARNING, ERROR, CRITICAL
Q15: 如何设置线程池大小?
在 .env 文件中配置:
WORKFLOW_MAX_WORKERS=10
性能优化
1. 合理使用线程池
# 根据任务类型调整线程池大小
manager = WorkflowManager(max_workers=5) # IO 密集型任务
manager = WorkflowManager(max_workers=2) # CPU 密集型任务
2. 使用缓存
@wf.step(sort=1)
def process_with_cache(self):
# 检查缓存
cached = self.get_global_data("cached_data")
if cached:
return self.set_success(data=cached)
# 计算数据
data = self.compute_data()
# 设置缓存
self.set_global_data("cached_data", data)
return self.set_success(data=data)
3. 批量操作
@wf.step(sort=1)
def batch_process(self):
# 批量处理
items = self.get_items()
# 使用列表推导式
results = [self.process_item(item) for item in items]
return self.set_success(data=results)
4. 避免重复计算
@wf.step(sort=1)
def compute_once(self):
# 计算一次,多次使用
if not hasattr(self, '_computed_value'):
self._computed_value = self.expensive_computation()
return self.set_success(data=self._computed_value)
安全建议
1. 敏感信息
# ✅ 使用环境变量
DB_PASSWORD=${DB_PASSWORD}
REDIS_PASSWORD=${REDIS_PASSWORD}
# ❌ 不要硬编码
DB_PASSWORD=secret123
2. 输入验证
@wf.step(sort=1)
def validate_input(self, user_input=None):
if not user_input:
return self.set_failure(message="输入不能为空")
if len(user_input) > 100:
return self.set_failure(message="输入过长")
return self.set_success()
3. 错误信息
# ✅ 不暴露敏感信息
return self.set_error(message="处理失败")
# ❌ 暴露敏感信息
return self.set_error(message=f"数据库错误: {str(e)}")
总结
遵循这些最佳实践可以帮助你:
- 编写更清晰、更易维护的代码
- 提高工作流的可靠性和性能
- 避免常见的问题和陷阱
- 更好地利用框架的功能
希望这些最佳实践对你有所帮助!