笔者之前写过一篇关于vnpy的简单介绍和安装方法,本篇文章的目的是简单介绍vnpy的框架,然后详细介绍一下如何用vnpy开发自己的量化策略以及整个的回测逻辑是怎么样的。只有我们真的搞清楚了框架结构和相关的逻辑,我们才可以比较灵活高效的使用这种开源框架。

       vnpy是开源的,好处是显而易见的,我们可以自己修改和增加相关的功能,个性化定制;但是弊端就是,对于我这样的一个初级者来说,一堆代码刚开始会让你没有头绪,不知从何入手,并且一堆的函数方法各种跨模块调用,你还得换着文件去看源码,刚开始把自己给搞晕是很正常的。最要命的是,vnpy还没有比较友好的用户文档,虽然社区已经开始着手这件事了,但是目前用户还是主要靠自己去摸索,这个令很多新老用户吐槽不断,望而却步。笔者花了好几天时间研究这个框架,并实现了R-breaker策略,在此将一些小小的思考和收获分享给有需要的朋友。也欢迎指正、交流。

       笔者把vnpy分为五个主要的部分,从策略开发和回测的角度看,主要有策略开发模板和回测引擎,这是两个主要的python模块,当然还有很多其他的一些模块对象,但是策略开发和回测的工作主线就在这两个模块中;从实盘角度看,可以分为策略模板和cta实盘引擎,还有一个交易的图形界面和事件驱动引擎,事件驱动引擎主要就是根据市场实时行情以及用户的指令进行事件触发并响应。本篇文章是关于策略开发和回测的,因此我们下面详细介绍策略的开发和整个回测的逻辑。

       首先,关于策略开发。

       我们知道,要开发一个策略,主要的有三部分:数据输入、策略实现,指令输出。

       我们如果把数据可以分为两部分:回测需要用到的历史数据和策略中需要用到的数据,那么回测需要用到的历史数据是在回测引擎中直接从数据库中导入的,经过初步的处理,会转化成相应的数据类型并以参数的形式传递给策略模块中对应的方法中。因此,我们在策略文件中写策略时用到的数据不需要在策略模块中导入,直接使用相应方法中的参数即可;当然,如果策略还需要用到额外的历史数据,也可以在策略模块中直接导入相应的数据。

       在vnpy中,关于策略的代码,是写在回调函数中的,即策略模块中的onBar(self,bar)或onXminBar(self,bar)方法里面。其中参数bar就是前面说到的回测引擎中传递过来的bar数据,策略代码里面可以直接使用这个bar数据。一般在实现策略的时候,我们会用到K线数据,因此在回调函数里面可以先调用self.am.updateBar(bar)方法,这个方法会在am对象中生成一个size为100的bar列表,并会及时更新,相当于一个size为100的移动K线;其中am为预定义类ArrayManager的对象。在实现策略时,还会用到交易指令,比如做多做空、买入平仓等,这些指令在策略模板里面都是有实现的,用户自己去模板里面看下源码就知道怎么使用了。相应的交易指令方法会将指令再传到回测引擎中,等待被撮合。

       因此,关于策略开发,整个逻辑主线是简单的,即通过使用回测引擎中传入的数据和自己导入的数据,在回调函数中实现策略思想,然后生成相应的交易指令,并传回到回测引擎中等待被撮合。当然,其中有很多的细节和需要注意的东西,这里不可能一一陈述,自己去动手实现一下,经历过各种debug才能真正的理解。

       然后,关于回测。

       回测是在回测引擎中进行的,即一个名为ctaBacktesting.py的文件。回测引擎做的事情主要为:从数据库中导入数据、策略初始化、数据回放(回测),其中在数据回放的过程中,同时会处理策略文件中传回的交易指令,并保存相应的记录。

       当运行回测引擎的时候,即调用回测引擎中的runBacktesting()方法,该方法中,首先会载入历史数据,即调用回测引擎中的loadHistoryData()方法,该方法是实现将数据中数据库中导入并传递给一个预定义的保存数据的initData列表中,在保存之前,要把数据转化成vnpy框架自定义的数据对象,然后再把这些对象保存到initData列表中,以供使用。当我们不是从vnpy自带的本地数据库中导入数据时,我们需要注意导入的数据是否符合数据转化时的一些必要条件,比如变量名是否匹配或者不足等。

       数据导入并按指定的预定义的类型保存到initData列表中后,回测引擎会进行策略初始化,这个过程会调用策略模块中的回调函数,即实现策略的函数,会把所有的历史数据在策略中运行一遍,初始化一下参数值,但是要注意这个过程并不会生成有效的交易指令,是否生成有效的交易指令是由策略模块中的类变量self.trading的布尔值决定的,默认为False,只有当其为True时,交易指令才会被传回回测引擎中被撮合。

       初始化之后,就是进行真正的回测,在此之前,会先令策略模块中的类变量trading为True,然后再进行数据回放,以生成有效的交易指令。数据回放时,回测引擎会调用引擎中的newBar()方法,这个方法中,会先撮合上一次循环中生成的指令,并保存相应的结果,以供相关的方法计算回测结果,然后再进行下一次循环,直至结束。

      这里,回测引擎的主要工作已经完成了,剩下的就是一些结果展示。具体怎么回测,可以参考vnpy例子中的backtesting_IF.py文件。

       在看具体的例子之前,我们再看下一个交易指令的生命历程是怎么样的,这个其实挺重要的。

       一个指令生命历程:回测时,策略模块发出交易指令(对应指令的方法,如buy()、short()等),对应的指令方法将指令传给模板的senOrder()方法,该方法中先判断trading参数是否为True,若为True,则把指令传给回测引擎的sendOrder()方法,该方法会将相应的指令保存在workingLimitOrderDict或workingStopOrderDict字典中等待被撮合(实际上就是回测是newBar()方法中有相应的撮合方法代码),撮合后该指令会从字典中删除,该指令生命就此结束;若为False,则会返回一个空列表,不会传到回测引擎中,即该指令的生命在这里就结束了。

       最后,我们用一个具体的例子去展示一下这个过程。

       本例子是以螺纹钢主力合约为为标的,实现R-breaker日内策略。R-breaker策略是一种短线日内交易策略,结合了趋势和反转这两种交易方式,该策略曾经长期被Future Thruth杂志评为最赚钱的策略之一。下面我们看看该策略表现如何。

       R-breaker策略的大体思路就是:根据前一日的收盘价、最高价、最低价,通过一定的方式计算出六个价位,然后再根据日内价格相对这六个价位的相对走势,触发相应的交易信号。具体的交易信号可以看下图:

