asyncio官方中文文档
asyncio博客demo
asyncio异步并发
asyncio.ganter 和 async、await关键字的使用demo
ganter
并发等同于多create_task
和await
配合的并发,前者效率更高
# 协程监控合约变化
from tqsdk import TqApi, TqSim,TqBacktest,TargetPosTask
import datetime
async def task(symbol):
#quote = api.get_quote(symbol)
klines = api.get_kline_serial(symbol,60,data_length=100)
async with api.register_update_notify(klines) as update_chan:
async for _ in update_chan:
if api.is_changing(klines.iloc[-1], 'datetime'):
print(symbol)#打印变动合约名称
api = TqApi(TqSim(),backtest=TqBacktest(datetime.date(2020,6,1),datetime.date(2020,8,2)))
async def get_mi_symbols():
# 获取当前所有主力合约
lst = [v['underlying_symbol'] for k,v in api._data['quotes'].items() if k.startswith('KQ.m')]
#print(lst)
return lst
mi_symbols_list = ['CFFEX.IF2008', 'CFFEX.IH2008', 'CFFEX.IC2008', 'CFFEX.TF2009', 'CFFEX.T2009', 'SHFE.cu2009', 'SHFE.au2012', 'SHFE.ag2012', 'SHFE.zn2009', 'SHFE.al2009', 'SHFE.ru2101', 'SHFE.rb2010', 'SHFE.fu2101', 'SHFE.hc2010', 'SHFE.bu2012', 'SHFE.pb2009', 'SHFE.ni2010', 'SHFE.sn2010', 'SHFE.wr2010', 'INE.sc2009', 'DCE.a2009', 'DCE.b2010', 'DCE.bb2009', 'DCE.c2101', 'DCE.cs2009', 'DCE.fb2009', 'DCE.i2101', 'DCE.j2009', 'DCE.jd2009', 'DCE.jm2009', 'DCE.l2009', 'DCE.m2101', 'DCE.p2009', 'DCE.pp2009', 'DCE.v2009', 'DCE.y2101', 'CZCE.WH009', 'CZCE.PM105', 'CZCE.CF101', 'CZCE.CY101', 'CZCE.SR101', 'CZCE.TA101', 'CZCE.OI009', 'CZCE.RI009', 'CZCE.MA009', 'CZCE.FG101', 'CZCE.RS009', 'CZCE.RM009', 'CZCE.ZC009', 'CZCE.JR009', 'CZCE.LR011', 'CZCE.SF010', 'CZCE.SM009', 'CZCE.AP010', 'CFFEX.TS2009', 'SHFE.sp2012', 'DCE.eg2009', 'CZCE.CJ009', 'INE.nr2010', 'DCE.rr2010', 'CZCE.UR101', 'SHFE.ss2010', 'DCE.eb2009', 'CZCE.SA009', 'DCE.pg2011', 'INE.lu2101']
#mi_symbols_list = api.create_task(get_mi_symbols())
# for i in mi_symbols_list:
# task = api.create_task(task(i))
# #print(i)
tasks = [task(i) for i in mi_symbols_list]
async def main():
await asyncio.gather(
*tasks
#task('CFFEX.IF2008'),
#task('CFFEX.IH2008')
)
api.create_task(main())
# api.create_task(task('CFFEX.IF2008'))
# api.create_task(task('CFFEX.IH2008'))
#api.create_task(aa())
with closing(api):
while True:
api.wait_update()
多进程异步io混合编程
from tqsdk import TqApi, TqSim,TqBacktest,TargetPosTask
import datetime
import asyncio
from contextlib import closing
import multiprocessing
from multiprocessing import Pool
async def task(symbol):
#quote = api.get_quote(symbol)
print('开始加载{}的{}-{}数据,多核异步回测加载中...'.format(symbol,str(start_dt),str(end_dt)))
klines = api.get_kline_serial(symbol,60,data_length=100)
async with api.register_update_notify(klines) as update_chan:
async for _ in update_chan:
if api.is_changing(klines.iloc[-1], 'datetime'):
print(symbol)#打印变动合约名称
async def get_mi_symbols():
# 获取当前所有主力合约
lst = [v['underlying_symbol'] for k,v in api._data['quotes'].items() if k.startswith('KQ.m')]
#print(lst)
return lst
#mi_symbols_list = ['CFFEX.IF2008', 'CFFEX.IH2008', 'CFFEX.IC2008', 'CFFEX.TF2009', 'CFFEX.T2009', 'SHFE.cu2009', 'SHFE.au2012', 'SHFE.ag2012', 'SHFE.zn2009', 'SHFE.al2009', 'SHFE.ru2101', 'SHFE.rb2010', 'SHFE.fu2101', 'SHFE.hc2010', 'SHFE.bu2012', 'SHFE.pb2009', 'SHFE.ni2010', 'SHFE.sn2010', 'SHFE.wr2010', 'INE.sc2009', 'DCE.a2009', 'DCE.b2010', 'DCE.bb2009', 'DCE.c2101', 'DCE.cs2009', 'DCE.fb2009', 'DCE.i2101', 'DCE.j2009', 'DCE.jd2009', 'DCE.jm2009', 'DCE.l2009', 'DCE.m2101', 'DCE.p2009', 'DCE.pp2009', 'DCE.v2009', 'DCE.y2101', 'CZCE.WH009', 'CZCE.PM105', 'CZCE.CF101', 'CZCE.CY101', 'CZCE.SR101', 'CZCE.TA101', 'CZCE.OI009', 'CZCE.RI009', 'CZCE.MA009', 'CZCE.FG101', 'CZCE.RS009', 'CZCE.RM009', 'CZCE.ZC009', 'CZCE.JR009', 'CZCE.LR011', 'CZCE.SF010', 'CZCE.SM009', 'CZCE.AP010', 'CFFEX.TS2009', 'SHFE.sp2012', 'DCE.eg2009', 'CZCE.CJ009', 'INE.nr2010', 'DCE.rr2010', 'CZCE.UR101', 'SHFE.ss2010', 'DCE.eb2009', 'CZCE.SA009', 'DCE.pg2011', 'INE.lu2101']
mi_symbols_list = ['SHFE.rb2010','SHFE.ru2009','SHFE.pb2009', 'SHFE.ni2010','CFFEX.IF2008', 'CFFEX.IH2008', 'CFFEX.IC2008', 'CFFEX.TF2009']
def cut_symbol_list(pool):
"""划分多进程池的合约任务"""
global mi_symbols_dict
mi_symbols_dict ={
}
id_idx = 0
print('正在划分进程任务...')
for idx,x in enumerate(mi_symbols_list):
try:
if len(mi_symbols_list)%pool==0:
id = [i for i in range(len(mi_symbols_list) + 1) if i % (len(mi_symbols_list) / pool) == 0 and i != 0]
if idx+1 in id:
if id_idx==0:
mi_symbols_dict['symbols' + str(idx) + '_' + str(idx + 1)] = [task(i) for i in mi_symbols_list[:id[id_idx]]]
id_idx += 1
print(mi_symbols_dict)
continue
mi_symbols_dict['symbols'+str(idx)+'_'+str(idx+1)] = [task(i) for i in mi_symbols_list[id[id_idx-1]:id[id_idx]]]
id_idx += 1
print(mi_symbols_dict)
except:
print('keyerorr')
return mi_symbols_dict
async def main(tasks):
await asyncio.gather(
*tasks
#task('CFFEX.IF2008'),
#task('CFFEX.IH2008')
)
# api.create_task(task('CFFEX.IF2008'))
# api.create_task(task('CFFEX.IH2008'))
if __name__ == "__main__":
start_dt = datetime.date(2020, 6, 1)
end_dt = datetime.date(2020, 8, 2)
api = TqApi(TqSim(), backtest=TqBacktest(start_dt, end_dt),web_gui=True,debug=True)
cut_dict = cut_symbol_list(4)
multiprocessing.freeze_support()
p = Pool(4)
for k,v in cut_dict.items():
p.apply_async(api.create_task(main(cut_dict[k])))
#api.create_task(main())
print('Waiting for all subprocesses done...')
with closing(api):
while True:
api.wait_update()
#api.close()
p.close()
p.join()
print('All subprocesses done.')
今天的文章python 并发分享到此就结束了,感谢您的阅读。
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
如需转载请保留出处:https://bianchenghao.cn/59915.html