python 并发

python 并发asyncio官方中文文档asyncio博客demoasyncio异步并发asyncio.ganter和async、await关键字的使用demoganter并发等同于多create_task和await配合的并发,前

asyncio官方中文文档
asyncio博客demo

asyncio异步并发

asyncio.ganter 和 async、await关键字的使用demo

ganter并发等同于多create_taskawait配合的并发,前者效率更高

# 协程监控合约变化
from tqsdk import TqApi, TqSim,TqBacktest,TargetPosTask
import datetime



async def task(symbol):
    #quote = api.get_quote(symbol)
    klines = api.get_kline_serial(symbol,60,data_length=100)

    async with api.register_update_notify(klines) as update_chan:
        async for _ in update_chan:
            if api.is_changing(klines.iloc[-1], 'datetime'):
                print(symbol)#打印变动合约名称

api = TqApi(TqSim(),backtest=TqBacktest(datetime.date(2020,6,1),datetime.date(2020,8,2)))

async def get_mi_symbols():
    # 获取当前所有主力合约
    lst = [v['underlying_symbol'] for k,v in api._data['quotes'].items() if k.startswith('KQ.m')]
    #print(lst)
    return lst
mi_symbols_list = ['CFFEX.IF2008', 'CFFEX.IH2008', 'CFFEX.IC2008', 'CFFEX.TF2009', 'CFFEX.T2009', 'SHFE.cu2009', 'SHFE.au2012', 'SHFE.ag2012', 'SHFE.zn2009', 'SHFE.al2009', 'SHFE.ru2101', 'SHFE.rb2010', 'SHFE.fu2101', 'SHFE.hc2010', 'SHFE.bu2012', 'SHFE.pb2009', 'SHFE.ni2010', 'SHFE.sn2010', 'SHFE.wr2010', 'INE.sc2009', 'DCE.a2009', 'DCE.b2010', 'DCE.bb2009', 'DCE.c2101', 'DCE.cs2009', 'DCE.fb2009', 'DCE.i2101', 'DCE.j2009', 'DCE.jd2009', 'DCE.jm2009', 'DCE.l2009', 'DCE.m2101', 'DCE.p2009', 'DCE.pp2009', 'DCE.v2009', 'DCE.y2101', 'CZCE.WH009', 'CZCE.PM105', 'CZCE.CF101', 'CZCE.CY101', 'CZCE.SR101', 'CZCE.TA101', 'CZCE.OI009', 'CZCE.RI009', 'CZCE.MA009', 'CZCE.FG101', 'CZCE.RS009', 'CZCE.RM009', 'CZCE.ZC009', 'CZCE.JR009', 'CZCE.LR011', 'CZCE.SF010', 'CZCE.SM009', 'CZCE.AP010', 'CFFEX.TS2009', 'SHFE.sp2012', 'DCE.eg2009', 'CZCE.CJ009', 'INE.nr2010', 'DCE.rr2010', 'CZCE.UR101', 'SHFE.ss2010', 'DCE.eb2009', 'CZCE.SA009', 'DCE.pg2011', 'INE.lu2101']

#mi_symbols_list = api.create_task(get_mi_symbols())

# for i in mi_symbols_list:
# task = api.create_task(task(i))
# #print(i)
tasks = [task(i) for i in mi_symbols_list]
async def main():
    await asyncio.gather(
        *tasks
        #task('CFFEX.IF2008'),
        #task('CFFEX.IH2008')
    )
api.create_task(main())
# api.create_task(task('CFFEX.IF2008'))
# api.create_task(task('CFFEX.IH2008'))
#api.create_task(aa())
with closing(api):
    while True:
        api.wait_update()

多进程异步io混合编程

from tqsdk import TqApi, TqSim,TqBacktest,TargetPosTask
import datetime
import asyncio
from contextlib import closing
import multiprocessing
from multiprocessing import Pool



async def task(symbol):
    #quote = api.get_quote(symbol)
    print('开始加载{}的{}-{}数据,多核异步回测加载中...'.format(symbol,str(start_dt),str(end_dt)))
    klines = api.get_kline_serial(symbol,60,data_length=100)

    async with api.register_update_notify(klines) as update_chan:
        async for _ in update_chan:
            if api.is_changing(klines.iloc[-1], 'datetime'):
                print(symbol)#打印变动合约名称

async def get_mi_symbols():
    # 获取当前所有主力合约
    lst = [v['underlying_symbol'] for k,v in api._data['quotes'].items() if k.startswith('KQ.m')]
    #print(lst)
    return lst
