本文是《以AI量化为生》系列的第22篇,我们将深入讲解指标计算引擎的重构过程,从UI耦合到计算分离,从重复实现到代码复用,从图表依赖到策略直用,让CTA策略能够直接复用图表指标的计算逻辑,支持回测、实盘和无UI运行环境。

写在前面

上一篇讲了交易图表的AI分析功能集成,有读者私信问:图表里那些指标(MACD、RSI、SuperTrend这些)能不能直接在策略里用?现在写策略要用指标,要么自己重新算一遍,要么只能用vnpy自带的那几个简单函数。

这个问题其实挺普遍的。你看,图表系统里已经实现了20多个指标,从基础的MACD、RSI、布林带,到高级的SuperTrend、WaveTrend、斐波那契入场带,每个指标都有完整的计算逻辑、参数配置、智能解读。但这些指标都继承自pyqtgraph的图表组件,必须在图表环境里才能用。

策略要用这些指标怎么办?只能在策略里重新实现一遍计算逻辑。这就很尴尬了——同样的MACD计算,图表里写一遍,策略里又写一遍。维护起来很麻烦,改个参数要改两个地方,还容易出现计算结果不一致的问题。

更麻烦的是回测和实盘。回测的时候不需要渲染图表,但如果要用指标,就得创建图表组件,带来不必要的UI开销。实盘无头运行更是没法用图表组件。

所以这次重构的目标很明确:把指标计算逻辑从UI组件里分离出来,让策略和图表都能复用同一套计算代码

问题的本质

先说说为什么会有这个问题。

ATMQuant的指标系统是基于vnpy的图表框架设计的。所有指标都继承自ChartItemCandleItem,这些是pyqtgraph的图表组件。指标的计算逻辑和渲染逻辑混在一起,像这样:

class MacdItem(ChartItem):
    def _draw_bar_picture(self, ix: int, bar: BarData):
        # 计算MACD值
        diffs, deas, macds = talib.MACD(
            close_prices,
            fastperiod=self.short_window,
            slowperiod=self.long_window,
            signalperiod=self.M
        )

        # 绘制MACD柱状图
        picture = QtGui.QPicture()
        painter = QtGui.QPainter(picture)
        # ... 绘制代码
        return picture

计算和渲染耦合在一起,没法单独拿出来用。

策略里要用MACD,只能这样:

class MyStrategy(CtaTemplate):
    def on_bar(self, bar: BarData):
        # 手动计算MACD
        close_array = self.am.close_array
        diffs, deas, macds = talib.MACD(
            close_array,
            fastperiod=12,
            slowperiod=26,
            signalperiod=9
        )
        # ... 策略逻辑

重复实现,维护成本高。

重构方案

这次重构的核心思路是分层

图表层(UI)
    ↓ 使用
计算层(Headless Calculators)

计算层:纯Python计算,零Qt依赖,只负责指标计算。
图表层:负责可视化渲染,调用计算层获取数据。

计算层设计

创建core/indicators/calculators/目录,所有计算器都继承自HeadlessCalculator基类:

class HeadlessCalculator(ABC):
    """无头指标计算器基类(零Qt依赖)"""

    @abstractmethod
    def update(self, am: ArrayManager) -> None:
        """更新计算(基于ArrayManager)"""
        pass

    @abstractmethod
    def get_values(self) -> Dict[str, Any]:
        """获取指标值"""
        pass

    @property
    @abstractmethod
    def inited(self) -> bool:
        """是否已初始化"""
        pass

关键点:

  1. 基于ArrayManager:使用vnpy标准的ArrayManager,不是图表系统的DynaArrayManager
  2. 统一接口:所有计算器都实现update()get_values()
  3. 零依赖:不导入任何Qt或pyqtgraph模块

以MACD为例:

class MACDCalculator(HeadlessCalculator):
    def __init__(self, fast_period: int = 12, slow_period: int = 26,
                 signal_period: int = 9):
        self.fast_period = fast_period
        self.slow_period = slow_period
        self.signal_period = signal_period
        self._inited = False
        self._values = {}

    def update(self, am: ArrayManager) -> None:
        if not am.inited:
            return

        # 计算MACD
        diffs, deas, macds = talib.MACD(
            am.close_array,
            fastperiod=self.fast_period,
            slowperiod=self.slow_period,
            signalperiod=self.signal_period
        )

        # 保存结果
        self._values = {
            "diff": round(float(diffs[-1]), 4),
            "signal": round(float(deas[-1]), 4),
            "macd": round(float(macds[-1]), 4),
            "histogram": round(float(macds[-1]), 4),
            "trend": "up" if macds[-1] > 0 else "down",
            # ... 更多字段
        }
        self._inited = True

    def get_values(self) -> Dict[str, Any]:
        return self._values

    @property
    def inited(self) -> bool:
        return self._inited

简洁明了,只负责计算。

图表层重构

图表指标重构后变得很简单,只需要调用计算器:

