1. <strong id="7actg"></strong>
    2. <table id="7actg"></table>

    3. <address id="7actg"></address>
      <address id="7actg"></address>
      1. <object id="7actg"><tt id="7actg"></tt></object>

        非常適合小白的 Asyncio 教程

        共 7609字,需瀏覽 16分鐘

         ·

        2021-01-11 18:41


        原文:https://segmentfault.com/a/1190000008814676

        所謂「異步 IO」,就是你發(fā)起一個(gè) IO 操作,卻不用等它結(jié)束,你可以繼續(xù)做其他事情,當(dāng)它結(jié)束時(shí),你會(huì)得到通知。

        Asyncio 是并發(fā)(concurrency)的一種方式。對(duì) Python 來說,并發(fā)還可以通過線程(threading)和多進(jìn)程(multiprocessing)來實(shí)現(xiàn)。

        Asyncio 并不能帶來真正的并行(parallelism)。當(dāng)然,因?yàn)?GIL(全局解釋器鎖)的存在,Python 的多線程也不能帶來真正的并行。

        可交給 asyncio 執(zhí)行的任務(wù),稱為協(xié)程(coroutine)。一個(gè)協(xié)程可以放棄執(zhí)行,把機(jī)會(huì)讓給其它協(xié)程(即 yield fromawait)。

        1. 定義協(xié)程

        協(xié)程的定義,需要使用 async def 語句。

        async def do_some_work(x): pass

        do_some_work 便是一個(gè)協(xié)程。
        準(zhǔn)確來說,do_some_work 是一個(gè)協(xié)程函數(shù),可以通過 asyncio.iscoroutinefunction 來驗(yàn)證:

        print(asyncio.iscoroutinefunction(do_some_work)) # True

        這個(gè)協(xié)程什么都沒做,我們讓它睡眠幾秒,以模擬實(shí)際的工作量 :

        async def do_some_work(x):
            print("Waiting " + str(x))
            await asyncio.sleep(x)

        在解釋 await 之前,有必要說明一下協(xié)程可以做哪些事。協(xié)程可以:

        • 等待一個(gè) future 結(jié)束

        • 等待另一個(gè)協(xié)程(產(chǎn)生一個(gè)結(jié)果,或引發(fā)一個(gè)異常)

        • 產(chǎn)生一個(gè)結(jié)果給正在等它的協(xié)程

        • 引發(fā)一個(gè)異常給正在等它的協(xié)程

        asyncio.sleep 也是一個(gè)協(xié)程,所以 await asyncio.sleep(x) 就是等待另一個(gè)協(xié)程??蓞⒁?asyncio.sleep 的文檔:

        sleep(delay, result=None, *, loop=None)
        Coroutine that completes after a given time (in seconds).

        2. 運(yùn)行協(xié)程

        調(diào)用協(xié)程函數(shù),協(xié)程并不會(huì)開始運(yùn)行,只是返回一個(gè)協(xié)程對(duì)象,可以通過 asyncio.iscoroutine 來驗(yàn)證:

        print(asyncio.iscoroutine(do_some_work(3))) # True

        此處還會(huì)引發(fā)一條警告:

        async1.py:16: RuntimeWarning: coroutine 'do_some_work' was never awaited
          print(asyncio.iscoroutine(do_some_work(3)))

        要讓這個(gè)協(xié)程對(duì)象運(yùn)行的話,有兩種方式:

        • 在另一個(gè)已經(jīng)運(yùn)行的協(xié)程中用 await 等待它

        • 通過 ensure_future 函數(shù)計(jì)劃它的執(zhí)行

        簡(jiǎn)單來說,只有 loop 運(yùn)行了,協(xié)程才可能運(yùn)行。
        下面先拿到當(dāng)前線程缺省的 loop ,然后把協(xié)程對(duì)象交給 loop.run_until_complete,協(xié)程對(duì)象隨后會(huì)在 loop 里得到運(yùn)行。

        loop = asyncio.get_event_loop()
        loop.run_until_complete(do_some_work(3))

        run_until_complete 是一個(gè)阻塞(blocking)調(diào)用,直到協(xié)程運(yùn)行結(jié)束,它才返回。這一點(diǎn)從函數(shù)名不難看出。
        run_until_complete 的參數(shù)是一個(gè) future,但是我們這里傳給它的卻是協(xié)程對(duì)象,之所以能這樣,是因?yàn)樗趦?nèi)部做了檢查,通過 ensure_future 函數(shù)把協(xié)程對(duì)象包裝(wrap)成了 future。所以,我們可以寫得更明顯一些:

        loop.run_until_complete(asyncio.ensure_future(do_some_work(3)))

        完整代碼:

        import asyncio

        async def do_some_work(x):
            print("Waiting " + str(x))
            await asyncio.sleep(x)

        loop = asyncio.get_event_loop()
        loop.run_until_complete(do_some_work(3))

        運(yùn)行結(jié)果:

        Waiting 3
        <三秒鐘后程序結(jié)束>

        3. 回調(diào)函數(shù)

        假如協(xié)程是一個(gè) IO 的讀操作,等它讀完數(shù)據(jù)后,我們希望得到通知,以便下一步數(shù)據(jù)的處理。這一需求可以通過往 future 添加回調(diào)來實(shí)現(xiàn)。

        def done_callback(futu):
            print('Done')

        futu = asyncio.ensure_future(do_some_work(3))
        futu.add_done_callback(done_callback)

        loop.run_until_complete(futu)

        4. 多個(gè)協(xié)程

        實(shí)際項(xiàng)目中,往往有多個(gè)協(xié)程,同時(shí)在一個(gè) loop 里運(yùn)行。為了把多個(gè)協(xié)程交給 loop,需要借助 asyncio.gather 函數(shù)。

        loop.run_until_complete(asyncio.gather(do_some_work(1), do_some_work(3)))

        或者先把協(xié)程存在列表里:

        coros = [do_some_work(1), do_some_work(3)]
        loop.run_until_complete(asyncio.gather(*coros))

        運(yùn)行結(jié)果:

        Waiting 3
        Waiting 1
        <等待三秒鐘>
        Done

        這兩個(gè)協(xié)程是并發(fā)運(yùn)行的,所以等待的時(shí)間不是 1 + 3 = 4 秒,而是以耗時(shí)較長的那個(gè)協(xié)程為準(zhǔn)。

        參考函數(shù) gather 的文檔:

        gather(*coros_or_futures, loop=None, return_exceptions=False)
        Return a future aggregating results from the given coroutines or futures.

        發(fā)現(xiàn)也可以傳 futures 給它:

        futus = [asyncio.ensure_future(do_some_work(1)),
                     asyncio.ensure_future(do_some_work(3))]

        loop.run_until_complete(asyncio.gather(*futus))

        gather 起聚合的作用,把多個(gè) futures 包裝成單個(gè) future,因?yàn)?loop.run_until_complete 只接受單個(gè) future。

        5. run_until_complete和run_forever

        我們一直通過 run_until_complete 來運(yùn)行 loop ,等到 future 完成,run_until_complete 也就返回了。

        async def do_some_work(x):
            print('Waiting ' + str(x))
            await asyncio.sleep(x)
            print('Done')

        loop = asyncio.get_event_loop()

        coro = do_some_work(3)
        loop.run_until_complete(coro)

        輸出:

        Waiting 3
        <等待三秒鐘>
        Done
        <程序退出>

        現(xiàn)在改用 run_forever

        async def do_some_work(x):
            print('Waiting ' + str(x))
            await asyncio.sleep(x)
            print('Done')

        loop = asyncio.get_event_loop()

        coro = do_some_work(3)
        asyncio.ensure_future(coro)

        loop.run_forever()

        輸出:

        Waiting 3
        <等待三秒鐘>
        Done
        <程序沒有退出>

        三秒鐘過后,future 結(jié)束,但是程序并不會(huì)退出。run_forever 會(huì)一直運(yùn)行,直到 stop 被調(diào)用,但是你不能像下面這樣調(diào) stop

        loop.run_forever()
        loop.stop()

        run_forever 不返回,stop 永遠(yuǎn)也不會(huì)被調(diào)用。所以,只能在協(xié)程中調(diào) stop

        async def do_some_work(loop, x):
            print('Waiting ' + str(x))
            await asyncio.sleep(x)
            print('Done')
            loop.stop()

        這樣并非沒有問題,假如有多個(gè)協(xié)程在 loop 里運(yùn)行:

        asyncio.ensure_future(do_some_work(loop1))
        asyncio.ensure_future(do_some_work(loop3))

        loop.run_forever()

        第二個(gè)協(xié)程沒結(jié)束,loop 就停止了——被先結(jié)束的那個(gè)協(xié)程給停掉的。
        要解決這個(gè)問題,可以用 gather 把多個(gè)協(xié)程合并成一個(gè) future,并添加回調(diào),然后在回調(diào)里再去停止 loop。

        async def do_some_work(loop, x):
            print('Waiting ' + str(x))
            await asyncio.sleep(x)
            print('Done')

        def done_callback(loop, futu):
            loop.stop()

        loop = asyncio.get_event_loop()

        futus = asyncio.gather(do_some_work(loop1), do_some_work(loop3))
        futus.add_done_callback(functools.partial(done_callback, loop))

        loop.run_forever()

        其實(shí)這基本上就是 run_until_complete 的實(shí)現(xiàn)了,run_until_complete 在內(nèi)部也是調(diào)用 run_forever。

        6. Close Loop?

        以上示例都沒有調(diào)用 loop.close,好像也沒有什么問題。所以到底要不要調(diào) loop.close 呢?
        簡(jiǎn)單來說,loop 只要不關(guān)閉,就還可以再運(yùn)行。:

        loop.run_until_complete(do_some_work(loop1))
        loop.run_until_complete(do_some_work(loop3))
        loop.close()

        但是如果關(guān)閉了,就不能再運(yùn)行了:

        loop.run_until_complete(do_some_work(loop1))
        loop.close()
        loop.run_until_complete(do_some_work(loop3))  # 此處異常

        建議調(diào)用 loop.close,以徹底清理 loop 對(duì)象防止誤用。

        7. gather 和 wait

        asyncio.gatherasyncio.wait 功能相似。

        coros = [do_some_work(loop1), do_some_work(loop3)]
        loop.run_until_complete(asyncio.wait(coros))

        具體差別可請(qǐng)參見 StackOverflow 的討論:Asyncio.gather vs asyncio.wait。

        8. Timer

        C++ Boost.Asio 提供了 IO 對(duì)象 timer,但是 Python 并沒有原生支持 timer,不過可以用 asyncio.sleep 模擬。

        async def timer(x, cb):
            futu = asyncio.ensure_future(asyncio.sleep(x))
            futu.add_done_callback(cb)
            await futu

        t = timer(3lambda futu: print('Done'))
        loop.run_until_complete(t)
        AirPython 技術(shù)交流群開放啦!
        群里既有國內(nèi)一二線大廠在職員工,也有國內(nèi)外高校在讀學(xué)生,既有十多年碼齡的編程老鳥,也有中小學(xué)剛剛?cè)腴T的新人,學(xué)習(xí)氛圍良好!
        想入群的同學(xué),掃描下方二維碼,并備注『 交流群 』,拉你進(jìn)群!(謝絕廣告黨,非誠勿擾?。?/span>~

        近期文章推薦:

        為了幫助剪輯小姐姐少熬夜,我用 Python 硬肝了一次短視頻音頻創(chuàng)作

        逆向爬蟲時(shí),Python 如何正確調(diào)用 JAR 加密邏輯?

        干貨 | Jmeter 如何保證搶購、秒殺活動(dòng)正常運(yùn)行?

        微信朋友圈被折疊?會(huì)自動(dòng)化不存在的(下)


        感謝創(chuàng)作者的好文
        瀏覽 38
        點(diǎn)贊
        評(píng)論
        收藏
        分享

        手機(jī)掃一掃分享

        分享
        舉報(bào)
        評(píng)論
        圖片
        表情
        推薦
        點(diǎn)贊
        評(píng)論
        收藏
        分享

        手機(jī)掃一掃分享

        分享
        舉報(bào)
        1. <strong id="7actg"></strong>
        2. <table id="7actg"></table>

        3. <address id="7actg"></address>
          <address id="7actg"></address>
          1. <object id="7actg"><tt id="7actg"></tt></object>
            日韩一级视频在线观看 | 亚洲AV无码专区在线 | 成人免费A级视频 | 欧美狂野粗黑大 | 黄色AAA片 | 欧美激情偷拍 | 尤物国产精品 | 欧美性猛交xxx乱久交 | 无码人妻一区二区三区AⅤ抖音 | 五月天色婷婷丁香 |