注:图片来自网上

       由于笔者用到的数据是2013年到2015年的螺纹钢主力合约的分钟K线数据和日K数据,而vnpy框架自带的例子中的数据只有分钟K线数据,为了省去K线数据频率转化的麻烦,笔者直接从聚宽上分别导入了这两类数据。(笔者也试了Tushare,但是好像Tushare提供的数据只是最近几个月的,不想麻烦,就没有去细究了)

       下面是策略文件中的的主要代码:(策略部分代码参考了《Python与量化投资》这本书)

def __init__(self, ctaEngine, setting):
        """Constructor"""
        super(RbreakerStrategy, self).__init__(ctaEngine, setting)
        
        self.bg = BarGenerator(self.onBar)        # 创建K线合成器对象
        #self.bg30 = BarGenerator(self.onBar, 30, self.on30minBar)
        self.am = ArrayManager()
        #cons=ts.get_apis()
        #self.df_day=ts.bar('000300',conn=cons,asset='INDEX',freq='D',start_date='2018-03-07',end_date='')
        self.df_day=jd.get_price('RB9999.XSGE', start_date='2013-01-01', end_date='2015-05-30', frequency='1d', fields=['open', 'close','high','low'])
        self.df_day['index_number']=np.arange(len(self.df_day.index.date))
        self.indicator1=0          #反转做空信号
        self.indicator2=0          #反转做多信号
 
        
    #----------------------------------------------------------------------
    def onInit(self):
        """初始化策略(必须由用户继承实现)"""
        self.writeCtaLog(u'%s策略初始化' %self.name)
        
        # 载入历史数据,并采用回放计算的方式初始化策略数值
        initData = self.loadBar(self.initDays)
        for bar in initData:
            self.onBar(bar)

        self.putEvent()

    #----------------------------------------------------------------------
    def onStart(self):
        """启动策略(必须由用户继承实现)"""
        self.writeCtaLog(u'%s策略启动' %self.name)
        self.putEvent()

    #----------------------------------------------------------------------
    def onStop(self):
        """停止策略(必须由用户继承实现)"""
        self.writeCtaLog(u'%s策略停止' %self.name)
        self.putEvent()

    #----------------------------------------------------------------------
    def onTick(self, tick):
        """收到行情TICK推送(必须由用户继承实现)""" 
        self.bg.updateTick(tick)

    #----------------------------------------------------------------------
    def onBar(self, bar):
        """收到Bar推送(必须由用户继承实现)"""
        #self.am.updateBar(bar) #一分钟k线直接用am中的updateBar()方法即可,相比于bg中的会更简单更快
        
        # 全撤之前发出的委托
        #self.cancelAll()
    
        # 保存K线数据
        am = self.am
        
        am.updateBar(bar)
        
        if not am.inited:
            return
        
        #获取前一分钟的收盘价和最高价
        min_1_close=am.closeArray[-2]
        min_1_high=am.highArray[-2]
        
        #计算前一日和前两日的bar
        day_b_1=self.df_day.iloc[int(self.df_day.loc[bar.datetime.date(),'index_number']-1)]
        #day_b_2=self.df_day.iloc[int(self.df_day.loc[bar.datetime.date(),'index_number']-2)]
        
        #计算前一日最高价、最低价和收盘价
        high=day_b_1['high']
        low=day_b_1['low']
        close=day_b_1['close']
        
        # 计算指标数值
        sw=high+self.coeff_w*(close-low)
        bw=low-self.coeff_w*(high-close)
        sa=self.coeff_a1*(high+low)/2-self.coeff_a2*low
        ba=self.coeff_a1*(high+low)/2-self.coeff_a2*high
        sb=bw-self.coeff_b*(sw-bw)
        bb=sw+self.coeff_b*(sw-bw)
        
        
        # 判断是否要进行交易
      
        ##趋势
        if min_1_close<=bb and bar.close>bb:
            if self.pos==0:
                self.buy(bar.open,self.fixedSize)
            if self.pos <0:
                self.cover(bar.close,abs(self.pos))
        if min_1_close>=sb and bar.close<sb:
            if self.pos==0:
                self.short(bar.open,self.fixedSize)
            if self.pos>0:
                self.sell(bar.close,self.pos)
                
        ##反转 
        ###多单反转
        if bar.high>sw and bar.close>sa:
            self.indicator1=1
        if self.indicator1==1 and bar.close<sa:
            self.indicator1=0
            if self.pos>0:
                self.sell(bar.close,self.pos)
                self.short(bar.open,self.fixedSize)
        ###空单反转
        if bar.low<bw:
            self.indicator2=1
        if self.indicator2==1 and bar.close>ba:
            self.indicator2=0
            if self.pos<0:
                self.buy(bar.close,abs(self.pos)+self.fixedSize)
        
        #当天平仓
        if bar.datetime.time()>dt.time(14,55):
            if self.pos>0:
                self.sell(bar.close,self.pos)
            if self.pos<0:
                self.cover(bar.close,abs(self.pos))
        
            
        # 同步数据到数据库
        #self.saveSyncData()        
    
        # 发出状态更新事件
        self.putEvent()  

          接下来是回测引擎文件中的部分代码,当然就是包含了修改部分的代码,该文件中其他部分的代码都没有变:

def loadHistoryData(self):
        """载入历史数据"""
        #self.dbClient = pymongo.MongoClient(globalSetting['mongoHost'], globalSetting['mongoPort'])
        #collection = self.dbClient[self.dbName][self.symbol]          

        self.output(u'开始载入数据')
      
        # 首先根据回测模式,确认要使用的数据类
        if self.mode == self.BAR_MODE:
            dataClass = VtBarData
            func = self.newBar
        else:
            dataClass = VtTickData
            func = self.newTick

        # 载入初始化需要用的数据
        '''flt = {'datetime':{'$gte':self.dataStartDate,
                           '$lt':self.strategyStartDate}} '''       
        #initCursor = collection.find(flt).sort('datetime')
        
        # 将数据从查询指针中读取出,并生成列表
        #cons=ts.get_apis()
        #self.initData1 =ts.bar('000300',conn=cons,asset='INDEX',freq='1min',start_date='2018-03-12',end_date='').sort()     # 清空initData列表
        self.initData1=jd.get_price('RB9999.XSGE', start_date='2013-01-10', end_date='2015-05-30 23:00:00', frequency='1m', fields=['open',                                                                                                                                                                                             'close','high','low','volume'])
        self.initData=[]
        for c in self.initData1.T:
            d=self.initData1.loc[c]
            data = dataClass()
            data.__dict__ = {'high':d['high'],'low':d['low'],'open':d['open'],'close':d['close'],'volume':d['volume'],
                                                              'datetime':d.name.to_pydatetime()}
            self.initData.append(data)   
        
        # 载入回测数据
        '''if not self.dataEndDate:
            flt = {'datetime':{'$gte':self.strategyStartDate}}   # 数据过滤条件
        else:
            flt = {'datetime':{'$gte':self.strategyStartDate,
                               '$lte':self.dataEndDate}}  
        self.dbCursor = collection.find(flt).sort('datetime')'''
        
        self.output(u'载入完成,数据量:%s' %(len(self.initData1.index.date)))
        
    #----------------------------------------------------------------------
    def runBacktesting(self):
        """运行回测"""
        # 载入历史数据
        self.loadHistoryData()
        
        # 首先根据回测模式,确认要使用的数据类
        if self.mode == self.BAR_MODE:
            dataClass = VtBarData
            func = self.newBar
        else:
            dataClass = VtTickData
            func = self.newTick

        self.output(u'开始回测')
        
        self.strategy.onInit()
        self.strategy.inited = True
        self.output(u'策略初始化完成')
        
        self.strategy.trading = True
        self.strategy.onStart()
        self.output(u'策略启动完成')
        
        self.output(u'开始回放数据')

        for c in self.initData1.T:
            d=self.initData1.loc[c]
            data = dataClass()
            data.__dict__ = {'high':d['high'],'low':d['low'],'open':d['open'],'close':d['close'],'volume':d['volume'],
                                                             'datetime':d.name.to_pydatetime()}
            func(data)     
            
        self.output(u'数据回放结束')

       以上代码仅供参考,不可完全copy运行,不然必然是会有bug的。

       最后,我们看下R-breaker策略在螺纹钢主力合约上的表现。笔者分别回测了策略只在日盘运行和在日盘夜盘都运行两种情况,两种情况下的结果差异很大,这是个很有趣的发现。

       首先我们看下在日盘夜盘都运行的情况,即在每天的22:55开始平掉当天的所有仓位,回测结果如下:

可以看到,表现挺差的,虽然最终盈利,但是年化收益只有1.71%,夏普率只有0.15,最大回撤高达16.18%。

       接下来看一下只在日盘运行的情况,即在每天的14:55开始平仓,结果如下:

可以看到,该策略在日盘中,年化收益为8.42%,夏普比率有1.4,最大回撤只有3.95%,虽然表现也不怎么样,但是相比于同时跑日盘和夜盘还是好了很多。

       有兴趣的朋友可以试试其他的标的。不过该策略在螺纹钢主力合约上日盘和夜盘上的表现出来的区别还是很有趣的,值得好好研究一番。

      友情提示:本篇文章主要是为了介绍框架策略开发和回测的逻辑,其中还有很多细节性的东西需要注意;策略代码也不要copy直接运行,不然会bug多多,因为还有其他的一些修改的地方笔者并没有一一指出。当然,有问题欢迎交流和指正。

 

    

 

Logo

旨在为数千万中国开发者提供一个无缝且高效的云端环境,以支持学习、使用和贡献开源项目。

更多推荐