性能优化

版本: 2.1.0-alpha2 作者: @yutiansut @quantaxis 更新日期: 2025-10-25

QUANTAXIS 2.1.0提供了多层次的性能优化方案,从数据层到策略层全面提升系统性能。


🎯 性能优化概览

优化层次

  1. 数据层优化: MongoDB索引、ClickHouse、数据缓存

  2. 计算层优化: Rust加速、向量化计算、并行处理

  3. 策略层优化: 算法优化、内存管理、事件驱动

  4. 系统层优化: 资源配置、进程管理、网络优化

性能目标

  • 数据查询: < 100ms (单标的日线1年)

  • 指标计算: < 10ms (MA/MACD等常用指标)

  • 回测速度: > 1000 ticks/s

  • 实盘延迟: < 50ms (Tick-to-Order)


📊 数据层优化

1. MongoDB索引优化

from pymongo import MongoClient, ASCENDING, DESCENDING

client = MongoClient('mongodb://localhost:27017/')
db = client.quantaxis

# 股票日线索引
db.stock_day.create_index([
    ('code', ASCENDING),
    ('date_stamp', ASCENDING)
])

# 复合索引(常用查询)
db.stock_day.create_index([
    ('code', ASCENDING),
    ('date', ASCENDING)
], background=True)

# 期货分钟线索引
db.future_min.create_index([
    ('code', ASCENDING),
    ('datetime', ASCENDING)
])

# 查看索引使用情况
explain = db.stock_day.find({
    'code': '000001',
    'date': {'$gte': '2024-01-01', '$lte': '2024-12-31'}
}).explain()

print(f"查询耗时: {explain['executionStats']['executionTimeMillis']}ms")
print(f"扫描文档数: {explain['executionStats']['totalDocsExamined']}")

2. 数据缓存策略

import QUANTAXIS as QA
from functools import lru_cache
import hashlib
import pickle

class DataCache:
    """数据缓存管理"""
    
    def __init__(self, max_size=1000):
        self.cache = {}
        self.max_size = max_size
    
    def get_key(self, code, start, end, freq):
        """生成缓存键"""
        key_str = f"{code}_{start}_{end}_{freq}"
        return hashlib.md5(key_str.encode()).hexdigest()
    
    def get(self, code, start, end, freq):
        """获取缓存数据"""
        key = self.get_key(code, start, end, freq)
        return self.cache.get(key)
    
    def set(self, code, start, end, freq, data):
        """设置缓存数据"""
        if len(self.cache) >= self.max_size:
            # LRU淘汰
            oldest_key = next(iter(self.cache))
            del self.cache[oldest_key]
        
        key = self.get_key(code, start, end, freq)
        self.cache[key] = data
    
    def clear(self):
        """清空缓存"""
        self.cache.clear()

# 使用示例
cache = DataCache(max_size=500)

def fetch_stock_data_cached(code, start, end):
    """带缓存的数据获取"""
    # 检查缓存
    data = cache.get(code, start, end, 'day')
    if data is not None:
        return data
    
    # 从数据库获取
    data = QA.QA_fetch_stock_day(code, start, end)
    
    # 写入缓存
    cache.set(code, start, end, 'day', data)
    return data

# 使用LRU缓存装饰器
@lru_cache(maxsize=100)
def get_stock_list():
    """获取股票列表(缓存)"""
    return QA.QA_fetch_stock_list()

3. ClickHouse集成

from clickhouse_driver import Client
import pandas as pd

class ClickHouseData:
    """ClickHouse数据访问"""
    
    def __init__(self, host='localhost', port=9000):
        self.client = Client(host=host, port=port)
    
    def create_stock_table(self):
        """创建股票表"""
        self.client.execute('''
            CREATE TABLE IF NOT EXISTS stock_day (
                code String,
                date Date,
                open Float64,
                high Float64,
                low Float64,
                close Float64,
                volume UInt64,
                date_stamp UInt32
            ) ENGINE = MergeTree()
            PARTITION BY toYYYYMM(date)
            ORDER BY (code, date)
        ''')
    
    def insert_data(self, df):
        """批量插入数据"""
        data = df.to_dict('records')
        self.client.execute(
            'INSERT INTO stock_day VALUES',
            data
        )
    
    def query_stock(self, code, start, end):
        """高性能查询"""
        query = f'''
            SELECT *
            FROM stock_day
            WHERE code = '{code}'
            AND date >= '{start}'
            AND date <= '{end}'
            ORDER BY date
        '''
        
        result = self.client.execute(query)
        columns = ['code', 'date', 'open', 'high', 'low', 'close', 'volume', 'date_stamp']
        return pd.DataFrame(result, columns=columns)

