这个网格策略是一个很基础的网格策略,采用的是双向挂单的方式,能实现0滑点,网格策略只适合震荡行情,可以展示这个框架是怎么写实盘代码的,因为CTP的自由度非常高,所以我的方式不一定对,也不一定好,能实现我的想法即可,即使是最简单的网格策略,写起来并不简单,止盈止损还没写,目前我的代码水平有限,请谅解。
仓位分为三种,分别为多仓,空仓和持仓为零,每一种持仓方式不同那么下单方式都不同。
代码实现:
# 多头下单
def buy_price(ctp_api, price):
ret, sell_order = ctp_api.insertOrder(code=Account.subscribe_list[0], BSType='sellclose',
price=price + parameter_dict['TICK'],
volume=parameter_dict['Volume']) # orderRef报单编号
ret, buy_order = ctp_api.insertOrder(code=Account.subscribe_list[0], BSType='buyopen',
price=price - parameter_dict['TICK'],
volume=parameter_dict['Volume']) # orderRef报单编号
return buy_order, sell_order
代码实现:
# 空头下单
def sell_price(ctp_api, price):
ret, buy_order = ctp_api.insertOrder(code=Account.subscribe_list[0], BSType='sellopen',
price=price + parameter_dict['TICK'],
volume=parameter_dict['Volume']) # orderRef报单编号
ret, sell_order = ctp_api.insertOrder(code=Account.subscribe_list[0], BSType='buyclose',
price=price - parameter_dict['TICK'],
volume=parameter_dict['Volume']) # orderRef报单编号
return buy_order, sell_order
代码实现:
# 零轴下单
def zero_price(ctp_api, price):
ret, sell_order = ctp_api.insertOrder(code=Account.subscribe_list[0], BSType='sellopen',
price=price + parameter_dict['TICK'],
volume=parameter_dict['Volume']) # orderRef报单编号
ret, buy_order = ctp_api.insertOrder(code=Account.subscribe_list[0], BSType='buyopen',
price=price - parameter_dict['TICK'],
volume=parameter_dict['Volume']) # orderRef报单编号
return buy_order, sell_order
def main(Account):
ctp_api = CtpAPI(Account) # 初始化
ctp_api.connect_to_md() # 登录行情账户
ctp_api.connect_to_td() # 登录交易账户
time.sleep(1) # 等待登录成功
# 多头方向
while True:
# 查询持仓
ctp_api.reqQryInvestorPosition(InstrumentID=Account.subscribe_list[0]) # 查询单独持仓
g.df_pInvestorPosition.loc[(g.df_pInvestorPosition['持仓多空方向'] == '3'), '当前持仓数量'] = - \
g.df_pInvestorPosition["当前持仓数量"] # 当持仓多空方向等于3时,为空,当前持仓数量设置为负数
g.df_pInvestorPosition['总持仓'] = g.df_pInvestorPosition['当前持仓数量'].sum()
if g.df_pInvestorPosition.iloc[0]['总持仓'] > 0: # 当持有仓位大于0时,多头方向
print('进入多头方向')
buy_wait_price(ctp_api)
if g.df_pInvestorPosition.iloc[0]['总持仓'] < 0: # 当持有仓位小于0时,空头方向
print('进入空头方向')
sell_wait_price(ctp_api)
elif g.df_pInvestorPosition.iloc[0]['总持仓'] == 0: # 当持有仓位等于0时,中性方向
print('进入中性方向')
zero_wait_price(ctp_api)
ctp_api.tduserapi.Join() # 等待程序完成
先登录行情账户和交易账户,然后查询总持仓,根据总持仓的数量进入不同方向进行交易。
# 多头下单循环
def buy_wait_price(ctp_api):
buy_order, sell_order = buy_price(ctp_api, price=g.md_dict.get('最新价')) # 第一次挂单
while True:
new_price = g.md_dict.get('最新价')
# 阻塞函数,当最新价格相同时等待
if is_price_changed(new_price):
if int(buy_order) == int(g.PTrade.get('报单引用')): # 判断是否成交
ctp_api.cancelOrder(sell_order) # 撤另一个方向的单
# buy_order, sell_order = buy_price(ctp_api, price=g.md_dict.get('最新价')) # 再次下单
buy_order, sell_order = buy_price(ctp_api, price=g.PTrade.get('价格')) # 再次下单
print(g.PTrade) # 成交回报
if int(sell_order) == int(g.PTrade.get('报单引用')): # 判断是否成交
ctp_api.cancelOrder(buy_order) # 撤另一个方向的单
# 查询持仓
ctp_api.reqQryInvestorPosition(InstrumentID=Account.subscribe_list[0]) # 查询单独持仓
g.df_pInvestorPosition.loc[(g.df_pInvestorPosition['持仓多空方向'] == '3'), '当前持仓数量'] = - \
g.df_pInvestorPosition["当前持仓数量"] # 当持仓多空方向等于3时,为空,当前持仓数量设置为负数
g.df_pInvestorPosition['总持仓'] = g.df_pInvestorPosition['当前持仓数量'].sum()
print(g.PTrade)
print(g.df_pInvestorPosition)
if g.df_pInvestorPosition.iloc[0]['总持仓'] <= 0:
print('进入中性方向')
zero_wait_price(ctp_api)
break # 退出循环
elif g.df_pInvestorPosition.iloc[0]['总持仓'] > 0:
buy_order, sell_order = buy_price(ctp_api, price=g.PTrade.get('价格')) # 再次下单
就讲这个代码就好了,理解了这个,剩下的自己基本能看懂。
假设刚开始运行时进入了多头仓位,立即根据最新价格向两边挂单,不断刷新最新价。
如果下多单,那么当前总持仓肯定为正,不需要判断总持仓是否小于等于0,然后撤另一个方向的单,根据成交价双向挂单。
如果平多单,立即撤另一个方向的单,查询持仓是否小于等于0,如果是则进入中性方向(持仓为0)挂单,并退出循环,如果大于0,则再次双向挂单。
读懂这个,其他基本没啥问题了。
实盘效果:
import time
from multiprocessing import Queue
from program.function import *
from program.is_price_changed import *
import queue
import threading
import pandas as pd
pd.set_option('max_rows', None) # 显示最多行数
pd.set_option('max_columns', None) # 显示最多列数
pd.set_option('expand_frame_repr', False) # 当列太多时显示不清楚
pd.set_option('display.unicode.east_asian_width', True) # 设置输出右对齐
class CFtdcMdSpi(mdapi.CThostFtdcMdSpi):
def __init__(self, tapi, account):
mdapi.CThostFtdcMdSpi.__init__(self) # 初始化环境
self.tapi = tapi
self.account = account
# self.share_queue = share_queue
def OnFrontConnected(self) -> "void": # 连接通知
print("连接通知OnFrontConnected")
loginfield = mdapi.CThostFtdcReqUserLoginField()
loginfield.BrokerID = str(self.account.broker_id) # 9999
loginfield.UserID = str(self.account.investor_id) # b'218493'账号
loginfield.Password = str(self.account.password) # 密码
loginfield.UserProductInfo = "python dll"
self.tapi.ReqUserLogin(loginfield, 0) # 登录账户
# 登录通知
def OnRspUserLogin(self, pRspUserLogin: 'CThostFtdcRspUserLoginField', pRspInfo: 'CThostFtdcRspInfoField',
nRequestID: 'int', bIsLast: 'bool') -> "void":
print(
f"登录通知OnRspUserLogin, SessionID={pRspUserLogin.SessionID},ErrorID={pRspInfo.ErrorID},ErrorMsg={pRspInfo.ErrorMsg}")
ret = self.tapi.SubscribeMarketData([id.encode('utf-8') for id in self.account.subscribe_list],
len(self.account.subscribe_list)) # 订阅行情
# 行情通知
def OnRtnDepthMarketData(self, pDepthMarketData: 'CThostFtdcDepthMarketDataField') -> "void":
# print("订阅响应OnRtnDepthMarketData")
md_dict = {
'交易日': pDepthMarketData.TradingDay,
'合约代码': pDepthMarketData.InstrumentID,
'最新价': pDepthMarketData.LastPrice,
'上次结算价': pDepthMarketData.PreSettlementPrice,
'昨收盘': pDepthMarketData.PreClosePrice,
'昨持仓量': pDepthMarketData.PreOpenInterest,
'今开盘': pDepthMarketData.OpenPrice,
'最高价': pDepthMarketData.HighestPrice,
'最低价': pDepthMarketData.LowestPrice,
'数量': pDepthMarketData.Volume,
'Turnover': pDepthMarketData.Turnover,
'OpenInterest': pDepthMarketData.OpenInterest,
'ClosePrice': pDepthMarketData.ClosePrice,
'SettlementPrice': pDepthMarketData.SettlementPrice,
'UpperLimitPrice': pDepthMarketData.UpperLimitPrice,
'LowerLimitPrice': pDepthMarketData.LowerLimitPrice,
'PreDelta': pDepthMarketData.PreDelta,
'CurrDelta': pDepthMarketData.CurrDelta,
'UpdateTime': pDepthMarketData.UpdateTime,
'UpdateMillisec': pDepthMarketData.UpdateMillisec,
'BidPrice1': pDepthMarketData.BidPrice1,
'BidVolume1': pDepthMarketData.BidVolume1,
'AskPrice1': pDepthMarketData.AskPrice1,
'AskVolume1': pDepthMarketData.AskVolume1,
'AveragePrice': pDepthMarketData.AveragePrice,
'ActionDay': pDepthMarketData.ActionDay
}
# print(md_dict) # 打印合约名称,最新价
g.md_dict = md_dict # 最新行情
# share_queue.put(md_dict) # 将md_dict添加到共享队列中
def OnRspSubMarketData(self, pSpecificInstrument: 'CThostFtdcSpecificInstrumentField',
pRspInfo: 'CThostFtdcRspInfoField', nRequestID: 'int', bIsLast: 'bool') -> "void":
print("行情通知OnRspSubMarketData")
class CTraderSpi(tdapi.CThostFtdcTraderSpi):
def __init__(self, tduserapi, Account):
tdapi.CThostFtdcTraderSpi.__init__(self)
self.tduserapi = tduserapi
self.Account = Account
# 连接前台
def OnFrontConnected(self):
print("开始建立交易连接")
authfield = tdapi.CThostFtdcReqAuthenticateField()
authfield.BrokerID = str(self.Account.broker_id)
authfield.UserID = str(self.Account.investor_id)
authfield.AppID = str(self.Account.app_id)
authfield.AuthCode = str(self.Account.auth_code)
ret = self.tduserapi.ReqAuthenticate(authfield, 0)
if ret == 0:
print('发送穿透式认证请求成功!')
else:
print('发送穿透式认证请求失败!')
judge_ret(ret)
# 穿透式认证响应
def OnRspAuthenticate(self, pRspAuthenticateField, pRspInfo, nRequestID: 'int', bIsLast: 'bool'):
if pRspInfo.ErrorID != 0 and pRspInfo != None:
print('穿透式认证失败\n错误信息为:{}\n错误代码为:{}'.format(pRspInfo.ErrorMsg, pRspInfo.ErrorID))
else:
print('穿透式认证成功!')
# 发送登录请求
loginfield = tdapi.CThost
本主题为课程学员专享,成为股票量化投资课程学员后可免费阅读
成为学员上传的附件:
主题数 26 |
精华数
23 |