#mi_symbols_list = ['CFFEX.IF2008', 'CFFEX.IH2008', 'CFFEX.IC2008', 'CFFEX.TF2009', 'CFFEX.T2009', 'SHFE.cu2009', 'SHFE.au2012', 'SHFE.ag2012', 'SHFE.zn2009', 'SHFE.al2009', 'SHFE.ru2101', 'SHFE.rb2010', 'SHFE.fu2101', 'SHFE.hc2010', 'SHFE.bu2012', 'SHFE.pb2009', 'SHFE.ni2010', 'SHFE.sn2010', 'SHFE.wr2010', 'INE.sc2009', 'DCE.a2009', 'DCE.b2010', 'DCE.bb2009', 'DCE.c2101', 'DCE.cs2009', 'DCE.fb2009', 'DCE.i2101', 'DCE.j2009', 'DCE.jd2009', 'DCE.jm2009', 'DCE.l2009', 'DCE.m2101', 'DCE.p2009', 'DCE.pp2009', 'DCE.v2009', 'DCE.y2101', 'CZCE.WH009', 'CZCE.PM105', 'CZCE.CF101', 'CZCE.CY101', 'CZCE.SR101', 'CZCE.TA101', 'CZCE.OI009', 'CZCE.RI009', 'CZCE.MA009', 'CZCE.FG101', 'CZCE.RS009', 'CZCE.RM009', 'CZCE.ZC009', 'CZCE.JR009', 'CZCE.LR011', 'CZCE.SF010', 'CZCE.SM009', 'CZCE.AP010', 'CFFEX.TS2009', 'SHFE.sp2012', 'DCE.eg2009', 'CZCE.CJ009', 'INE.nr2010', 'DCE.rr2010', 'CZCE.UR101', 'SHFE.ss2010', 'DCE.eb2009', 'CZCE.SA009', 'DCE.pg2011', 'INE.lu2101']

mi_symbols_list = ['SHFE.rb2010','SHFE.ru2009','SHFE.pb2009', 'SHFE.ni2010','CFFEX.IF2008', 'CFFEX.IH2008', 'CFFEX.IC2008', 'CFFEX.TF2009']
def cut_symbol_list(pool):
    """划分多进程池的合约任务"""
    global mi_symbols_dict
    mi_symbols_dict ={ 
   }
    id_idx = 0
    print('正在划分进程任务...')
    for idx,x in enumerate(mi_symbols_list):
        try:
            if len(mi_symbols_list)%pool==0:
                id = [i for i in range(len(mi_symbols_list) + 1) if i % (len(mi_symbols_list) / pool) == 0 and i != 0]
                if idx+1 in id:
                    if id_idx==0:
                        mi_symbols_dict['symbols' + str(idx) + '_' + str(idx + 1)] = [task(i) for i in mi_symbols_list[:id[id_idx]]]
                        id_idx += 1
                        print(mi_symbols_dict)
                        continue
                    mi_symbols_dict['symbols'+str(idx)+'_'+str(idx+1)] = [task(i) for i in mi_symbols_list[id[id_idx-1]:id[id_idx]]]
                    id_idx += 1
                    print(mi_symbols_dict)
        except:
            print('keyerorr')
    return mi_symbols_dict


async def main(tasks):
    await asyncio.gather(
        *tasks
        #task('CFFEX.IF2008'),
        #task('CFFEX.IH2008')
    )

# api.create_task(task('CFFEX.IF2008'))
# api.create_task(task('CFFEX.IH2008'))

if __name__ == "__main__":
    start_dt = datetime.date(2020, 6, 1)
    end_dt = datetime.date(2020, 8, 2)
    api = TqApi(TqSim(), backtest=TqBacktest(start_dt, end_dt),web_gui=True,debug=True)
    cut_dict = cut_symbol_list(4)
    multiprocessing.freeze_support()
    p = Pool(4)
    for k,v in cut_dict.items():
        p.apply_async(api.create_task(main(cut_dict[k])))
    #api.create_task(main())
    print('Waiting for all subprocesses done...')

    with closing(api):
        while True:
            api.wait_update()

    #api.close()
    p.close()
    p.join()
    print('All subprocesses done.')

今天的文章python 并发分享到此就结束了,感谢您的阅读。

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
如需转载请保留出处:https://bianchenghao.cn/59915.html

(0)
编程小号编程小号

相关推荐

发表回复

您的电子邮箱地址不会被公开。 必填项已用 * 标注