# 使用示例
ch = ClickHouseData()
data = ch.query_stock('000001', '2024-01-01', '2024-12-31')
print(f"查询耗时: < 50ms (vs MongoDB 200ms+)")

⚡ 计算层优化

1. Rust加速

import qars2
import numpy as np
import time

# 性能对比
data = np.random.rand(100000)

# Python实现
start = time.time()
result_py = []
for i in range(20, len(data)):
    result_py.append(np.mean(data[i-20:i]))
python_time = time.time() - start

# Rust实现
start = time.time()
result_rust = qars2.ma(data, 20)
rust_time = time.time() - start

print(f"Python耗时: {python_time*1000:.2f}ms")
print(f"Rust耗时: {rust_time*1000:.2f}ms")
print(f"加速比: {python_time/rust_time:.0f}x")

2. 向量化计算

import numpy as np
import pandas as pd

# ❌ 低效:循环计算
def calculate_returns_slow(prices):
    returns = []
    for i in range(1, len(prices)):
        returns.append((prices[i] - prices[i-1]) / prices[i-1])
    return returns

# ✅ 高效:向量化
def calculate_returns_fast(prices):
    return prices.pct_change().fillna(0)

# 性能对比
prices = pd.Series(np.random.rand(100000))

%timeit calculate_returns_slow(prices)  # 约 50ms
%timeit calculate_returns_fast(prices)  # 约 1ms

# ❌ 低效:逐行DataFrame操作
def process_dataframe_slow(df):
    results = []
    for idx, row in df.iterrows():
        results.append(row['close'] * row['volume'])
    return results

# ✅ 高效:向量化操作
def process_dataframe_fast(df):
    return df['close'] * df['volume']

3. 并行计算

from multiprocessing import Pool, cpu_count
import QUANTAXIS as QA

def calculate_indicators(code):
    """计算单个标的指标"""
    data = QA.QA_fetch_stock_day(code, '2024-01-01', '2024-12-31')
    
    # 计算指标
    ma5 = QA.MA(data['close'], 5)
    ma20 = QA.MA(data['close'], 20)
    
    return {
        'code': code,
        'ma5': ma5.iloc[-1],
        'ma20': ma20.iloc[-1]
    }

# 串行处理
codes = QA.QA_fetch_stock_list()['code'].tolist()[:100]

start = time.time()
results_serial = [calculate_indicators(code) for code in codes]
serial_time = time.time() - start

# 并行处理
start = time.time()
with Pool(processes=cpu_count()) as pool:
    results_parallel = pool.map(calculate_indicators, codes)
parallel_time = time.time() - start

print(f"串行耗时: {serial_time:.2f}s")
print(f"并行耗时: {parallel_time:.2f}s")
print(f"加速比: {serial_time/parallel_time:.2f}x")

4. NumPy优化技巧

import numpy as np

# ✅ 使用NumPy内置函数
data = np.array([1, 2, 3, 4, 5])
result = np.sum(data)  # 快
# 而不是 sum(data)    # 慢

# ✅ 预分配数组
n = 100000
result = np.zeros(n)  # 预分配
for i in range(n):
    result[i] = i * 2

# ❌ 避免动态扩展
# result = []
# for i in range(n):
#     result.append(i * 2)

# ✅ 使用视图而非复制
arr = np.random.rand(10000)
view = arr[100:200]  # 视图,不复制数据
# copy = arr[100:200].copy()  # 复制,耗费内存

# ✅ 使用广播
a = np.array([[1, 2, 3]])
b = np.array([[1], [2], [3]])
result = a + b  # 广播,高效

🔧 策略层优化

1. 算法优化

from QUANTAXIS.QAStrategy import QAStrategyCtaBase
import numpy as np

