1. <strong id="7actg"></strong>
    2. <table id="7actg"></table>

    3. <address id="7actg"></address>
      <address id="7actg"></address>
      1. <object id="7actg"><tt id="7actg"></tt></object>

        詳解Python多線程、多進(jìn)程

        共 42992字,需瀏覽 86分鐘

         ·

        2023-11-07 12:39


        在學(xué)習(xí)Python的過程中,有接觸到多線程編程相關(guān)的知識(shí)點(diǎn),先前一直都沒有徹底的搞明白。今天準(zhǔn)備花一些時(shí)間,把里面的細(xì)節(jié)盡可能的梳理清楚。

        線程與進(jìn)程的區(qū)別

        進(jìn)程(process)和線程(thread)是操作系統(tǒng)的基本概念,但是它們比較抽象,不容易掌握。關(guān)于多進(jìn)程和多線程,教科書上最經(jīng)典的一句話是“進(jìn)程是資源分配的最小單位,線程是CPU調(diào)度的最小單位”。線程是程序中一個(gè)單一的順序控制流程。進(jìn)程內(nèi)一個(gè)相對(duì)獨(dú)立的、可調(diào)度的執(zhí)行單元,是系統(tǒng)獨(dú)立調(diào)度和分派CPU的基本單位指運(yùn)行中的程序的調(diào)度單位。在單個(gè)程序中同時(shí)運(yùn)行多個(gè)線程完成不同的工作,稱為多線程。

        進(jìn)程和線程區(qū)別

        進(jìn)程是資源分配的基本單位。所有與該進(jìn)程有關(guān)的資源,都被記錄在進(jìn)程控制塊PCB中。以表示該進(jìn)程擁有這些資源或正在使用它們。另外,進(jìn)程也是搶占處理機(jī)的調(diào)度單位,它擁有一個(gè)完整的虛擬地址空間。當(dāng)進(jìn)程發(fā)生調(diào)度時(shí),不同的進(jìn)程擁有不同的虛擬地址空間,而同一進(jìn)程內(nèi)的不同線程共享同一地址空間。
        與進(jìn)程相對(duì)應(yīng),線程與資源分配無關(guān),它屬于某一個(gè)進(jìn)程,并與進(jìn)程內(nèi)的其他線程一起共享進(jìn)程的資源。線程只由相關(guān)堆棧(系統(tǒng)棧或用戶棧)寄存器和線程控制表TCB組成。寄存器可被用來存儲(chǔ)線程內(nèi)的局部變量,但不能存儲(chǔ)其他線程的相關(guān)變量。
        通常在一個(gè)進(jìn)程中可以包含若干個(gè)線程,它們可以利用進(jìn)程所擁有的資源。在引入線程的操作系統(tǒng)中,通常都是把進(jìn)程作為分配資源的基本單位,而把線程作為獨(dú)立運(yùn)行和獨(dú)立調(diào)度的基本單位。
        由于線程比進(jìn)程更小,基本上不擁有系統(tǒng)資源,故對(duì)它的調(diào)度所付出的開銷就會(huì)小得多,能更高效的提高系統(tǒng)內(nèi)多個(gè)程序間并發(fā)執(zhí)行的程度,從而顯著提高系統(tǒng)資源的利用率和吞吐量。
        因而近年來推出的通用操作系統(tǒng)都引入了線程,以便進(jìn)一步提高系統(tǒng)的并發(fā)性,并把它視為現(xiàn)代操作系統(tǒng)的一個(gè)重要指標(biāo)。

        線程與進(jìn)程的區(qū)別可以歸納為以下4點(diǎn):

        • 地址空間和其它資源(如打開文件):進(jìn)程間相互獨(dú)立,同一進(jìn)程的各線程間共享。某進(jìn)程內(nèi)的線程在其它進(jìn)程不可見。

        • 通信:進(jìn)程間通信IPC,線程間可以直接讀寫進(jìn)程數(shù)據(jù)段(如全局變量)來進(jìn)行通信——需要進(jìn)程同步和互斥手段的輔助,以保證數(shù)據(jù)的一致性。

        • 調(diào)度和切換:線程上下文切換比進(jìn)程上下文切換要快得多。

        • 在多線程OS中,進(jìn)程不是一個(gè)可執(zhí)行的實(shí)體。

        多進(jìn)程和多線程的比較

        對(duì)比維度 多進(jìn)程 多線程 總結(jié)
        數(shù)據(jù)共享、同步 數(shù)據(jù)共享復(fù)雜,同步簡單 數(shù)據(jù)共享簡單,同步復(fù)雜 各有優(yōu)劣
        內(nèi)存、CPU 占用內(nèi)存多,切換復(fù)雜,CPU利用率低 占用內(nèi)存少,切換簡單,CPU利用率高 線程占優(yōu)
        創(chuàng)建、銷毀、切換 復(fù)雜,速度慢 簡單,速度快 線程占優(yōu)
        編程、調(diào)試 編程簡單,調(diào)試簡單 編程復(fù)雜,調(diào)試復(fù)雜 進(jìn)程占優(yōu)
        可靠性 進(jìn)程間不會(huì)互相影響 一個(gè)線程掛掉將導(dǎo)致整個(gè)進(jìn)程掛掉 進(jìn)程占優(yōu)
        分布式 適用于多核、多機(jī),擴(kuò)展到多臺(tái)機(jī)器簡單 適合于多核 進(jìn)程占優(yōu)
        總結(jié),進(jìn)程和線程還可以類比為火車和車廂:
        • 線程在進(jìn)程下行進(jìn)(單純的車廂無法運(yùn)行)
        • 一個(gè)進(jìn)程可以包含多個(gè)線程(一輛火車可以有多個(gè)車廂)
        • 不同進(jìn)程間數(shù)據(jù)很難共享(一輛火車上的乘客很難換到另外一輛火車,比如站點(diǎn)換乘)
        • 同一進(jìn)程下不同線程間數(shù)據(jù)很易共享(A車廂換到B車廂很容易)
        • 進(jìn)程要比線程消耗更多的計(jì)算機(jī)資源(采用多列火車相比多個(gè)車廂更耗資源)
        • 進(jìn)程間不會(huì)相互影響,一個(gè)線程掛掉將導(dǎo)致整個(gè)進(jìn)程掛掉(一列火車不會(huì)影響到另外一列火車,但是如果一列火車上中間的一節(jié)車廂著火了,將影響到該趟火車的所有車廂)
        • 進(jìn)程可以拓展到多機(jī),進(jìn)程最多適合多核(不同火車可以開在多個(gè)軌道上,同一火車的車廂不能在行進(jìn)的不同的軌道上)
        • 進(jìn)程使用的內(nèi)存地址可以上鎖,即一個(gè)線程使用某些共享內(nèi)存時(shí),其他線程必須等它結(jié)束,才能使用這一塊內(nèi)存。(比如火車上的洗手間)-”互斥鎖(mutex)”
        • 進(jìn)程使用的內(nèi)存地址可以限定使用量(比如火車上的餐廳,最多只允許多少人進(jìn)入,如果滿了需要在門口等,等有人出來了才能進(jìn)去)-“信號(hào)量(semaphore)”

        Python全局解釋器鎖GIL

        全局解釋器鎖(英語:Global Interpreter Lock,縮寫GIL),并不是Python的特性,它是在實(shí)現(xiàn)Python解析器(CPython)時(shí)所引入的一個(gè)概念。由于CPython是大部分環(huán)境下默認(rèn)的Python執(zhí)行環(huán)境。所以在很多人的概念里CPython就是Python,也就想當(dāng)然的把GIL歸結(jié)為Python語言的缺陷。那么CPython實(shí)現(xiàn)中的GIL又是什么呢?來看看官方的解釋:

        The mechanism used by the CPython interpreter to assure that only one thread executes Python bytecode at a time. This simplifies the CPython implementation by making the object model (including critical built-in types such as dict) implicitly safe against concurrent access. Locking the entire interpreter makes it easier for the interpreter to be multi-threaded, at the expense of much of the parallelism afforded by multi-processor machines.

        Python代碼的執(zhí)行由Python 虛擬機(jī)(也叫解釋器主循環(huán),CPython版本)來控制,Python 在設(shè)計(jì)之初就考慮到要在解釋器的主循環(huán)中,同時(shí)只有一個(gè)線程在執(zhí)行,即在任意時(shí)刻,只有一個(gè)線程在解釋器中運(yùn)行。對(duì)Python 虛擬機(jī)的訪問由全局解釋器鎖(GIL)來控制,正是這個(gè)鎖能保證同一時(shí)刻只有一個(gè)線程在運(yùn)行。

        GIL 有什么好處?簡單來說,它在單線程的情況更快,并且在和 C 庫結(jié)合時(shí)更方便,而且不用考慮線程安全問題,這也是早期 Python 最常見的應(yīng)用場景和優(yōu)勢。另外,GIL的設(shè)計(jì)簡化了CPython的實(shí)現(xiàn),使得對(duì)象模型,包括關(guān)鍵的內(nèi)建類型如字典,都是隱含可以并發(fā)訪問的。鎖住全局解釋器使得比較容易的實(shí)現(xiàn)對(duì)多線程的支持,但也損失了多處理器主機(jī)的并行計(jì)算能力。

        在多線程環(huán)境中,Python 虛擬機(jī)按以下方式執(zhí)行:

        1. 設(shè)置GIL
        2. 切換到一個(gè)線程去運(yùn)行
        3. 運(yùn)行直至指定數(shù)量的字節(jié)碼指令,或者線程主動(dòng)讓出控制(可以調(diào)用sleep(0))
        4. 把線程設(shè)置為睡眠狀態(tài)
        5. 解鎖GIL
        6. 再次重復(fù)以上所有步驟


        Python3.2前,GIL的釋放邏輯是當(dāng)前線程遇見IO操作或者ticks計(jì)數(shù)達(dá)到100(ticks可以看作是python自身的一個(gè)計(jì)數(shù)器,專門做用于GIL,每次釋放后歸零,這個(gè)計(jì)數(shù)可以通過 sys.setcheckinterval 來調(diào)整),進(jìn)行釋放。因?yàn)橛?jì)算密集型線程在釋放GIL之后又會(huì)立即去申請GIL,并且通常在其它線程還沒有調(diào)度完之前它就已經(jīng)重新獲取到了GIL,就會(huì)導(dǎo)致一旦計(jì)算密集型線程獲得了GIL,那么它在很長一段時(shí)間內(nèi)都將占據(jù)GIL,甚至一直到該線程執(zhí)行結(jié)束。
        Python 3.2開始使用新的GIL。新的GIL實(shí)現(xiàn)中用一個(gè)固定的超時(shí)時(shí)間來指示當(dāng)前的線程放棄全局鎖。在當(dāng)前線程保持這個(gè)鎖,且其他線程請求這個(gè)鎖時(shí),當(dāng)前線程就會(huì)在5毫秒后被強(qiáng)制釋放該鎖。該改進(jìn)在單核的情況下,對(duì)于單個(gè)線程長期占用GIL的情況有所好轉(zhuǎn)。
        在單核CPU上,數(shù)百次的間隔檢查才會(huì)導(dǎo)致一次線程切換。在多核CPU上,存在嚴(yán)重的線程顛簸(thrashing)。而每次釋放GIL鎖,線程進(jìn)行鎖競爭、切換線程,會(huì)消耗資源。單核下多線程,每次釋放GIL,喚醒的那個(gè)線程都能獲取到GIL鎖,所以能夠無縫執(zhí)行,但多核下,CPU0釋放GIL后,其他CPU上的線程都會(huì)進(jìn)行競爭,但GIL可能會(huì)馬上又被CPU0拿到,導(dǎo)致其他幾個(gè)CPU上被喚醒后的線程會(huì)醒著等待到切換時(shí)間后又進(jìn)入待調(diào)度狀態(tài),這樣會(huì)造成線程顛簸(thrashing),導(dǎo)致效率更低。
        另外,從上面的實(shí)現(xiàn)機(jī)制可以推導(dǎo)出,Python的多線程對(duì)IO密集型代碼要比CPU密集型代碼更加友好。
        針對(duì)GIL的應(yīng)對(duì)措施:
        • 使用更高版本Python(對(duì)GIL機(jī)制進(jìn)行了優(yōu)化)
        • 使用多進(jìn)程替換多線程(多進(jìn)程之間沒有GIL,但是進(jìn)程本身的資源消耗較多)
        • 指定cpu運(yùn)行線程(使用affinity模塊)
        • 使用Jython、IronPython等無GIL解釋器
        • 全I(xiàn)O密集型任務(wù)時(shí)才使用多線程
        • 使用協(xié)程(高效的單線程模式,也稱微線程;通常與多進(jìn)程配合使用)
        • 將關(guān)鍵組件用C/C++編寫為Python擴(kuò)展,通過ctypes使Python程序直接調(diào)用C語言編譯的動(dòng)態(tài)鏈接庫的導(dǎo)出函數(shù)。(with nogil調(diào)出GIL限制)

        Python的多進(jìn)程包multiprocessing

        Python的threading包主要運(yùn)用多線程的開發(fā),但由于GIL的存在,Python中的多線程其實(shí)并不是真正的多線程,如果想要充分地使用多核CPU的資源,大部分情況需要使用多進(jìn)程。在Python 2.6版本的時(shí)候引入了multiprocessing包,它完整的復(fù)制了一套threading所提供的接口方便遷移。唯一的不同就是它使用了多進(jìn)程而不是多線程。每個(gè)進(jìn)程有自己的獨(dú)立的GIL,因此也不會(huì)出現(xiàn)進(jìn)程之間的GIL爭搶。
        借助這個(gè)multiprocessing,你可以輕松完成從單進(jìn)程到并發(fā)執(zhí)行的轉(zhuǎn)換。multiprocessing支持子進(jìn)程、通信和共享數(shù)據(jù)、執(zhí)行不同形式的同步,提供了Process、Queue、Pipe、Lock等組件。

        Multiprocessing產(chǎn)生的背景

        除了應(yīng)對(duì)Python的GIL以外,產(chǎn)生multiprocessing的另外一個(gè)原因時(shí)Windows操作系統(tǒng)與Linux/Unix系統(tǒng)的不一致。
        Unix/Linux操作系統(tǒng)提供了一個(gè)fork()系統(tǒng)調(diào)用,它非常特殊。普通的函數(shù),調(diào)用一次,返回一次,但是fork()調(diào)用一次,返回兩次,因?yàn)椴僮飨到y(tǒng)自動(dòng)把當(dāng)前進(jìn)程(父進(jìn)程)復(fù)制了一份(子進(jìn)程),然后,分別在父進(jìn)程和子進(jìn)程內(nèi)返回。子進(jìn)程永遠(yuǎn)返回0,而父進(jìn)程返回子進(jìn)程的ID。這樣做的理由是,一個(gè)父進(jìn)程可以fork出很多子進(jìn)程,所以,父進(jìn)程要記下每個(gè)子進(jìn)程的ID,而子進(jìn)程只需要調(diào)用getpid()就可以拿到父進(jìn)程的ID。
        Python的os模塊封裝了常見的系統(tǒng)調(diào)用,其中就包括fork,可以在Python程序中輕松創(chuàng)建子進(jìn)程:
        import os

        print('Process (%s) start...' % os.getpid())

        \# Only works on Unix/Linux/Mac:

        pid = os.fork()

        if pid == 0:

            print('I am child process (%s) and my parent is %s.' % (os.getpid(), os.getppid()))

        else:

            print('I (%s) just created a child process (%s).' % (os.getpid(), pid))

        上述代碼在Linux、Unix和Mac上的執(zhí)行結(jié)果為:

        Process (876) start...

        I (876) just created a child process (877).

        I am child process (877) and my parent is 876.
        有了fork調(diào)用,一個(gè)進(jìn)程在接到新任務(wù)時(shí)就可以復(fù)制出一個(gè)子進(jìn)程來處理新任務(wù),常見的Apache服務(wù)器就是由父進(jìn)程監(jiān)聽端口,每當(dāng)有新的http請求時(shí),就fork出子進(jìn)程來處理新的http請求。
        由于Windows沒有fork調(diào)用,上面的代碼在Windows上無法運(yùn)行。由于Python是跨平臺(tái)的,自然也應(yīng)該提供一個(gè)跨平臺(tái)的多進(jìn)程支持。multiprocessing模塊就是跨平臺(tái)版本的多進(jìn)程模塊。multiprocessing模塊封裝了fork()調(diào)用,使我們不需要關(guān)注fork()的細(xì)節(jié)。由于Windows沒有fork調(diào)用,因此,multiprocessing需要“模擬”出fork的效果。

        multiprocessing常用組件及功能


        創(chuàng)建管理進(jìn)程模塊:

        • Process(用于創(chuàng)建進(jìn)程)
        • Pool(用于創(chuàng)建管理進(jìn)程池)
        • Queue(用于進(jìn)程通信,資源共享)
        • Value,Array(用于進(jìn)程通信,資源共享)
        • Pipe(用于管道通信)
        • Manager(用于資源共享)

        同步子進(jìn)程模塊:

        • Condition(條件變量)
        • Event(事件)
        • Lock(互斥鎖)
        • RLock(可重入的互斥鎖(同一個(gè)進(jìn)程可以多次獲得它,同時(shí)不會(huì)造成阻塞)
        • Semaphore(信號(hào)量)

        接下來就一起來學(xué)習(xí)下每個(gè)組件及功能的具體使用方法。

        Process(用于創(chuàng)建進(jìn)程)

        multiprocessing模塊提供了一個(gè)Process類來代表一個(gè)進(jìn)程對(duì)象。

        在multiprocessing中,每一個(gè)進(jìn)程都用一個(gè)Process類來表示。

        構(gòu)造方法:Process([group [, target [, name [, args [, kwargs]]]]])

        • group:分組,實(shí)際上不使用,值始終為None
        • target:表示調(diào)用對(duì)象,即子進(jìn)程要執(zhí)行的任務(wù),你可以傳入方法名
        • name:為子進(jìn)程設(shè)定名稱
        • args:要傳給target函數(shù)的位置參數(shù),以元組方式進(jìn)行傳入。
        • kwargs:要傳給target函數(shù)的字典參數(shù),以字典方式進(jìn)行傳入。

        實(shí)例方法:

        • start():啟動(dòng)進(jìn)程,并調(diào)用該子進(jìn)程中的p.run()
        • run():進(jìn)程啟動(dòng)時(shí)運(yùn)行的方法,正是它去調(diào)用target指定的函數(shù),我們自定義類的類中一定要實(shí)現(xiàn)該方法
        • terminate():強(qiáng)制終止進(jìn)程p,不會(huì)進(jìn)行任何清理操作,如果p創(chuàng)建了子進(jìn)程,該子進(jìn)程就成了僵尸進(jìn)程,使用該方法需要特別小心這種情況。如果p還保存了一個(gè)鎖那么也將不會(huì)被釋放,進(jìn)而導(dǎo)致死鎖
        • is_alive():返回進(jìn)程是否在運(yùn)行。如果p仍然運(yùn)行,返回True
        • join([timeout]):進(jìn)程同步,主進(jìn)程等待子進(jìn)程完成后再執(zhí)行后面的代碼。線程等待p終止(強(qiáng)調(diào):是主線程處于等的狀態(tài),而p是處于運(yùn)行的狀態(tài))。timeout是可選的超時(shí)時(shí)間(超過這個(gè)時(shí)間,父線程不再等待子線程,繼續(xù)往下執(zhí)行),需要強(qiáng)調(diào)的是,p.join只能join住start開啟的進(jìn)程,而不能join住run開啟的進(jìn)程

        屬性介紹:

        • daemon:默認(rèn)值為False,如果設(shè)為True,代表p為后臺(tái)運(yùn)行的守護(hù)進(jìn)程;當(dāng)p的父進(jìn)程終止時(shí),p也隨之終止,并且設(shè)定為True后,p不能創(chuàng)建自己的新進(jìn)程;必須在p.start()之前設(shè)置
        • name:進(jìn)程的名稱
        • pid:進(jìn)程的pid
        • exitcode:進(jìn)程在運(yùn)行時(shí)為None、如果為–N,表示被信號(hào)N結(jié)束(了解即可)
        • authkey:進(jìn)程的身份驗(yàn)證鍵,默認(rèn)是由os.urandom()隨機(jī)生成的32字符的字符串。這個(gè)鍵的用途是為涉及網(wǎng)絡(luò)連接的底層進(jìn)程間通信提供安全性,這類連接只有在具有相同的身份驗(yàn)證鍵時(shí)才能成功(了解即可)

        使用示例:(注意:在windows中Process()必須放到if name == ‘main’:下)

        from multiprocessing import Process

        import os

        def run_proc(name):

            print('Run child process %s (%s)...' % (name, os.getpid()))

        if __name__=='__main__':

            print('Parent process %s.' % os.getpid())

            p = Process(target=run_proc, args=('test',))

            print('Child process will start.')

            p.start()

            p.join()

        print('Child process end.')

        Pool(用于創(chuàng)建管理進(jìn)程池)


        Pool類用于需要執(zhí)行的目標(biāo)很多,而手動(dòng)限制進(jìn)程數(shù)量又太繁瑣時(shí),如果目標(biāo)少且不用控制進(jìn)程數(shù)量則可以用Process類。Pool可以提供指定數(shù)量的進(jìn)程,供用戶調(diào)用,當(dāng)有新的請求提交到Pool中時(shí),如果池還沒有滿,那么就會(huì)創(chuàng)建一個(gè)新的進(jìn)程用來執(zhí)行該請求;但如果池中的進(jìn)程數(shù)已經(jīng)達(dá)到規(guī)定最大值,那么該請求就會(huì)等待,直到池中有進(jìn)程結(jié)束,就重用進(jìn)程池中的進(jìn)程。
        構(gòu)造方法:Pool([processes[, initializer[, initargs[, maxtasksperchild[, context]]]]])
        • processes :要?jiǎng)?chuàng)建的進(jìn)程數(shù),如果省略,將默認(rèn)使用cpu_count()返回的數(shù)量。
        • initializer:每個(gè)工作進(jìn)程啟動(dòng)時(shí)要執(zhí)行的可調(diào)用對(duì)象,默認(rèn)為None。如果initializer是None,那么每一個(gè)工作進(jìn)程在開始的時(shí)候會(huì)調(diào)用initializer(*initargs)。
        • initargs:是要傳給initializer的參數(shù)組。
        • maxtasksperchild:工作進(jìn)程退出之前可以完成的任務(wù)數(shù),完成后用一個(gè)新的工作進(jìn)程來替代原進(jìn)程,來讓閑置的資源被釋放。maxtasksperchild默認(rèn)是None,意味著只要Pool存在工作進(jìn)程就會(huì)一直存活。
        • context: 用在制定工作進(jìn)程啟動(dòng)時(shí)的上下文,一般使用Pool() 或者一個(gè)context對(duì)象的Pool()方法來創(chuàng)建一個(gè)池,兩種方法都適當(dāng)?shù)脑O(shè)置了context。
        實(shí)例方法:
        • apply(func[, args[, kwargs]]):在一個(gè)池工作進(jìn)程中執(zhí)行func(args,*kwargs),然后返回結(jié)果。需要強(qiáng)調(diào)的是:此操作并不會(huì)在所有池工作進(jìn)程中并執(zhí)行func函數(shù)。如果要通過不同參數(shù)并發(fā)地執(zhí)行func函數(shù),必須從不同線程調(diào)用p.apply()函數(shù)或者使用p.apply_async()。它是阻塞的。apply很少使用
        • apply_async(func[, arg[, kwds={}[, callback=None]]]):在一個(gè)池工作進(jìn)程中執(zhí)行func(args,*kwargs),然后返回結(jié)果。此方法的結(jié)果是AsyncResult類的實(shí)例,callback是可調(diào)用對(duì)象,接收輸入?yún)?shù)。當(dāng)func的結(jié)果變?yōu)榭捎脮r(shí),將理解傳遞給callback。callback禁止執(zhí)行任何阻塞操作,否則將接收其他異步操作中的結(jié)果。它是非阻塞。
        • map(func, iterable[, chunksize=None]):Pool類中的map方法,與內(nèi)置的map函數(shù)用法行為基本一致,它會(huì)使進(jìn)程阻塞直到返回結(jié)果。注意,雖然第二個(gè)參數(shù)是一個(gè)迭代器,但在實(shí)際使用中,必須在整個(gè)隊(duì)列都就緒后,程序才會(huì)運(yùn)行子進(jìn)程。
        • map_async(func, iterable[, chunksize=None]):map_async與map的關(guān)系同apply與apply_async
        • imap():imap 與 map的區(qū)別是,map是當(dāng)所有的進(jìn)程都已經(jīng)執(zhí)行完了,并將結(jié)果返回了,imap()則是立即返回一個(gè)iterable可迭代對(duì)象。
        • imap_unordered():不保證返回的結(jié)果順序與進(jìn)程添加的順序一致。
        • close():關(guān)閉進(jìn)程池,防止進(jìn)一步操作。如果所有操作持續(xù)掛起,它們將在工作進(jìn)程終止前完成。
        • join():等待所有工作進(jìn)程退出。此方法只能在close()或teminate()之后調(diào)用,讓其不再接受新的Process。
        • terminate():結(jié)束工作進(jìn)程,不再處理未處理的任務(wù)。
        方法apply_async()和map_async()的返回值是AsyncResul的實(shí)例obj。實(shí)例具有以下方法:
        • get():返回結(jié)果,如果有必要?jiǎng)t等待結(jié)果到達(dá)。timeout是可選的。如果在指定時(shí)間內(nèi)還沒有到達(dá),將引發(fā)異常。如果遠(yuǎn)程操作中引發(fā)了異常,它將在調(diào)用此方法時(shí)再次被引發(fā)。
        • ready():如果調(diào)用完成,返回True
        • successful():如果調(diào)用完成且沒有引發(fā)異常,返回True,如果在結(jié)果就緒之前調(diào)用此方法,引發(fā)異常
        • wait([timeout]):等待結(jié)果變?yōu)榭捎谩?/span>
        • terminate():立即終止所有工作進(jìn)程,同時(shí)不執(zhí)行任何清理或結(jié)束任何掛起工作。如果p被垃圾回收,將自動(dòng)調(diào)用此函數(shù)

        使用示例:

        \# -*- coding:utf-8 -*-

        \# Pool+map

        from multiprocessing import Pool

        def test(i):

            print(i)

        if __name__ == "__main__":

            lists = range(100)

            pool = Pool(8)

            pool.map(test, lists)

            pool.close()

            pool.join()
        \# -*- coding:utf-8 -*-

        \# 異步進(jìn)程池(非阻塞)

        from multiprocessing import Pool

        def test(i):

            print(i)

        if __name__ == "__main__":

            pool = Pool(8)

            for i in range(100):

                '''

                For循環(huán)中執(zhí)行步驟:

                (1)循環(huán)遍歷,將100個(gè)子進(jìn)程添加到進(jìn)程池(相對(duì)父進(jìn)程會(huì)阻塞)

                (2)每次執(zhí)行8個(gè)子進(jìn)程,等一個(gè)子進(jìn)程執(zhí)行完后,立馬啟動(dòng)新的子進(jìn)程。(相對(duì)父進(jìn)程不阻塞)

                apply_async為異步進(jìn)程池寫法。異步指的是啟動(dòng)子進(jìn)程的過程,與父進(jìn)程本身的執(zhí)行(print)是異步的,而For循環(huán)中往進(jìn)程池添加子進(jìn)程的過程,與父進(jìn)程本身的執(zhí)行卻是同步的。

                '''


                pool.apply_async(test, args=(i,))  # 維持執(zhí)行的進(jìn)程總數(shù)為8,當(dāng)一個(gè)進(jìn)程執(zhí)行完后啟動(dòng)一個(gè)新進(jìn)程.

            print("test")

            pool.close()

            pool.join()
        \# -*- coding:utf-8 -*-

        \# 異步進(jìn)程池(非阻塞)

        from multiprocessing import Pool

        def test(i):

            print(i)

        if __name__ == "__main__":

            pool = Pool(8)

            for i in range(100):

                '''

                    實(shí)際測試發(fā)現(xiàn),for循環(huán)內(nèi)部執(zhí)行步驟:

                    (1)遍歷100個(gè)可迭代對(duì)象,往進(jìn)程池放一個(gè)子進(jìn)程

                    (2)執(zhí)行這個(gè)子進(jìn)程,等子進(jìn)程執(zhí)行完畢,再往進(jìn)程池放一個(gè)子進(jìn)程,再執(zhí)行。(同時(shí)只執(zhí)行一個(gè)子進(jìn)程)

                    for循環(huán)執(zhí)行完畢,再執(zhí)行print函數(shù)。

                '''


                pool.apply(test, args=(i,))  # 維持執(zhí)行的進(jìn)程總數(shù)為8,當(dāng)一個(gè)進(jìn)程執(zhí)行完后啟動(dòng)一個(gè)新進(jìn)程.

            print("test")

            pool.close()

            pool.join()

        Queue(用于進(jìn)程通信,資源共享)

        在使用多進(jìn)程的過程中,最好不要使用共享資源。普通的全局變量是不能被子進(jìn)程所共享的,只有通過Multiprocessing組件構(gòu)造的數(shù)據(jù)結(jié)構(gòu)可以被共享。
        Queue是用來創(chuàng)建進(jìn)程間資源共享的隊(duì)列的類,使用Queue可以達(dá)到多進(jìn)程間數(shù)據(jù)傳遞的功能(缺點(diǎn):只適用Process類,不能在Pool進(jìn)程池中使用)。
        構(gòu)造方法:Queue([maxsize])
        • maxsize是隊(duì)列中允許最大項(xiàng)數(shù),省略則無大小限制。
        實(shí)例方法:
        • put():用以插入數(shù)據(jù)到隊(duì)列。put方法還有兩個(gè)可選參數(shù):blocked和timeout。如果blocked為True(默認(rèn)值),并且timeout為正值,該方法會(huì)阻塞timeout指定的時(shí)間,直到該隊(duì)列有剩余的空間。如果超時(shí),會(huì)拋出Queue.Full異常。如果blocked為False,但該Queue已滿,會(huì)立即拋出Queue.Full異常。
        • get():可以從隊(duì)列讀取并且刪除一個(gè)元素。get方法有兩個(gè)可選參數(shù):blocked和timeout。如果blocked為True(默認(rèn)值),并且timeout為正值,那么在等待時(shí)間內(nèi)沒有取到任何元素,會(huì)拋出Queue.Empty異常。如果blocked為False,有兩種情況存在,如果Queue有一個(gè)值可用,則立即返回該值,否則,如果隊(duì)列為空,則立即拋出Queue.Empty異常。若不希望在empty的時(shí)候拋出異常,令blocked為True或者參數(shù)全部置空即可。
        • get_nowait():同q.get(False)
        • put_nowait():同q.put(False)
        • empty():調(diào)用此方法時(shí)q為空則返回True,該結(jié)果不可靠,比如在返回True的過程中,如果隊(duì)列中又加入了項(xiàng)目。
        • full():調(diào)用此方法時(shí)q已滿則返回True,該結(jié)果不可靠,比如在返回True的過程中,如果隊(duì)列中的項(xiàng)目被取走。
        • qsize():返回隊(duì)列中目前項(xiàng)目的正確數(shù)量,結(jié)果也不可靠,理由同q.empty()和q.full()一樣

        使用示例:

        from multiprocessing import Process, Queue

        import os, time, random

        def write(q):

            print('Process to write: %s' % os.getpid())

            for value in ['A''B''C']:

                print('Put %s to queue...' % value)

                q.put(value)

                time.sleep(random.random())

        def read(q):

            print('Process to read: %s' % os.getpid())

            while True:

                value = q.get(True)

                print('Get %s from queue.' % value)

        if __name__ == "__main__":

            q = Queue()

            pw = Process(target=write, args=(q,))

            pr = Process(target=read, args=(q,))

            pw.start()

            pr.start()

            pw.join()  # 等待pw結(jié)束

            pr.terminate()  # pr進(jìn)程里是死循環(huán),無法等待其結(jié)束,只能強(qiáng)行終止
        JoinableQueue就像是一個(gè)Queue對(duì)象,但隊(duì)列允許項(xiàng)目的使用者通知生成者項(xiàng)目已經(jīng)被成功處理。通知進(jìn)程是使用共享的信號(hào)和條件變量來實(shí)現(xiàn)的。
        構(gòu)造方法:JoinableQueue([maxsize])
        • maxsize:隊(duì)列中允許最大項(xiàng)數(shù),省略則無大小限制。
        實(shí)例方法
        JoinableQueue的實(shí)例p除了與Queue對(duì)象相同的方法之外還具有:
        • task_done():使用者使用此方法發(fā)出信號(hào),表示q.get()的返回項(xiàng)目已經(jīng)被處理。如果調(diào)用此方法的次數(shù)大于從隊(duì)列中刪除項(xiàng)目的數(shù)量,將引發(fā)ValueError異常
        • join():生產(chǎn)者調(diào)用此方法進(jìn)行阻塞,直到隊(duì)列中所有的項(xiàng)目均被處理。阻塞將持續(xù)到隊(duì)列中的每個(gè)項(xiàng)目均調(diào)用q.task_done()方法為止

        使用示例:

        \# -*- coding:utf-8 -*-

        from multiprocessing import Process, JoinableQueue

        import time, random

        def consumer(q):

            while True:

                res = q.get()

                print('消費(fèi)者拿到了 %s' % res)

                q.task_done()

        def producer(seq, q):

            for item in seq:

                time.sleep(random.randrange(1,2))

                q.put(item)

                print('生產(chǎn)者做好了 %s' % item)

            q.join()

        if __name__ == "__main__":

            q = JoinableQueue()

            seq = ('產(chǎn)品%s' % i for i in range(5))

            p = Process(target=consumer, args=(q,))

            p.daemon = True  # 設(shè)置為守護(hù)進(jìn)程,在主線程停止時(shí)p也停止,但是不用擔(dān)心,producer內(nèi)調(diào)用q.join保證了consumer已經(jīng)處理完隊(duì)列中的所有元素

            p.start()

            producer(seq, q)

            print('主線程')

        Value,Array(用于進(jìn)程通信,資源共享)

        multiprocessing 中Value和Array的實(shí)現(xiàn)原理都是在共享內(nèi)存中創(chuàng)建ctypes()對(duì)象來達(dá)到共享數(shù)據(jù)的目的,兩者實(shí)現(xiàn)方法大同小異,只是選用不同的ctypes數(shù)據(jù)類型而已。
        Value
        構(gòu)造方法:Value((typecode_or_type, args[, lock])
        • typecode_or_type:定義ctypes()對(duì)象的類型,可以傳Type code或 C Type,具體對(duì)照表見下文。
        • args:傳遞給typecode_or_type構(gòu)造函數(shù)的參數(shù)
        • lock:默認(rèn)為True,創(chuàng)建一個(gè)互斥鎖來限制對(duì)Value對(duì)象的訪問,如果傳入一個(gè)鎖,如Lock或RLock的實(shí)例,將用于同步。如果傳入False,Value的實(shí)例就不會(huì)被鎖保護(hù),它將不是進(jìn)程安全的。
        typecode_or_type支持的類型:
        | Type code | C Type             | Python Type       | Minimum size in bytes |

        | --------- | ------------------ | ----------------- | --------------------- |

        | `'b'`     | signed char        | int               | 1                     |

        | `'B'`     | unsigned char      | int               | 1                     |

        | `'u'`     | Py_UNICODE         | Unicode character | 2                     |

        | `'h'`     | signed short       | int               | 2                     |

        | `'H'`     | unsigned short     | int               | 2                     |

        | `'i'`     | signed int         | int               | 2                     |

        | `'I'`     | unsigned int       | int               | 2                     |

        | `'l'`     | signed long        | int               | 4                     |

        | `'L'`     | unsigned long      | int               | 4                     |

        | `'q'`     | signed long long   | int               | 8                     |

        | `'Q'`     | unsigned long long | int               | 8                     |

        | `'f'`     | float              | float             | 4                     |

        | `'d'`     | double             | float             | 8                     |

        參考地址:https://docs.python.org/3/library/array.html

        Array

        構(gòu)造方法:Array(typecode_or_type, size_or_initializer, **kwds[, lock])

        • typecode_or_type:同上
        • size_or_initializer:如果它是一個(gè)整數(shù),那么它確定數(shù)組的長度,并且數(shù)組將被初始化為零。否則,size_or_initializer是用于初始化數(shù)組的序列,其長度決定數(shù)組的長度。
        • kwds:傳遞給typecode_or_type構(gòu)造函數(shù)的參數(shù)
        • lock:同上

        使用示例:

        import multiprocessing

        def f(n, a):

            n.value = 3.14

            a[0] = 5

        if __name__ == '__main__':

            num = multiprocessing.Value('d', 0.0)

            arr = multiprocessing.Array('i', range(10))

            p = multiprocessing.Process(target=f, args=(num, arr))

            p.start()

            p.join()

            print(num.value)

            print(arr[:])

        注意:Value和Array只適用于Process類。

        Pipe(用于管道通信)

        多進(jìn)程還有一種數(shù)據(jù)傳遞方式叫管道原理和 Queue相同。Pipe可以在進(jìn)程之間創(chuàng)建一條管道,并返回元組(conn1,conn2),其中conn1,conn2表示管道兩端的連接對(duì)象,強(qiáng)調(diào)一點(diǎn):必須在產(chǎn)生Process對(duì)象之前產(chǎn)生管道。
        構(gòu)造方法:Pipe([duplex])
        • dumplex:默認(rèn)管道是全雙工的,如果將duplex射成False,conn1只能用于接收,conn2只能用于發(fā)送。
        實(shí)例方法:
        • send(obj):通過連接發(fā)送對(duì)象。obj是與序列化兼容的任意對(duì)象
        • recv():接收conn2.send(obj)發(fā)送的對(duì)象。如果沒有消息可接收,recv方法會(huì)一直阻塞。如果連接的另外一端已經(jīng)關(guān)閉,那么recv方法會(huì)拋出EOFError。
        • close():關(guān)閉連接。如果conn1被垃圾回收,將自動(dòng)調(diào)用此方法
        • fileno():返回連接使用的整數(shù)文件描述符
        • poll([timeout]):如果連接上的數(shù)據(jù)可用,返回True。timeout指定等待的最長時(shí)限。如果省略此參數(shù),方法將立即返回結(jié)果。如果將timeout射成None,操作將無限期地等待數(shù)據(jù)到達(dá)。
        • recv_bytes([maxlength]):接收c.send_bytes()方法發(fā)送的一條完整的字節(jié)消息。maxlength指定要接收的最大字節(jié)數(shù)。如果進(jìn)入的消息,超過了這個(gè)最大值,將引發(fā)IOError異常,并且在連接上無法進(jìn)行進(jìn)一步讀取。如果連接的另外一端已經(jīng)關(guān)閉,再也不存在任何數(shù)據(jù),將引發(fā)EOFError異常。
        • send_bytes(buffer [, offset [, size]]):通過連接發(fā)送字節(jié)數(shù)據(jù)緩沖區(qū),buffer是支持緩沖區(qū)接口的任意對(duì)象,offset是緩沖區(qū)中的字節(jié)偏移量,而size是要發(fā)送字節(jié)數(shù)。結(jié)果數(shù)據(jù)以單條消息的形式發(fā)出,然后調(diào)用c.recv_bytes()函數(shù)進(jìn)行接收
        • recv_bytes_into(buffer [, offset]):接收一條完整的字節(jié)消息,并把它保存在buffer對(duì)象中,該對(duì)象支持可寫入的緩沖區(qū)接口(即bytearray對(duì)象或類似的對(duì)象)。offset指定緩沖區(qū)中放置消息處的字節(jié)位移。返回值是收到的字節(jié)數(shù)。如果消息長度大于可用的緩沖區(qū)空間,將引發(fā)BufferTooShort異常。

        使用示例:

        from multiprocessing import Process, Pipe

        import time

        \# 子進(jìn)程執(zhí)行方法

        def f(Subconn):

            time.sleep(1)

            Subconn.send("吃了嗎")

            print("來自父親的問候:", Subconn.recv())

            Subconn.close()

        if __name__ == "__main__":

            parent_conn, child_conn = Pipe()  # 創(chuàng)建管道兩端

            p = Process(target=f, args=(child_conn,))  # 創(chuàng)建子進(jìn)程

            p.start()

            print("來自兒子的問候:", parent_conn.recv())

            parent_conn.send("嗯")

        Manager(用于資源共享)

        Manager()返回的manager對(duì)象控制了一個(gè)server進(jìn)程,此進(jìn)程包含的python對(duì)象可以被其他的進(jìn)程通過proxies來訪問。從而達(dá)到多進(jìn)程間數(shù)據(jù)通信且安全。Manager模塊常與Pool模塊一起使用。
        Manager支持的類型有l(wèi)ist,dict,Namespace,Lock,RLock,Semaphore,BoundedSemaphore,Condition,Event,Queue,Value和Array。
        管理器是獨(dú)立運(yùn)行的子進(jìn)程,其中存在真實(shí)的對(duì)象,并以服務(wù)器的形式運(yùn)行,其他進(jìn)程通過使用代理訪問共享對(duì)象,這些代理作為客戶端運(yùn)行。Manager()是BaseManager的子類,返回一個(gè)啟動(dòng)的SyncManager()實(shí)例,可用于創(chuàng)建共享對(duì)象并返回訪問這些共享對(duì)象的代理。
        BaseManager,創(chuàng)建管理器服務(wù)器的基類
        構(gòu)造方法:BaseManager([address[, authkey]])
        • address:(hostname,port),指定服務(wù)器的網(wǎng)址地址,默認(rèn)為簡單分配一個(gè)空閑的端口
        • authkey:連接到服務(wù)器的客戶端的身份驗(yàn)證,默認(rèn)為current_process().authkey的值
        實(shí)例方法:
        • start([initializer[, initargs]]):啟動(dòng)一個(gè)單獨(dú)的子進(jìn)程,并在該子進(jìn)程中啟動(dòng)管理器服務(wù)器
        • get_server():獲取服務(wù)器對(duì)象
        • connect():連接管理器對(duì)象
        • shutdown():關(guān)閉管理器對(duì)象,只能在調(diào)用了start()方法之后調(diào)用
        實(shí)例屬性:
        • address:只讀屬性,管理器服務(wù)器正在使用的地址
        SyncManager,以下類型均不是進(jìn)程安全的,需要加鎖..
        實(shí)例方法:
        • Array(self,*args,**kwds)
        • BoundedSemaphore(self,*args,**kwds)
        • Condition(self,*args,**kwds)
        • Event(self,*args,**kwds)
        • JoinableQueue(self,*args,**kwds)
        • Lock(self,*args,**kwds)
        • Namespace(self,*args,**kwds)
        • Pool(self,*args,**kwds)
        • Queue(self,*args,**kwds)
        • RLock(self,*args,**kwds)
        • Semaphore(self,*args,**kwds)
        • Value(self,*args,**kwds)
        • dict(self,*args,**kwds)
        • list(self,*args,**kwds)

        使用示例:

        import multiprocessing

        def f(x, arr, l, d, n):

            x.value = 3.14

            arr[0] = 5

            l.append('Hello')

            d[1] = 2

            n.a = 10

        if __name__ == '__main__':

            server = multiprocessing.Manager()

            x = server.Value('d', 0.0)

            arr = server.Array('i', range(10))

            l = server.list()

            d = server.dict()

            n = server.Namespace()

            proc = multiprocessing.Process(target=f, args=(x, arr, l, d, n))

            proc.start()

            proc.join()

            print(x.value)

            print(arr)

            print(l)

            print(d)

            print(n)

        同步子進(jìn)程模塊

        Lock(互斥鎖)

        Lock鎖的作用是當(dāng)多個(gè)進(jìn)程需要訪問共享資源的時(shí)候,避免訪問的沖突。加鎖保證了多個(gè)進(jìn)程修改同一塊數(shù)據(jù)時(shí),同一時(shí)間只能有一個(gè)修改,即串行的修改,犧牲了速度但保證了數(shù)據(jù)安全。Lock包含兩種狀態(tài)——鎖定和非鎖定,以及兩個(gè)基本的方法。

        構(gòu)造方法:Lock()

        實(shí)例方法:

        • acquire([timeout]): 使線程進(jìn)入同步阻塞狀態(tài),嘗試獲得鎖定。
        • release(): 釋放鎖。使用前線程必須已獲得鎖定,否則將拋出異常。

        使用示例:

        from multiprocessing import Process, Lock

        def l(lock, num):

            lock.acquire()

            print("Hello Num: %s" % (num))

            lock.release()

        if __name__ == '__main__':

            lock = Lock()  # 這個(gè)一定要定義為全局

            for num in range(20):

                Process(target=l, args=(lock, num)).start()

        RLock(可重入的互斥鎖(同一個(gè)進(jìn)程可以多次獲得它,同時(shí)不會(huì)造成阻塞)

        RLock(可重入鎖)是一個(gè)可以被同一個(gè)線程請求多次的同步指令。RLock使用了“擁有的線程”和“遞歸等級(jí)”的概念,處于鎖定狀態(tài)時(shí),RLock被某個(gè)線程擁有。擁有RLock的線程可以再次調(diào)用acquire(),釋放鎖時(shí)需要調(diào)用release()相同次數(shù)??梢哉J(rèn)為RLock包含一個(gè)鎖定池和一個(gè)初始值為0的計(jì)數(shù)器,每次成功調(diào)用 acquire()/release(),計(jì)數(shù)器將+1/-1,為0時(shí)鎖處于未鎖定狀態(tài)。

        構(gòu)造方法:RLock()

        實(shí)例方法:

        • acquire([timeout]):同Lock
        • release(): 同Lock

        Semaphore(信號(hào)量)

        信號(hào)量是一個(gè)更高級(jí)的鎖機(jī)制。信號(hào)量內(nèi)部有一個(gè)計(jì)數(shù)器而不像鎖對(duì)象內(nèi)部有鎖標(biāo)識(shí),而且只有當(dāng)占用信號(hào)量的線程數(shù)超過信號(hào)量時(shí)線程才阻塞。這允許了多個(gè)線程可以同時(shí)訪問相同的代碼區(qū)。比如廁所有3個(gè)坑,那最多只允許3個(gè)人上廁所,后面的人只能等里面有人出來了才能再進(jìn)去,如果指定信號(hào)量為3,那么來一個(gè)人獲得一把鎖,計(jì)數(shù)加1,當(dāng)計(jì)數(shù)等于3時(shí),后面的人均需要等待。一旦釋放,就有人可以獲得一把鎖。

        構(gòu)造方法:Semaphore([value])

        • value:設(shè)定信號(hào)量,默認(rèn)值為1

        實(shí)例方法:

        • acquire([timeout]):同Lock
        • release(): 同Lock

        使用示例:

        from multiprocessing import Process, Semaphore

        import time, random

        def go_wc(sem, user):

            sem.acquire()

            print('%s 占到一個(gè)茅坑' % user)

            time.sleep(random.randint(0, 3))

            sem.release()

            print(user, 'OK')

        if __name__ == '__main__':

            sem = Semaphore(2)

            p_l = []

            for i in range(5):

                p = Process(target=go_wc, args=(sem, 'user%s' % i,))

                p.start()

                p_l.append(p)

            for i in p_l:

                i.join()

        Condition(條件變量)

        可以把Condition理解為一把高級(jí)的鎖,它提供了比Lock, RLock更高級(jí)的功能,允許我們能夠控制復(fù)雜的線程同步問題。Condition在內(nèi)部維護(hù)一個(gè)鎖對(duì)象(默認(rèn)是RLock),可以在創(chuàng)建Condigtion對(duì)象的時(shí)候把瑣對(duì)象作為參數(shù)傳入。Condition也提供了acquire, release方法,其含義與鎖的acquire, release方法一致,其實(shí)它只是簡單的調(diào)用內(nèi)部鎖對(duì)象的對(duì)應(yīng)的方法而已。Condition還提供了其他的一些方法。

        構(gòu)造方法:Condition([lock/rlock])

        • 可以傳遞一個(gè)Lock/RLock實(shí)例給構(gòu)造方法,否則它將自己生成一個(gè)RLock實(shí)例。
        實(shí)例方法:
        • acquire([timeout]):首先進(jìn)行acquire,然后判斷一些條件。如果條件不滿足則wait
        • release():釋放 Lock
        • wait([timeout]): 調(diào)用這個(gè)方法將使線程進(jìn)入Condition的等待池等待通知,并釋放鎖。使用前線程必須已獲得鎖定,否則將拋出異常。處于wait狀態(tài)的線程接到通知后會(huì)重新判斷條件。
        • notify(): 調(diào)用這個(gè)方法將從等待池挑選一個(gè)線程并通知,收到通知的線程將自動(dòng)調(diào)用acquire()嘗試獲得鎖定(進(jìn)入鎖定池);其他線程仍然在等待池中。調(diào)用這個(gè)方法不會(huì)釋放鎖定。使用前線程必須已獲得鎖定,否則將拋出異常。
        • notifyAll(): 調(diào)用這個(gè)方法將通知等待池中所有的線程,這些線程都將進(jìn)入鎖定池嘗試獲得鎖定。調(diào)用這個(gè)方法不會(huì)釋放鎖定。使用前線程必須已獲得鎖定,否則將拋出異常。

        使用示例:

        import multiprocessing

        import time

        def stage_1(cond):

            """perform first stage of work,

            then notify stage_2 to continue

            """


            name = multiprocessing.current_process().name

            print('Starting', name)

            with cond:

                print('{} done and ready for stage 2'.format(name))

                cond.notify_all()

        def stage_2(cond):

            """wait for the condition telling us stage_1 is done"""

            name = multiprocessing.current_process().name

            print('Starting', name)

            with cond:

                cond.wait()

                print('{} running'.format(name))

        if __name__ == '__main__':

            condition = multiprocessing.Condition()

            s1 = multiprocessing.Process(name='s1',

                                         target=stage_1,

                                         args=(condition,))

            s2_clients = [

                multiprocessing.Process(

                    name='stage_2[{}]'.format(i),

                    target=stage_2,

                    args=(condition,),

                )

                for i in range(1, 3)

            ]

            for c in s2_clients:

                c.start()

                time.sleep(1)

            s1.start()

            s1.join()

            for c in s2_clients:

                c.join()

        Event(事件)

        Event內(nèi)部包含了一個(gè)標(biāo)志位,初始的時(shí)候?yàn)閒alse??梢允褂胹et()來將其設(shè)置為true;或者使用clear()將其從新設(shè)置為false;可以使用is_set()來檢查標(biāo)志位的狀態(tài);另一個(gè)最重要的函數(shù)就是wait(timeout=None),用來阻塞當(dāng)前線程,直到event的內(nèi)部標(biāo)志位被設(shè)置為true或者timeout超時(shí)。如果內(nèi)部標(biāo)志位為true則wait()函數(shù)理解返回。

        使用示例:

        import multiprocessing

        import time

        def wait_for_event(e):

            """Wait for the event to be set before doing anything"""

            print('wait_for_event: starting')

            e.wait()

            print('wait_for_event: e.is_set()->', e.is_set())

        def wait_for_event_timeout(e, t):

            """Wait t seconds and then timeout"""

            print('wait_for_event_timeout: starting')

            e.wait(t)

            print('wait_for_event_timeout: e.is_set()->', e.is_set())

        if __name__ == '__main__':

            e = multiprocessing.Event()

            w1 = multiprocessing.Process(

                name='block',

                target=wait_for_event,

                args=(e,),

            )

            w1.start()

            w2 = multiprocessing.Process(

                name='nonblock',

                target=wait_for_event_timeout,

                args=(e, 2),

            )

            w2.start()

            print('main: waiting before calling Event.set()')

            time.sleep(3)

            e.set()

            print('main: event is set')

        其他內(nèi)容

        multiprocessing.dummy 模塊與 multiprocessing 模塊的區(qū)別:dummy 模塊是多線程,而 multiprocessing 是多進(jìn)程, api 都是通用的。所有可以很方便將代碼在多線程和多進(jìn)程之間切換。multiprocessing.dummy通常在IO場景可以嘗試使用,比如使用如下方式引入線程池。
        from multiprocessing.dummy import Pool as ThreadPool

        multiprocessing.dummy與早期的threading,不同的點(diǎn)好像是在多多核CPU下,只綁定了一個(gè)核心(具體未考證)。

        參考文檔:

        • https://docs.python.org/3/library/multiprocessing.html
        • https://www.rddoc.com/doc/Python/3.6.0/zh/library/multiprocessing/

        Python并發(fā)之concurrent.futures

        Python標(biāo)準(zhǔn)庫為我們提供了threading和multiprocessing模塊編寫相應(yīng)的多線程/多進(jìn)程代碼。從Python3.2開始,標(biāo)準(zhǔn)庫為我們提供了concurrent.futures模塊,它提供了ThreadPoolExecutor和ProcessPoolExecutor兩個(gè)類,實(shí)現(xiàn)了對(duì)threading和multiprocessing的更高級(jí)的抽象,對(duì)編寫線程池/進(jìn)程池提供了直接的支持。concurrent.futures基礎(chǔ)模塊是executor和future。

        Executor

        Executor是一個(gè)抽象類,它不能被直接使用。它為具體的異步執(zhí)行定義了一些基本的方法。ThreadPoolExecutor和ProcessPoolExecutor繼承了Executor,分別被用來創(chuàng)建線程池和進(jìn)程池的代碼。

        ThreadPoolExecutor對(duì)象

        ThreadPoolExecutor類是Executor子類,使用線程池執(zhí)行異步調(diào)用。

        class concurrent.futures.ThreadPoolExecutor(max_workers)

        使用max_workers數(shù)目的線程池執(zhí)行異步調(diào)用。

        ProcessPoolExecutor對(duì)象

        ThreadPoolExecutor類是Executor子類,使用進(jìn)程池執(zhí)行異步調(diào)用。

        class concurrent.futures.ProcessPoolExecutor(max_workers=None)
        使用max_workers數(shù)目的進(jìn)程池執(zhí)行異步調(diào)用,如果max_workers為None則使用機(jī)器的處理器數(shù)目(如4核機(jī)器max_worker配置為None時(shí),則使用4個(gè)進(jìn)程進(jìn)行異步并發(fā))。

        submit()方法

        Executor中定義了submit()方法,這個(gè)方法的作用是提交一個(gè)可執(zhí)行的回調(diào)task,并返回一個(gè)future實(shí)例。future對(duì)象代表的就是給定的調(diào)用。

        Executor.submit(fn, *args, **kwargs)

        • fn:需要異步執(zhí)行的函數(shù)
        • *args, **kwargs:fn參數(shù)

        使用示例:

        from concurrent import futures

        def test(num):

            import time

            return time.ctime(), num

        with futures.ThreadPoolExecutor(max_workers=1) as executor:

            future = executor.submit(test, 1)

            print(future.result())

        map()方法

        除了submit,Exectuor還為我們提供了map方法,這個(gè)方法返回一個(gè)map(func, *iterables)迭代器,迭代器中的回調(diào)執(zhí)行返回的結(jié)果有序的。

        Executor.map(func, *iterables, timeout=None)

        • func:需要異步執(zhí)行的函數(shù)
        • *iterables:可迭代對(duì)象,如列表等。每一次func執(zhí)行,都會(huì)從iterables中取參數(shù)。
        • timeout:設(shè)置每次異步操作的超時(shí)時(shí)間,timeout的值可以是int或float,如果操作超時(shí),會(huì)返回raisesTimeoutError;如果不指定timeout參數(shù),則不設(shè)置超時(shí)間。

        使用示例:

        from concurrent import futures

        def test(num):

            import time

            return time.ctime(), num

        data = [1, 2, 3]

        with futures.ThreadPoolExecutor(max_workers=1) as executor:

            for future in executor.map(test, data):

                print(future)

        shutdown()方法

        釋放系統(tǒng)資源,在Executor.submit()或 Executor.map()等異步操作后調(diào)用。使用with語句可以避免顯式調(diào)用此方法。

        Executor.shutdown(wait=True)

        Future

        Future可以理解為一個(gè)在未來完成的操作,這是異步編程的基礎(chǔ)。通常情況下,我們執(zhí)行io操作,訪問url時(shí)(如下)在等待結(jié)果返回之前會(huì)產(chǎn)生阻塞,cpu不能做其他事情,而Future的引入幫助我們在等待的這段時(shí)間可以完成其他的操作。
        Future類封裝了可調(diào)用的異步執(zhí)行。Future 實(shí)例通過 Executor.submit()方法創(chuàng)建。
        • cancel():試圖取消調(diào)用。如果調(diào)用當(dāng)前正在執(zhí)行,并且不能被取消,那么該方法將返回False,否則調(diào)用將被取消,方法將返回True。
        • cancelled():如果成功取消調(diào)用,返回True。
        • running():如果調(diào)用當(dāng)前正在執(zhí)行并且不能被取消,返回True。
        • done():如果調(diào)用成功地取消或結(jié)束了,返回True。
        • result(timeout=None):返回調(diào)用返回的值。如果調(diào)用還沒有完成,那么這個(gè)方法將等待超時(shí)秒。如果調(diào)用在超時(shí)秒內(nèi)沒有完成,那么就會(huì)有一個(gè)Futures.TimeoutError將報(bào)出。timeout可以是一個(gè)整形或者浮點(diǎn)型數(shù)值,如果timeout不指定或者為None,等待時(shí)間無限。如果futures在完成之前被取消了,那么 CancelledError 將會(huì)報(bào)出。
        • exception(timeout=None):返回調(diào)用拋出的異常,如果調(diào)用還未完成,該方法會(huì)等待timeout指定的時(shí)長,如果該時(shí)長后調(diào)用還未完成,就會(huì)報(bào)出超時(shí)錯(cuò)誤futures.TimeoutError。timeout可以是一個(gè)整形或者浮點(diǎn)型數(shù)值,如果timeout不指定或者為None,等待時(shí)間無限。如果futures在完成之前被取消了,那么 CancelledError 將會(huì)報(bào)出。如果調(diào)用完成并且無異常報(bào)出,返回None.
        • add_done_callback(fn):將可調(diào)用fn捆綁到future上,當(dāng)Future被取消或者結(jié)束運(yùn)行,fn作為future的唯一參數(shù)將會(huì)被調(diào)用。如果future已經(jīng)運(yùn)行完成或者取消,fn將會(huì)被立即調(diào)用。
        • wait(fs, timeout=None, return_when=ALL_COMPLETED)
          • 等待fs提供的 Future 實(shí)例(possibly created by different Executor instances) 運(yùn)行結(jié)束。返回一個(gè)命名的2元集合,分表代表已完成的和未完成的
          • return_when 表明什么時(shí)候函數(shù)應(yīng)該返回。它的值必須是一下值之一:
            • FIRST_COMPLETED :函數(shù)在任何future結(jié)束或者取消的時(shí)候返回。
            • FIRST_EXCEPTION :函數(shù)在任何future因?yàn)楫惓=Y(jié)束的時(shí)候返回,如果沒有future報(bào)錯(cuò),效果等于
            • ALL_COMPLETED :函數(shù)在所有future結(jié)束后才會(huì)返回。
        • as_completed(fs, timeout=None):參數(shù)是一個(gè) Future 實(shí)例列表,返回值是一個(gè)迭代器,在運(yùn)行結(jié)束后產(chǎn)出 Future實(shí)例 。

        使用示例:

        from concurrent.futures import ThreadPoolExecutor, wait, as_completed

        from time import sleep

        from random import randint

        def return_after_5_secs(num):

            sleep(randint(1, 5))

            return "Return of {}".format(num)

        pool = ThreadPoolExecutor(5)

        futures = []

        for x in range(5):

            futures.append(pool.submit(return_after_5_secs, x))

        print(1)

        for x in as_completed(futures):

            print(x.result())

        print(2)

        參考鏈接:https://pythonhosted.org/futures  作者:錢魏Way 來源:https://www.biaodianfu.com/python-multi-thread-and-multi-process.html

            
            


        本文僅做學(xué)術(shù)分享,如有侵權(quán),請聯(lián)系刪文。

              
              

        —THE END—

        瀏覽 210
        點(diǎn)贊
        評(píng)論
        收藏
        分享

        手機(jī)掃一掃分享

        分享
        舉報(bào)
        評(píng)論
        圖片
        表情
        推薦
        點(diǎn)贊
        評(píng)論
        收藏
        分享

        手機(jī)掃一掃分享

        分享
        舉報(bào)
        1. <strong id="7actg"></strong>
        2. <table id="7actg"></table>

        3. <address id="7actg"></address>
          <address id="7actg"></address>
          1. <object id="7actg"><tt id="7actg"></tt></object>
            色xxxx| 伊人免费视频在线观看| 热久色| 亚洲精品69| 国精产品一区一区三区四川| 后入av| 国产精品片| 成人在线h| 色综合一区二区| www亚洲视频| 西西人体大胆裸体A片| 亚洲AV无码精品久久一区二区| 日韩成人AV在线播放| 视频一区在线播放| 中文字幕精品视频在线观看| 丁香五月社区| 一区二区三区在线播放| 亚洲色成人中文字幕在线| 日韩人妻无码电影| 精品一区二区三区四区视频| 久久成人小电影| www.干| 日日夜夜精品| 国产激情精品视频| 夜夜骑天天| 河南熟妇搡BBBB搡BBBB| 成人性爱视频在线播放| 亚洲第一无码| 久久久亚洲无码精品| 永久黄网站| www.尤物| 亚洲经典一| 日韩熟妇无码| 激情无码国产| 国产性爱在线| 国产精品久久久91| 蜜臀AV在线观看| 成人亚洲AV| AV无码网站| 人人操人人射| 北条麻妃无码视频在线| 97国产资源| 欧美久久久久久| 欧美色999| 国产xxxx| 欧美sesese| 亚洲精品资源| 欧美成人乱码一区二区三区| 草B网| 无码一区二区三区免费| 国产乱子伦一区二区三区视频| av天堂无码| 草逼免费视频| 色资源在线观看| 欧美淫乱视频| 五月丁香六月情| 粉嫩小泬BBBBBB免费看| 日韩无码电影网站| 日韩大片在线| 成人免看一级a一片A片| 日韩无码人妻一区二区三区| 久久熟妇| 大鸡巴在线| 精品国产精品三级精品AV网址| 国产无码中文字幕| 超碰小说| 欧美午夜精品久久久久久3D| 无码少妇| 国产毛片精品一区二区色欲黄A片| 久久无码一区二区三区| 91精品国产一区二区三区四区大| 亚洲成人五月天| 最新人妻| 我和岳m愉情XXXⅩ视频| 国内精品国产成人国产三级| 人妻精品综合码| 亚洲成人在线网站| 18禁无码网站| 无码少妇视频| 加勒比一区二区| 亚洲精品成人av无码| 二区三区不卡| 国产av不卡| 人妻少妇偷人精品无码免费| 人人草在线观看| 日韩群交| 亚洲性爱在线视频| 日韩操逼| 69福利视频| 日本一级特黄电影| av青青草原| 曰本中文字幕在线视频| 黄色成人网址| 怡春院综合成人社区| 99久久婷婷| 91精品综合久久久久久五月丁香| 一级午夜| 韩国午夜激情| 五月婷婷综合在线| www日本色| 日韩AV中文字幕在线| 一本色道久久综合熟妇人妻| 猛男大粗猛爽H男人味| 人妻懂色av粉嫩av浪潮av| 免费无码进口视频| 久久久久成人视频| 丁香五月激情视频| 啊啊啊啊啊靠逼| 9999久久久久| 精品无人区无码乱码毛片国产 | 操美女大逼| 北条麻妃无码视频| 亚洲黄色视频网站在线观看| 国产毛片久久久久久国产毛片| 91九色首页| 亚洲成年人网| 97国产免费| 97资源在线视频| 成人三级电影| 国产精品不卡一区二区三区| 综合五月婷婷| 成人在线国产| 四虎www| 国产免费自拍| 欧美一级婬片AAAAAA片| 一级a片在线免费观看| 欧美视频在线一区| 日韩三级精品| 蜜桃在线视频| 波多野结衣高清无码视频| 91国视频| 蜜桃免费视频| 黄色电影A| 中文字幕免| 无码少妇视频| 欧美精品A片| 脓肿是什么原因引起的,该怎么治疗| 天天添| 成年人黄色视频免费观看| 免费在线观看黄色| 在线免费看黄网站| 无码精品一区二区三区在线观看| 人人干人人上| 亚洲精品性爱| 国产高清无码一区| 亚洲福利天堂| 成年人观看视频| 夜夜av| 嫩BBB槡BBBB槡BBB小号| 人人妻人人爽人人操| 免费性爱网站| 北条麻妃精品青青久久价格| 久草综合视频| 丰满人妻一区二区三区视频54| 四虎在线观看视频| 亚洲黄色无码| 欧美日韩激情视频| 国产精品欧美综合在线| 成人网站免费在线观看| 97AV在线| 伊人网视频在线播放| 欧美日屄| 91久久午夜无码鲁丝片久久人妻 | 在线观看A片| 囯产伦精一区二区三区四区| 丁香九月婷婷| 日韩在线二区| 欧美黄片免费看| 欧美成人超碰| 中国老少配BBwBBwBBW| 操b在线| 欧美精品秘一区二区三区蜜臀| 日韩av电影免费在线观看 | 欧美日韩亚洲综合| 免费日本A片| 欧美精品一二三区| 青青草乱伦视频| 免费一级婬片AAA片毛片A级| 2018天天干天天操| 九九九视频在线观看| 欧美一级视频| 色综合久久天天综合网| 草草影院第一页YYCCC| 中文无码字幕| 99热在线观看精品免费| 中文激情网| 黄色操逼网站?| 91人妻无码精品蜜桃| 台湾精品一区二区三区| 久久91久久久久麻豆精品| 在线免费黄| 亚洲日韩一区二区三区| 国产探花自拍| 国产三级国产三级国产普通话| 在线观看高清无码视频| 国产aⅴ激情无码久久久无码 | 高清无码自拍| 欧美精品福利| 操老女人逼视频| 人人爱人人爽人人操| 欧美日韩一| 久久性爱网| 黄色大片免费在线观看| 夜夜爱视频| 91日韩视频在线| 2025中文在线观看最好看的电影 | 久久久久久麻豆| 色综合色| 嫩小槡BBBB槡BBBB槡漫画| 久久牛牛| 51妺嘿嘿在线电影免费观看| 青青久久91| 日韩乱轮小说与视频| 91嫩草欧美久久久九九九| 国产黄| 亚洲精品色| 91香蕉网| 亚洲丝袜av| 亚洲日韩精品欧美一区二区yw| 欧美午夜精品久久久| 亚洲AV成人片无码网站| A级成人网站| 91成人一区| 色资源站| 日本AAAA片| 成人自拍视频在线| 欧美国产另类| 四川揉BBB搡BBB| 91视频成人版一区二区| 天天干天天摸| 无码中文字幕在线观看| 亚洲精品AⅤ一区二| 高清无码一区二区三区四区| 一级全黄120分钟免费| 日韩av在线电影| 国产精品视频在线免费观看 | 色视频在线| caopeng97| 婷婷精品免费久久| 一区二区三区观看| 国产视频入口| 日韩熟妇无码| 免费日韩黄色电影| www.久久精品视频| 欧美一区二区三区四| 青青草原视频在线免费观看| 国产福利美女网站| 精品无码在线观看视频| 任你爽在线视频| 黑人干亚洲人| 天天干天天撸影视| 亚洲无码高清在线观看视频| 强波多野结衣黑人| 91麻豆福利在线| 正在播放李彩斐被洋老外| 日本爱爱免费视频| 日韩免费A片| 日本一区二区三区在线视频| 日韩欧美亚洲| 欧美国产精品一二三产品在哪买 | 日韩高清无码毛片| 亚洲www在线| 午夜无码高清| 国产在线视频一区二区三区 | 欧美性爱在线观看| 毛片毛片毛片| 中文字幕观看| a亚洲天堂| 秋霞无码一区二区三区| www99国产| 秋霞日韩| 国产婷婷五月| 国产精品在线免费| 抠逼网站| 蜜桃在线无码| 97人妻人人澡人人| 超碰免费人人| 黄在线免费观看| 性爱视频小说| 69xx视频| 五月婷亚洲精品AV天堂| 国产成人精品一区二区三区| 日韩熟女视频| 在线免费观看黄色片| 日本一级黄色电影| 欧美不卡| 久久精品亚洲| 蜜桃成人AV| 久射精品| 亚洲精品国产精品乱玛不99| 天堂网中文在线| 中文字幕在线播放av| 成人免费视频网站| 亚洲砖区区免费| 伊人精品A片一区二区三区| 中文字幕一区在线| 另类老妇videos另类| 免费A片国产毛无码A片| 97精品| 国精产品一区一区三区四区| 黄色性视频| 18性XXXXX性猛交| 天天色色天天| 中文在线资源| 亚洲五月丁香| 中文字幕+乱码+中文乱码91| Av天堂图片在线| 国语A片| 五月激情婷婷网| 先锋影音资源AV| 日韩亚洲欧美在线观看| 国产精品久久久久久久久久两年半 | 东京热综合影院| 久久成人在线视频| 精品日韩中文字幕| 久久久永久免费视频| 亚洲AV无码专区在线播放中文| 操逼视频欧美| 亚洲午夜视频在线观看| 欧美九九| 探花熟女| www.日逼| 久久黄视频| 亚洲黄色无码| 菊花综合网| 欧美中文字| 欧美国产乱伦| 亚洲最大的成人网站| 国产精品毛片久久久久久久| 亚洲无码高清在线观看视频| 九色蝌蚪9l视频蝌蚪9l视频成人熟妇| 国产精品不卡在线| a黄色片| 秋霞福利视频| 亚洲欧洲在线播放| 2024AV在线| 亚洲午夜福利在线| 日本一级a片| 男女高清无码| 18啪啪网站| 精品国产欧美一区二区三区成人| 99在线免费观看| 手机AV免费| 老司机精品| 在线观看免费无码| 成人精品无码| 国产人妻人伦精品1国产丝袜 | 黄片免费观看网站| 爱爱帝国综合社区| 91av在线电影| 3d动漫精品H区XXXXX区| 免费无码视频在线观看| 日韩一区二区AV| 无码人妻一区二区三区在线视频不卡 | 玖玖在线| 青草视频在线观看免费| 夜夜操夜夜骑| 国产av天堂| 亚洲免费观看高清视频| 色黄网站在线观看| 刘玥91精一区二区三区| 亚洲日韩乱码在线| 香蕉视频日韩| 日韩午夜成人电影| BBWBBw嫩| 国产免费久久久| 色呦呦一区二区三区| 91在线观看| 日韩视频中文字幕在线| 99免费在线观看视频| 麻豆性爱| 日本久久成人| 三级片在线网站| 中文字幕三级av片| 91视频第一页| 91色噜噜狠狠色婷婷| 成人网站视频在线免费观看| 日韩综合在线观看| 国产激情在线播放| 成人精品永久免费视频99久久精品| 欧美精品亚洲| 国产又粗又长| 999成人网| 亚洲AV无码成人| 无码一区二区三区四季| 91看片看婬黄大片女跟女| 五月精品| 婷婷五月激情小说| 日韩亚洲视频| 大屌探花| 日韩在线观看一区二区| 久久久久国产一区二区三区| 国产尤物在线观看| 精品人妻一区二区蜜桃视频| 亚洲无码视频在线观看高清| 久久国产热视频| 亚洲成人三区| 香蕉中文在线| 五月激情啪啪| 加勒比无码综合| 青青色在线观看| 欲撸视频| 人人干97| 不迷路福利视频| 成人电影综合网| 天天干天天操天天射| 在线播放高清无码| 日本精品一区二区三区四区的功能 | 婷婷婷色| 一级A片免费看| 99久久婷婷国产综合| 婷婷久久综合久色综| 免费看黄色A片| 成人三级片在线播放| 黄色污污污网站| 日本爽妇网| 性欧美V| 久久XX| 五月婷婷日韩| 一级内射视频| 一区无码免费| 91午夜视频| 中文在线最新版天堂8| 成人大香蕉网| 国产成人在线精品| 久久精品视频99| 青草成人在线| 国产一级aa| 男女做爱无码| 狠狠一区| 国产无码做爱视频| 五月伊人网| 天天干免费视频| 高清无码自拍| 亚洲秘无码一区二区| 91在线播放视频| 91色色色色| 久久精品毛片| 精品久久久久久AV2025| 玖玖国产| 无码欧美人XXXXX日本无码| 欧美女人操逼| 国产高清AV无码| 久久4| 日韩资源网| 丁香五月婷婷六月| 国产乱码精品一区二区三区的特点| 久久一| 亚洲婷婷在线| 日韩一| 老鸭窝毛片| 欧美乱伦一区| 性爱乱伦视频| 成人网中文字幕| 99久在线精品99re8| 人人艹人人艹| 7799精品视频| 精品久久免费视频| www.国产豆花精品区| 思思热在线观看视频| 欧美特黄AAAAAA| 精品乱子伦一区二区三区| 精品国产区一区二| 久久久国产探花视频| 天天干天天舔| 日韩不卡视频在线观看| 少妇精品| 国产Av高清| 波多野结衣无码网站| 成人做爰A片一区二区| 黄色视频毛片| 美女毛片网站| www.天天日| A片大香蕉| 四季AV一区二区凹凸懂色桃花| 91亚洲在线| 欧美性猛交XXXX乱大交HD| 4080yy午夜理论片成人| 91丨熟女丨首页| 日逼黄色视频| 激情av在线观看| 国产成人AV网站| 青春草在线免费视频| 九九九色视频| 欧美黄色免费看| 丁香婷婷激情五月| 亚洲综合免费观看高清完整版在线观| 99人妻视频| 婷婷五月丁香色| 黄色视频在线观看| 乱码中文字幕日韩欧美在线| 一级色情片| 天天撸在线| 亚洲国产成人精品激情在线| 黄色无码网站| 黄色视频免费播放| 少妇搡BBBB搡BBB搡小说| 思思久久高颜值| 日本成人黄色视频| 亚洲色,天堂网| 日日爱99| 久久嫩草在线影院| 国产高清自拍视频| 草草影院第一页| AAA久久久| 强伦轩人妻一区二区三区最新版本更新内容 | 91人妻人人澡人人爽人人DVD| 欧美亚洲综合手机在线| 亚洲AV男人天堂| 波多野结衣一级婬片A片免费下载 囯产精品久久久久久久久免费无码 | 天天色情| 曰曰摸日日碰| 欧美激情一区二区A片成人牛牛| 久久肥妞操| 99久久99九九99九九九| 日本成人网址| 日韩精品无码一区二区三区 | 大香蕉在线观看视频| 中文字幕高清在线| 最新中文字幕av| 亚洲精品在线观看免费| 日韩三级片av| 欧美成人无码片免费看A片秀色| 午夜网页| 亚洲无码一卡二卡| 九九re精品视频在线观看| 亚洲蜜桃av一区| 欧美肏逼视频| 99热都是精品| 亚洲无码1区| 久久嫩草| 亚洲中文字幕无码在线观看| 青青草东路热vv| 嫩草视频| 99精品人妻| 亚洲成人影片在线观看| 可以免费看的黄色视频| 欧美日韩一级黄色片| 久久人妻无码| 婷婷色网| 日韩一级a片| 91av在线免费观看| 国产尤物视频| 黄片免费高清| 3d动漫一区二区| 中出欧美亚洲| 99爱视频| 欧美三级在线视频| 成人做爰黄AA片免费看三区| 精东AV| 96精品| 8050午| 午夜精品久久久久久久久无码99热 | 草av| 日韩一级爱爱| 黄色视频在线观看亚洲一区二区三区免费| 五月网| 九九热精品视频在线观看| A在线免费观看| 国产伦精一品二品三品app| 亚州无码免费| 日本aaaa片| av青草| 黄片网站入口| 福利视频中文字幕| 豆花无码视频一区二区| 一本色道88久久加勒比精品| 日韩操逼| 一级片在线播放| 另类一区| 无码-ThePorn| 蜜柚AV| 久久免费精品视频| 九九热精品视频在线播放| 亚洲免费黄色| 午夜免费视频1000| 嗯啊av| 日韩经典无码| 97人妻在线视频| 尤物在线| 日韩午夜福利| 91亚洲精品乱码久久久久久蜜桃| 狠狠干中文字幕| 蜜芽成人网| 先锋成人在线| 你懂得在线| 亚洲Av无码成人专区擼| 亚洲一区二区在线视频| 91AV免费观看| 丁香色五月婷婷| 91精品国产综合久久久蜜臀主演| 国产一级婬片A片免费无成人黑豆| 四虎成人网址| 日本高清无码视频| 囯产精品久久久久久久| 日韩中文在线视频| www.亚洲精品| 7799精品| 偷拍一区二区| 狠狠操在线视频| 国产一级内射| 99在线视频观看| 日本高清色清di免费观看| 污污的网站18| 国产主播av| 自拍偷拍福利视频网站| 国产日韩91| 男女性爱视频网站| 人人操人人爽人人妻| 操女人大逼| 亚洲狼友视频| 69久久久| 水蜜桃视频免费观看| 黄色成人视频在线免费观看| 人妻综合网| 国产精品视频免费| 人妻无码久久精品人妻成人| 六月丁香五月天| 青青草大香蕉在线| 亚洲AV永久无码精品国产精| 一区二区三区Av| 毛片资源| 亚欧洲精品| 亚洲精品无码久久久| 人人插人人摸| 三级片无码| 丰满人妻一区二区免费看| 成人午夜视频精品一区| 亚洲中文AV在线| 香蕉成人网站| 亚洲中文久久| 日韩在线看片| 亚洲午夜福利在线| 亚洲一卡二卡| 超碰在线| 中国无码| 黄色毛片网| 上床视频网站| a视频免费观看| 久热无码| 综合色国产精品欧美在线| 手机AV免费| 五月天操逼网站| 日本操B视频| 男女做爱视频网站| 欧美日韩黄色| 亚洲高清无码久久| 免费观看一级黄片| 久久免费视频,久久免费视频| 成人亚洲在线| www.51av| 色秘乱码一区二区三区| 又爽又黄免费网站97双女| 黄色视频日本免费| 在线观看日韩欧美| 黄色综合网| 一起操在线观看| 亚洲AV自拍| 成年人激情网| 人人操人人妻人人看| 一区在线免费观看| 久久天天操| 大香蕉久久久| 手机看片1024久久| 一级特黄色片| 在线AⅤ| 人人色人人色| 不卡无码免费视频| 曰曰摸日日碰| 在线免费观看黄色电影| 免费18蜜桃久久19| AV高清无码在线观看| 日韩中文字幕在线免费观看| 9999国产精品| 99re视频在线| 1024手机在线观看| 色婷婷久综合久久一本国产AV | 五月天激情小说| 五月天福利网| 手机看片1024国产| 成人免费a片| 五月丁香六月久久| 天天天日天天天天天天天日歌词| 一级国产欧美成人A片| 欧美视频免费| AV中文字幕电影| 日韩欧美中文字幕在线视频| 日韩黄色AV| 国产人成视频免费观看| 成人精品一区日本无码网站suv | 一级黄色蜜芽视频| A级毛片视频| 久操人妻| 欧美footjob高跟脚交| 亚洲爱爱网站| 成人三级片免费| 不迷路福利视频| 久久久久亚洲AV无码专区| 91欧美精品| 女侠吕四娘第二部| 操逼网址大全| 91豆花成人网站| 成人精品一区二区三区视频| xxxxxbbbbb| 少妇嫩搡BBBB搡BBBB| 操欧美女人| 黄色电影A| 三级片欧美| 688AV秘无码一区二区| 一区二区无码区| 内射视频网站| 日韩精品在线一区| 男人天堂无码| 日本少妇bbw| 成人午夜福利视频| av高清无码| 最近中文字幕中文翻译歌词| 午夜福利10000| 国产18欠欠欠一区二区| 欧美熟妇BBB搡BBB| 亚洲天堂在线视频观看| 一级a一级a爰片免费免免中国A片 一级一级a免一级a做免费线看内裤 | 搡bbb| 亚洲欧美精品AAAAAA片| 日本色电影在线观看| 日韩在线免费| 色婷婷色婷婷| 99无码视频| 激情五月天在线视频| 婷婷久月| 中文字幕日韩欧美在线| 91AV免费观看| 91大长腿美女花外围在线观看| 91porn在线观看| 日韩电影免费在线观看中文字幕| 人人弄人人| 婷婷丁香五月激情| 精品成人无码一区二区三区| 日韩黄色视频网站| 欧洲毛片基地c区| 久久黄色小视频| 夫妻成人免费看片一区二区| 97超碰资源总站| 日韩a| 国产97在线视频| 六月丁香五月婷婷| 97国产精品久久| 亚洲中文在线播放| 日韩一级性爱视频| 成人怡红院| 日本黄色视频网| 国产综合久久| 亚洲乱码一区二区三区| 无码视频在线免费观看| 大香蕉75在线| 国产成人视频| 草逼动态图| 天堂在线视频| 俺去俺来也www色官网黑人| 五月天综合网| 67194国产| 最新人妻| 亚洲专区在线播放| 99久久久国产精品免费蜜臀| 久热国产精品| 福利所导航| 91美女网站| 天天干天天日天天干天天日 | 国产专区在线| 亚洲午夜久久久| 国产精品1| 国产做爰XXXⅩ久久久骚妇| 91AV电影| 五月天丁香网| 女人高潮天天躁夜夜躁| 婷婷色网站| 最新中文字幕AV| 日本女人操逼视频| 五月婷视频| 色高清无码免费视频| 欧美夜夜骑| 国产香蕉在线播放| 99黄色视频| 资源av| 精品无码一区二区| 成人色色网站| 91麻豆国产视频| 中文字幕日韩欧美| 日韩精品一区在线| 国产一级操逼| 99在线视频精品| 热九九热| 安徽妇搡BBBB搡BBBB小说| 国产乱码精品一品二品| 日逼网址| 国产在线视频一区二区三区| 亚洲三级无码在线观看| 人妻视频网站| 国产成人精品av在线观看| 天天射夜夜骑| 九色PORNY国产成人| 国产免费内射| 九九操比| 五月婷婷深深爱| 好吊妞视频在线| 啪视频网站国产馆| 中文不卡视频| 婷婷五月天性爱| 亚洲国产视频在线观看| 五月天福利视频| 青春草在线免费视频| 亚洲美女视频在线| 操逼网页| 久草精品视频| 中文字幕AV在线免费观看| 国产黄色电影| 免费看v片| 成人在线中文| 91精品国产闺蜜国产在线闺蜜| 国产精品美女毛片真酒店| 欧美成人精品网站| 性欧美丰满熟妇XXXX性久久久| 亚洲图片欧美色图| 3d动漫精品H区XXXXX区| 无码高清视频在线观看| 国产中文字幕在线观看| 手机看片1204| 97国产精品久久| 国产ts视频| 91AV在线播放| 日韩AV综合| 午夜无码三级| 日韩无码波多野结衣| 乱伦自拍| 嫩草在线视频| 手机看片1024国产| 丁香五月成人网| 老鸭窝av免费入口在线观看| 狠狠干在线视频| 超小超嫩国产合集六部| 欧美日韩性色无码免费| 国产免费精彩视频| 国产成人一区二区三区| 国产精品成人在线| 免费看的操逼视频| 亚洲三级片免费观看| 国内毛片毛片毛片毛片毛片毛片毛片毛片毛片毛片毛片毛片 | 国内精品内射| 熟女人妻人妻の视频| 正在播放吴梦梦淫行| 日皮免费视频| 国产一级操逼视频| 免费黄色视频网站大全| 国产在线你懂得| 大香蕉最新国产2025| 大鸡巴在线观看| 操婊网| 黄色无遮挡| 国产高潮又爽又无遮挡又免费| 按摩性高湖婬AAA片A片中国| 亚洲成人av| 手机免费AV| 日韩日屄视频| 中文字幕亚洲欧美| 日韩欧美一区在线| 亚洲四房播| 国产精品色婷婷99久久精品| 51福利视频| 国产一二三视频| 欧美精品18| 国产Av高清| 日韩AV一区二区三区四区| 撸一撸av| 国产黄色电影| 欧美五月婷婷| 影音先锋成人资源网| 日韩精品免费无码视频| 97超碰碰碰| 九九re| 北条麻妃精品青青久久价格| 天天做| 免费a级毛片| 欧美日韩三区| 国产精品99久久久久久成人| 丰满人妻一区二区三区视频54| 日本亚洲国产| 日欧一级片| 夜色321| 老妇性BBWBBWBBWBBW | 自慰一区| 一区无码免费| 女侠吕四娘第二部| 99re在线观看| 精品香蕉视频| 色哟哟一区二区三区| 天天色天天日天天干| 色天使av| 中文无码熟妇人妻| 丰满人妻一区二区三区四区54| 国产成人777777精品综合|