class MacdItem(ChartItem):
    def __init__(self, manager, short_window=12, long_window=26, M=9):
        super().__init__(manager)
        # 不再需要DynaArrayManager
        self.macd_data = {}
        self._needs_recalc = True

    def _ensure_calculated(self) -> None:
        """全量计算MACD数据,委托给MACDCalculator"""
        if not self._needs_recalc and self.macd_data:
            return

        bars = self._manager.get_all_bars()
        if not bars:
            return

        # 调用计算器
        close_array = np.array([bar.close_price for bar in bars])
        diffs, deas, macds = MACDCalculator.compute_arrays(
            close_array, self.short_window, self.long_window, self.M
        )

        # 缓存结果
        for n in range(len(diffs)):
            if not np.isnan(diffs[n]):
                self.macd_data[n] = {
                    "diff": float(diffs[n]),
                    "signal": float(deas[n]),
                    "macd": float(macds[n]),
                }

        self._needs_recalc = False

代码量大幅减少,逻辑更清晰。

IndicatorManager管理器

为了方便策略使用多个指标,提供了IndicatorManager

class IndicatorManager:
    """指标管理器,统一管理多个计算器"""

    def add(self, name: str, calculator: HeadlessCalculator):
        """添加指标"""
        self.calculators[name] = calculator
        return self  # 支持链式调用

    def update(self, am: ArrayManager):
        """批量更新所有指标"""
        for calc in self.calculators.values():
            calc.update(am)

    def get_values(self, name: str) -> Dict[str, Any]:
        """获取指定指标的值"""
        if name not in self.calculators:
            return {}
        return self.calculators[name].get_values()

策略中使用

重构后,策略里用指标变得非常简单:

from vnpy.trader.utility import ArrayManager
from core.indicators.calculators import (
    IndicatorManager,
    MACDCalculator,
    RSICalculator,
    SupertrendCalculator
)

class MyStrategy(CtaTemplate):
    def on_init(self):
        # 初始化ArrayManager
        self.am = ArrayManager(size=100)

        # 初始化指标管理器
        self.indicators = IndicatorManager()
        self.indicators.add("macd", MACDCalculator(12, 26, 9))
        self.indicators.add("rsi", RSICalculator(14))
        self.indicators.add("supertrend", SupertrendCalculator(14, 3.0))

        self.load_bar(30)

    def on_bar(self, bar: BarData):
        # 更新ArrayManager
        self.am.update_bar(bar)
        if not self.am.inited:
            return

        # 批量更新所有指标
        self.indicators.update(self.am)

        # 获取指标值
        macd = self.indicators.get_values("macd")
        rsi = self.indicators.get_values("rsi")
        supertrend = self.indicators.get_values("supertrend")

        # 策略逻辑
        if macd["cross_signal"] == "golden_cross" and rsi["oversold"]:
            self.buy(bar.close_price, 1)
        elif supertrend["direction"] == "down":
            self.sell(bar.close_price, 1)

干净利落,不需要重复实现计算逻辑。

重构成果

这次重构涉及17个指标文件,代码变化:

17 files changed, 305 insertions(+), 965 deletions(-)

删除了965行代码,新增305行,净减少660行。代码量减少了68%。

实现的计算器包括:

基础指标

  • MACD、RSI、BOLL、DMI、SMA、EMA

高级指标

  • SuperTrend、WaveTrend、Squeeze Momentum
  • Fibonacci Entry Bands、Smart Money Channels
  • Adaptive MACD Deluxe、SuperTrended RSI、ZLEMA
  • Enhanced Volume

每个计算器都有完整的测试用例,确保计算结果与talib一致。

测试验证

写了一个完整的测试套件tests/test_headless_calculators.py,包含15个测试用例:

  1. 数学验证:与talib直接计算对比,确保数值一致
  2. 输出格式验证:确保计算器输出与图表指标的get_current_values()一致
  3. IndicatorManager功能验证:测试管理器的添加、删除、批量更新
  4. 逐bar喂入验证:模拟回测场景,确保逐bar更新结果正确
  5. 多周期独立性验证:确保不同周期的指标互不影响
  6. 图表指标对比(可选):在Qt环境下与原始图表指标对比

运行测试:

# 数学验证(无需Qt环境)
python tests/test_headless_calculators.py

# 含图表指标对比(需要Qt环境)
python tests/test_headless_calculators.py --with-chart

测试结果:

============================================================
  测试汇总
============================================================
  ✓ PASS  MACD
  ✓ PASS  RSI
  ✓ PASS  Boll
  ✓ PASS  DMI
  ✓ PASS  SMA
  ✓ PASS  EMA
  ✓ PASS  SuperTrend
  ✓ PASS  WaveTrend
  ✓ PASS  Squeeze
  ✓ PASS  Fibonacci
  ✓ PASS  输出格式
  ✓ PASS  IndicatorManager
  ✓ PASS  逐bar喂入
  ✓ PASS  多周期独立

  结果: 14/14 通过
  🎉 全部测试通过!

先写到这,有问题欢迎留言交流。


本文是《以AI量化为生》系列文章的第22篇,完整代码已开源至GitHub:https://github.com/seasonstar/atmquant

本文内容仅供学习交流,不构成任何投资建议。交易有风险,投资需谨慎。


加入「量策堂·AI算法指标策略」

想系统性掌握策略研发、指标可视化与回测优化?加入我的知识星球,获得持续、体系化的成长支持:


往期文章回顾

《以AI量化为生》系列

《量化指标解码》系列

rx9hQ)


Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