class OptimizedStrategy(QAStrategyCtaBase):
    """优化的策略"""
    
    def user_init(self):
        self.ma_period = 20
        
        # ✅ 预计算固定值
        self.position_size = self.init_cash * 0.2
        
        # ✅ 使用deque存储历史数据
        from collections import deque
        self.price_buffer = deque(maxlen=self.ma_period)
    
    def on_bar(self, bar):
        # ✅ 避免重复获取数据
        self.price_buffer.append(bar.close)
        
        if len(self.price_buffer) < self.ma_period:
            return
        
        # ✅ 使用NumPy计算(快)
        ma = np.mean(self.price_buffer)
        
        # ❌ 而非每次重新获取和计算(慢)
        # market_data = self.get_code_marketdata(bar.code)
        # ma = sum([x['close'] for x in market_data[-20:]]) / 20
        
        # 交易逻辑
        positions = self.acc.positions
        if bar.close > ma and bar.code not in positions:
            self.BuyOpen(bar.code, 1)
        elif bar.close < ma and bar.code in positions:
            self.SellClose(bar.code, 1)

2. 内存优化

import sys
import gc

class MemoryOptimizedStrategy(QAStrategyCtaBase):
    """内存优化策略"""
    
    def user_init(self):
        # ✅ 使用生成器而非列表
        self.data_generator = self.get_data_generator()
        
        # ✅ 只保留必要的历史数据
        self.max_history = 100
        self.price_history = []
    
    def get_data_generator(self):
        """生成器模式"""
        for bar in self.market_data:
            yield bar
    
    def on_bar(self, bar):
        # ✅ 限制历史数据大小
        self.price_history.append(bar.close)
        if len(self.price_history) > self.max_history:
            self.price_history.pop(0)
        
        # ✅ 定期回收垃圾
        if bar.datetime.minute == 0:
            gc.collect()
    
    def get_memory_usage(self):
        """获取内存使用"""
        import psutil
        process = psutil.Process()
        return process.memory_info().rss / 1024 / 1024  # MB

3. 减少I/O操作

class IOOptimizedStrategy(QAStrategyCtaBase):
    """I/O优化策略"""
    
    def user_init(self):
        # ✅ 预加载数据
        self.preload_data()
        
        # ✅ 批量写入日志
        self.log_buffer = []
        self.log_batch_size = 100
    
    def preload_data(self):
        """预加载所有需要的数据"""
        self.stock_list = QA.QA_fetch_stock_list()
        self.index_data = QA.QA_fetch_index_day('000001', self.start, self.end)
    
    def on_bar(self, bar):
        # 策略逻辑
        pass
    
    def log_trade(self, trade_info):
        """批量日志"""
        self.log_buffer.append(trade_info)
        
        if len(self.log_buffer) >= self.log_batch_size:
            self.flush_logs()
    
    def flush_logs(self):
        """批量写入"""
        with open('trades.log', 'a') as f:
            for log in self.log_buffer:
                f.write(log + '\n')
        self.log_buffer.clear()

🚀 回测优化

1. 并行回测

from multiprocessing import Pool
import QUANTAXIS as QA

def run_single_backtest(params):
    """单次回测"""
    fast_period, slow_period = params
    
    strategy = DualMAStrategy(
        code='rb2501',
        frequence='5min',
        start='2024-01-01',
        end='2024-12-31',
        fast_period=fast_period,
        slow_period=slow_period
    )
    strategy.run_backtest()
    
    return {
        'params': params,
        'return': strategy.acc.total_return,
        'sharpe': strategy.acc.sharpe
    }

# 参数组合
param_grid = [
    (5, 20), (5, 30), (5, 40),
    (10, 20), (10, 30), (10, 40),
    (15, 20), (15, 30), (15, 40)
]

# 并行回测
with Pool(processes=4) as pool:
    results = pool.map(run_single_backtest, param_grid)

# 找出最优参数
best_result = max(results, key=lambda x: x['sharpe'])
print(f"最优参数: {best_result['params']}")
print(f"夏普比率: {best_result['sharpe']:.2f}")

2. 增量回测

class IncrementalBacktest:
    """增量回测"""
    
    def __init__(self):
        self.last_end_date = None
        self.acc_state = None
    
    def run_backtest(self, start, end, incremental=True):
        """增量运行回测"""
        if incremental and self.last_end_date:
            # 只回测新数据
            start = self.last_end_date
            # 恢复账户状态
            strategy.acc = self.acc_state
        
        strategy = MyStrategy(
            code='rb2501',
            start=start,
            end=end
        )
        strategy.run_backtest()
        
        # 保存状态
        self.last_end_date = end
        self.acc_state = strategy.acc
        
        return strategy.acc

# 使用示例
backtester = IncrementalBacktest()

# 初次回测
acc1 = backtester.run_backtest('2024-01-01', '2024-06-30')

# 增量回测(只计算新数据)
acc2 = backtester.run_backtest('2024-01-01', '2024-12-31', incremental=True)

📈 性能监控

1. 性能分析

import cProfile
import pstats
from io import StringIO

def profile_strategy():
    """性能分析"""
    profiler = cProfile.Profile()
    profiler.enable()
    
    # 运行策略
    strategy = MyStrategy(
        code='rb2501',
        start='2024-01-01',
        end='2024-12-31'
    )
    strategy.run_backtest()
    
    profiler.disable()
    
    # 输出结果
    s = StringIO()
    ps = pstats.Stats(profiler, stream=s).sort_stats('cumulative')
    ps.print_stats(20)
    print(s.getvalue())

profile_strategy()

2. 内存分析

from memory_profiler import profile

@profile
def memory_intensive_function():
    """内存密集型函数"""
    data = QA.QA_fetch_stock_day('000001', '2020-01-01', '2024-12-31')
    
    # 计算指标
    ma5 = QA.MA(data['close'], 5)
    ma20 = QA.MA(data['close'], 20)
    
    return ma5, ma20

# 运行分析
memory_intensive_function()

3. 实时监控

import time
import psutil

class PerformanceMonitor:
    """性能监控"""
    
    def __init__(self):
        self.start_time = None
        self.bar_count = 0
    
    def start(self):
        """开始监控"""
        self.start_time = time.time()
        self.bar_count = 0
    
    def on_bar(self):
        """每个bar调用"""
        self.bar_count += 1
        
        # 每1000个bar输出一次
        if self.bar_count % 1000 == 0:
            elapsed = time.time() - self.start_time
            tps = self.bar_count / elapsed
            
            # CPU和内存
            cpu = psutil.cpu_percent()
            memory = psutil.Process().memory_info().rss / 1024 / 1024
            
            print(f"性能: {tps:.0f} ticks/s, CPU: {cpu}%, 内存: {memory:.0f}MB")

# 使用示例
monitor = PerformanceMonitor()
monitor.start()

for bar in bars:
    # 策略逻辑
    monitor.on_bar()

💡 最佳实践

1. 数据层

  • ✅ 为常用查询字段建立索引

  • ✅ 使用ClickHouse处理大规模数据分析

  • ✅ 实现多级缓存策略(内存→Redis→MongoDB)

  • ✅ 批量读取数据,减少数据库查询次数

  • ❌ 避免在循环中查询数据库

2. 计算层

  • ✅ 优先使用Rust加速关键计算

  • ✅ 使用NumPy向量化操作

  • ✅ 并行计算多标的数据

  • ✅ 预计算固定值,避免重复计算

  • ❌ 避免Python循环,使用向量化

3. 策略层

  • ✅ 使用deque存储有限历史数据

  • ✅ 减少不必要的I/O操作

  • ✅ 定期回收垃圾

  • ✅ 使用生成器处理大数据集

  • ❌ 避免在on_bar中进行复杂计算

4. 系统层

  • ✅ 使用SSD存储数据库

  • ✅ 配置足够的内存(推荐32GB+)

  • ✅ 使用多核CPU并行处理

  • ✅ 优化网络配置(实盘)

  • ❌ 避免在虚拟机中运行高频策略


📊 性能基准

典型操作性能

操作
未优化
优化后
提升

股票日线查询(1年)

500ms

50ms

10x

MA计算(10万点)

100ms

1ms

100x

单标的回测(1年分钟)

30s

3s

10x

100标的并行因子计算

120s

15s

8x

实盘Tick延迟

200ms

30ms

6.7x


🔗 相关资源


📝 总结

QUANTAXIS性能优化要点:

数据层: MongoDB索引 + ClickHouse + 多级缓存 ✅ 计算层: Rust加速 + 向量化 + 并行计算 ✅ 策略层: 算法优化 + 内存管理 + I/O减少 ✅ 监控: 性能分析 + 实时监控 + 持续优化


作者: @yutiansut @quantaxis 最后更新: 2025-10-25

返回高级功能

Last updated

Was this helpful?