1. Python | 小白的 Asyncio 教程

        共 5176字,需瀏覽 11分鐘

         ·

        2020-12-22 12:14

        作者:adam1q84
        原文:https://segmentfault.com/a/1190000008814676


        所謂「異步 IO」,就是你發(fā)起一個 IO 操作,卻不用等它結束,你可以繼續(xù)做其他事情,當它結束時,你會得到通知。Asyncio 是并發(fā)(concurrency)的一種方式。對 Python 來說,并發(fā)還可以通過線程(threading)和多進程(multiprocessing)來實現。


        Asyncio 并不能帶來真正的并行(parallelism)。當然,因為 GIL(全局解釋器鎖)的存在,Python 的多線程也不能帶來真正的并行??山唤o asyncio 執(zhí)行的任務,稱為協程(coroutine)。一個協程可以放棄執(zhí)行,把機會讓給其它協程(即 yield from 或 await)。


        定義協程


        協程的定義,需要使用 async def 語句。

        async def do_some_work(x): pass

        do_some_work 便是一個協程。準確來說,do_some_work 是一個協程函數,可以通過 asyncio.iscoroutinefunction 來驗證:

        print(asyncio.iscoroutinefunction(do_some_work)) # True

        這個協程什么都沒做,我們讓它睡眠幾秒,以模擬實際的工作量 :

        async def do_some_work(x):    print("Waiting " + str(x))????await?asyncio.sleep(x)

        在解釋 await 之前,有必要說明一下協程可以做哪些事。協程可以:


        * 等待一個 future 結束
        * 等待另一個協程(產生一個結果,或引發(fā)一個異常)
        * 產生一個結果給正在等它的協程
        * 引發(fā)一個異常給正在等它的協程


        asyncio.sleep 也是一個協程,所以 await asyncio.sleep(x)?就是等待另一個協程??蓞⒁?asyncio.sleep 的文檔:

        sleep(delay, result=None, *, loop=None)# Coroutine that completes after a given time (in seconds).


        運行協程


        調用協程函數,協程并不會開始運行,只是返回一個協程對象,可以asyncio.iscoroutine 來驗證:


        print(asyncio.iscoroutine(do_some_work(3))) # True

        此處還會引發(fā)一條警告:

        async1.py:16: RuntimeWarning: coroutine 'do_some_work' was never awaited  print(asyncio.iscoroutine(do_some_work(3)))


        要讓這個協程對象運行的話,有兩種方式:


        * 在另一個已經運行的協程中用 `await` 等待它

        * 通過 `ensure_future` 函數計劃它的執(zhí)行


        簡單來說,只有 loop 運行了,協程才可能運行。下面先拿到當前線程缺省的 loop ,然后把協程對象交給 loop.run_until_complete,協程對象隨后會在 loop 里得到運行。

        loop = asyncio.get_event_loop()loop.run_until_complete(do_some_work(3))


        run_until_complete 是一個阻塞(blocking)調用,直到協程運行結束,它才返回。這一點從函數名不難看出。run_until_complete 的參數是一個 future,但是我們這里傳給它的卻是協程對象,之所以能這樣,是因為它在內部做了檢查,通過 ensure_future 函數把協程對象包裝(wrap)成了 future。所以,我們可以寫得更明顯一些:


        loop.run_until_complete(asyncio.ensure_future(do_some_work(3)))

        完整代碼:

        import asyncio
        async def do_some_work(x): print("Waiting " + str(x)) await asyncio.sleep(x)
        loop = asyncio.get_event_loop()loop.run_until_complete(do_some_work(3))

        運行結果:

        Waiting 3<三秒鐘后程序結束>


        回調


        假如協程是一個 IO 的讀操作,等它讀完數據后,我們希望得到通知,以便下一步數據的處理。這一需求可以通過往 future 添加回調來實現。


        def done_callback(futu):    print('Done')
        futu = asyncio.ensure_future(do_some_work(3))futu.add_done_callback(done_callback)
        loop.run_until_complete(futu)


        多個協程


        實際項目中,往往有多個協程,同時在一個 loop 里運行。為了把多個協程交給 loop,需要借助 asyncio.gather 函數。

        loop.run_until_complete(asyncio.gather(do_some_work(1), do_some_work(3)))

        或者先把協程存在列表里:

        coros = [do_some_work(1), do_some_work(3)]loop.run_until_complete(asyncio.gather(*coros))

        運行結果:

        Waiting 3Waiting 1<等待三秒鐘>Done

        這兩個協程是并發(fā)運行的,所以等待的時間不是 1 + 3 = 4 秒,而是以耗時較長的那個協程為準。


        參考函數 gather 的文檔:


        gather(*coros_or_futures, loop=None, return_exceptions=False)
        Return a future aggregating results from the given coroutines or futures.


        發(fā)現也可以傳 futures 給它:


        futus = [asyncio.ensure_future(do_some_work(1)),             asyncio.ensure_future(do_some_work(3))]
        loop.run_until_complete(asyncio.gather(*futus))


        gather 起聚合作用,把多個 futures 包裝成單個 future,因為 loop.run_until_complete 只接受單個 future。


        run_until_complete 和 run_forever


        我們一直通過 run_until_complete 來運行 loop ,等到 future 完成, run_until_complete 也就返回了。


        async def do_some_work(x):    print('Waiting ' + str(x))    await asyncio.sleep(x)    print('Done')
        loop = asyncio.get_event_loop()
        coro = do_some_work(3)loop.run_until_complete(coro)

        輸出:

        Waiting 3
        <等待三秒鐘>
        Done
        <程序退出>

        現在改用 run_forever:


        async def do_some_work(x):    print('Waiting ' + str(x))    await asyncio.sleep(x)    print('Done')
        loop = asyncio.get_event_loop()
        coro = do_some_work(3)asyncio.ensure_future(coro)
        loop.run_forever()

        輸出:

        Waiting 3<等待三秒鐘>Done<程序沒有退出>


        三秒鐘過后,future 結束,但是程序并不會退出。run_forever 會一直運行,直到 stop 被調用,但是你不能像下面這樣調 stop:


        loop.run_forever()loop.stop()

        run_forever 不返回,stop 永遠也不會被調用。所以,只能在協程中調 stop:

        async def do_some_work(loop, x):    print('Waiting ' + str(x))    await asyncio.sleep(x)    print('Done')    loop.stop()

        這樣并非沒有問題,假如有多個協程在 loop 里運行:

        asyncio.ensure_future(do_some_work(loop, 1))asyncio.ensure_future(do_some_work(loop, 3))
        loop.run_forever()

        第二個協程沒結束,loop 就停止了——被先結束的那個協程給停掉的。要解決這個問題,可以用 gather 把多個協程合并成一個 future,并添加回調,然后在回調里再去停止 loop。

        async def do_some_work(loop, x):    print('Waiting ' + str(x))    await asyncio.sleep(x)    print('Done')
        def done_callback(loop, futu): loop.stop()
        loop = asyncio.get_event_loop()
        futus = asyncio.gather(do_some_work(loop, 1), do_some_work(loop, 3))futus.add_done_callback(functools.partial(done_callback, loop))
        loop.run_forever()

        其實這基本上就是 run_until_complete 的實現了,run_until_complete 在內部也是調用 run_forever。


        Close Loop?


        以上示例都沒有調用 loop.close,好像也沒有什么問題。所以到底要不要調 loop.close 呢?


        簡單來說,loop 只要不關閉,就還可以再運行。:


        loop.run_until_complete(do_some_work(loop, 1))loop.run_until_complete(do_some_work(loop, 3))loop.close()

        但是如果關閉了,就不能再運行了:

        loop.run_until_complete(do_some_work(loop, 1))loop.close()loop.run_until_complete(do_some_work(loop, 3))  # 此處異常


        建議調用 loop.close,以徹底清理 loop 對象防止誤用。


        gather vs. wait


        asyncio.gather 和 asyncio.wait 功能相似。

        coros = [do_some_work(loop, 1), do_some_work(loop, 3)]loop.run_until_complete(asyncio.wait(coros))


        具體差別可請參見 StackOverflow 的討論:Asyncio.gather vs asyncio.wait。


        C++ Boost.Asio 提供了 IO 對象 timer,但是 Python 并沒有原生支持 timer,不過可以用 asyncio.sleep 模擬。


        async def timer(x, cb):    futu = asyncio.ensure_future(asyncio.sleep(x))    futu.add_done_callback(cb)    await futu
        t = timer(3, lambda futu: print('Done'))loop.run_until_complete(t)

        瀏覽 33
        點贊
        評論
        收藏
        分享

        手機掃一掃分享

        分享
        舉報
        評論
        圖片
        表情
        推薦
        點贊
        評論
        收藏
        分享

        手機掃一掃分享

        分享
        舉報
          
          

            1. 波多野42部无码喷潮在线 | 蜜臀av在线播放一区二区三区 | 欧美插菊花综合网 | 成熟护士长的蚌肉的滋味 | 扒开腿躁狂女人爽出白浆 |