QAPubSub 模块文档
概述
QAPubSub 是 QUANTAXIS 的发布-订阅消息系统,基于 RabbitMQ 提供高效的消息队列服务。支持 1-1、1-n、n-n 的消息分发模式,可用于计算任务分发收集、实时订单流处理等场景。
模块架构
核心组件
base.py: 基础消息处理类
producer.py: 消息生产者
consumer.py: 消息消费者
setting.py: 配置管理
declaters.py: 队列声明和管理
主要功能
1. 消息生产者
from QUANTAXIS.QAPubSub.producer import QAProducer
# 创建生产者
producer = QAProducer(
host='localhost',
port=5672,
username='guest',
password='guest'
)
# 发送消息
producer.send_message(
exchange='qa_exchange',
routing_key='data.stock',
message={'code': '000001', 'price': 10.5}
)
2. 消息消费者
from QUANTAXIS.QAPubSub.consumer import QAConsumer
# 定义消息处理函数
def process_message(channel, method, properties, body):
data = json.loads(body)
print(f"处理消息: {data}")
# 确认消息
channel.basic_ack(delivery_tag=method.delivery_tag)
# 创建消费者
consumer = QAConsumer(
host='localhost',
port=5672,
username='guest',
password='guest'
)
# 订阅消息
consumer.subscribe(
queue='data_queue',
callback=process_message
)
# 开始消费
consumer.start_consuming()
3. 分发模式
# 1-1 模式:点对点消息
producer.send_direct('task_queue', task_data)
# 1-n 模式:广播消息
producer.send_fanout('broadcast_exchange', broadcast_data)
# n-n 模式:主题订阅
producer.send_topic('topic_exchange', 'stock.price.*', price_data)
使用场景
实时数据分发: 行情数据实时推送
任务队列: 计算任务分发和结果收集
订单流处理: 交易订单的实时处理
系统解耦: 微服务间异步通信
配置示例
# 消息队列配置
RABBITMQ_CONFIG = {
'host': 'localhost',
'port': 5672,
'username': 'quantaxis',
'password': 'password',
'virtual_host': '/qa'
}
相关模块
QAEngine: 任务引擎,使用消息队列进行任务分发
QAWebServer: Web服务,通过消息队列处理请求
QASchedule: 调度系统,使用消息队列触发任务
Last updated
Was this helpful?