Redis 集成
2026/3/20大约 3 分钟
Redis 集成
HsxWorkFlow 内置 Redis 操作封装,提供缓存、队列、计数等功能。
概览
RedisHandler
RedisHandler 是 Redis 操作的封装类。
获取 Redis 处理器
from hsxworkflow.Utils.redis_handler import get_redis_handler
rdb = get_redis_handler()
计数操作
增加计数
# 增加计数(默认 +1)
rdb.set_count("workflow_success")
# 增加指定数量
rdb.set_count("workflow_success", 5)
获取计数
# 获取计数值
count = rdb.get_count("workflow_success")
# 如果键不存在返回 None
count = rdb.get_count("nonexistent") # None
检查键是否存在
# 检查键是否存在
has = rdb.has_key("workflow_success") # True or False
初始化计数器
# 初始化计数器(带过期时间)
key = rdb.set_count_format_at_datetime("my_counter", expire_hours=48)
队列操作
推入队列
# 推入队列(7 天过期)
rdb.save_data_to_queue(
"queue_name",
{"data": "value"},
expire=604800
)
弹出队列
# 从队列弹出数据
data = rdb.get_data_from_queue("queue_name")
获取队列长度
# 获取队列长度
length = rdb.get_queue_len("queue_name")
完整示例
from hsxworkflow import WorkRegisterHandler, StepActionHandler
from hsxworkflow.Utils.redis_handler import get_redis_handler
wf = WorkRegisterHandler(
step_key="redis_demo",
title="Redis 集成",
desc="Redis 操作示例"
)
@wf.register_class()
class RedisDemo(StepActionHandler):
def __init__(self):
super().__init__()
self.rdb = get_redis_handler()
@wf.step(sort=1)
def init_counter(self):
"""初始化计数器"""
self.socket_log("初始化计数器...", base=True)
# 初始化计数器(48 小时过期)
key = self.rdb.set_count_format_at_datetime("task_counter", expire_hours=48)
self.socket_log(f"计数器键: {key}", base=True)
return self.set_success(message="计数器已初始化")
@wf.step(sort=2)
def increment_counter(self):
"""增加计数"""
self.socket_log("增加计数...", base=True)
# 增加计数
self.rdb.set_count("task_counter", 1)
# 获取当前计数
count = self.rdb.get_count("task_counter")
self.socket_log(f"当前计数: {count}", base=True)
return self.set_success(
message=f"计数已增加到 {count}",
data={"count": count}
)
@wf.step(sort=3)
def queue_operations(self):
"""队列操作"""
self.socket_log("队列操作...", base=True)
# 推入队列
for i in range(3):
data = {"task_id": i, "data": f"task_{i}"}
self.rdb.save_data_to_queue("task_queue", data, expire=86400)
self.socket_log(f"推入任务: {data}", base=True)
# 获取队列长度
length = self.rdb.get_queue_len("task_queue")
self.socket_log(f"队列长度: {length}", base=True)
return self.set_success(
message=f"队列操作完成,长度: {length}",
data={"queue_length": length}
)
@wf.step(sort=4)
def process_queue(self):
"""处理队列"""
self.socket_log("处理队列...", base=True)
while True:
# 弹出队列
data = self.rdb.get_data_from_queue("task_queue")
if data is None:
self.socket_log("队列为空", base=True)
break
# 处理数据
self.socket_log(f"处理任务: {data}", base=True)
# 增加计数
self.rdb.set_count("processed_tasks", 1)
# 获取处理计数
count = self.rdb.get_count("processed_tasks")
self.socket_log(f"已处理任务数: {count}", base=True)
return self.set_success(
message=f"队列处理完成,处理了 {count} 个任务",
data={"processed_count": count}
)
配置
Redis 配置
在 .env 文件中配置:
# Redis 配置
REDIS_HOST=localhost
REDIS_PORT=6379
REDIS_PASSWORD=
REDIS_DB=0
REDIS_MAX_CONNECTIONS=10
代码中访问配置
from hsxworkflow import config
print(config.redis.host) # localhost
print(config.redis.port) # 6379
print(config.redis.url) # redis://localhost:6379/0
最佳实践
1. 键命名规范
# ✅ 使用有意义的键名
rdb.set_count("workflow_success_2025_01_31", 1)
# ❌ 使用无意义的键名
rdb.set_count("count", 1)
2. 过期时间设置
# ✅ 设置合理的过期时间
rdb.save_data_to_queue("temp_queue", data, expire=3600) # 1 小时
# ❌ 不设置过期时间(可能导致内存泄漏)
rdb.save_data_to_queue("temp_queue", data)
3. 错误处理
@wf.step(sort=1)
def safe_redis_operation(self):
try:
rdb = get_redis_handler()
rdb.set_count("counter", 1)
return self.set_success()
except Exception as e:
self.socket_log(f"Redis 操作失败: {e}", level="e", base=True)
return self.set_error(message=f"Redis 操作失败: {str(e)}")
下一步
现在你已经了解了 Redis 集成的使用方法,可以继续学习: