性能优化
版本: 2.1.0-alpha2 作者: @yutiansut @quantaxis 更新日期: 2025-10-25
QUANTAXIS 2.1.0提供了多层次的性能优化方案,从数据层到策略层全面提升系统性能。
🎯 性能优化概览
优化层次
数据层优化: MongoDB索引、ClickHouse、数据缓存
计算层优化: Rust加速、向量化计算、并行处理
策略层优化: 算法优化、内存管理、事件驱动
系统层优化: 资源配置、进程管理、网络优化
性能目标
数据查询: < 100ms (单标的日线1年)
指标计算: < 10ms (MA/MACD等常用指标)
回测速度: > 1000 ticks/s
实盘延迟: < 50ms (Tick-to-Order)
📊 数据层优化
1. MongoDB索引优化
from pymongo import MongoClient, ASCENDING, DESCENDING
client = MongoClient('mongodb://localhost:27017/')
db = client.quantaxis
# 股票日线索引
db.stock_day.create_index([
('code', ASCENDING),
('date_stamp', ASCENDING)
])
# 复合索引(常用查询)
db.stock_day.create_index([
('code', ASCENDING),
('date', ASCENDING)
], background=True)
# 期货分钟线索引
db.future_min.create_index([
('code', ASCENDING),
('datetime', ASCENDING)
])
# 查看索引使用情况
explain = db.stock_day.find({
'code': '000001',
'date': {'$gte': '2024-01-01', '$lte': '2024-12-31'}
}).explain()
print(f"查询耗时: {explain['executionStats']['executionTimeMillis']}ms")
print(f"扫描文档数: {explain['executionStats']['totalDocsExamined']}")2. 数据缓存策略
import QUANTAXIS as QA
from functools import lru_cache
import hashlib
import pickle
class DataCache:
"""数据缓存管理"""
def __init__(self, max_size=1000):
self.cache = {}
self.max_size = max_size
def get_key(self, code, start, end, freq):
"""生成缓存键"""
key_str = f"{code}_{start}_{end}_{freq}"
return hashlib.md5(key_str.encode()).hexdigest()
def get(self, code, start, end, freq):
"""获取缓存数据"""
key = self.get_key(code, start, end, freq)
return self.cache.get(key)
def set(self, code, start, end, freq, data):
"""设置缓存数据"""
if len(self.cache) >= self.max_size:
# LRU淘汰
oldest_key = next(iter(self.cache))
del self.cache[oldest_key]
key = self.get_key(code, start, end, freq)
self.cache[key] = data
def clear(self):
"""清空缓存"""
self.cache.clear()
# 使用示例
cache = DataCache(max_size=500)
def fetch_stock_data_cached(code, start, end):
"""带缓存的数据获取"""
# 检查缓存
data = cache.get(code, start, end, 'day')
if data is not None:
return data
# 从数据库获取
data = QA.QA_fetch_stock_day(code, start, end)
# 写入缓存
cache.set(code, start, end, 'day', data)
return data
# 使用LRU缓存装饰器
@lru_cache(maxsize=100)
def get_stock_list():
"""获取股票列表(缓存)"""
return QA.QA_fetch_stock_list()3. ClickHouse集成
from clickhouse_driver import Client
import pandas as pd
class ClickHouseData:
"""ClickHouse数据访问"""
def __init__(self, host='localhost', port=9000):
self.client = Client(host=host, port=port)
def create_stock_table(self):
"""创建股票表"""
self.client.execute('''
CREATE TABLE IF NOT EXISTS stock_day (
code String,
date Date,
open Float64,
high Float64,
low Float64,
close Float64,
volume UInt64,
date_stamp UInt32
) ENGINE = MergeTree()
PARTITION BY toYYYYMM(date)
ORDER BY (code, date)
''')
def insert_data(self, df):
"""批量插入数据"""
data = df.to_dict('records')
self.client.execute(
'INSERT INTO stock_day VALUES',
data
)
def query_stock(self, code, start, end):
"""高性能查询"""
query = f'''
SELECT *
FROM stock_day
WHERE code = '{code}'
AND date >= '{start}'
AND date <= '{end}'
ORDER BY date
'''
result = self.client.execute(query)
columns = ['code', 'date', 'open', 'high', 'low', 'close', 'volume', 'date_stamp']
return pd.DataFrame(result, columns=columns)
# 使用示例
ch = ClickHouseData()
data = ch.query_stock('000001', '2024-01-01', '2024-12-31')
print(f"查询耗时: < 50ms (vs MongoDB 200ms+)")⚡ 计算层优化
1. Rust加速
import qars2
import numpy as np
import time
# 性能对比
data = np.random.rand(100000)
# Python实现
start = time.time()
result_py = []
for i in range(20, len(data)):
result_py.append(np.mean(data[i-20:i]))
python_time = time.time() - start
# Rust实现
start = time.time()
result_rust = qars2.ma(data, 20)
rust_time = time.time() - start
print(f"Python耗时: {python_time*1000:.2f}ms")
print(f"Rust耗时: {rust_time*1000:.2f}ms")
print(f"加速比: {python_time/rust_time:.0f}x")2. 向量化计算
import numpy as np
import pandas as pd
# ❌ 低效:循环计算
def calculate_returns_slow(prices):
returns = []
for i in range(1, len(prices)):
returns.append((prices[i] - prices[i-1]) / prices[i-1])
return returns
# ✅ 高效:向量化
def calculate_returns_fast(prices):
return prices.pct_change().fillna(0)
# 性能对比
prices = pd.Series(np.random.rand(100000))
%timeit calculate_returns_slow(prices) # 约 50ms
%timeit calculate_returns_fast(prices) # 约 1ms
# ❌ 低效:逐行DataFrame操作
def process_dataframe_slow(df):
results = []
for idx, row in df.iterrows():
results.append(row['close'] * row['volume'])
return results
# ✅ 高效:向量化操作
def process_dataframe_fast(df):
return df['close'] * df['volume']3. 并行计算
from multiprocessing import Pool, cpu_count
import QUANTAXIS as QA
def calculate_indicators(code):
"""计算单个标的指标"""
data = QA.QA_fetch_stock_day(code, '2024-01-01', '2024-12-31')
# 计算指标
ma5 = QA.MA(data['close'], 5)
ma20 = QA.MA(data['close'], 20)
return {
'code': code,
'ma5': ma5.iloc[-1],
'ma20': ma20.iloc[-1]
}
# 串行处理
codes = QA.QA_fetch_stock_list()['code'].tolist()[:100]
start = time.time()
results_serial = [calculate_indicators(code) for code in codes]
serial_time = time.time() - start
# 并行处理
start = time.time()
with Pool(processes=cpu_count()) as pool:
results_parallel = pool.map(calculate_indicators, codes)
parallel_time = time.time() - start
print(f"串行耗时: {serial_time:.2f}s")
print(f"并行耗时: {parallel_time:.2f}s")
print(f"加速比: {serial_time/parallel_time:.2f}x")4. NumPy优化技巧
import numpy as np
# ✅ 使用NumPy内置函数
data = np.array([1, 2, 3, 4, 5])
result = np.sum(data) # 快
# 而不是 sum(data) # 慢
# ✅ 预分配数组
n = 100000
result = np.zeros(n) # 预分配
for i in range(n):
result[i] = i * 2
# ❌ 避免动态扩展
# result = []
# for i in range(n):
# result.append(i * 2)
# ✅ 使用视图而非复制
arr = np.random.rand(10000)
view = arr[100:200] # 视图,不复制数据
# copy = arr[100:200].copy() # 复制,耗费内存
# ✅ 使用广播
a = np.array([[1, 2, 3]])
b = np.array([[1], [2], [3]])
result = a + b # 广播,高效🔧 策略层优化
1. 算法优化
from QUANTAXIS.QAStrategy import QAStrategyCtaBase
import numpy as np
class OptimizedStrategy(QAStrategyCtaBase):
"""优化的策略"""
def user_init(self):
self.ma_period = 20
# ✅ 预计算固定值
self.position_size = self.init_cash * 0.2
# ✅ 使用deque存储历史数据
from collections import deque
self.price_buffer = deque(maxlen=self.ma_period)
def on_bar(self, bar):
# ✅ 避免重复获取数据
self.price_buffer.append(bar.close)
if len(self.price_buffer) < self.ma_period:
return
# ✅ 使用NumPy计算(快)
ma = np.mean(self.price_buffer)
# ❌ 而非每次重新获取和计算(慢)
# market_data = self.get_code_marketdata(bar.code)
# ma = sum([x['close'] for x in market_data[-20:]]) / 20
# 交易逻辑
positions = self.acc.positions
if bar.close > ma and bar.code not in positions:
self.BuyOpen(bar.code, 1)
elif bar.close < ma and bar.code in positions:
self.SellClose(bar.code, 1)2. 内存优化
import sys
import gc
class MemoryOptimizedStrategy(QAStrategyCtaBase):
"""内存优化策略"""
def user_init(self):
# ✅ 使用生成器而非列表
self.data_generator = self.get_data_generator()
# ✅ 只保留必要的历史数据
self.max_history = 100
self.price_history = []
def get_data_generator(self):
"""生成器模式"""
for bar in self.market_data:
yield bar
def on_bar(self, bar):
# ✅ 限制历史数据大小
self.price_history.append(bar.close)
if len(self.price_history) > self.max_history:
self.price_history.pop(0)
# ✅ 定期回收垃圾
if bar.datetime.minute == 0:
gc.collect()
def get_memory_usage(self):
"""获取内存使用"""
import psutil
process = psutil.Process()
return process.memory_info().rss / 1024 / 1024 # MB3. 减少I/O操作
class IOOptimizedStrategy(QAStrategyCtaBase):
"""I/O优化策略"""
def user_init(self):
# ✅ 预加载数据
self.preload_data()
# ✅ 批量写入日志
self.log_buffer = []
self.log_batch_size = 100
def preload_data(self):
"""预加载所有需要的数据"""
self.stock_list = QA.QA_fetch_stock_list()
self.index_data = QA.QA_fetch_index_day('000001', self.start, self.end)
def on_bar(self, bar):
# 策略逻辑
pass
def log_trade(self, trade_info):
"""批量日志"""
self.log_buffer.append(trade_info)
if len(self.log_buffer) >= self.log_batch_size:
self.flush_logs()
def flush_logs(self):
"""批量写入"""
with open('trades.log', 'a') as f:
for log in self.log_buffer:
f.write(log + '\n')
self.log_buffer.clear()🚀 回测优化
1. 并行回测
from multiprocessing import Pool
import QUANTAXIS as QA
def run_single_backtest(params):
"""单次回测"""
fast_period, slow_period = params
strategy = DualMAStrategy(
code='rb2501',
frequence='5min',
start='2024-01-01',
end='2024-12-31',
fast_period=fast_period,
slow_period=slow_period
)
strategy.run_backtest()
return {
'params': params,
'return': strategy.acc.total_return,
'sharpe': strategy.acc.sharpe
}
# 参数组合
param_grid = [
(5, 20), (5, 30), (5, 40),
(10, 20), (10, 30), (10, 40),
(15, 20), (15, 30), (15, 40)
]
# 并行回测
with Pool(processes=4) as pool:
results = pool.map(run_single_backtest, param_grid)
# 找出最优参数
best_result = max(results, key=lambda x: x['sharpe'])
print(f"最优参数: {best_result['params']}")
print(f"夏普比率: {best_result['sharpe']:.2f}")2. 增量回测
class IncrementalBacktest:
"""增量回测"""
def __init__(self):
self.last_end_date = None
self.acc_state = None
def run_backtest(self, start, end, incremental=True):
"""增量运行回测"""
if incremental and self.last_end_date:
# 只回测新数据
start = self.last_end_date
# 恢复账户状态
strategy.acc = self.acc_state
strategy = MyStrategy(
code='rb2501',
start=start,
end=end
)
strategy.run_backtest()
# 保存状态
self.last_end_date = end
self.acc_state = strategy.acc
return strategy.acc
# 使用示例
backtester = IncrementalBacktest()
# 初次回测
acc1 = backtester.run_backtest('2024-01-01', '2024-06-30')
# 增量回测(只计算新数据)
acc2 = backtester.run_backtest('2024-01-01', '2024-12-31', incremental=True)📈 性能监控
1. 性能分析
import cProfile
import pstats
from io import StringIO
def profile_strategy():
"""性能分析"""
profiler = cProfile.Profile()
profiler.enable()
# 运行策略
strategy = MyStrategy(
code='rb2501',
start='2024-01-01',
end='2024-12-31'
)
strategy.run_backtest()
profiler.disable()
# 输出结果
s = StringIO()
ps = pstats.Stats(profiler, stream=s).sort_stats('cumulative')
ps.print_stats(20)
print(s.getvalue())
profile_strategy()2. 内存分析
from memory_profiler import profile
@profile
def memory_intensive_function():
"""内存密集型函数"""
data = QA.QA_fetch_stock_day('000001', '2020-01-01', '2024-12-31')
# 计算指标
ma5 = QA.MA(data['close'], 5)
ma20 = QA.MA(data['close'], 20)
return ma5, ma20
# 运行分析
memory_intensive_function()3. 实时监控
import time
import psutil
class PerformanceMonitor:
"""性能监控"""
def __init__(self):
self.start_time = None
self.bar_count = 0
def start(self):
"""开始监控"""
self.start_time = time.time()
self.bar_count = 0
def on_bar(self):
"""每个bar调用"""
self.bar_count += 1
# 每1000个bar输出一次
if self.bar_count % 1000 == 0:
elapsed = time.time() - self.start_time
tps = self.bar_count / elapsed
# CPU和内存
cpu = psutil.cpu_percent()
memory = psutil.Process().memory_info().rss / 1024 / 1024
print(f"性能: {tps:.0f} ticks/s, CPU: {cpu}%, 内存: {memory:.0f}MB")
# 使用示例
monitor = PerformanceMonitor()
monitor.start()
for bar in bars:
# 策略逻辑
monitor.on_bar()💡 最佳实践
1. 数据层
✅ 为常用查询字段建立索引
✅ 使用ClickHouse处理大规模数据分析
✅ 实现多级缓存策略(内存→Redis→MongoDB)
✅ 批量读取数据,减少数据库查询次数
❌ 避免在循环中查询数据库
2. 计算层
✅ 优先使用Rust加速关键计算
✅ 使用NumPy向量化操作
✅ 并行计算多标的数据
✅ 预计算固定值,避免重复计算
❌ 避免Python循环,使用向量化
3. 策略层
✅ 使用deque存储有限历史数据
✅ 减少不必要的I/O操作
✅ 定期回收垃圾
✅ 使用生成器处理大数据集
❌ 避免在on_bar中进行复杂计算
4. 系统层
✅ 使用SSD存储数据库
✅ 配置足够的内存(推荐32GB+)
✅ 使用多核CPU并行处理
✅ 优化网络配置(实盘)
❌ 避免在虚拟机中运行高频策略
📊 性能基准
典型操作性能
股票日线查询(1年)
500ms
50ms
10x
MA计算(10万点)
100ms
1ms
100x
单标的回测(1年分钟)
30s
3s
10x
100标的并行因子计算
120s
15s
8x
实盘Tick延迟
200ms
30ms
6.7x
🔗 相关资源
📝 总结
QUANTAXIS性能优化要点:
✅ 数据层: MongoDB索引 + ClickHouse + 多级缓存 ✅ 计算层: Rust加速 + 向量化 + 并行计算 ✅ 策略层: 算法优化 + 内存管理 + I/O减少 ✅ 监控: 性能分析 + 实时监控 + 持续优化
作者: @yutiansut @quantaxis 最后更新: 2025-10-25
Last updated
Was this helpful?