QAResourceManager - QUANTAXIS统一资源管理器
版本: QUANTAXIS 2.1.0+ 作者: @yutiansut @quantaxis 日期: 2025
📋 概述
QAResourceManager是QUANTAXIS 2.1.0新增的统一资源管理器,提供对MongoDB、RabbitMQ、ClickHouse、Redis等外部资源的统一管理和优雅关闭机制。
核心特性
✅ 连接池管理 - 自动复用连接,减少开销 ✅ 上下文管理器 - 支持with语句,自动释放资源 ✅ 优雅关闭 - 确保资源正确释放,无泄漏 ✅ 自动重连 - 连接断开时自动重试 ✅ 健康检查 - 定期检查连接状态 ✅ 线程安全 - 支持多线程环境 ✅ 单例模式 - 全局资源池管理 ✅ atexit清理 - 程序退出时自动关闭资源
🚀 快速开始
安装依赖
# 基础依赖 (MongoDB)
pip install pymongo motor
# RabbitMQ
pip install pika
# ClickHouse
pip install clickhouse-driver
# Redis
pip install redis
# 完整安装
pip install quantaxis[full]最简示例
from QUANTAXIS.QAUtil.QAResourceManager import QAMongoResourceManager
# 使用with语句(推荐)
with QAMongoResourceManager() as mongo:
db = mongo.get_database('quantaxis')
result = db.stock_day.find_one({'code': '000001'})
print(result)
# 自动关闭连接,无需手动close()📚 详细文档
1. MongoDB资源管理器
1.1 基本用法
from QUANTAXIS.QAUtil.QAResourceManager import QAMongoResourceManager
# 方法1: 上下文管理器(推荐)
with QAMongoResourceManager() as mongo:
db = mongo.get_database('quantaxis')
# 操作数据库...
# 方法2: 手动管理
mongo = QAMongoResourceManager()
try:
mongo.connect()
db = mongo.get_database('quantaxis')
# 操作数据库...
finally:
mongo.close() # 确保关闭1.2 配置参数
mongo = QAMongoResourceManager(
uri='mongodb://user:pass@localhost:27017', # 连接URI
max_pool_size=100, # 连接池大小
server_selection_timeout_ms=5000, # 服务器选择超时(毫秒)
async_mode=False # 是否使用异步客户端
)1.3 异步模式
import asyncio
async def async_query():
async with QAMongoResourceManager(async_mode=True) as mongo:
db = mongo.get_database('quantaxis')
result = await db.stock_day.find_one({'code': '000001'})
return result
# 运行异步函数
asyncio.run(async_query())1.4 连接池配置
QAMongoResourceManager默认配置:
maxPoolSize: 100 (最大连接数)
minPoolSize: 10 (最小连接数)
maxIdleTimeMS: 60000 (60秒, 连接最大空闲时间)
waitQueueTimeoutMS: 5000 (5秒, 等待连接池超时)
2. RabbitMQ资源管理器
2.1 基本用法
from QUANTAXIS.QAUtil.QAResourceManager import QARabbitMQResourceManager
with QARabbitMQResourceManager() as rabbitmq:
channel = rabbitmq.get_channel()
# 声明队列
channel.queue_declare(queue='test_queue', durable=True)
# 发布消息
channel.basic_publish(
exchange='',
routing_key='test_queue',
body='Hello QUANTAXIS'
)
# 消费消息
method_frame, header_frame, body = channel.basic_get(queue='test_queue')
if method_frame:
print(f"收到消息: {body.decode()}")
channel.basic_ack(method_frame.delivery_tag)
# 自动关闭连接和通道2.2 配置参数
rabbitmq = QARabbitMQResourceManager(
host='localhost', # RabbitMQ主机
port=5672, # 端口
username='admin', # 用户名
password='admin', # 密码
vhost='/', # 虚拟主机
heartbeat=600, # 心跳间隔(秒), 0表示禁用
socket_timeout=5 # Socket超时(秒)
)2.3 安全特性
密码擦除: 认证后自动清除内存中的密码 (
erase_on_connect=True)心跳保持: 默认600秒心跳,防止连接超时
优雅关闭: 先关闭通道,再关闭连接
3. ClickHouse资源管理器
3.1 基本用法
from QUANTAXIS.QAUtil.QAResourceManager import QAClickHouseResourceManager
with QAClickHouseResourceManager() as clickhouse:
# 执行SQL
result = clickhouse.execute("SELECT version()")
print(f"ClickHouse版本: {result[0][0]}")
# 查询并返回DataFrame
df = clickhouse.query_dataframe("""
SELECT * FROM stock_day
WHERE code = '000001'
LIMIT 10
""")
print(df.head())3.2 配置参数
clickhouse = QAClickHouseResourceManager(
host='localhost', # ClickHouse主机
port=9000, # Native protocol端口
database='quantaxis', # 数据库名
user='default', # 用户名
password='', # 密码
compression=True, # 启用压缩
insert_block_size=100000000 # 插入块大小
)3.3 性能优化配置
# 内置性能优化设置:
settings = {
'insert_block_size': 100000000, # 大批量插入
'max_threads': 4, # 最大查询线程数
'max_block_size': 65536, # 最大块大小
}4. Redis资源管理器
4.1 基本用法
from QUANTAXIS.QAUtil.QAResourceManager import QARedisResourceManager
with QARedisResourceManager() as redis_mgr:
# 设置键值(60秒过期)
redis_mgr.set('test_key', 'test_value', ex=60)
# 获取值
value = redis_mgr.get('test_key')
print(f"值: {value}")
# 删除键
redis_mgr.delete('test_key')4.2 配置参数
redis_mgr = QARedisResourceManager(
host='localhost', # Redis主机
port=6379, # 端口
db=0, # 数据库编号
password=None, # 密码(可选)
max_connections=50, # 连接池最大连接数
socket_timeout=5, # Socket超时(秒)
socket_keepalive=True, # 启用TCP keepalive
decode_responses=True # 解码响应为字符串
)4.3 管道操作
with QARedisResourceManager() as redis_mgr:
# 创建管道
pipe = redis_mgr.pipeline(transaction=True)
# 批量操作
pipe.set('key1', 'value1')
pipe.set('key2', 'value2')
pipe.set('key3', 'value3')
# 执行
results = pipe.execute()
print(f"管道操作结果: {results}")4.4 健康检查
# Redis内置健康检查,每30秒自动检查连接
# health_check_interval=305. 统一资源池管理器
5.1 基本用法(推荐)
from QUANTAXIS.QAUtil.QAResourceManager import QAResourcePool
# 获取单例实例
pool = QAResourcePool.get_instance()
# 获取各类资源
mongo = pool.get_mongo()
rabbitmq = pool.get_rabbitmq()
clickhouse = pool.get_clickhouse()
redis = pool.get_redis()
# 使用资源...
db = mongo.get_database('quantaxis')
channel = rabbitmq.get_channel()
# 健康检查
health = pool.health_check()
print(health) # {'mongo': True, 'rabbitmq': True, ...}
# 关闭所有资源
pool.close_all()5.2 单例模式
QAResourcePool采用单例模式,全局唯一:
pool1 = QAResourcePool.get_instance()
pool2 = QAResourcePool.get_instance()
assert pool1 is pool2 # True, 同一实例5.3 自动清理
import atexit
# QAResourcePool在初始化时自动注册atexit清理函数
# 程序退出时自动调用pool.close_all()
# 无需手动清理5.4 单独关闭资源
pool = QAResourcePool.get_instance()
# 关闭单个资源
pool.close_resource('mongo')
pool.close_resource('rabbitmq')
pool.close_resource('clickhouse')
pool.close_resource('redis')
# 或关闭所有资源
pool.close_all()6. 便捷函数
6.1 快捷上下文管理器
from QUANTAXIS.QAUtil.QAResourceManager import (
get_mongo_resource,
get_rabbitmq_resource,
get_clickhouse_resource,
get_redis_resource
)
# MongoDB
with get_mongo_resource() as mongo:
db = mongo.get_database('quantaxis')
# ...
# RabbitMQ
with get_rabbitmq_resource() as rabbitmq:
channel = rabbitmq.get_channel()
# ...
# ClickHouse
with get_clickhouse_resource() as clickhouse:
df = clickhouse.query_dataframe("SELECT * FROM stock_day LIMIT 10")
# ...
# Redis
with get_redis_resource() as redis_mgr:
redis_mgr.set('key', 'value')
# ...🔧 高级用法
1. 自定义连接配置
MongoDB自定义URI
# 从环境变量获取
import os
mongo_uri = os.getenv('MONGODB_URI', 'mongodb://localhost:27017')
with QAMongoResourceManager(uri=mongo_uri) as mongo:
db = mongo.get_database('quantaxis')RabbitMQ认证
with QARabbitMQResourceManager(
host='rabbitmq.example.com',
username='quantaxis_user',
password='secure_password',
vhost='/quantaxis'
) as rabbitmq:
channel = rabbitmq.get_channel()2. 连接重试
所有资源管理器均支持reconnect()方法:
mongo = QAMongoResourceManager()
try:
mongo.connect()
# 使用连接...
except Exception as e:
# 连接失败,重试
mongo.reconnect()3. 健康检查
pool = QAResourcePool.get_instance()
# 定期健康检查
import time
while True:
health = pool.health_check()
for resource, status in health.items():
if not status:
print(f"❌ {resource}连接异常,正在重连...")
# 自动重连逻辑...
time.sleep(60) # 每60秒检查一次4. 线程安全
所有资源管理器使用threading.RLock确保线程安全:
import threading
pool = QAResourcePool.get_instance()
def worker():
mongo = pool.get_mongo()
# 多线程安全访问
db = mongo.get_database('quantaxis')
# ...
threads = [threading.Thread(target=worker) for _ in range(10)]
for t in threads:
t.start()
for t in threads:
t.join()💡 最佳实践
1. 使用with语句
✅ 推荐:
with QAMongoResourceManager() as mongo:
db = mongo.get_database('quantaxis')
# 操作...
# 自动关闭❌ 不推荐:
mongo = QAMongoResourceManager()
mongo.connect()
db = mongo.get_database('quantaxis')
# 忘记close() - 资源泄漏!2. 使用资源池管理全局资源
✅ 推荐 (长期运行的应用):
pool = QAResourcePool.get_instance()
mongo = pool.get_mongo() # 复用同一连接
rabbitmq = pool.get_rabbitmq()
# 应用运行...
# 程序退出时自动清理(atexit)❌ 不推荐 (频繁创建销毁):
for i in range(1000):
with QAMongoResourceManager() as mongo: # 每次创建新连接!
db = mongo.get_database('quantaxis')3. 异常处理
✅ 推荐:
try:
with QAMongoResourceManager() as mongo:
db = mongo.get_database('quantaxis')
# 操作可能抛出异常...
except pymongo.errors.ConnectionFailure as e:
print(f"MongoDB连接失败: {e}")
except Exception as e:
print(f"其他错误: {e}")4. 配置外部化
✅ 推荐:
# config.py
MONGODB_URI = os.getenv('MONGODB_URI', 'mongodb://localhost:27017')
RABBITMQ_HOST = os.getenv('RABBITMQ_HOST', 'localhost')
# app.py
from config import MONGODB_URI, RABBITMQ_HOST
with QAMongoResourceManager(uri=MONGODB_URI) as mongo:
# ...5. 日志监控
import logging
logging.basicConfig(level=logging.INFO)
# QAResourceManager会自动记录:
# - 连接成功/失败
# - 资源关闭
# - 重连尝试
# - 错误信息📊 性能优化
MongoDB连接池调优
mongo = QAMongoResourceManager(
max_pool_size=200, # 高并发场景
server_selection_timeout_ms=10000, # 增加超时
)RabbitMQ心跳调优
rabbitmq = QARabbitMQResourceManager(
heartbeat=300, # 减少心跳频率(低流量场景)
socket_timeout=10 # 增加超时(慢网络)
)ClickHouse查询优化
clickhouse = QAClickHouseResourceManager(
insert_block_size=500000000, # 超大批量插入
)
# 查询时使用压缩
df = clickhouse.query_dataframe("""
SELECT * FROM stock_day
WHERE code IN ('000001', '000002')
SETTINGS max_threads = 8
""")Redis连接池调优
redis_mgr = QARedisResourceManager(
max_connections=100, # 高并发场景
socket_keepalive=True, # 保持连接
health_check_interval=60 # 增加健康检查间隔
)🐛 故障排查
问题1: ImportError
ImportError: No module named 'pymongo'解决:
pip install pymongo motor pika clickhouse-driver redis
# 或
pip install quantaxis[full]问题2: 连接超时
pymongo.errors.ServerSelectionTimeoutError解决:
# 增加超时时间
mongo = QAMongoResourceManager(
server_selection_timeout_ms=10000 # 10秒
)问题3: 资源泄漏
症状: 程序运行一段时间后,数据库连接数不断增加
解决:
# 方法1: 使用with语句
with QAMongoResourceManager() as mongo:
# 自动关闭
# 方法2: 使用资源池
pool = QAResourcePool.get_instance()
mongo = pool.get_mongo() # 复用连接问题4: RabbitMQ连接断开
症状: pika.exceptions.StreamLostError
解决:
rabbitmq = QARabbitMQResourceManager(
heartbeat=600, # 启用心跳
)
# 或手动重连
try:
channel = rabbitmq.get_channel()
except pika.exceptions.StreamLostError:
rabbitmq.reconnect()
channel = rabbitmq.get_channel()📖 示例代码
完整示例请参考:
examples/resource_manager_example.py - 9个完整示例
QUANTAXIS/QAUtil/QAResourceManager.py - 源码和内联文档
🔗 相关文档
🤝 贡献
如果发现问题或有改进建议,欢迎:
提交Issue: https://github.com/QUANTAXIS/QUANTAXIS/issues
提交PR: https://github.com/QUANTAXIS/QUANTAXIS/pulls
📝 更新日志
v2.1.0 (2025-01-25)
✨ 新增QAResourceManager统一资源管理器
✨ 新增MongoDB/RabbitMQ/ClickHouse/Redis管理器
✨ 新增QAResourcePool单例资源池
✨ 新增便捷上下文管理器函数
✨ 新增自动atexit清理机制
✨ 新增健康检查功能
✨ 新增线程安全支持
👥 作者
@yutiansut @quantaxis
📄 许可证
MIT License
Copyright (c) 2016-2025 yutiansut/QUANTAXIS
Last updated
Was this helpful?