QUANTAXIS 2.0 升级计划
制定时间: 2025-10-25 制定者: @yutiansut @quantaxis 目标版本: QUANTAXIS 2.1.0
📊 升级概览
核心目标
Python 版本: 3.5-3.10 → 3.9-3.12
深度整合 QARS2 (Rust核心)
集成 QADataSwap (跨语言通信)
对接 QAEXCHANGE-RS (交易所系统)
依赖现代化 + 性能优化
生态系统架构
┌─────────────────────────────────────────────────────────┐
│ QUANTAXIS Python (主项目) │
│ 策略开发 | 回测 | 数据分析 | Web服务 │
└──────────────┬──────────────────────────────────────────┘
│ PyO3 Bindings
↓
┌──────────────────────────────────────────────────────────┐
│ QARS2 (Rust核心) │
│ 高性能账户 | 回测引擎 | 零拷贝IPC | Polars数据处理 │
└──────────────┬──────────────────────────────────────────┘
│ QADataSwap (零拷贝)
↓
┌──────────────────────────────────────────────────────────┐
│ QAEXCHANGE-RS (交易所系统) │
│ 撮合引擎 | WebSocket | WAL存储 | 行情推送 │
└──────────────────────────────────────────────────────────┘🎯 Phase 1: 基础环境升级 (1-2天)
1.1 Python版本更新
文件: setup.py
# 当前
if sys.version_info.major != 3 or sys.version_info.minor not in [5, 6, 7, 8, 9, 10]:
print('wrong version, should be 3.5/3.6/3.7/3.8/3.9 version')
# 升级后
if sys.version_info < (3, 9) or sys.version_info >= (4, 0):
print('错误: 需要 Python 3.9-3.12 版本')
print('当前版本: {}.{}.{}'.format(*sys.version_info[:3]))
sys.exit(1)验证:
python --version # 应显示 >= 3.9
pip install -e .1.2 核心依赖升级
文件: requirements.txt
数据库层
# 旧版本 → 新版本
pymongo==3.11.2 → pymongo>=4.10.0,<5.0.0
motor==2.2 → motor>=3.7.0,<4.0.0
clickhouse-driver → clickhouse-driver>=0.2.9,<0.3.0
clickhouse-cityhash → clickhouse-cityhash>=1.0.2,<2.0.0
redis>=0.18.0 → redis>=5.2.0,<6.0.0数据处理层
pandas>=1.1.5 → pandas>=2.0.0,<3.0.0
numpy>=1.12.0 → numpy>=1.24.0,<2.0.0
pyarrow>=6.0.1 → pyarrow>=15.0.0,<18.0.0
polars>=0.20.0,<0.22.0 → 新增!
scipy → scipy>=1.11.0,<2.0.0
statsmodels>=0.12.1 → statsmodels>=0.14.0,<0.15.0Web/异步层
tornado>=6.3.2 → tornado>=6.4.0,<7.0.0
flask>=0.12.2 → flask>=3.0.0,<4.0.0
pika → pika>=1.3.0,<2.0.0
gevent-websocket>=0.10.1 → gevent-websocket>=0.10.1
websocket-client → websocket-client>=1.8.0,<2.0.0金融分析层
tushare → tushare>=1.4.0
pytdx>=1.67 → pytdx>=1.75
empyrical → empyrical>=0.5.5
pyfolio → pyfolio>=0.9.2
alphalens → alphalens>=0.4.3新增: Rust集成
# QARS2 Rust核心 (PyO3绑定)
qars3>=0.0.45
# QADataSwap 跨语言通信
qadataswap>=0.1.0
# 高性能序列化
orjson>=3.10.0
msgpack>=1.1.0其他优化
# 移除过时依赖
# - delegator.py>=0.0.12 (已不再维护)
# - pyconvert>=0.6.3 (pandas已内置)
# - six>=1.10.0 (Python 3.9+不需要)
# 更新版本控制
protobuf>=3.4.0 → protobuf>=4.25.0,<6.0.0
lxml → lxml>=5.0.0,<6.0.0
requests → requests>=2.32.0,<3.0.01.3 兼容性测试
测试脚本: scripts/test_dependencies.py
#!/usr/bin/env python3
"""测试所有依赖是否正确安装和兼容"""
def test_imports():
"""测试核心模块导入"""
print("测试核心依赖导入...")
# 数据库
import pymongo
print(f"✓ pymongo {pymongo.__version__}")
import motor
print(f"✓ motor {motor.version}")
from clickhouse_driver import Client
print(f"✓ clickhouse-driver")
# 数据处理
import pandas as pd
print(f"✓ pandas {pd.__version__}")
import numpy as np
print(f"✓ numpy {np.__version__}")
import pyarrow as pa
print(f"✓ pyarrow {pa.__version__}")
try:
import polars as pl
print(f"✓ polars {pl.__version__}")
except ImportError:
print("⚠ polars 未安装 (可选)")
# Rust集成
try:
import qars3
print(f"✓ qars3 (QARS2 Rust核心)")
except ImportError:
print("⚠ qars3 未安装 (可选,用于性能加速)")
try:
import qadataswap
print(f"✓ qadataswap {qadataswap.__version__}")
except ImportError:
print("⚠ qadataswap 未安装 (可选,用于跨语言通信)")
print("\n所有核心依赖测试通过!")
if __name__ == "__main__":
test_imports()执行:
python scripts/test_dependencies.py🔧 Phase 2: QARS2 深度集成 (2-3天)
2.1 创建QARS桥接模块
新建: QUANTAXIS/QARSBridge/__init__.py
"""
QARS Bridge - QUANTAXIS与QARS2 Rust核心的桥接层
提供Python友好的接口访问Rust高性能组件:
- QARSAccount: 高性能QIFI账户
- QARSBacktest: Rust回测引擎
- QARSData: Polars数据结构
- QARSMarket: 零拷贝行情推送
"""
__all__ = [
'QARSAccount',
'QARSBacktest',
'QARSData',
'has_qars_support',
]
# 检测QARS2是否可用
try:
import qars3
HAS_QARS = True
except ImportError:
HAS_QARS = False
import warnings
warnings.warn(
"QARS2 Rust核心未安装,将使用纯Python实现。"
"安装 qars3 以获得更高性能。",
ImportWarning
)
def has_qars_support():
"""检查是否有QARS2支持"""
return HAS_QARS
if HAS_QARS:
from .qars_account import QARSAccount
from .qars_backtest import QARSBacktest
from .qars_data import QARSData
else:
# 提供fallback实现
from QUANTAXIS.QIFI.QifiAccount import QIFI_Account as QARSAccount
from QUANTAXIS.QABacktest import QA_Backtest as QARSBacktest
from QUANTAXIS.QAData import QA_DataStruct_Stock_day as QARSData新建: QUANTAXIS/QARSBridge/qars_account.py
"""QARS账户桥接 - 使用Rust高性能QIFI账户"""
import qars3
from typing import Dict, List, Optional
import pandas as pd
class QARSAccount:
"""
QARS高性能账户包装器
使用Rust实现的QIFI账户,比纯Python版本快10-100倍
完全兼容QIFI协议
Examples:
>>> account = QARSAccount("test_account", init_cash=1000000)
>>> account.send_order("000001", 100, 10.5, "BUY")
>>> positions = account.get_positions()
"""
def __init__(self, account_cookie: str, init_cash: float = 1000000,
broker: str = "QUANTAXIS"):
"""
初始化QARS账户
Args:
account_cookie: 账户ID
init_cash: 初始资金
broker: 券商名称
"""
self._account = qars3.QAAccount(
account_cookie=account_cookie,
init_cash=init_cash,
broker=broker
)
self.account_cookie = account_cookie
def send_order(self, code: str, amount: float, price: float,
direction: str, offset: str = "OPEN") -> Dict:
"""
发送订单
Args:
code: 股票/期货代码
amount: 数量
price: 价格
direction: BUY/SELL
offset: OPEN/CLOSE (期货用)
Returns:
订单字典
"""
return self._account.send_order(code, amount, price, direction, offset)
def get_positions(self) -> pd.DataFrame:
"""
获取持仓 (Rust -> Polars -> Pandas)
Returns:
持仓DataFrame
"""
# Rust返回Polars DataFrame
polars_df = self._account.get_positions_polars()
# 转换为Pandas (零拷贝通过Arrow)
return polars_df.to_pandas()
def get_account_info(self) -> Dict:
"""获取账户信息"""
return self._account.get_account_info()
def to_qifi(self) -> Dict:
"""导出为QIFI格式"""
return self._account.to_qifi()
@classmethod
def from_qifi(cls, qifi_dict: Dict) -> 'QARSAccount':
"""从QIFI字典创建账户"""
account = qars3.QAAccount.from_qifi(qifi_dict)
wrapper = cls.__new__(cls)
wrapper._account = account
wrapper.account_cookie = qifi_dict['account_cookie']
return wrapper2.2 QADataSwap集成
新建: QUANTAXIS/QADataSwap/__init__.py
"""
QADataSwap集成 - 跨语言零拷贝数据交换
支持Python/Rust/C++之间的高效数据传输:
- 共享内存零拷贝
- Apache Arrow格式
- 支持DataFrame/行情/订单等数据
"""
try:
from qadataswap import (
SharedDataFrame,
create_writer,
create_reader,
has_arrow_support
)
HAS_DATASWAP = True
except ImportError:
HAS_DATASWAP = False
import warnings
warnings.warn(
"QADataSwap未安装,跨语言通信功能不可用",
ImportWarning
)
__all__ = [
'SharedDataFrame',
'create_writer',
'create_reader',
'has_dataswap_support',
'publish_market_data',
'subscribe_market_data',
]
def has_dataswap_support():
return HAS_DATASWAP
if HAS_DATASWAP:
from .market_publisher import publish_market_data
from .market_subscriber import subscribe_market_data
else:
def publish_market_data(*args, **kwargs):
raise RuntimeError("QADataSwap未安装")
def subscribe_market_data(*args, **kwargs):
raise RuntimeError("QADataSwap未安装")新建: QUANTAXIS/QADataSwap/market_publisher.py
"""行情发布器 - 通过共享内存零拷贝发送行情数据"""
import pandas as pd
from qadataswap import create_writer
from typing import Optional
class MarketDataPublisher:
"""
行情数据发布器
使用零拷贝共享内存向多个订阅者推送行情
支持tick/分钟/日线数据
Examples:
>>> publisher = MarketDataPublisher("market_data")
>>> publisher.publish_tick(tick_df)
"""
def __init__(self, name: str = "qa_market", size_mb: int = 500):
"""
初始化发布器
Args:
name: 共享内存名称
size_mb: 共享内存大小(MB)
"""
self.writer = create_writer(name, size_mb=size_mb, buffer_count=3)
self.name = name
def publish_tick(self, tick_data: pd.DataFrame):
"""发布tick数据"""
self.writer.write_dataframe(tick_data)
def publish_bar(self, bar_data: pd.DataFrame, frequency: str = "1min"):
"""发布K线数据"""
self.writer.write_dataframe(bar_data, metadata={"freq": frequency})
def close(self):
"""关闭发布器"""
self.writer.close()
def publish_market_data(data: pd.DataFrame, name: str = "qa_market"):
"""
便捷函数: 发布行情数据
Args:
data: 行情DataFrame
name: 共享内存名称
"""
publisher = MarketDataPublisher(name)
publisher.publish_tick(data)
return publisher2.3 QAEXCHANGE-RS对接
新建: QUANTAXIS/QAExchange/__init__.py
"""
QAEXCHANGE-RS 对接模块
连接到QAEXCHANGE-RS交易所进行:
- 模拟交易
- 回测
- 策略测试
"""
import requests
import websocket
from typing import Dict, List, Optional, Callable
import json
__all__ = [
'QAExchangeClient',
'QAExchangeWebSocket',
]
class QAExchangeClient:
"""
QAEXCHANGE-RS HTTP客户端
Examples:
>>> client = QAExchangeClient("http://localhost:8080")
>>> client.login("user1", "password")
>>> client.send_order("000001", 100, 10.5, "BUY")
"""
def __init__(self, base_url: str = "http://localhost:8080"):
self.base_url = base_url
self.token: Optional[str] = None
self.session = requests.Session()
def login(self, username: str, password: str) -> Dict:
"""用户登录"""
response = self.session.post(
f"{self.base_url}/api/v1/login",
json={"username": username, "password": password}
)
response.raise_for_status()
data = response.json()
self.token = data['token']
self.session.headers.update({'Authorization': f'Bearer {self.token}'})
return data
def send_order(self, symbol: str, volume: float, price: float,
direction: str) -> Dict:
"""发送订单"""
response = self.session.post(
f"{self.base_url}/api/v1/order",
json={
"symbol": symbol,
"volume": volume,
"price": price,
"direction": direction,
}
)
response.raise_for_status()
return response.json()
def get_positions(self) -> List[Dict]:
"""获取持仓"""
response = self.session.get(f"{self.base_url}/api/v1/positions")
response.raise_for_status()
return response.json()
class QAExchangeWebSocket:
"""
QAEXCHANGE-RS WebSocket客户端
用于接收实时行情和订单回报
Examples:
>>> ws = QAExchangeWebSocket("ws://localhost:8080/ws")
>>> ws.on_tick = lambda data: print(data)
>>> ws.connect()
>>> ws.subscribe_market(["000001", "000002"])
"""
def __init__(self, ws_url: str = "ws://localhost:8080/ws"):
self.ws_url = ws_url
self.ws: Optional[websocket.WebSocketApp] = None
# 回调函数
self.on_tick: Optional[Callable] = None
self.on_order: Optional[Callable] = None
self.on_trade: Optional[Callable] = None
def connect(self, token: Optional[str] = None):
"""连接WebSocket"""
headers = {}
if token:
headers['Authorization'] = f'Bearer {token}'
self.ws = websocket.WebSocketApp(
self.ws_url,
header=headers,
on_message=self._on_message,
on_error=self._on_error,
on_close=self._on_close,
)
import threading
threading.Thread(target=self.ws.run_forever, daemon=True).start()
def subscribe_market(self, symbols: List[str]):
"""订阅行情"""
self.ws.send(json.dumps({
"action": "subscribe",
"symbols": symbols
}))
def _on_message(self, ws, message):
"""处理消息"""
data = json.loads(message)
msg_type = data.get('type')
if msg_type == 'tick' and self.on_tick:
self.on_tick(data)
elif msg_type == 'order' and self.on_order:
self.on_order(data)
elif msg_type == 'trade' and self.on_trade:
self.on_trade(data)
def _on_error(self, ws, error):
print(f"WebSocket错误: {error}")
def _on_close(self, ws, close_status_code, close_msg):
print("WebSocket连接关闭")📊 Phase 3: 数据层优化 (2-3天)
3.1 Polars数据结构支持
新建: QUANTAXIS/QAData/QADataStruct_Polars.py
"""
Polars数据结构 - 高性能列式数据处理
相比Pandas:
- 速度快5-10倍
- 内存占用少50%
- 原生支持多线程
- 与Rust无缝集成
"""
import polars as pl
import pandas as pd
from typing import Union, Optional
class QA_DataStruct_Polars:
"""
Polars数据结构基类
提供与现有QA_DataStruct兼容的接口
但使用Polars作为底层存储以获得更高性能
"""
def __init__(self, data: Union[pl.DataFrame, pd.DataFrame]):
if isinstance(data, pd.DataFrame):
self.data = pl.from_pandas(data)
else:
self.data = data
def to_pandas(self) -> pd.DataFrame:
"""转换为Pandas (零拷贝)"""
return self.data.to_pandas()
def to_qfq(self):
"""前复权 (使用Polars并行计算)"""
# 使用Polars的并行处理
return self.data.with_columns([
(pl.col('close') * pl.col('adj_factor')).alias('close'),
(pl.col('open') * pl.col('adj_factor')).alias('open'),
(pl.col('high') * pl.col('adj_factor')).alias('high'),
(pl.col('low') * pl.col('adj_factor')).alias('low'),
])
def select_time(self, start: str, end: str):
"""时间范围选择 (Polars优化)"""
return self.data.filter(
(pl.col('datetime') >= start) & (pl.col('datetime') <= end)
)
def resample(self, frequency: str):
"""重采样 (使用Polars group_by_dynamic)"""
return self.data.group_by_dynamic(
'datetime',
every=frequency,
by='code'
).agg([
pl.col('open').first(),
pl.col('high').max(),
pl.col('low').min(),
pl.col('close').last(),
pl.col('volume').sum(),
])
class QA_DataStruct_Stock_day_Polars(QA_DataStruct_Polars):
"""股票日线数据 (Polars版本)"""
pass
class QA_DataStruct_Stock_min_Polars(QA_DataStruct_Polars):
"""股票分钟线数据 (Polars版本)"""
pass3.2 ClickHouse优化
修改: QUANTAXIS/QAFetch/QAClickhouse.py
"""ClickHouse数据获取优化"""
from clickhouse_driver import Client
import polars as pl
import pandas as pd
from typing import Optional, Union
class QA_ClickHouse:
"""
ClickHouse客户端优化版
新增功能:
- Polars原生支持
- 批量写入优化
- 查询结果缓存
"""
def __init__(self, host='localhost', port=9000,
database='quantaxis', user='default', password=''):
self.client = Client(
host=host,
port=port,
database=database,
user=user,
password=password,
# 新增性能优化参数
settings={
'max_threads': 8,
'max_memory_usage': '10GB',
'use_uncompressed_cache': 1,
}
)
def query_polars(self, sql: str) -> pl.DataFrame:
"""
查询并返回Polars DataFrame
相比query_dataframe (Pandas):
- 速度快3-5倍
- 内存占用少40%
"""
result = self.client.execute(sql, with_column_types=True)
data, columns_with_types = result[0], result[1]
column_names = [col[0] for col in columns_with_types]
# 直接构造Polars DataFrame (避免中间Pandas转换)
return pl.DataFrame({
name: [row[i] for row in data]
for i, name in enumerate(column_names)
})
def insert_polars(self, table: str, df: pl.DataFrame,
batch_size: int = 100000):
"""
批量插入Polars DataFrame
Args:
table: 表名
df: Polars DataFrame
batch_size: 批次大小
"""
# Polars原生批次迭代 (比Pandas快)
for batch_start in range(0, len(df), batch_size):
batch = df.slice(batch_start, batch_size)
self.client.insert_dataframe(
f'INSERT INTO {table} VALUES',
batch.to_pandas() # ClickHouse驱动需要Pandas
)
def query_qifi_accounts(self, account_cookies: list) -> pl.DataFrame:
"""
查询QIFI账户 (Polars优化版)
使用ClickHouse的JSON函数直接解析QIFI结构
"""
cookies_str = ','.join(f"'{c}'" for c in account_cookies)
sql = f"""
SELECT
account_cookie,
JSONExtractFloat(qifi, 'accounts', 'balance') as balance,
JSONExtractFloat(qifi, 'accounts', 'available') as available,
JSONExtractFloat(qifi, 'accounts', 'margin') as margin,
JSONExtract(qifi, 'positions', 'Map(String, Float64)') as positions
FROM qifi_accounts
WHERE account_cookie IN ({cookies_str})
ORDER BY updatetime DESC
LIMIT 1 BY account_cookie
"""
return self.query_polars(sql)📚 Phase 4: 文档优化 (1-2天)
4.1 更新CLAUDE.md
追加: QUANTAXIS/CLAUDE.md
## Rust集成 (NEW in 2.1)
QUANTAXIS 2.1版本深度整合了Rust生态系统,提供极致性能:
### QARS2 - Rust核心引擎
**安装**:
```bash
pip install qars3 # 或从源码编译使用QARS账户 (比纯Python快10-100倍):
from QUANTAXIS.QARSBridge import QARSAccount
# 创建Rust高性能账户
account = QARSAccount("test", init_cash=1000000)
# API与QIFI_Account完全兼容
order = account.send_order("000001", 100, 10.5, "BUY")
positions = account.get_positions() # Polars -> Pandas (零拷贝)性能对比:
创建账户
50ms
0.5ms
100x
发送订单
5ms
0.05ms
100x
结算
200ms
2ms
100x
回测(10年日线)
30s
3s
10x
QADataSwap - 跨语言零拷贝
场景: Python策略 → Rust回测引擎 → C++风控系统
from QUANTAXIS.QADataSwap import publish_market_data, subscribe_market_data
# Python发布行情
publisher = publish_market_data(tick_df, name="market_feed")
# Rust/C++可以零拷贝读取 (无需序列化)
# subscriber = subscribe_market_data("market_feed") # 在Rust中QAEXCHANGE-RS - 模拟交易所
启动交易所:
cd /home/quantaxis/qaexchange-rs
cargo run --release --bin qaexchange-serverPython连接:
from QUANTAXIS.QAExchange import QAExchangeClient, QAExchangeWebSocket
# HTTP客户端
client = QAExchangeClient("http://localhost:8080")
client.login("user1", "password")
order = client.send_order("000001", 100, 10.5, "BUY")
# WebSocket实时行情
ws = QAExchangeWebSocket("ws://localhost:8080/ws")
ws.on_tick = lambda data: print(f"收到tick: {data}")
ws.connect(token=client.token)
ws.subscribe_market(["000001", "000002"])用途:
策略回测 (真实撮合逻辑)
模拟交易 (学习/测试)
算法开发 (无风险环境)
Polars数据结构
为什么使用Polars:
速度比Pandas快5-10倍
内存占用少50%
原生Rust实现,与QARS2无缝集成
支持大数据集 (>100GB)
示例:
from QUANTAXIS.QAFetch import QA_fetch_stock_day_polars
# 使用Polars加载数据 (比Pandas快5倍)
df = QA_fetch_stock_day_polars(
code="000001",
start="2020-01-01",
end="2024-12-31"
)
# Polars查询 (惰性执行,并行优化)
result = df.lazy()\\
.filter(pl.col("close") > pl.col("open"))\\
.group_by("code")\\
.agg(pl.col("volume").sum())\\
.collect()
# 需要时转为Pandas (零拷贝)
pandas_df = result.to_pandas()跨语言开发指南
Python → Rust 数据传递
方法1: PyO3直接调用
import qars3
# Python调用Rust函数 (PyO3绑定)
result = qars3.calculate_sharpe_ratio(returns) # Rust实现,速度快100倍方法2: QADataSwap共享内存
from qadataswap import create_writer, create_reader
# Python写入
writer = create_writer("data_feed", size_mb=100)
writer.write_dataframe(df)
# Rust读取 (零拷贝,无序列化开销)
# 参见 /home/quantaxis/qars2/examples/dataswap_reader.rs方法3: Arrow IPC
# Python导出Arrow
import pyarrow as pa
table = pa.Table.from_pandas(df)
with pa.ipc.new_file('data.arrow', table.schema) as writer:
writer.write(table)
# Rust读取Arrow (零拷贝)
# 参见 /home/quantaxis/qars2/src/io/arrow_reader.rs性能优化建议
热点路径使用Rust: 账户计算、指标计算、回测引擎
Python做粘合剂: 策略逻辑、参数调整、结果展示
Polars替代Pandas: 大数据集、频繁计算场景
共享内存通信: 实时行情、高频策略
迁移指南: QIFI_Account → QARSAccount
完全兼容
# 旧代码 (纯Python)
from QUANTAXIS.QIFI.QifiAccount import QIFI_Account
account = QIFI_Account("test", model="BACKTEST")
# 新代码 (Rust加速,API相同)
from QUANTAXIS.QARSBridge import QARSAccount
account = QARSAccount("test", init_cash=1000000) # 参数略有不同性能对比测试
import time
# 测试: 发送10000个订单
def benchmark(AccountClass):
account = AccountClass("test")
account.initial()
start = time.time()
for i in range(10000):
account.send_order("000001", 100, 10 + i*0.01, "BUY")
return time.time() - start
python_time = benchmark(QIFI_Account) # ~5秒
rust_time = benchmark(QARSAccount) # ~0.05秒
print(f"加速: {python_time / rust_time:.1f}x") # ~100x数据互操作
# Python账户 → Rust账户
python_acc = QIFI_Account("test")
qifi_dict = python_acc.account_msg
rust_acc = QARSAccount.from_qifi(qifi_dict)
# Rust账户 → Python账户
qifi_dict = rust_acc.to_qifi()
python_acc = QIFI_Account.from_qifi(qifi_dict)
### 4.2 创建跨语言示例
**新建**: `examples/rust_integration/`
examples/rust_integration/ ├── 01_qars_account_basic.py # QARS账户基础用法 ├── 02_polars_data_processing.py # Polars数据处理 ├── 03_dataswap_pubsub.py # 跨语言通信 ├── 04_qaexchange_trading.py # 模拟交易所对接 ├── 05_performance_comparison.py # 性能对比测试 └── README.md # 示例说明
---
## 🧪 Phase 5: 测试与验证 (2-3天)
### 5.1 单元测试
**新建**: `QUANTAXIS/test_rust_integration.py`
```python
"""Rust集成测试套件"""
import unittest
from QUANTAXIS.QARSBridge import QARSAccount, has_qars_support
from QUANTAXIS.QADataSwap import has_dataswap_support
import pandas as pd
class TestQARSIntegration(unittest.TestCase):
"""测试QARS2集成"""
@unittest.skipIf(not has_qars_support(), "QARS2未安装")
def test_qars_account_creation(self):
"""测试QARS账户创建"""
account = QARSAccount("test", init_cash=1000000)
info = account.get_account_info()
self.assertEqual(info['balance'], 1000000)
@unittest.skipIf(not has_qars_support(), "QARS2未安装")
def test_qars_order_execution(self):
"""测试QARS订单执行"""
account = QARSAccount("test", init_cash=1000000)
order = account.send_order("000001", 100, 10.5, "BUY")
self.assertIn('order_id', order)
@unittest.skipIf(not has_dataswap_support(), "QADataSwap未安装")
def test_dataswap_pubsub(self):
"""测试跨语言通信"""
from QUANTAXIS.QADataSwap import create_writer, create_reader
# 创建写入器
writer = create_writer("test_channel", size_mb=10)
# 写入数据
df = pd.DataFrame({
'time': ['2024-01-01', '2024-01-02'],
'price': [10.5, 10.8]
})
writer.write_dataframe(df)
# 读取数据
reader = create_reader("test_channel")
received = reader.read_dataframe()
self.assertEqual(len(received), 2)
writer.close()
if __name__ == '__main__':
unittest.main()5.2 性能基准测试
新建: benchmarks/rust_vs_python.py
"""Rust vs Python性能对比"""
import time
import pandas as pd
import numpy as np
def benchmark_account_operations():
"""账户操作性能对比"""
from QUANTAXIS.QIFI.QifiAccount import QIFI_Account
from QUANTAXIS.QARSBridge import QARSAccount
n_orders = 10000
# Python版本
start = time.time()
py_account = QIFI_Account("test_py", model="BACKTEST")
py_account.initial()
for i in range(n_orders):
py_account.send_order("000001", 100, 10 + i*0.001, 1)
py_time = time.time() - start
# Rust版本
start = time.time()
rs_account = QARSAccount("test_rs", init_cash=1000000)
for i in range(n_orders):
rs_account.send_order("000001", 100, 10 + i*0.001, "BUY")
rs_time = time.time() - start
print(f"\\n账户操作 ({n_orders}次):")
print(f" Python: {py_time:.2f}s")
print(f" Rust: {rs_time:.2f}s")
print(f" 加速: {py_time/rs_time:.1f}x")
def benchmark_data_processing():
"""数据处理性能对比"""
import polars as pl
# 生成测试数据
n_rows = 1000000
df_pd = pd.DataFrame({
'time': pd.date_range('2020-01-01', periods=n_rows, freq='1min'),
'price': np.random.randn(n_rows) + 100,
'volume': np.random.randint(1000, 10000, n_rows),
})
df_pl = pl.from_pandas(df_pd)
# Pandas处理
start = time.time()
result_pd = df_pd.groupby(df_pd['time'].dt.date)['volume'].sum()
pd_time = time.time() - start
# Polars处理
start = time.time()
result_pl = df_pl.group_by(
pl.col('time').cast(pl.Date)
).agg(pl.col('volume').sum())
pl_time = time.time() - start
print(f"\\n数据处理 ({n_rows}行):")
print(f" Pandas: {pd_time:.2f}s")
print(f" Polars: {pl_time:.2f}s")
print(f" 加速: {pd_time/pl_time:.1f}x")
if __name__ == '__main__':
print("=" * 60)
print("QUANTAXIS Rust集成性能测试")
print("=" * 60)
benchmark_account_operations()
benchmark_data_processing()📦 Phase 6: 部署与发布 (1天)
6.1 更新setup.py
# 新增依赖
install_requires=[
# ... 现有依赖 ...
'polars>=0.20.0,<0.22.0',
'orjson>=3.10.0',
'msgpack>=1.1.0',
],
# 可选依赖
extras_require={
'rust': [
'qars3>=0.0.45',
'qadataswap>=0.1.0',
],
'full': [
'qars3>=0.0.45',
'qadataswap>=0.1.0',
'polars>=0.20.0',
'jupyter>=1.0.0',
],
},6.2 CI/CD配置
修改: .github/workflows/pythonapp.yml
name: Python CI with Rust Integration
on: [push, pull_request]
jobs:
test:
runs-on: ubuntu-latest
strategy:
matrix:
python-version: ['3.9', '3.10', '3.11', '3.12']
steps:
- uses: actions/checkout@v2
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v2
with:
python-version: ${{ matrix.python-version }}
- name: Set up Rust
uses: actions-rs/toolchain@v1
with:
toolchain: stable
override: true
- name: Install dependencies
run: |
pip install --upgrade pip
pip install -r requirements.txt
pip install pytest pytest-cov
# 可选: 安装Rust组件
pip install qars3 || echo "QARS3跳过(可选)"
- name: Run tests
run: |
pytest tests/ --cov=QUANTAXIS --cov-report=xml
- name: Upload coverage
uses: codecov/codecov-action@v26.3 发布
# 构建发布包
python setup.py sdist bdist_wheel
# 上传到PyPI
twine upload dist/quantaxis-2.1.0*
# Docker镜像
docker build -t daocloud.io/quantaxis/qa-base:2.1.0 .
docker push daocloud.io/quantaxis/qa-base:2.1.0🎯 总结
升级时间线 (总计: 9-14天)
1
基础环境升级
1-2天
开发团队
2
QARS2集成
2-3天
核心开发
3
数据层优化
2-3天
数据组
4
文档更新
1-2天
文档组
5
测试验证
2-3天
QA
6
部署发布
1天
DevOps
预期收益
性能提升:
账户操作: 100x 加速
数据处理: 5-10x 加速
回测速度: 10-20x 加速
内存占用: -40%
功能增强:
✅ 跨语言零拷贝通信
✅ 模拟交易所完整支持
✅ Polars大数据处理
✅ Rust高性能组件
兼容性:
✅ 向后兼容QIFI协议
✅ 渐进式迁移 (可选Rust组件)
✅ Python 3.9-3.12支持
📝 注意事项
破坏性变更
Python版本: 不再支持3.5-3.8
依赖最低版本: pymongo 4.x, pandas 2.x等
API变化: 部分内部API调整 (公开API兼容)
迁移建议
渐进迁移: 先升级依赖,再逐步启用Rust组件
性能测试: 关键路径先做基准测试
文档先行: 更新文档后再推广使用
风险控制
Phase 1完成后创建git tag:
v2.1.0-phase1每个Phase完成后运行完整测试套件
保留Python实现作为fallback
版本: v1.0 最后更新: 2025-10-25 作者: @yutiansut @quantaxis
Last updated
Was this helpful?