Not receiving data from custom parser
Closed this issue · 4 comments
This is the code in demos/test_extmodules
that is modified:
import sys
from wtpy import BaseExtParser, BaseExtExecuter
from wtpy import WTSTickStruct
from ctypes import byref
import threading
import time
import random
import datetime
from wtpy import WtEngine,EngineType
sys.path.append('../Strategies')
from DualThrust import StraDualThrust
from EmptyStra import EmptyStra
class MyExecuter(BaseExtExecuter):
def __init__(self, id: str, scale: float):
super().__init__(id, scale)
def init(self):
print("inited")
def set_position(self, stdCode: str, targetPos: float):
print("position confirmed: %s -> %f " % (stdCode, targetPos))
class YourParser(BaseExtParser):
def __init__(self, id: str):
super().__init__(id)
self.__worker__ = None
def init(self, engine:WtEngine):
'''
初始化
'''
print(f"Initializing YourParser with ID: {self.__id__}")
super().init(engine)
def random_sim(self):
while True:
curTick = WTSTickStruct()
curTick.code = bytes("CFFEX.IF.HOT", encoding="UTF8")
curTick.exchg = bytes("CFFEX", encoding="UTF8")
# Simulate tick data
curTick.price = random.uniform(3000, 4000) # Random price between 3000 and 4000
curTick.open = curTick.price
curTick.high = curTick.price
curTick.low = curTick.price
curTick.settle = curTick.price
curTick.upper_limit = curTick.price + 100
curTick.lower_limit = curTick.price - 100
curTick.pre_close = curTick.price
curTick.pre_settle = curTick.price
curTick.pre_interest = random.randint(1000, 2000)
curTick.interest = random.randint(1000, 2000)
curTick.volume = random.randint(100, 500) # Random volume
curTick.amount = curTick.volume * curTick.price
now = datetime.datetime.now()
curTick.date = int(now.strftime("%Y%m%d"))
curTick.time = int(now.strftime("%H%M%S"))
curTick.millisec = now.microsecond // 1000
time.sleep(1)
# Log the simulated data
print(f"Simulated Tick: Code={curTick.code.decode('UTF8')}, Price={curTick.price}, Time={curTick.date}{curTick.time}")
# Push the data
result = self.__engine__.push_quote_from_extended_parser(self.__id__, byref(curTick), True)
print(f"Push result: {result}") # Log the result of the push operation
def connect(self):
'''
开始连接
'''
print("Connecting YourParser")
if self.__worker__ is None:
self.__worker__ = threading.Thread(target=self.random_sim, daemon=True)
self.__worker__.start()
return
def disconnect(self):
'''
断开连接
'''
print("disconnect")
return
def release(self):
'''
释放,一般是进程退出时调用
'''
print("release")
return
def subscribe(self, fullCode:str):
'''
订阅实时行情\n
@fullCode 合约代码,格式如CFFEX.IF2106
'''
print("subscribe: " + fullCode)
return
def unsubscribe(self, fullCode:str):
'''
退订实时行情\n
@fullCode 合约代码,格式如CFFEX.IF2106
'''
print("unsubscribe: " + fullCode)
return
if __name__ == "__main__":
#创建一个运行环境,并加入策略
engine = WtEngine(EngineType.ET_CTA)
engine.init('../common/', "config.yaml")
straInfo = EmptyStra(name='test', code="CFFEX.IF.HOT", barCnt=1, period="m5", isForStk=False)
engine.add_cta_strategy(straInfo)
yourParser = YourParser("yourParser")
yourParser.init(engine)
myExecuter = MyExecuter('exec', 1)
engine.commitConfig()
engine.add_exetended_parser(yourParser)
engine.add_exetended_executer(myExecuter)
engine.run()
print('press ctrl-c to exit')
try:
while True:
time.sleep(1)
except KeyboardInterrupt as e:
exit(0)
And this tdparser.yaml config. I understand tdparsers.yaml
is about loading parser that provided by .so file? So the parser in the python code don't need to be registered using this yaml file. (I did so, and get Same name of parsers: myParser
error)
parsers:
- active: true
bport: 9001
filter: ''
host: 127.0.0.1
id: parser1
module: ParserUDP
sport: 3997
# - active: true
# bport: 9001
# filter: ''
# host: 127.0.0.1
# id: yourParser
# module: YourParser
# sport: 3998
I get this log
[12.13 10:26:47 - info ] Callbacks of Extented Parser registration done
[12.13 10:26:47 - info ] Callbacks of Extented Executer registration done
[12.13 10:26:47 - info ] WonderTrader CTA production framework initialzied, version: UNIX v0.9.9 Build@Dec 4 2023 09:51:19
Initializing YourParser with ID: yourParser
[12.13 10:26:47 - info ] Trading sessions loaded
[12.13 10:26:47 - warning] No session configured for CZCE.PX
[12.13 10:26:47 - warning] No session configured for CZCE.SH
[12.13 10:26:47 - warning] No session configured for INE.ec
[12.13 10:26:47 - warning] No session configured for SHFE.br
[12.13 10:26:47 - info ] Commodities configuration file ../common/commodities.json loaded
[12.13 10:26:47 - warning] Commodity CZCE.PX not found, contract PX405 skipped
[12.13 10:26:47 - warning] Commodity CZCE.PX not found, contract PX406 skipped
[12.13 10:26:47 - warning] Commodity CZCE.PX not found, contract PX407 skipped
[12.13 10:26:47 - warning] Commodity CZCE.PX not found, contract PX408 skipped
[12.13 10:26:47 - warning] Commodity CZCE.PX not found, contract PX409 skipped
[12.13 10:26:47 - warning] Commodity CZCE.PX not found, contract PX410 skipped
[12.13 10:26:47 - warning] Commodity CZCE.PX not found, contract PX411 skipped
[12.13 10:26:47 - warning] Commodity CZCE.SH not found, contract SH405 skipped
[12.13 10:26:47 - warning] Commodity CZCE.SH not found, contract SH406 skipped
[12.13 10:26:47 - warning] Commodity CZCE.SH not found, contract SH407 skipped
[12.13 10:26:47 - warning] Commodity CZCE.SH not found, contract SH408 skipped
[12.13 10:26:47 - warning] Commodity CZCE.SH not found, contract SH409 skipped
[12.13 10:26:47 - warning] Commodity CZCE.SH not found, contract SH410 skipped
[12.13 10:26:47 - warning] Commodity CZCE.SH not found, contract SH411 skipped
[12.13 10:26:47 - warning] Commodity INE.ec not found, contract ec2404 skipped
[12.13 10:26:47 - warning] Commodity INE.ec not found, contract ec2406 skipped
[12.13 10:26:47 - warning] Commodity INE.ec not found, contract ec2408 skipped
[12.13 10:26:47 - warning] Commodity INE.ec not found, contract ec2410 skipped
[12.13 10:26:47 - warning] Commodity INE.ec not found, contract ec2412 skipped
[12.13 10:26:47 - warning] Commodity SHFE.br not found, contract br2401 skipped
[12.13 10:26:47 - warning] Commodity SHFE.br not found, contract br2402 skipped
[12.13 10:26:47 - warning] Commodity SHFE.br not found, contract br2403 skipped
[12.13 10:26:47 - warning] Commodity SHFE.br not found, contract br2404 skipped
[12.13 10:26:47 - warning] Commodity SHFE.br not found, contract br2405 skipped
[12.13 10:26:47 - warning] Commodity SHFE.br not found, contract br2406 skipped
[12.13 10:26:47 - warning] Commodity SHFE.br not found, contract br2407 skipped
[12.13 10:26:47 - warning] Commodity SHFE.br not found, contract br2408 skipped
[12.13 10:26:47 - warning] Commodity SHFE.br not found, contract br2409 skipped
[12.13 10:26:47 - warning] Commodity SHFE.br not found, contract br2410 skipped
[12.13 10:26:47 - warning] Commodity SHFE.br not found, contract br2411 skipped
[12.13 10:26:47 - info ] Contracts configuration file ../common/contracts.json loaded, 6 exchanges
[12.13 10:26:47 - info ] Holidays loaded
[12.13 10:26:47 - info ] Hot rules loaded
[12.13 10:26:47 - info ] Trading environment initialized, engine name: CTA
[12.13 10:26:47 - info ] Running mode: Production
[12.13 10:26:47 - info ] Strategy filter Q3LS00_if0 loaded
[12.13 10:26:47 - info ] Code filter CFFEX.IF0 loaded
[12.13 10:26:47 - info ] 0 fee templates loaded
[12.13 10:26:47 - debug] 0 position info of portfolio loaded
[12.13 10:26:47 - info ] [RiskControl] Params inited, Checking frequency: 5 s, MaxIDD: ON(20.00%), MaxMDD: OFF(60.00%), Capital: 5000000.0, Profit Boudary: 101.00%, Calc Span: 30 mins, Risk Scale: 0.30
[12.13 10:26:47 - info ] Engine task poolsize is 0
[12.13 10:26:47 - info ] Resampled bars will be aligned by section: no
[12.13 10:26:47 - info ] Force to cache bars: yes
[12.13 10:26:47 - info ] WtDataReader initialized, rt dir is ../storage/rt/, hist dir is ../storage/his/, adjust_flag is 0
[12.13 10:26:47 - info ] No adjusting factor file configured, loading skipped
[12.13 10:26:47 - info ] Data manager initialized
[12.13 10:26:47 - info ] Action policies initialized
[12.13 10:26:47 - info ] Reading parser config from tdparsers.yaml...
[12.13 10:26:47 - info ] [parser1] Parser module /wtpy/wtpy/wrapper/linux/parsers/libParserUDP.so loaded
[12.13 10:26:47 - info ] [parser1] Parser initialzied, check_time: false
[12.13 10:26:47 - info ] 1 parsers loaded
[12.13 10:26:47 - info ] Reading trader config from tdtraders.yaml...
[12.13 10:26:47 - info ] [simnow] Risk control rule default of trading channel loaded
[12.13 10:26:47 - info ] [simnow] Trader module /wtpy/wtpy/wrapper/linux/traders/libTraderCTP.so loaded
[12.13 10:26:47 - info ] 1 traders loaded
[12.13 10:26:47 - info ] Executer factory WtExeFact loaded
[12.13 10:26:47 - info ] 0 executers loaded
[12.13 10:26:47 - info ] Extended parser created
[12.13 10:26:47 - info ] Extended Executer created
Connecting YourParser
[12.13 10:26:47 - info ] 2 parsers started
[12.13 10:26:47 - info ] registerFront: tcp://180.168.146.187:10201
[12.13 10:26:47 - info ] 1 trading channels started
[12.13 10:26:47 - info ] CtaTicker will drive engine with session TRADING
[12.13 10:26:47 - info ] Trading date confirmed: 20231213
[12.13 10:26:47 - debug] Main KBars confirmed: CFFEX.IF.HOT#m5
[12.13 10:26:47 - info ] Reading final bars of CFFEX.IF.HOT via extended loader...
[12.13 10:26:47 - info ] 0 items of back min5 data of CFFEX.IF.HOT cached
[12.13 10:26:47 - info ] HOT contract on 20231213 confirmed: CFFEX.IF.HOT -> IF2312
[12.13 10:26:47 - debug] His min5 bars of CFFEX.IF.HOT loaded, 0 from history, 0 from realtime
[12.13 10:26:47 - info ] Market data subscribed: CFFEX.IF.HOT
[12.13 10:26:47 - info ] EmptyStra inited
[12.13 10:26:47 - info ] Trading day 20231213 begun
press ctrl-c to exit
[12.13 10:26:47 - info ] [RiskControl] Current Balance Ratio: 100.00%
Simulated Tick: Code=CFFEX.IF.HOT, Price=3321.4201982447967, Time=20231213102647
Push result: None
Simulated Tick: Code=CFFEX.IF.HOT, Price=3478.500870923655, Time=20231213102648
Push result: None
Simulated Tick: Code=CFFEX.IF.HOT, Price=3799.4673982465883, Time=20231213102649
Push result: None
Simulated Tick: Code=CFFEX.IF.HOT, Price=3928.3792774291314, Time=20231213102650
Push result: None
Simulated Tick: Code=CFFEX.IF.HOT, Price=3734.984036522967, Time=20231213102651
Push result: None
[12.13 10:26:52 - info ] [RiskControl] Current Balance Ratio: 100.00%
I also made an empty strategy for log. It should log if receive simulated data
from wtpy import BaseCtaStrategy
from wtpy import CtaContext
import numpy as np
class EmptyStra(BaseCtaStrategy):
def __init__(self, name:str, code:str, barCnt:int, period:str, isForStk:bool = False):
BaseCtaStrategy.__init__(self, name)
self.__period__ = period
self.__bar_cnt__ = barCnt
self.__code__ = code
self.__is_stk__ = isForStk
def on_init(self, context:CtaContext):
code = self.__code__ #品种代码
if self.__is_stk__:
code = code + "-" # 如果是股票代码,后面加上一个+/-,+表示后复权,-表示前复权
context.stra_prepare_bars(code, self.__period__, self.__bar_cnt__, isMain = True)
# 获取K线和tick数据的时候会自动订阅, 这里只需要订阅额外要检测的品种即可, 不知道对不对
context.stra_sub_ticks(code)
context.stra_log_text("EmptyStra inited")
#读取存储的数据
# self.xxx = context.user_load_data('xxx',1)
def on_tick(self, context: CtaContext, stdCode: str, newTick: dict):
'''
逐笔数据进来时调用
生产环境中,每笔行情进来就直接调用
回测环境中,是模拟的逐笔数据
@context 策略运行上下文
@stdCode 合约代码
@newTick 最新逐笔
'''
print('get new tick')
print('stdCode', stdCode, 'newTick', newTick)
pass
def on_bar(self, context:CtaContext, stdCode:str, period:str, newBar:dict):
'''
K线闭合时回调
@context 策略上下文
@stdCode 合约代码
@period K线周期
@newBar 最新闭合的K线
'''
print('get new bar')
print('stdCode', stdCode, 'period', period, '\n', 'newBar', newBar)
pass
return
def on_calculate(self, context:CtaContext):
'''
K线闭合时调用,一般作为策略的核心计算模块
@context 策略运行上下文
'''
code = self.__code__ #品种代码
trdUnit = 1
if self.__is_stk__:
trdUnit = 100
#读取最近50条1分钟线(dataframe对象)
theCode = code
if self.__is_stk__:
theCode = theCode + "-" # 如果是股票代码,后面加上一个+/-,+表示后复权,-表示前复权
np_bars = context.stra_get_bars(theCode, self.__period__, self.__bar_cnt__, isMain = True)
print('on_calculate data',np_bars.ndarray)
# print('bars',bars.get_bar)
short_ma = np_bars.closes[-5:].mean()
long_ma = np_bars.closes[-60:]
# print('short ma', short_ma, len(short_ma))
print('short ma', short_ma)
print('long ma', long_ma,len(long_ma))
But I didn't receive any simulated data from my parser.
是配置用法问题(但不确定是哪里配错了),因为 C++ 写的 parser 也收不到信息。但是 ParserCTP 用同样的接口在发消息就能收到。
ParserRandom 生成随机测试行情的代码
void ParserRandom::subscribe(const CodeSet &vecSymbols)
{
m_running = true;
std::thread(&ParserRandom::generateRandomData, this, vecSymbols).detach();
}
void ParserRandom::unsubscribe(const CodeSet &vecSymbols)
{
m_running = false;
}
void ParserRandom::generateRandomData(const CodeSet &vecSymbols)
{
std::default_random_engine generator(static_cast<long unsigned int>(time(0)));
std::uniform_real_distribution<double> priceDist(100.0, 200.0); // Example price range
std::uniform_int_distribution<int> volumeDist(100, 1000); // Example volume range
while (m_running)
{
for (const std::string &symbol : vecSymbols)
{
WTSTickData *tick = WTSTickData::create("IC2401");
WTSTickStruct "e = tick->getTickStruct();
// strcpy(quote.exchg, pCommInfo->getExchg());
strcpy(quote.exchg, "CFFEX");
m_uTradingDate = TimeUtils::getCurDate();
// Randomly generate the fields similar to OnRtnDepthMarketData
quote.price = priceDist(generator);
quote.open = priceDist(generator);
quote.high = priceDist(generator);
quote.low = priceDist(generator);
quote.total_volume = volumeDist(generator);
quote.trading_date = m_uTradingDate;
// ... fill other fields similarly
// Print tick data for debugging
// std::cout << "Symbol: " << symbol << ", Price: " << quote.price << ", Volume: " << quote.total_volume << std::endl;
if (m_sink)
m_sink->handleQuote(tick, 1);
tick->release();
}
std::this_thread::sleep_for(std::chrono::seconds(1));
}
}
可能是 session 等地方配错了。
mdparsers.yaml
parsers: # 行情通道配置
- active: true
broker: '8888'
code: '' # 要录制的合约代码,如果为空默认contracts.json中的全部,不为空则只录制指定的合约,注意这里须与contracts中的代码一致!如'CFFEX.IF2408, CFFEX.IF2403'
front: tcp://121.37.90.193:20004 #openctp
id: parser
user: 1234
pass: 123456
module: ParserRandom
dtcfg.yaml
basefiles: # 基础文件
commodity: ../common/commodities.json
contract: ../common/contracts.json
holiday: ../common/holidays.json
session: ../common/sessions.json
utf-8: true
broadcaster: # UDP广播器配置项,如果要在局域网转发行情,可以使用该模块
active: true
bport: 3997 # UDP查询端口,主要是用于查询最新的快照
broadcast: # 广播配置
- host: 255.255.255.255 # 广播地址,255.255.255.255会向整个局域网广播,但是受限于路由器
port: 9001 # 广播端口,接收端口要和广播端口一致
type: 2 # 数据类型,固定为2
shmcaster: # 共享内存转发,适合本机转发
active: true
path: ./exchange.membin # memmap文件路径
parsers: mdparsers.yaml
statemonitor: statemonitor.yaml
writer: # 数据落地配置
module: WtDataStorage #数据存储模块
async: true #同步落地还是异步落地,期货推荐同步,股票推荐异步
groupsize: 20 #日志分组大小,主要用于控制日志输出,当订阅合约较多时,推荐1000以上,当订阅的合约数较少时,推荐100以内
path: ../storage_AD #数据存储的路径
savelog: true #是否保存tick到csv
disabletick: false #不保存tick数据,默认false
disablemin1: false #不保存min1数据,默认false
disablemin5: false #不保存min5数据,默认false
disableday: false #不保存day数据,默认false
disablehis: false #收盘作业不转储历史数据,默认false
commodities.json
{
"CFFEX": {
"IC": {
"covermode": 0,
"pricemode": 0,
"category": 1,
"precision": 1,
"pricetick": 0.2,
"volscale": 200,
"name": "中证",
"exchg": "CFFEX",
"session": "SD0930",
"holiday": "CHINA"
},
sessions.json
{
"SD0930":{
"name":"股票白盘0930",
"offset": 0,
"auction":{
"from": 929,
"to": 930
},
"sections":[
{
"from": 930,
"to": 1130
},
{
"from": 1300,
"to": 1500
}
]
},
"FD0915":{
"name":"期货白盘0915",
"offset": 0,
"auction":{
"from": 929,
"to": 930
},
"sections":[
{
"from": 930,
"to": 1130
},
{
"from": 1300,
"to": 1515
}
]
},
运行后没有接收到行情。(我在 c++ 部分加过 log,c++ 那边是一直在循环输出行情的)
而同时如果使用 ParserCTP 就能正常收到 tick

感谢,我昨天写了个自动产 parser 的 CI,今天让他自己改改合约名多试试…
推荐检查流程:
1、检查是否正确配置自定义品种的合约信息
2、使用testUDP检查是否正确转发
3、检查策略是否正确订阅tick数据