国产秋霞理论久久久电影-婷婷色九月综合激情丁香-欧美在线观看乱妇视频-精品国avA久久久久久久-国产乱码精品一区二区三区亚洲人-欧美熟妇一区二区三区蜜桃视频

Python 多線程、多進(jìn)程最全整理

共 25480字,需瀏覽 51分鐘

 ·

2022-03-10 15:57

在下方公眾號(hào)后臺(tái)回復(fù):JGNB,可獲取杰哥原創(chuàng)的 PDF 手冊(cè)。

作者:錢魏Way

來源:https://www.biaodianfu.com/python-multi-thread-and-multi-process.html

大家好,我是杰哥。

在學(xué)習(xí)Python的過程中,有接觸到多線程編程相關(guān)的知識(shí)點(diǎn),先前一直都沒有徹底的搞明白。今天準(zhǔn)備花一些時(shí)間,把里面的細(xì)節(jié)盡可能的梳理清楚。

線程與進(jìn)程的區(qū)別

進(jìn)程(process)和線程(thread)是操作系統(tǒng)的基本概念,但是它們比較抽象,不容易掌握。關(guān)于多進(jìn)程和多線程,教科書上最經(jīng)典的一句話是“進(jìn)程是資源分配的最小單位,線程是CPU調(diào)度的最小單位”。線程是程序中一個(gè)單一的順序控制流程。進(jìn)程內(nèi)一個(gè)相對(duì)獨(dú)立的、可調(diào)度的執(zhí)行單元,是系統(tǒng)獨(dú)立調(diào)度和分派CPU的基本單位指運(yùn)行中的程序的調(diào)度單位。在單個(gè)程序中同時(shí)運(yùn)行多個(gè)線程完成不同的工作,稱為多線程。

進(jìn)程和線程區(qū)別

進(jìn)程是資源分配的基本單位。所有與該進(jìn)程有關(guān)的資源,都被記錄在進(jìn)程控制塊PCB中。以表示該進(jìn)程擁有這些資源或正在使用它們。另外,進(jìn)程也是搶占處理機(jī)的調(diào)度單位,它擁有一個(gè)完整的虛擬地址空間。當(dāng)進(jìn)程發(fā)生調(diào)度時(shí),不同的進(jìn)程擁有不同的虛擬地址空間,而同一進(jìn)程內(nèi)的不同線程共享同一地址空間。

與進(jìn)程相對(duì)應(yīng),線程與資源分配無關(guān),它屬于某一個(gè)進(jìn)程,并與進(jìn)程內(nèi)的其他線程一起共享進(jìn)程的資源。線程只由相關(guān)堆棧(系統(tǒng)棧或用戶棧)寄存器和線程控制表TCB組成。寄存器可被用來存儲(chǔ)線程內(nèi)的局部變量,但不能存儲(chǔ)其他線程的相關(guān)變量。

通常在一個(gè)進(jìn)程中可以包含若干個(gè)線程,它們可以利用進(jìn)程所擁有的資源。在引入線程的操作系統(tǒng)中,通常都是把進(jìn)程作為分配資源的基本單位,而把線程作為獨(dú)立運(yùn)行和獨(dú)立調(diào)度的基本單位。

由于線程比進(jìn)程更小,基本上不擁有系統(tǒng)資源,故對(duì)它的調(diào)度所付出的開銷就會(huì)小得多,能更高效的提高系統(tǒng)內(nèi)多個(gè)程序間并發(fā)執(zhí)行的程度,從而顯著提高系統(tǒng)資源的利用率和吞吐量。

因而近年來推出的通用操作系統(tǒng)都引入了線程,以便進(jìn)一步提高系統(tǒng)的并發(fā)性,并把它視為現(xiàn)代操作系統(tǒng)的一個(gè)重要指標(biāo)。

線程與進(jìn)程的區(qū)別可以歸納為以下4點(diǎn):

  • 地址空間和其它資源(如打開文件):進(jìn)程間相互獨(dú)立,同一進(jìn)程的各線程間共享。某進(jìn)程內(nèi)的線程在其它進(jìn)程不可見。

  • 通信:進(jìn)程間通信IPC,線程間可以直接讀寫進(jìn)程數(shù)據(jù)段(如全局變量)來進(jìn)行通信——需要進(jìn)程同步和互斥手段的輔助,以保證數(shù)據(jù)的一致性。

  • 調(diào)度和切換:線程上下文切換比進(jìn)程上下文切換要快得多。

  • 在多線程OS中,進(jìn)程不是一個(gè)可執(zhí)行的實(shí)體。

多進(jìn)程和多線程的比較

對(duì)比維度多進(jìn)程多線程總結(jié)
數(shù)據(jù)共享、同步數(shù)據(jù)共享復(fù)雜,同步簡單數(shù)據(jù)共享簡單,同步復(fù)雜各有優(yōu)劣
內(nèi)存、CPU占用內(nèi)存多,切換復(fù)雜,CPU利用率低占用內(nèi)存少,切換簡單,CPU利用率高線程占優(yōu)
創(chuàng)建、銷毀、切換復(fù)雜,速度慢簡單,速度快線程占優(yōu)
編程、調(diào)試編程簡單,調(diào)試簡單編程復(fù)雜,調(diào)試復(fù)雜進(jìn)程占優(yōu)
可靠性進(jìn)程間不會(huì)互相影響一個(gè)線程掛掉將導(dǎo)致整個(gè)進(jìn)程掛掉進(jìn)程占優(yōu)
分布式適用于多核、多機(jī),擴(kuò)展到多臺(tái)機(jī)器簡單適合于多核進(jìn)程占優(yōu)

總結(jié),進(jìn)程和線程還可以類比為火車和車廂:

  • 線程在進(jìn)程下行進(jìn)(單純的車廂無法運(yùn)行)

  • 一個(gè)進(jìn)程可以包含多個(gè)線程(一輛火車可以有多個(gè)車廂)

  • 不同進(jìn)程間數(shù)據(jù)很難共享(一輛火車上的乘客很難換到另外一輛火車,比如站點(diǎn)換乘)

  • 同一進(jìn)程下不同線程間數(shù)據(jù)很易共享(A車廂換到B車廂很容易)

  • 進(jìn)程要比線程消耗更多的計(jì)算機(jī)資源(采用多列火車相比多個(gè)車廂更耗資源)

  • 進(jìn)程間不會(huì)相互影響,一個(gè)線程掛掉將導(dǎo)致整個(gè)進(jìn)程掛掉(一列火車不會(huì)影響到另外一列火車,但是如果一列火車上中間的一節(jié)車廂著火了,將影響到該趟火車的所有車廂)

  • 進(jìn)程可以拓展到多機(jī),進(jìn)程最多適合多核(不同火車可以開在多個(gè)軌道上,同一火車的車廂不能在行進(jìn)的不同的軌道上)

  • 進(jìn)程使用的內(nèi)存地址可以上鎖,即一個(gè)線程使用某些共享內(nèi)存時(shí),其他線程必須等它結(jié)束,才能使用這一塊內(nèi)存。(比如火車上的洗手間)-”互斥鎖(mutex)”

  • 進(jìn)程使用的內(nèi)存地址可以限定使用量(比如火車上的餐廳,最多只允許多少人進(jìn)入,如果滿了需要在門口等,等有人出來了才能進(jìn)去)-“信號(hào)量(semaphore)”

Python全局解釋器鎖GIL

全局解釋器鎖(英語:Global Interpreter Lock,縮寫GIL),并不是Python的特性,它是在實(shí)現(xiàn)Python解析器(CPython)時(shí)所引入的一個(gè)概念。由于CPython是大部分環(huán)境下默認(rèn)的Python執(zhí)行環(huán)境。所以在很多人的概念里CPython就是Python,也就想當(dāng)然的把GIL歸結(jié)為Python語言的缺陷。那么CPython實(shí)現(xiàn)中的GIL又是什么呢?來看看官方的解釋:

The mechanism used by the CPython interpreter to assure that only one thread executes Python bytecode at a time. This simplifies the CPython implementation by making the object model (including critical built-in types such as dict) implicitly safe against concurrent access. Locking the entire interpreter makes it easier for the interpreter to be multi-threaded, at the expense of much of the parallelism afforded by multi-processor machines.

Python代碼的執(zhí)行由Python 虛擬機(jī)(也叫解釋器主循環(huán),CPython版本)來控制,Python 在設(shè)計(jì)之初就考慮到要在解釋器的主循環(huán)中,同時(shí)只有一個(gè)線程在執(zhí)行,即在任意時(shí)刻,只有一個(gè)線程在解釋器中運(yùn)行。對(duì)Python 虛擬機(jī)的訪問由全局解釋器鎖(GIL)來控制,正是這個(gè)鎖能保證同一時(shí)刻只有一個(gè)線程在運(yùn)行。

GIL 有什么好處?簡單來說,它在單線程的情況更快,并且在和 C 庫結(jié)合時(shí)更方便,而且不用考慮線程安全問題,這也是早期 Python 最常見的應(yīng)用場景和優(yōu)勢。另外,GIL的設(shè)計(jì)簡化了CPython的實(shí)現(xiàn),使得對(duì)象模型,包括關(guān)鍵的內(nèi)建類型如字典,都是隱含可以并發(fā)訪問的。鎖住全局解釋器使得比較容易的實(shí)現(xiàn)對(duì)多線程的支持,但也損失了多處理器主機(jī)的并行計(jì)算能力。

在多線程環(huán)境中,Python 虛擬機(jī)按以下方式執(zhí)行:

1.設(shè)置GIL
2.切換到一個(gè)線程去運(yùn)行
3.運(yùn)行直至指定數(shù)量的字節(jié)碼指令,或者線程主動(dòng)讓出控制(可以調(diào)用sleep(0))
4.把線程設(shè)置為睡眠狀態(tài)
5.解鎖GIL
6.再次重復(fù)以上所有步驟

Python3.2前,GIL的釋放邏輯是當(dāng)前線程遇見IO操作或者ticks計(jì)數(shù)達(dá)到100(ticks可以看作是python自身的一個(gè)計(jì)數(shù)器,專門做用于GIL,每次釋放后歸零,這個(gè)計(jì)數(shù)可以通過 sys.setcheckinterval 來調(diào)整),進(jìn)行釋放。因?yàn)橛?jì)算密集型線程在釋放GIL之后又會(huì)立即去申請(qǐng)GIL,并且通常在其它線程還沒有調(diào)度完之前它就已經(jīng)重新獲取到了GIL,就會(huì)導(dǎo)致一旦計(jì)算密集型線程獲得了GIL,那么它在很長一段時(shí)間內(nèi)都將占據(jù)GIL,甚至一直到該線程執(zhí)行結(jié)束。

Python 3.2開始使用新的GIL。新的GIL實(shí)現(xiàn)中用一個(gè)固定的超時(shí)時(shí)間來指示當(dāng)前的線程放棄全局鎖。在當(dāng)前線程保持這個(gè)鎖,且其他線程請(qǐng)求這個(gè)鎖時(shí),當(dāng)前線程就會(huì)在5毫秒后被強(qiáng)制釋放該鎖。該改進(jìn)在單核的情況下,對(duì)于單個(gè)線程長期占用GIL的情況有所好轉(zhuǎn)。

在單核CPU上,數(shù)百次的間隔檢查才會(huì)導(dǎo)致一次線程切換。在多核CPU上,存在嚴(yán)重的線程顛簸(thrashing)。而每次釋放GIL鎖,線程進(jìn)行鎖競爭、切換線程,會(huì)消耗資源。單核下多線程,每次釋放GIL,喚醒的那個(gè)線程都能獲取到GIL鎖,所以能夠無縫執(zhí)行,但多核下,CPU0釋放GIL后,其他CPU上的線程都會(huì)進(jìn)行競爭,但GIL可能會(huì)馬上又被CPU0拿到,導(dǎo)致其他幾個(gè)CPU上被喚醒后的線程會(huì)醒著等待到切換時(shí)間后又進(jìn)入待調(diào)度狀態(tài),這樣會(huì)造成線程顛簸(thrashing),導(dǎo)致效率更低。

另外,從上面的實(shí)現(xiàn)機(jī)制可以推導(dǎo)出,Python的多線程對(duì)IO密集型代碼要比CPU密集型代碼更加友好。

針對(duì)GIL的應(yīng)對(duì)措施:

  • 使用更高版本Python(對(duì)GIL機(jī)制進(jìn)行了優(yōu)化)

  • 使用多進(jìn)程替換多線程(多進(jìn)程之間沒有GIL,但是進(jìn)程本身的資源消耗較多)

  • 指定cpu運(yùn)行線程(使用affinity模塊)

  • 使用Jython、IronPython等無GIL解釋器

  • 全I(xiàn)O密集型任務(wù)時(shí)才使用多線程

  • 使用協(xié)程(高效的單線程模式,也稱微線程;通常與多進(jìn)程配合使用)

  • 將關(guān)鍵組件用C/C++編寫為Python擴(kuò)展,通過ctypes使Python程序直接調(diào)用C語言編譯的動(dòng)態(tài)鏈接庫的導(dǎo)出函數(shù)。(with nogil調(diào)出GIL限制)

Python的多進(jìn)程包multiprocessing

Python的threading包主要運(yùn)用多線程的開發(fā),但由于GIL的存在,Python中的多線程其實(shí)并不是真正的多線程,如果想要充分地使用多核CPU的資源,大部分情況需要使用多進(jìn)程。在Python 2.6版本的時(shí)候引入了multiprocessing包,它完整的復(fù)制了一套threading所提供的接口方便遷移。唯一的不同就是它使用了多進(jìn)程而不是多線程。每個(gè)進(jìn)程有自己的獨(dú)立的GIL,因此也不會(huì)出現(xiàn)進(jìn)程之間的GIL爭搶。

借助這個(gè)multiprocessing,你可以輕松完成從單進(jìn)程到并發(fā)執(zhí)行的轉(zhuǎn)換。multiprocessing支持子進(jìn)程、通信和共享數(shù)據(jù)、執(zhí)行不同形式的同步,提供了Process、Queue、Pipe、Lock等組件。

Multiprocessing產(chǎn)生的背景

除了應(yīng)對(duì)Python的GIL以外,產(chǎn)生multiprocessing的另外一個(gè)原因時(shí)Windows操作系統(tǒng)與Linux/Unix系統(tǒng)的不一致。

Unix/Linux操作系統(tǒng)提供了一個(gè)fork()系統(tǒng)調(diào)用,它非常特殊。普通的函數(shù),調(diào)用一次,返回一次,但是fork()調(diào)用一次,返回兩次,因?yàn)椴僮飨到y(tǒng)自動(dòng)把當(dāng)前進(jìn)程(父進(jìn)程)復(fù)制了一份(子進(jìn)程),然后,分別在父進(jìn)程和子進(jìn)程內(nèi)返回。子進(jìn)程永遠(yuǎn)返回0,而父進(jìn)程返回子進(jìn)程的ID。這樣做的理由是,一個(gè)父進(jìn)程可以fork出很多子進(jìn)程,所以,父進(jìn)程要記下每個(gè)子進(jìn)程的ID,而子進(jìn)程只需要調(diào)用getpid()就可以拿到父進(jìn)程的ID。

Python的os模塊封裝了常見的系統(tǒng)調(diào)用,其中就包括fork,可以在Python程序中輕松創(chuàng)建子進(jìn)程:

import?os

print('Process?(%s)?start...'?%?os.getpid())

\#?Only?works?on?Unix/Linux/Mac:

pid?=?os.fork()

if?pid?==?0:

????print('I?am?child?process?(%s)?and?my?parent?is?%s.'?%?(os.getpid(),?os.getppid()))

else:

????print('I?(%s)?just?created?a?child?process?(%s).'?%?(os.getpid(),?pid))

上述代碼在Linux、Unix和Mac上的執(zhí)行結(jié)果為:

Process?(876)?start...

I?(876)?just?created?a?child?process?(877).

I?am?child?process?(877)?and?my?parent?is?876.

有了fork調(diào)用,一個(gè)進(jìn)程在接到新任務(wù)時(shí)就可以復(fù)制出一個(gè)子進(jìn)程來處理新任務(wù),常見的Apache服務(wù)器就是由父進(jìn)程監(jiān)聽端口,每當(dāng)有新的http請(qǐng)求時(shí),就fork出子進(jìn)程來處理新的http請(qǐng)求。

由于Windows沒有fork調(diào)用,上面的代碼在Windows上無法運(yùn)行。由于Python是跨平臺(tái)的,自然也應(yīng)該提供一個(gè)跨平臺(tái)的多進(jìn)程支持。multiprocessing模塊就是跨平臺(tái)版本的多進(jìn)程模塊。multiprocessing模塊封裝了fork()調(diào)用,使我們不需要關(guān)注fork()的細(xì)節(jié)。由于Windows沒有fork調(diào)用,因此,multiprocessing需要“模擬”出fork的效果。

multiprocessing常用組件及功能

創(chuàng)建管理進(jìn)程模塊:

  • Process(用于創(chuàng)建進(jìn)程)

  • Pool(用于創(chuàng)建管理進(jìn)程池)

  • Queue(用于進(jìn)程通信,資源共享)

  • Value,Array(用于進(jìn)程通信,資源共享)

  • Pipe(用于管道通信)

  • Manager(用于資源共享)

同步子進(jìn)程模塊:

  • Condition(條件變量)

  • Event(事件)

  • Lock(互斥鎖)

  • RLock(可重入的互斥鎖(同一個(gè)進(jìn)程可以多次獲得它,同時(shí)不會(huì)造成阻塞)

  • Semaphore(信號(hào)量)

接下來就一起來學(xué)習(xí)下每個(gè)組件及功能的具體使用方法。

Process(用于創(chuàng)建進(jìn)程)

multiprocessing模塊提供了一個(gè)Process類來代表一個(gè)進(jìn)程對(duì)象。

在multiprocessing中,每一個(gè)進(jìn)程都用一個(gè)Process類來表示。

構(gòu)造方法:Process([group [, target [, name [, args [, kwargs]]]]])

  • group:分組,實(shí)際上不使用,值始終為None

  • target:表示調(diào)用對(duì)象,即子進(jìn)程要執(zhí)行的任務(wù),你可以傳入方法名

  • name:為子進(jìn)程設(shè)定名稱

  • args:要傳給target函數(shù)的位置參數(shù),以元組方式進(jìn)行傳入。

  • kwargs:要傳給target函數(shù)的字典參數(shù),以字典方式進(jìn)行傳入。

實(shí)例方法:

  • start():啟動(dòng)進(jìn)程,并調(diào)用該子進(jìn)程中的p.run()

  • run():進(jìn)程啟動(dòng)時(shí)運(yùn)行的方法,正是它去調(diào)用target指定的函數(shù),我們自定義類的類中一定要實(shí)現(xiàn)該方法

  • terminate():強(qiáng)制終止進(jìn)程p,不會(huì)進(jìn)行任何清理操作,如果p創(chuàng)建了子進(jìn)程,該子進(jìn)程就成了僵尸進(jìn)程,使用該方法需要特別小心這種情況。如果p還保存了一個(gè)鎖那么也將不會(huì)被釋放,進(jìn)而導(dǎo)致死鎖

  • is_alive():返回進(jìn)程是否在運(yùn)行。如果p仍然運(yùn)行,返回True

  • join([timeout]):進(jìn)程同步,主進(jìn)程等待子進(jìn)程完成后再執(zhí)行后面的代碼。線程等待p終止(強(qiáng)調(diào):是主線程處于等的狀態(tài),而p是處于運(yùn)行的狀態(tài))。timeout是可選的超時(shí)時(shí)間(超過這個(gè)時(shí)間,父線程不再等待子線程,繼續(xù)往下執(zhí)行),需要強(qiáng)調(diào)的是,p.join只能join住start開啟的進(jìn)程,而不能join住run開啟的進(jìn)程

屬性介紹:

  • daemon:默認(rèn)值為False,如果設(shè)為True,代表p為后臺(tái)運(yùn)行的守護(hù)進(jìn)程;當(dāng)p的父進(jìn)程終止時(shí),p也隨之終止,并且設(shè)定為True后,p不能創(chuàng)建自己的新進(jìn)程;必須在p.start()之前設(shè)置

  • name:進(jìn)程的名稱

  • pid:進(jìn)程的pid

  • exitcode:進(jìn)程在運(yùn)行時(shí)為None、如果為–N,表示被信號(hào)N結(jié)束(了解即可)

  • authkey:進(jìn)程的身份驗(yàn)證鍵,默認(rèn)是由os.urandom()隨機(jī)生成的32字符的字符串。這個(gè)鍵的用途是為涉及網(wǎng)絡(luò)連接的底層進(jìn)程間通信提供安全性,這類連接只有在具有相同的身份驗(yàn)證鍵時(shí)才能成功(了解即可)

使用示例:(注意:在windows中Process()必須放到if name == ‘main’:下)

from?multiprocessing?import?Process

import?os

def?run_proc(name):

????print('Run?child?process?%s?(%s)...'?%?(name,?os.getpid()))

if?__name__=='__main__':

????print('Parent?process?%s.'?%?os.getpid())

????p?=?Process(target=run_proc,?args=('test',))

????print('Child?process?will?start.')

????p.start()

????p.join()

print('Child?process?end.')

Pool(用于創(chuàng)建管理進(jìn)程池)

Pool類用于需要執(zhí)行的目標(biāo)很多,而手動(dòng)限制進(jìn)程數(shù)量又太繁瑣時(shí),如果目標(biāo)少且不用控制進(jìn)程數(shù)量則可以用Process類。Pool可以提供指定數(shù)量的進(jìn)程,供用戶調(diào)用,當(dāng)有新的請(qǐng)求提交到Pool中時(shí),如果池還沒有滿,那么就會(huì)創(chuàng)建一個(gè)新的進(jìn)程用來執(zhí)行該請(qǐng)求;但如果池中的進(jìn)程數(shù)已經(jīng)達(dá)到規(guī)定最大值,那么該請(qǐng)求就會(huì)等待,直到池中有進(jìn)程結(jié)束,就重用進(jìn)程池中的進(jìn)程。

構(gòu)造方法:Pool([processes[, initializer[, initargs[, maxtasksperchild[, context]]]]])

  • processes :要?jiǎng)?chuàng)建的進(jìn)程數(shù),如果省略,將默認(rèn)使用cpu_count()返回的數(shù)量。

  • initializer:每個(gè)工作進(jìn)程啟動(dòng)時(shí)要執(zhí)行的可調(diào)用對(duì)象,默認(rèn)為None。如果initializer是None,那么每一個(gè)工作進(jìn)程在開始的時(shí)候會(huì)調(diào)用initializer(*initargs)。

  • initargs:是要傳給initializer的參數(shù)組。

  • maxtasksperchild:工作進(jìn)程退出之前可以完成的任務(wù)數(shù),完成后用一個(gè)新的工作進(jìn)程來替代原進(jìn)程,來讓閑置的資源被釋放。maxtasksperchild默認(rèn)是None,意味著只要Pool存在工作進(jìn)程就會(huì)一直存活。

  • context: 用在制定工作進(jìn)程啟動(dòng)時(shí)的上下文,一般使用Pool() 或者一個(gè)context對(duì)象的Pool()方法來創(chuàng)建一個(gè)池,兩種方法都適當(dāng)?shù)脑O(shè)置了context。

實(shí)例方法:

  • apply(func[, args[, kwargs]]):在一個(gè)池工作進(jìn)程中執(zhí)行func(args,*kwargs),然后返回結(jié)果。需要強(qiáng)調(diào)的是:此操作并不會(huì)在所有池工作進(jìn)程中并執(zhí)行func函數(shù)。如果要通過不同參數(shù)并發(fā)地執(zhí)行func函數(shù),必須從不同線程調(diào)用p.apply()函數(shù)或者使用p.apply_async()。它是阻塞的。apply很少使用

  • apply_async(func[, arg[, kwds={}[, callback=None]]]):在一個(gè)池工作進(jìn)程中執(zhí)行func(args,*kwargs),然后返回結(jié)果。此方法的結(jié)果是AsyncResult類的實(shí)例,callback是可調(diào)用對(duì)象,接收輸入?yún)?shù)。當(dāng)func的結(jié)果變?yōu)榭捎脮r(shí),將理解傳遞給callback。callback禁止執(zhí)行任何阻塞操作,否則將接收其他異步操作中的結(jié)果。它是非阻塞。

  • map(func, iterable[, chunksize=None]):Pool類中的map方法,與內(nèi)置的map函數(shù)用法行為基本一致,它會(huì)使進(jìn)程阻塞直到返回結(jié)果。注意,雖然第二個(gè)參數(shù)是一個(gè)迭代器,但在實(shí)際使用中,必須在整個(gè)隊(duì)列都就緒后,程序才會(huì)運(yùn)行子進(jìn)程。

  • map_async(func, iterable[, chunksize=None]):map_async與map的關(guān)系同apply與apply_async

  • imap():imap 與 map的區(qū)別是,map是當(dāng)所有的進(jìn)程都已經(jīng)執(zhí)行完了,并將結(jié)果返回了,imap()則是立即返回一個(gè)iterable可迭代對(duì)象。

  • imap_unordered():不保證返回的結(jié)果順序與進(jìn)程添加的順序一致。

  • close():關(guān)閉進(jìn)程池,防止進(jìn)一步操作。如果所有操作持續(xù)掛起,它們將在工作進(jìn)程終止前完成。

  • join():等待所有工作進(jìn)程退出。此方法只能在close()或teminate()之后調(diào)用,讓其不再接受新的Process。

  • terminate():結(jié)束工作進(jìn)程,不再處理未處理的任務(wù)。

方法apply_async()和map_async()的返回值是AsyncResul的實(shí)例obj。實(shí)例具有以下方法:

  • get():返回結(jié)果,如果有必要?jiǎng)t等待結(jié)果到達(dá)。timeout是可選的。如果在指定時(shí)間內(nèi)還沒有到達(dá),將引發(fā)異常。如果遠(yuǎn)程操作中引發(fā)了異常,它將在調(diào)用此方法時(shí)再次被引發(fā)。

  • ready():如果調(diào)用完成,返回True

  • successful():如果調(diào)用完成且沒有引發(fā)異常,返回True,如果在結(jié)果就緒之前調(diào)用此方法,引發(fā)異常

  • wait([timeout]):等待結(jié)果變?yōu)榭捎谩?/span>

  • terminate():立即終止所有工作進(jìn)程,同時(shí)不執(zhí)行任何清理或結(jié)束任何掛起工作。如果p被垃圾回收,將自動(dòng)調(diào)用此函數(shù)

使用示例:

\#?-*-?coding:utf-8?-*-

\#?Pool+map

from?multiprocessing?import?Pool

def?test(i):

????print(i)

if?__name__?==?"__main__":

????lists?=?range(100)

????pool?=?Pool(8)

????pool.map(test,?lists)

????pool.close()

????pool.join()

\#?-*-?coding:utf-8?-*-

\#?異步進(jìn)程池(非阻塞)

from?multiprocessing?import?Pool

def?test(i):

????print(i)

if?__name__?==?"__main__":

????pool?=?Pool(8)

????for?i?in?range(100):

????????'''

??????? For循環(huán)中執(zhí)行步驟:

????????(1)循環(huán)遍歷,將100個(gè)子進(jìn)程添加到進(jìn)程池(相對(duì)父進(jìn)程會(huì)阻塞)

????????(2)每次執(zhí)行8個(gè)子進(jìn)程,等一個(gè)子進(jìn)程執(zhí)行完后,立馬啟動(dòng)新的子進(jìn)程。(相對(duì)父進(jìn)程不阻塞)

??????? apply_async為異步進(jìn)程池寫法。異步指的是啟動(dòng)子進(jìn)程的過程,與父進(jìn)程本身的執(zhí)行(print)是異步的,而For循環(huán)中往進(jìn)程池添加子進(jìn)程的過程,與父進(jìn)程本身的執(zhí)行卻是同步的。

????????'''


????????pool.apply_async(test,?args=(i,))??#?維持執(zhí)行的進(jìn)程總數(shù)為8,當(dāng)一個(gè)進(jìn)程執(zhí)行完后啟動(dòng)一個(gè)新進(jìn)程.

????print("test")

????pool.close()

????pool.join()

\#?-*-?coding:utf-8?-*-

\#?異步進(jìn)程池(非阻塞)

from?multiprocessing?import?Pool

def?test(i):

????print(i)

if?__name__?==?"__main__":

????pool?=?Pool(8)

????for?i?in?range(100):

????????'''

????????????實(shí)際測試發(fā)現(xiàn),for循環(huán)內(nèi)部執(zhí)行步驟:

????????????(1)遍歷100個(gè)可迭代對(duì)象,往進(jìn)程池放一個(gè)子進(jìn)程

????????????(2)執(zhí)行這個(gè)子進(jìn)程,等子進(jìn)程執(zhí)行完畢,再往進(jìn)程池放一個(gè)子進(jìn)程,再執(zhí)行。(同時(shí)只執(zhí)行一個(gè)子進(jìn)程)

??????????? for循環(huán)執(zhí)行完畢,再執(zhí)行print函數(shù)。

????????'''


????????pool.apply(test,?args=(i,))??#?維持執(zhí)行的進(jìn)程總數(shù)為8,當(dāng)一個(gè)進(jìn)程執(zhí)行完后啟動(dòng)一個(gè)新進(jìn)程.

????print("test")

????pool.close()

????pool.join()

Queue(用于進(jìn)程通信,資源共享)

在使用多進(jìn)程的過程中,最好不要使用共享資源。普通的全局變量是不能被子進(jìn)程所共享的,只有通過Multiprocessing組件構(gòu)造的數(shù)據(jù)結(jié)構(gòu)可以被共享。

Queue是用來創(chuàng)建進(jìn)程間資源共享的隊(duì)列的類,使用Queue可以達(dá)到多進(jìn)程間數(shù)據(jù)傳遞的功能(缺點(diǎn):只適用Process類,不能在Pool進(jìn)程池中使用)。

構(gòu)造方法:Queue([maxsize])

  • maxsize是隊(duì)列中允許最大項(xiàng)數(shù),省略則無大小限制。

實(shí)例方法:

  • put():用以插入數(shù)據(jù)到隊(duì)列。put方法還有兩個(gè)可選參數(shù):blocked和timeout。如果blocked為True(默認(rèn)值),并且timeout為正值,該方法會(huì)阻塞timeout指定的時(shí)間,直到該隊(duì)列有剩余的空間。如果超時(shí),會(huì)拋出Queue.Full異常。如果blocked為False,但該Queue已滿,會(huì)立即拋出Queue.Full異常。

  • get():可以從隊(duì)列讀取并且刪除一個(gè)元素。get方法有兩個(gè)可選參數(shù):blocked和timeout。如果blocked為True(默認(rèn)值),并且timeout為正值,那么在等待時(shí)間內(nèi)沒有取到任何元素,會(huì)拋出Queue.Empty異常。如果blocked為False,有兩種情況存在,如果Queue有一個(gè)值可用,則立即返回該值,否則,如果隊(duì)列為空,則立即拋出Queue.Empty異常。若不希望在empty的時(shí)候拋出異常,令blocked為True或者參數(shù)全部置空即可。

  • get_nowait():同q.get(False)

  • put_nowait():同q.put(False)

  • empty():調(diào)用此方法時(shí)q為空則返回True,該結(jié)果不可靠,比如在返回True的過程中,如果隊(duì)列中又加入了項(xiàng)目。

  • full():調(diào)用此方法時(shí)q已滿則返回True,該結(jié)果不可靠,比如在返回True的過程中,如果隊(duì)列中的項(xiàng)目被取走。

  • qsize():返回隊(duì)列中目前項(xiàng)目的正確數(shù)量,結(jié)果也不可靠,理由同q.empty()和q.full()一樣

使用示例:

from?multiprocessing?import?Process,?Queue

import?os,?time,?random

def?write(q):

????print('Process?to?write:?%s'?%?os.getpid())

????for?value?in?['A',?'B',?'C']:

????????print('Put?%s?to?queue...'?%?value)

????????q.put(value)

????????time.sleep(random.random())

def?read(q):

????print('Process?to?read:?%s'?%?os.getpid())

????while?True:

????????value?=?q.get(True)

????????print('Get?%s?from?queue.'?%?value)

if?__name__?==?"__main__":

????q?=?Queue()

????pw?=?Process(target=write,?args=(q,))

????pr?=?Process(target=read,?args=(q,))

????pw.start()

????pr.start()

????pw.join()??#?等待pw結(jié)束

????pr.terminate()??#?pr進(jìn)程里是死循環(huán),無法等待其結(jié)束,只能強(qiáng)行終止

JoinableQueue就像是一個(gè)Queue對(duì)象,但隊(duì)列允許項(xiàng)目的使用者通知生成者項(xiàng)目已經(jīng)被成功處理。通知進(jìn)程是使用共享的信號(hào)和條件變量來實(shí)現(xiàn)的。

構(gòu)造方法:JoinableQueue([maxsize])

  • maxsize:隊(duì)列中允許最大項(xiàng)數(shù),省略則無大小限制。

實(shí)例方法

JoinableQueue的實(shí)例p除了與Queue對(duì)象相同的方法之外還具有:

  • task_done():使用者使用此方法發(fā)出信號(hào),表示q.get()的返回項(xiàng)目已經(jīng)被處理。如果調(diào)用此方法的次數(shù)大于從隊(duì)列中刪除項(xiàng)目的數(shù)量,將引發(fā)ValueError異常

  • join():生產(chǎn)者調(diào)用此方法進(jìn)行阻塞,直到隊(duì)列中所有的項(xiàng)目均被處理。阻塞將持續(xù)到隊(duì)列中的每個(gè)項(xiàng)目均調(diào)用q.task_done()方法為止

使用示例:

\#?-*-?coding:utf-8?-*-

from?multiprocessing?import?Process,?JoinableQueue

import?time,?random

def?consumer(q):

????while?True:

????????res?=?q.get()

????????print('消費(fèi)者拿到了?%s'?%?res)

????????q.task_done()

def?producer(seq,?q):

????for?item?in?seq:

????????time.sleep(random.randrange(1,2))

????????q.put(item)

????????print('生產(chǎn)者做好了?%s'?%?item)

????q.join()

if?__name__?==?"__main__":

????q?=?JoinableQueue()

????seq?=?('產(chǎn)品%s'?%?i?for?i?in?range(5))

????p?=?Process(target=consumer,?args=(q,))

????p.daemon?=?True??#?設(shè)置為守護(hù)進(jìn)程,在主線程停止時(shí)p也停止,但是不用擔(dān)心,producer內(nèi)調(diào)用q.join保證了consumer已經(jīng)處理完隊(duì)列中的所有元素

????p.start()

????producer(seq,?q)

????print('主線程')

Value,Array(用于進(jìn)程通信,資源共享)

multiprocessing 中Value和Array的實(shí)現(xiàn)原理都是在共享內(nèi)存中創(chuàng)建ctypes()對(duì)象來達(dá)到共享數(shù)據(jù)的目的,兩者實(shí)現(xiàn)方法大同小異,只是選用不同的ctypes數(shù)據(jù)類型而已。

Value

構(gòu)造方法:Value((typecode_or_type, args[, lock])

  • typecode_or_type:定義ctypes()對(duì)象的類型,可以傳Type code或 C Type,具體對(duì)照表見下文。

  • args:傳遞給typecode_or_type構(gòu)造函數(shù)的參數(shù)

  • lock:默認(rèn)為True,創(chuàng)建一個(gè)互斥鎖來限制對(duì)Value對(duì)象的訪問,如果傳入一個(gè)鎖,如Lock或RLock的實(shí)例,將用于同步。如果傳入False,Value的實(shí)例就不會(huì)被鎖保護(hù),它將不是進(jìn)程安全的。

typecode_or_type支持的類型:

|?Type?code?|?C?Type?????????????|?Python?Type???????|?Minimum?size?in?bytes?|

|?---------?|?------------------?|?-----------------?|?---------------------?|

|?`'b'`?????|?signed?char????????|?int???????????????|?1?????????????????????|

|?`'B'`?????|?unsigned?char??????|?int???????????????|?1?????????????????????|

|?`'u'`?????|?Py_UNICODE?????????|?Unicode?character?|?2?????????????????????|

|?`'h'`?????|?signed?short???????|?int???????????????|?2?????????????????????|

|?`'H'`?????|?unsigned?short?????|?int???????????????|?2?????????????????????|

|?`'i'`?????|?signed?int?????????|?int???????????????|?2?????????????????????|

|?`'I'`?????|?unsigned?int???????|?int???????????????|?2?????????????????????|

|?`'l'`?????|?signed?long????????|?int???????????????|?4?????????????????????|

|?`'L'`?????|?unsigned?long??????|?int???????????????|?4?????????????????????|

|?`'q'`?????|?signed?long?long???|?int???????????????|?8?????????????????????|

|?`'Q'`?????|?unsigned?long?long?|?int???????????????|?8?????????????????????|

|?`'f'`?????|?float??????????????|?float?????????????|?4?????????????????????|

|?`'d'`?????|?double?????????????|?float?????????????|?8?????????????????????|

參考地址:https://docs.python.org/3/library/array.html

Array

構(gòu)造方法:Array(typecode_or_type, size_or_initializer, **kwds[, lock])

  • typecode_or_type:同上

  • size_or_initializer:如果它是一個(gè)整數(shù),那么它確定數(shù)組的長度,并且數(shù)組將被初始化為零。否則,size_or_initializer是用于初始化數(shù)組的序列,其長度決定數(shù)組的長度。

  • kwds:傳遞給typecode_or_type構(gòu)造函數(shù)的參數(shù)

  • lock:同上

使用示例:

import?multiprocessing

def?f(n,?a):

????n.value?=?3.14

????a[0]?=?5

if?__name__?==?'__main__':

????num?=?multiprocessing.Value('d',?0.0)

????arr?=?multiprocessing.Array('i',?range(10))

????p?=?multiprocessing.Process(target=f,?args=(num,?arr))

????p.start()

????p.join()

????print(num.value)

????print(arr[:])

注意:Value和Array只適用于Process類。

Pipe(用于管道通信)

多進(jìn)程還有一種數(shù)據(jù)傳遞方式叫管道原理和 Queue相同。Pipe可以在進(jìn)程之間創(chuàng)建一條管道,并返回元組(conn1,conn2),其中conn1,conn2表示管道兩端的連接對(duì)象,強(qiáng)調(diào)一點(diǎn):必須在產(chǎn)生Process對(duì)象之前產(chǎn)生管道。

構(gòu)造方法:Pipe([duplex])

  • dumplex:默認(rèn)管道是全雙工的,如果將duplex射成False,conn1只能用于接收,conn2只能用于發(fā)送。

實(shí)例方法:

  • send(obj):通過連接發(fā)送對(duì)象。obj是與序列化兼容的任意對(duì)象

  • recv():接收conn2.send(obj)發(fā)送的對(duì)象。如果沒有消息可接收,recv方法會(huì)一直阻塞。如果連接的另外一端已經(jīng)關(guān)閉,那么recv方法會(huì)拋出EOFError。

  • close():關(guān)閉連接。如果conn1被垃圾回收,將自動(dòng)調(diào)用此方法

  • fileno():返回連接使用的整數(shù)文件描述符

  • poll([timeout]):如果連接上的數(shù)據(jù)可用,返回True。timeout指定等待的最長時(shí)限。如果省略此參數(shù),方法將立即返回結(jié)果。如果將timeout射成None,操作將無限期地等待數(shù)據(jù)到達(dá)。

  • recv_bytes([maxlength]):接收c.send_bytes()方法發(fā)送的一條完整的字節(jié)消息。maxlength指定要接收的最大字節(jié)數(shù)。如果進(jìn)入的消息,超過了這個(gè)最大值,將引發(fā)IOError異常,并且在連接上無法進(jìn)行進(jìn)一步讀取。如果連接的另外一端已經(jīng)關(guān)閉,再也不存在任何數(shù)據(jù),將引發(fā)EOFError異常。

  • send_bytes(buffer [, offset [, size]]):通過連接發(fā)送字節(jié)數(shù)據(jù)緩沖區(qū),buffer是支持緩沖區(qū)接口的任意對(duì)象,offset是緩沖區(qū)中的字節(jié)偏移量,而size是要發(fā)送字節(jié)數(shù)。結(jié)果數(shù)據(jù)以單條消息的形式發(fā)出,然后調(diào)用c.recv_bytes()函數(shù)進(jìn)行接收

  • recv_bytes_into(buffer [, offset]):接收一條完整的字節(jié)消息,并把它保存在buffer對(duì)象中,該對(duì)象支持可寫入的緩沖區(qū)接口(即bytearray對(duì)象或類似的對(duì)象)。offset指定緩沖區(qū)中放置消息處的字節(jié)位移。返回值是收到的字節(jié)數(shù)。如果消息長度大于可用的緩沖區(qū)空間,將引發(fā)BufferTooShort異常。

使用示例:

from?multiprocessing?import?Process,?Pipe

import?time

\#?子進(jìn)程執(zhí)行方法

def?f(Subconn):

????time.sleep(1)

????Subconn.send("吃了嗎")

????print("來自父親的問候:",?Subconn.recv())

????Subconn.close()

if?__name__?==?"__main__":

????parent_conn,?child_conn?=?Pipe()??#?創(chuàng)建管道兩端

????p?=?Process(target=f,?args=(child_conn,))??#?創(chuàng)建子進(jìn)程

????p.start()

????print("來自兒子的問候:",?parent_conn.recv())

????parent_conn.send("嗯")

Manager(用于資源共享)

Manager()返回的manager對(duì)象控制了一個(gè)server進(jìn)程,此進(jìn)程包含的python對(duì)象可以被其他的進(jìn)程通過proxies來訪問。從而達(dá)到多進(jìn)程間數(shù)據(jù)通信且安全。Manager模塊常與Pool模塊一起使用。

Manager支持的類型有l(wèi)ist,dict,Namespace,Lock,RLock,Semaphore,BoundedSemaphore,Condition,Event,Queue,Value和Array。

管理器是獨(dú)立運(yùn)行的子進(jìn)程,其中存在真實(shí)的對(duì)象,并以服務(wù)器的形式運(yùn)行,其他進(jìn)程通過使用代理訪問共享對(duì)象,這些代理作為客戶端運(yùn)行。Manager()是BaseManager的子類,返回一個(gè)啟動(dòng)的SyncManager()實(shí)例,可用于創(chuàng)建共享對(duì)象并返回訪問這些共享對(duì)象的代理。

BaseManager,創(chuàng)建管理器服務(wù)器的基類

構(gòu)造方法:BaseManager([address[, authkey]])

  • address:(hostname,port),指定服務(wù)器的網(wǎng)址地址,默認(rèn)為簡單分配一個(gè)空閑的端口

  • authkey:連接到服務(wù)器的客戶端的身份驗(yàn)證,默認(rèn)為current_process().authkey的值

實(shí)例方法:

  • start([initializer[, initargs]]):啟動(dòng)一個(gè)單獨(dú)的子進(jìn)程,并在該子進(jìn)程中啟動(dòng)管理器服務(wù)器

  • get_server():獲取服務(wù)器對(duì)象

  • connect():連接管理器對(duì)象

  • shutdown():關(guān)閉管理器對(duì)象,只能在調(diào)用了start()方法之后調(diào)用

實(shí)例屬性:

  • address:只讀屬性,管理器服務(wù)器正在使用的地址

  • SyncManager,以下類型均不是進(jìn)程安全的,需要加鎖..

實(shí)例方法:
  • Array(self,*args,**kwds)
  • BoundedSemaphore(self,*args,**kwds)
  • Condition(self,*args,**kwds)
  • Event(self,*args,**kwds)
  • JoinableQueue(self,*args,**kwds)
  • Lock(self,*args,**kwds)
  • Namespace(self,*args,**kwds)
  • Pool(self,*args,**kwds)
  • Queue(self,*args,**kwds)
  • RLock(self,*args,**kwds)
  • Semaphore(self,*args,**kwds)
  • Value(self,*args,**kwds)
  • dict(self,*args,**kwds)
  • list(self,*args,**kwds)

使用示例:

import?multiprocessing

def?f(x,?arr,?l,?d,?n):

????x.value?=?3.14

????arr[0]?=?5

????l.append('Hello')

????d[1]?=?2

????n.a?=?10

if?__name__?==?'__main__':

????server?=?multiprocessing.Manager()

????x?=?server.Value('d',?0.0)

????arr?=?server.Array('i',?range(10))

????l?=?server.list()

????d?=?server.dict()

????n?=?server.Namespace()

????proc?=?multiprocessing.Process(target=f,?args=(x,?arr,?l,?d,?n))

????proc.start()

????proc.join()

????print(x.value)

????print(arr)

????print(l)

????print(d)

????print(n)

同步子進(jìn)程模塊

Lock(互斥鎖)

Lock鎖的作用是當(dāng)多個(gè)進(jìn)程需要訪問共享資源的時(shí)候,避免訪問的沖突。加鎖保證了多個(gè)進(jìn)程修改同一塊數(shù)據(jù)時(shí),同一時(shí)間只能有一個(gè)修改,即串行的修改,犧牲了速度但保證了數(shù)據(jù)安全。Lock包含兩種狀態(tài)——鎖定和非鎖定,以及兩個(gè)基本的方法。

構(gòu)造方法:Lock()

實(shí)例方法:

  • acquire([timeout]): 使線程進(jìn)入同步阻塞狀態(tài),嘗試獲得鎖定。

  • release(): 釋放鎖。使用前線程必須已獲得鎖定,否則將拋出異常。

使用示例:

from?multiprocessing?import?Process,?Lock

def?l(lock,?num):

????lock.acquire()

????print("Hello?Num:?%s"?%?(num))

????lock.release()

if?__name__?==?'__main__':

????lock?=?Lock()??#?這個(gè)一定要定義為全局

????for?num?in?range(20):

????????Process(target=l,?args=(lock,?num)).start()

RLock(可重入的互斥鎖(同一個(gè)進(jìn)程可以多次獲得它,同時(shí)不會(huì)造成阻塞)

RLock(可重入鎖)是一個(gè)可以被同一個(gè)線程請(qǐng)求多次的同步指令。RLock使用了“擁有的線程”和“遞歸等級(jí)”的概念,處于鎖定狀態(tài)時(shí),RLock被某個(gè)線程擁有。擁有RLock的線程可以再次調(diào)用acquire(),釋放鎖時(shí)需要調(diào)用release()相同次數(shù)??梢哉J(rèn)為RLock包含一個(gè)鎖定池和一個(gè)初始值為0的計(jì)數(shù)器,每次成功調(diào)用 acquire()/release(),計(jì)數(shù)器將+1/-1,為0時(shí)鎖處于未鎖定狀態(tài)。

構(gòu)造方法:RLock()

實(shí)例方法:

  • acquire([timeout]):同Lock

  • release(): 同Lock

Semaphore(信號(hào)量)

信號(hào)量是一個(gè)更高級(jí)的鎖機(jī)制。信號(hào)量內(nèi)部有一個(gè)計(jì)數(shù)器而不像鎖對(duì)象內(nèi)部有鎖標(biāo)識(shí),而且只有當(dāng)占用信號(hào)量的線程數(shù)超過信號(hào)量時(shí)線程才阻塞。這允許了多個(gè)線程可以同時(shí)訪問相同的代碼區(qū)。比如廁所有3個(gè)坑,那最多只允許3個(gè)人上廁所,后面的人只能等里面有人出來了才能再進(jìn)去,如果指定信號(hào)量為3,那么來一個(gè)人獲得一把鎖,計(jì)數(shù)加1,當(dāng)計(jì)數(shù)等于3時(shí),后面的人均需要等待。一旦釋放,就有人可以獲得一把鎖。

構(gòu)造方法:Semaphore([value])

  • value:設(shè)定信號(hào)量,默認(rèn)值為1

實(shí)例方法:

  • acquire([timeout]):同Lock

  • release(): 同Lock

使用示例:

from?multiprocessing?import?Process,?Semaphore

import?time,?random

def?go_wc(sem,?user):

????sem.acquire()

????print('%s?占到一個(gè)茅坑'?%?user)

????time.sleep(random.randint(0,?3))

????sem.release()

????print(user,?'OK')

if?__name__?==?'__main__':

????sem?=?Semaphore(2)

????p_l?=?[]

????for?i?in?range(5):

????????p?=?Process(target=go_wc,?args=(sem,?'user%s'?%?i,))

????????p.start()

????????p_l.append(p)

????for?i?in?p_l:

????????i.join()

Condition(條件變量)

可以把Condition理解為一把高級(jí)的鎖,它提供了比Lock, RLock更高級(jí)的功能,允許我們能夠控制復(fù)雜的線程同步問題。Condition在內(nèi)部維護(hù)一個(gè)鎖對(duì)象(默認(rèn)是RLock),可以在創(chuàng)建Condigtion對(duì)象的時(shí)候把瑣對(duì)象作為參數(shù)傳入。Condition也提供了acquire, release方法,其含義與鎖的acquire, release方法一致,其實(shí)它只是簡單的調(diào)用內(nèi)部鎖對(duì)象的對(duì)應(yīng)的方法而已。Condition還提供了其他的一些方法。

構(gòu)造方法:Condition([lock/rlock])

  • 可以傳遞一個(gè)Lock/RLock實(shí)例給構(gòu)造方法,否則它將自己生成一個(gè)RLock實(shí)例。

實(shí)例方法:

  • acquire([timeout]):首先進(jìn)行acquire,然后判斷一些條件。如果條件不滿足則wait

  • release():釋放 Lock

  • wait([timeout]): 調(diào)用這個(gè)方法將使線程進(jìn)入Condition的等待池等待通知,并釋放鎖。使用前線程必須已獲得鎖定,否則將拋出異常。處于wait狀態(tài)的線程接到通知后會(huì)重新判斷條件。

  • notify(): 調(diào)用這個(gè)方法將從等待池挑選一個(gè)線程并通知,收到通知的線程將自動(dòng)調(diào)用acquire()嘗試獲得鎖定(進(jìn)入鎖定池);其他線程仍然在等待池中。調(diào)用這個(gè)方法不會(huì)釋放鎖定。使用前線程必須已獲得鎖定,否則將拋出異常。

  • notifyAll(): 調(diào)用這個(gè)方法將通知等待池中所有的線程,這些線程都將進(jìn)入鎖定池嘗試獲得鎖定。調(diào)用這個(gè)方法不會(huì)釋放鎖定。使用前線程必須已獲得鎖定,否則將拋出異常。

使用示例:

import?multiprocessing

import?time

def?stage_1(cond):

????"""perform?first?stage?of?work,

????then?notify?stage_2?to?continue

????"""


????name?=?multiprocessing.current_process().name

????print('Starting',?name)

????with?cond:

????????print('{}?done?and?ready?for?stage?2'.format(name))

????????cond.notify_all()

def?stage_2(cond):

????"""wait?for?the?condition?telling?us?stage_1?is?done"""

????name?=?multiprocessing.current_process().name

????print('Starting',?name)

????with?cond:

????????cond.wait()

????????print('{}?running'.format(name))

if?__name__?==?'__main__':

????condition?=?multiprocessing.Condition()

????s1?=?multiprocessing.Process(name='s1',

?????????????????????????????????target=stage_1,

?????????????????????????????????args=(condition,))

????s2_clients?=?[

????????multiprocessing.Process(

????????????name='stage_2[{}]'.format(i),

????????????target=stage_2,

????????????args=(condition,),

????????)

????????for?i?in?range(1,?3)

????]

????for?c?in?s2_clients:

????????c.start()

????????time.sleep(1)

????s1.start()

????s1.join()

????for?c?in?s2_clients:

????????c.join()

Event(事件)

Event內(nèi)部包含了一個(gè)標(biāo)志位,初始的時(shí)候?yàn)閒alse??梢允褂胹et()來將其設(shè)置為true;或者使用clear()將其從新設(shè)置為false;可以使用is_set()來檢查標(biāo)志位的狀態(tài);另一個(gè)最重要的函數(shù)就是wait(timeout=None),用來阻塞當(dāng)前線程,直到event的內(nèi)部標(biāo)志位被設(shè)置為true或者timeout超時(shí)。如果內(nèi)部標(biāo)志位為true則wait()函數(shù)理解返回。

使用示例:

import?multiprocessing

import?time

def?wait_for_event(e):

????"""Wait?for?the?event?to?be?set?before?doing?anything"""

????print('wait_for_event:?starting')

????e.wait()

????print('wait_for_event:?e.is_set()->',?e.is_set())

def?wait_for_event_timeout(e,?t):

????"""Wait?t?seconds?and?then?timeout"""

????print('wait_for_event_timeout:?starting')

????e.wait(t)

????print('wait_for_event_timeout:?e.is_set()->',?e.is_set())

if?__name__?==?'__main__':

????e?=?multiprocessing.Event()

????w1?=?multiprocessing.Process(

????????name='block',

????????target=wait_for_event,

????????args=(e,),

????)

????w1.start()

????w2?=?multiprocessing.Process(

????????name='nonblock',

????????target=wait_for_event_timeout,

????????args=(e,?2),

????)

????w2.start()

????print('main:?waiting?before?calling?Event.set()')

????time.sleep(3)

????e.set()

????print('main:?event?is?set')

其他內(nèi)容

multiprocessing.dummy 模塊與 multiprocessing 模塊的區(qū)別:dummy 模塊是多線程,而 multiprocessing 是多進(jìn)程, api 都是通用的。所有可以很方便將代碼在多線程和多進(jìn)程之間切換。multiprocessing.dummy通常在IO場景可以嘗試使用,比如使用如下方式引入線程池。

from?multiprocessing.dummy?import?Pool?as?ThreadPool

multiprocessing.dummy與早期的threading,不同的點(diǎn)好像是在多多核CPU下,只綁定了一個(gè)核心(具體未考證)。

參考文檔:

https://docs.python.org/3/library/multiprocessing.html
https://www.rddoc.com/doc/Python/3.6.0/zh/library/multiprocessing/

Python并發(fā)之concurrent.futures

Python標(biāo)準(zhǔn)庫為我們提供了threading和multiprocessing模塊編寫相應(yīng)的多線程/多進(jìn)程代碼。從Python3.2開始,標(biāo)準(zhǔn)庫為我們提供了concurrent.futures模塊,它提供了ThreadPoolExecutor和ProcessPoolExecutor兩個(gè)類,實(shí)現(xiàn)了對(duì)threading和multiprocessing的更高級(jí)的抽象,對(duì)編寫線程池/進(jìn)程池提供了直接的支持。concurrent.futures基礎(chǔ)模塊是executor和future。

Executor

Executor是一個(gè)抽象類,它不能被直接使用。它為具體的異步執(zhí)行定義了一些基本的方法。ThreadPoolExecutor和ProcessPoolExecutor繼承了Executor,分別被用來創(chuàng)建線程池和進(jìn)程池的代碼。

ThreadPoolExecutor對(duì)象

ThreadPoolExecutor類是Executor子類,使用線程池執(zhí)行異步調(diào)用。

class?concurrent.futures.ThreadPoolExecutor(max_workers)

使用max_workers數(shù)目的線程池執(zhí)行異步調(diào)用。

ProcessPoolExecutor對(duì)象

ThreadPoolExecutor類是Executor子類,使用進(jìn)程池執(zhí)行異步調(diào)用。

class?concurrent.futures.ProcessPoolExecutor(max_workers=None)

使用max_workers數(shù)目的進(jìn)程池執(zhí)行異步調(diào)用,如果max_workers為None則使用機(jī)器的處理器數(shù)目(如4核機(jī)器max_worker配置為None時(shí),則使用4個(gè)進(jìn)程進(jìn)行異步并發(fā))。

submit()方法

Executor中定義了submit()方法,這個(gè)方法的作用是提交一個(gè)可執(zhí)行的回調(diào)task,并返回一個(gè)future實(shí)例。future對(duì)象代表的就是給定的調(diào)用。

Executor.submit(fn, *args, **kwargs)

  • fn:需要異步執(zhí)行的函數(shù)

  • *args, **kwargs:fn參數(shù)

使用示例:

from?concurrent?import?futures

def?test(num):

????import?time

????return?time.ctime(),?num

with?futures.ThreadPoolExecutor(max_workers=1)?as?executor:

????future?=?executor.submit(test,?1)

????print(future.result())

map()方法

除了submit,Exectuor還為我們提供了map方法,這個(gè)方法返回一個(gè)map(func, *iterables)迭代器,迭代器中的回調(diào)執(zhí)行返回的結(jié)果有序的。

Executor.map(func, *iterables, timeout=None)

  • func:需要異步執(zhí)行的函數(shù)

  • *iterables:可迭代對(duì)象,如列表等。每一次func執(zhí)行,都會(huì)從iterables中取參數(shù)。

  • timeout:設(shè)置每次異步操作的超時(shí)時(shí)間,timeout的值可以是int或float,如果操作超時(shí),會(huì)返回raisesTimeoutError;如果不指定timeout參數(shù),則不設(shè)置超時(shí)間。

使用示例:

from?concurrent?import?futures

def?test(num):

????import?time

????return?time.ctime(),?num

data?=?[1,?2,?3]

with?futures.ThreadPoolExecutor(max_workers=1)?as?executor:

????for?future?in?executor.map(test,?data):

????????print(future)

shutdown()方法

釋放系統(tǒng)資源,在Executor.submit()或 Executor.map()等異步操作后調(diào)用。使用with語句可以避免顯式調(diào)用此方法。

Executor.shutdown(wait=True)

Future

Future可以理解為一個(gè)在未來完成的操作,這是異步編程的基礎(chǔ)。通常情況下,我們執(zhí)行io操作,訪問url時(shí)(如下)在等待結(jié)果返回之前會(huì)產(chǎn)生阻塞,cpu不能做其他事情,而Future的引入幫助我們?cè)诘却倪@段時(shí)間可以完成其他的操作。

Future類封裝了可調(diào)用的異步執(zhí)行。Future 實(shí)例通過 Executor.submit()方法創(chuàng)建。

cancel():試圖取消調(diào)用。如果調(diào)用當(dāng)前正在執(zhí)行,并且不能被取消,那么該方法將返回False,否則調(diào)用將被取消,方法將返回True。

cancelled():如果成功取消調(diào)用,返回True。

running():如果調(diào)用當(dāng)前正在執(zhí)行并且不能被取消,返回True。

done():如果調(diào)用成功地取消或結(jié)束了,返回True。

result(timeout=None):返回調(diào)用返回的值。如果調(diào)用還沒有完成,那么這個(gè)方法將等待超時(shí)秒。如果調(diào)用在超時(shí)秒內(nèi)沒有完成,那么就會(huì)有一個(gè)Futures.TimeoutError將報(bào)出。timeout可以是一個(gè)整形或者浮點(diǎn)型數(shù)值,如果timeout不指定或者為None,等待時(shí)間無限。如果futures在完成之前被取消了,那么 CancelledError 將會(huì)報(bào)出。

exception(timeout=None):返回調(diào)用拋出的異常,如果調(diào)用還未完成,該方法會(huì)等待timeout指定的時(shí)長,如果該時(shí)長后調(diào)用還未完成,就會(huì)報(bào)出超時(shí)錯(cuò)誤futures.TimeoutError。timeout可以是一個(gè)整形或者浮點(diǎn)型數(shù)值,如果timeout不指定或者為None,等待時(shí)間無限。如果futures在完成之前被取消了,那么 CancelledError 將會(huì)報(bào)出。如果調(diào)用完成并且無異常報(bào)出,返回None.

add_done_callback(fn):將可調(diào)用fn捆綁到future上,當(dāng)Future被取消或者結(jié)束運(yùn)行,fn作為future的唯一參數(shù)將會(huì)被調(diào)用。如果future已經(jīng)運(yùn)行完成或者取消,fn將會(huì)被立即調(diào)用。

wait(fs, timeout=None, return_when=ALL_COMPLETED)

  • 等待fs提供的 Future 實(shí)例(possibly created by different Executor instances) 運(yùn)行結(jié)束。返回一個(gè)命名的2元集合,分表代表已完成的和未完成的

return_when 表明什么時(shí)候函數(shù)應(yīng)該返回。它的值必須是一下值之一:

  • FIRST_COMPLETED :函數(shù)在任何future結(jié)束或者取消的時(shí)候返回。

  • FIRST_EXCEPTION :函數(shù)在任何future因?yàn)楫惓=Y(jié)束的時(shí)候返回,如果沒有future報(bào)錯(cuò),效果等于

  • ALL_COMPLETED :函數(shù)在所有future結(jié)束后才會(huì)返回。

as_completed(fs, timeout=None):參數(shù)是一個(gè) Future 實(shí)例列表,返回值是一個(gè)迭代器,在運(yùn)行結(jié)束后產(chǎn)出 Future實(shí)例 。

使用示例:

from?concurrent.futures?import?ThreadPoolExecutor,?wait,?as_completed

from?time?import?sleep

from?random?import?randint

def?return_after_5_secs(num):

????sleep(randint(1,?5))

????return?"Return?of?{}".format(num)

pool?=?ThreadPoolExecutor(5)

futures?=?[]

for?x?in?range(5):

????futures.append(pool.submit(return_after_5_secs,?x))

print(1)

for?x?in?as_completed(futures):

????print(x.result())

print(2)

參考鏈接:

https://pythonhosted.org/futures

推薦閱讀

利用 Python 實(shí)現(xiàn)多任務(wù)進(jìn)程


Python爬蟲實(shí)戰(zhàn) | 利用多線程爬取 LOL 高清壁紙

瀏覽 64
點(diǎn)贊
評(píng)論
收藏
分享

手機(jī)掃一掃分享

分享
舉報(bào)
評(píng)論
圖片
表情
推薦
點(diǎn)贊
評(píng)論
收藏
分享

手機(jī)掃一掃分享

分享
舉報(bào)

感谢您访问我们的网站,您可能还对以下资源感兴趣:

国产秋霞理论久久久电影-婷婷色九月综合激情丁香-欧美在线观看乱妇视频-精品国avA久久久久久久-国产乱码精品一区二区三区亚洲人-欧美熟妇一区二区三区蜜桃视频 91麻豆精品无码| 国产一级婬片A片| 久久丁香五月| 波多野结衣高清视频| 国内自拍欧美| 老司机在线免费视频| 国产三级在线| 日韩视频免费| 国产学生妹在线播放| 求欧美精品网址| 成人综合在线观看| 亚洲国产中文字幕在线播放| 操逼网视频| 乱伦无码高清麻豆视频一区二区 | 操屄视频播放| 人人草人人摸| 51妺妺嘿嘿午夜成人A片| 天天爽夜夜爽人人爽| 夜夜操天天日| 成人性爱视频网| 欧美熟妇擦BBBB擦BBBB| JlZZJLZZJlZZ亚洲女人17| 国内精品一区二区| 色噜噜狠狠色综无码久久合欧美 | 正在播放吴梦梦淫行| 日本草逼视频| 亚洲激情五月天| 中文字幕+乱码+中文乱码电影| 久久私人影院| 夜夜骚av.一区二区三区四区| 日本免费在线黄色视频| 亚洲无码黄色片| 午夜成人黄色电影| 国产精品久久久91| 日韩性爱视屏| 91激情| 日韩高清一区| 欧美手机在线视频| 天天肏屄| 黄色片视频在线观看| 激情无码一区二区| 丝袜一区二区三区| 在线观看视频你懂的| 人人干人妻| 先锋资源国产| 91无码高清视频| 无码人妻丰满熟妇区毛片蜜桃麻豆 | 婷婷五月天丁香成人社区| 成人午夜大片| 91爱爱| 免费在线观看av| 中文字幕h| 天天草天天射| 亚洲免费黄色| 七十路の高齡熟妇无码| 亚洲乱伦视频| 男人色天堂网| 四虎欧美| 九九热国产视频| 亚洲高清无码视频在线| 亚洲小视频在线播放| 亚洲AV无码精品国产| 久久免费播放视频| wwwwww黄| 看一级黄色视频| 黄色一级小说| a视频免费看| 九九九九九九精品视频| 俺也去网av| 成人高清在线| 欧美AAAAAAAA| 成年人黄色视频| 亚洲成人在线视频免费观看 | 日韩在线一区二区三区四区| 国产精品久久久久久无人区| 手机AV在线观看| 69乱伦视频| 东京热av一区二区| 国产主播一区二区| 亚洲色婷婷在线| 日日射视频| 午夜无码福利视频| 噼里啪啦免费观看视频大全| 性欧美日韩| 亚洲一区无码在线观看| 久久久久久亚洲AV黄床| 中国操逼电影| 99热99re6国产线播放| 黄色视频在线观看| 亚洲三级黄片| 国产欧美日韩综合在线视频| 免费看黄色片视频| 嫩草嫩草69| 一区视频免费观看| 久久免费视频,久久免费视频| 国产剧情一区二区三区| 黄色一级片免费观看| 俺去也www俺去也com| 国产无码小视频| 在线播放内射| 国产超碰| 手机毛片| 日韩欧美精品在线观看| 日韩A电影| 丰满欧美熟妇免费视频| 日韩A片在线观看| 一本大道东京热av无码| 三级视频在线播放| 91熟女丰满原味| 欧美洲成人网站| 婷婷黄色电影| 男人的天堂视频在线观看| 91成人三级| 日韩不卡视频在线观看| 免费电影日本黄色| 亚洲一卡二卡三卡四卡免| 一级黄色视频免费观看| 国产精品内射视频| 成人午夜在线| 无码国产视频| 欧美日韩国产一区二区| 国产精品一区二区视频| 最好看的2019中文在线大全电影| 懂色中文字幕| 中韩一区二区| 黄色视频高潮| 91成人区| 五月天一区二区三区| 欧美高潮视频| 大香蕉做爱| 日本黄色一级| 久色网站| www.日韩精品| 国产96在线亚洲| 亚洲日操| 天堂精品| 伊人网视频在线观看| 久久蜜| 国产免费一区二区三区| 色五月婷婷综合| 日韩成人一区| 日本黄色A片| 北条麻妃一区二区三区-免费免费高清观看 | 日韩无码一级| 午夜乱伦| 亚洲人妻电影一区| 18禁AV在线| 91精品国产成人做爰观看奶头 | 综合成人| 亚洲天堂免费视频| 欧美成人视频。| 国产精品3| 翔田千里无码免费播放| 国內精品久久久久久久| 日韩AV小电影| 丰满少妇一区二区三区| 国产AV一级| 好吊一区二区三区| 91丨人妻丨国产| aaa精品| 香蕉成人网站| 97精品人妻一区二区三区在线| 久操视频免费| 99热这里只有精品99| 玖玖视频| 国产成人AA| 动漫3D成人H无码国漫| 91九色TS另类国产人妖| 亚洲天堂AV网| 久久久福利视频| 精品人妻一区二区三区含羞草| 正在播放亚洲| 亚洲激情欧美激情| 国产精品扒开腿做爽爽爽视频| 国产色情视频在线观看| 日韩中文字幕不卡| 亚洲无码偷拍| 日本欧洲三级| 久草网视频| 中文字幕av免费观看| 亚洲乱码精品久久久久..| 亚洲免费在线播放| 国产大奶一区二区| 国产免费性爱视频| 中文字幕视频免费| 影音先锋av资源网站| 天天视频色| 久热久| 欧美在线日韩在线| 亲子伦一区二区三区观看方式| 香蕉视频一区| 亚洲天堂一区二区| 9I看片成人免费视频| 国产欧美一| 久久综合17p| 日韩在线一级| 一级免费黄片| 三级一区二区| 嫩BBB搡BBB搡BBB四川| 西西人体44www大胆无码| 欧美综合网| 中文字幕在线观看免费视频| 欧美东京热视频| 好男人一区二区三区在线观看 | A片黄色毛片| 狠狠操电影| 亚洲V国产v欧美v久久久久久| 在线第一页| AV性爱社区| 我要看黄色一级片| 人妻AV无码| 亚洲无码专区在线| 殴美色色网| 国产剧情一区二区| 亚洲色无码| 色播婷婷五月天| 国产熟妇码视频户外直播| 国产一区视频18| 91色逼| 久草资源网| 日韩中文字幕在线| 天堂中文资源在线观看| 成人AV中文字幕| 婷婷99狠狠躁天天躁| 精品一区二区三区蜜桃臀www| 9999re| 色婷网| 日韩av电影在线观看| 浮力影院久久| 黄片视频在线免费看| 91丨九色丨熟女丰满| 国产乱国产乱老熟300部视频| 国产麻豆精品ThePorn| 欧美成人手机在线看片| 日韩欧美内射| 日韩欧美综合| 国产一区二区波多野结衣| 日韩一级黄色毛片| 日韩AV电影在线观看| 少妇二区| 天堂av在线免费观看| 中国字幕在线观看韩国电影| 久久任你操| 免费国产黄色视频网站| 丰满人妻无码| 国产AV大全| 日本一区二区三区视频在线观看| 国产在线A片| 婷婷网址| 成人AV在线一区二区| 久久久久久伊人| 五月丁香婷中文| 欧美色图视频网站| www黄色在线观看| 日韩性爱AV| 麻豆av人人乐| 午夜天堂在线| 99久在线精品99re8热| 天天操夜夜操| 99热激情在线| 久久久久免费| 中文字幕av高清片,中文在线观看| 日逼免费| 日韩国无码| 91福利网| 91吴梦梦无码一区二区| 臭小子晚上让你爽个够视频| 午夜毛片| 好吊妞在线观看| 91豆花成人网站| 成人午夜大片| 91九色TS另类国产人妖| 国产成人精品AA毛片| 爱视频福利网| 欧美自拍| 77777色婷婷| 午夜狠狠操| 日韩免费成人| 亚洲三级在线观看| 尤物网站在线观看| 欧美亚洲天堂网| 欧美成人电影| 黄色电影一区二区| 操碰在线| 秋霞午夜福利影院| 青青草国产亚洲精品久久| 国产成人综合自拍| 亚洲无码一卡二卡| 豆花视频| 黄色视频在线观看| 亚洲日韩影院| 婷婷九月色| 日日日日日干| 久久久精品人妻| 亚洲AV无码成人精品区h麻豆| 亚洲国产天堂| 婷婷草逼| 又黄又湿的视频| 国产一区不卡| 狠狠se| 免费视频一区二区三区四区| 精品中文在线视频| 中文字幕高清无码在线| 操逼综合| 亚洲黄色在线免费观看| 丰满少妇一区二区三区| 亚洲成人自拍无码| 麻豆md0049免费| 国产A片| 无码蜜桃吴梦梦| 欧美精品一二三区| 毛片性爱视屏| 99人妻| 嫩草视频在线观看免费网站| 大香蕉在线网站| 91成人亚洲| 欧美一级片免费观看| 东京热国产| 91九色精品女同系列| 麻豆久久久久| 国产熟女一区二区视频网站| 迷情校园综合| 伊人激情网| 久久成人在线| 色色在线| 日韩午夜片| 日逼A片| 夜夜骑免费视频| 国产日产亚洲精品| 亚洲国产一区二区三区四区| 中日韩特黄A片免费视频| 国产成人精品AV| 福利一区在线观看| 夜夜骚av一区二区三区| 国产又爽又黄网站免费观看| 中文字幕熟女人妻| 成人一区二区三区四区五区| 日韩A毛片| 色婷婷一级A片AAA毛片| 最新国产毛片| 一级黄色操逼视频| 国产一级A片| 91免费在线视频观看| 日韩在线观看视频网站| 激情人妻在线| 日本国产在线| A一级黄色| 永久免费黄色视频| 中文字幕在线观看辣文| yw在线播放| 精品人妻一二三区| 亚洲区中文字幕| 狠狠操狠狠操| 少妇激情av| 无码一级| 2025无码视频| 天天综合久久| 国产九九热视频| 蜜桃视频一区二区三区| 欧一美一婬一伦一区二区三区自慰,| 日本操骚逼| 一本色道久久综合无码人妻软件| 中文字幕一区二区蜜桃| 韩国无码片| 丰满人妻一区二区三区Av猛交 | 成人视频A片| 500部大龄熟乱4K视频| 在线观看日韩视频| 六月婷婷五月丁香| 国产毛片777777| 2014亚洲天堂| 亚洲激情黄色| 成人毛片在线大全免费| 青青久操| 翔田千里无码免费播放| 欧美A片在线观看| 男女拍拍网站| 中文字幕av在线| wwwa片| 国产美女网站| 爆乳尤物一区二区三区| 国产91高跟丝袜| 欧美精品成人| 国产乱伦AV网站| 欧美人操逼一二区| 农村老太HD肉HD| 伊人激情影院| 成人av无码| 影音先锋AV资源在线| 一级片免费| 五月天婷婷丁香网| 在线免费小黄片| 激情国产av| 欧美久久性爱视频| 亚洲视频第一页| 国产三级免费观看| 人人干人人操人人爽| 91在线无码精品秘国产| 人人操天天干| 精品一区在线| 亚洲少妇免费| 成人小视频十八禁免费观看| 99热电影| 亚洲午夜激情| 亚洲成人高清| 91狠狠色丁香婷婷综合久久精品| 影音先锋无码一区| 欧美性爱一区二区三区| 黄色香蕉视频| 亚洲韩国中文字幕| 中文字幕乱码中文字幕电视剧| 婷婷男人天堂| 国产黄片免费观看| 午夜综合网| 国内精产品一二区秘| 少妇黄色视频| TheAV精尽人亡av| 亚洲一区二区在线| 国产v视频| 大陆搡BBBBB搡BBBBBB| 久久免费精品视频| 内射婷婷| 五月天丁香成人| 免费的黄色视频| 婷婷综合一区| 草逼美女| 国产精品自拍小视频| 国产午夜精品一区二区三区四区 | 国产内射久久| 免费看成人747474九号视频在线观看| 亚洲在线高清| 国产99久久久精品| 毛片在线视频| 十八毛片| 黄色高清视频在线观看| 国产综合视频| 蜜桃av秘无码一区三| 一级a一级a爱片兔兔软件| 天天日天天操天天干| jizz无码| 一区二区三区三级片| 爱爱无码| 欧美激情影院| 91无码高清视频| 影音先锋自拍| 国产女主播在线观看| aaa精品| 日韩人妻丰满无码区A片| 亚洲免费成人视频| 日韩亚洲在线视频| 中文字幕片av| 毛多水多丰满女人A片| 69国产成人综合久久精品欧美| 青青操国产乱伦| 大奶无码| 天天操天天操天天操天天| 中文字幕第5页| 熟女人妻视频| 欧美色图视频在线观看| 天天综合久久| 国产三级在线观看视频| 美女网站永久免费观看| 电影豹妹香港版| 高清无码在线观看免费| 日韩一级片网站| 无码一区二区av| 婷婷天堂站| 丁香五月婷婷五月天| 亚洲婷婷视频| 内射午夜福利在线免费观看视频| 亚洲成人综合在线| 欧美日逼片| 粉嫩小泬BBBB免费看| 18禁在线| 中国无码视频| 亚洲精品麻豆| 中国1级毛片| 国产高清无码自拍| 中文字幕在线观看日本| 日屄视频在线观看| 国产一区二区三区在线观看免费视频免费视频免费视频 | 懂色av,蜜臀AV粉嫩av| 综合激情网站| 免费看黄色录像| 日韩免费高清| 欧美色999| 婷婷五月天成人| 午夜无码免费| 国产一区二区三区在线| 日韩欧美小视频| 熟女人妻在线| 国产aaaaaaaaaaaaa| 99视频在线免费观看| 俺也去网| 影音先锋在线成人| 精品国产一二三区| 性感欧美美女| 精品九九九九| 色婷婷激情视频| 超碰97人妻| 麻豆一区视频| 麻豆国产91在线播放| 成人婷婷| 精品动漫一区二区三区| 爽爽午国产浪潮AV性色www| 欧美色图15p| 骚逼综合| 亚洲一级视频在线观看| 欧美爱爱网站| 日本不卡一区二区三区四区| 亚洲黄色影院| 欧美一级特黄A片免费| 青青草原成人在线视频| 国产一级AA片| 天天日毛片| 婷婷五月激情网| 免费操逼视频在线观看| 精品动漫一区二区三区| 久久久老熟女一区二区三区91| 北条麻妃一区二区三区-免费免费高清观看 | 制服丝袜人妻| 黄色视频网站国产| 91超碰免费在线| 亚洲综合图色40p| 激情小说亚洲图片:伦| 国产成人精品免费视频| 躁BBB躁BBB躁BBBBB乃| 免费黄色视频网站大全| 国产一区二区不卡视频| 婷婷深爱五月丁香网| 久久久网站| 黄色亚洲视频| 欧美搡BBBB搡BBB| 黄色一级片在线| 亚洲欧美另类图片| 中文字幕乱伦性爱| 国产一区二区av| 一级色情片| 五月丁香婷婷激情| 日韩一区二区三区免费视频| 久久九九国产精品怡红院| 亚洲天媒在线播放| 人人妻人人草| www欧美日韩| 大香蕉欧美在线| 免费国产成人看片在线| 内射视频免费看| 久久精品免费看| 国产一级黄色| 日本中文字幕电影| 日韩国产一区二区| 高潮流水视频| 影音先锋中文字幕av| 69性影院| 午夜福利影院在线| 狠狠操狠狠操| 成人精品免费视频| 性爱福利社| 有码中文字幕在线观看| 亚洲无码AV在线观看| 国产精品中文字幕在线观看| 操一线天逼| 超碰自拍私拍二区三区区| 五月婷婷五月天| 国产精品永久| 九九亚洲| 大香蕉1024| 久久国产乱子伦精品免费午夜... 国产毛片精品一区二区色欲黄A片 | 亚洲AV成人无码精在线| 日本免费a片| 92久久| 怡春院久久| 51妺嘿嘿午夜福利视频| 日p视频在线观看| 亚洲福利一区二区| 胖老板办公室沙发无套爆秘书| 熟女中文字幕| 91爱爱视频| 精品午夜福利| 无码人妻精品一区二区三区蜜臀百度| 日韩香蕉视频| 91瑟瑟| 亚洲伊人影院| 国产成人自拍视频在线| 黄色电影毛片| 熊猫视频91| 欧美日韩在线观看视频| 中文字幕有码视频| 国语对白做受欧美| 琪琪色视频| 操东北女人逼| 伊人影院在线观看| 国产精品一区二区三区在线| 久久看片| 特级西西WWW无码| 中文字幕av网站| 欧美无遮挡| 丁香婷婷五月综合影院| 色94色.欧美.setu| 好吊视频一区二区三区| 嫩草在线精品| 极品另类| 日韩免费视频一区二区| 日韩另类视频| 爱操逼综合网| 偷拍欧美日韩| 国产一级婬乱片AV片AAA毛片| 黑人精品欧美一区二区蜜桃| 奇米色网| 亚洲无码中| 日韩第一色| 精品中文字幕视频| 91视频在线免费观看app| V天堂在线视频| 天天天做夜夜夜夜爽无码| 欧美色图视频网站| 欧美一区| 黄色国产视频| 黄页网站在线观看| 特级西西444www大胆免费看| 国产黄色自拍视频| 国产精品777| 国产激情网站| 超碰91在线观看| 神马午夜久久| 国产av黄色| 婷婷国产综合| 精品无码AV一区二区三区| 人妻无码精品蜜桃| 伊人二区| 香蕉av在线| va色婷婷亚洲在线| 国产性爱电影网| ww国产| 日韩综合另类| 国产一级a毛一级a毛观看视频网站 | 在线国产视频| 亚洲中文字幕一区| 福利视频一区| 另类一区| 天天综合视频| 欲色av| 免费黄片网站| 国产三级在线观看视频| 潮喷在线观看| 国外亚洲成AV人片在线观看 | 精品国产久| 欧美亚洲日本| 国产一级a| 亚洲精品无码在线播放| 欧美BBWBBWBBWBBWBBwBBW| 欧美夜夜草视频| 国产熟妇码视频户外直播| 亚洲一区二区三区无码| 99亚洲精品| 爱搞搞就要搞| 91豆花视频18| 男人天堂影院| www.色老板| 狼友视频在线观看18| 精品视频久久| 欧美日日日| 久色网| 成人精品在线| wwwA片| 天天操天天操天天操| 国内久久| 黄色日逼视频| 一区二区三区av| 玖玖视频| 国产96在线亚洲| 亚洲精品视频在线观看网站| 2025最新国产精品每日更新| 国产激情欧洲在线观看一区二区三区| 中文字幕浅井香舞被黑人俘虏 | 婷婷五月色| 久久婷婷视频| 国产免费一区二区三区| 黄片视频在线观看| 亚洲大哥天天干| 一级a片激情啪啪免费观| 一区二区三区精品婷婷| 国产精品后入| 91精品国产成人www| 成人免看一级a一片| 性爱无码AV| 东方AV免费在线观看| 黄网站欧美内射| 国产一级做a爱免费视频| 狠狠干B| 一级无码免费| 高清无码视频免费在线观看| 色黄网站在线观看| 亚洲无码蜜桃| 亚洲AV免费电影| 亚洲不卡在线观看| 俺也去在线| 中国操逼| 在线观看你懂得| 毛片毛片毛片毛片| 婷婷激情四射| 精品内射| 色高清无码免费视频| 国产欧美视频在线| 日韩黄色A片| 国产成人69免费看| 亚洲成人影音先锋| 国产一区一区| 亚洲欧美不卡| 日韩熟妇无码中文字慕| 亚洲AV无码成人| 免费在线观看a| 日毛片| www色色| 逼特逼视频| 久久久久久国产免费A片| 成人免费黄色| 一区二区三区高清| 日韩中文字幕av| 成人性生活视频| 中文字幕永久在线5| 久久系列观看完整指南| 日本一级片中文字幕| 热re99久久精品国产99热| 日本在线一级| 内射熟妇| 亚洲黄色免费电影| 91网址| 欧美日韩性爱视频| 俺来也俺去也| 亚洲精品欧美久久婷婷| 性爱视频91| 日韩欧美黄色片| 免费观看黄色AV| 日韩动态视频| 亚洲AV免费电影| 国产又爽又黄免费网站在线看| 国产中文字幕在线视频| av解说| 牛牛成人在线视频| 成人三级片视频| 在线视频内射| 日韩欧美在线中文字幕| 国产亚洲欧美精品综合在线| 亚洲成人免费在线观看| 黄色一级录像| 大乳奶一级婬片A片| 99re视频精品| 国产熟女AV| 中文字幕五码| 国产精品秘久久久久久1-~/\v7-/ 囯产精品一区二区三区线一牛影视1 | 成人无码影院日韩,成人年…| www.97av| 欧美亚洲动漫| 丰满少妇在线观看网站| 嫩BBB槡BBBB槡BBB| 人妻被午夜福利AV| 开心五月婷| 四虎www| 久操视频免费在线观看| 壁特壁视频在线观看| 麻豆av无码| 国产精品乱伦| 久久国产精品在线| 精品无码AV一区二区三区| 久久夜色精品国产噜噜亚洲AV| 九九在线观看视频| 成人午夜视频在线观看| 91久久国产综合久久| 影音先锋无码AV| 国产精品免费久久| 91精品成人电影| 另类老太婆性BBWBBw| 一区二区黄色| 欧美三级在线| 中文字幕无吗| 天堂网av2014| 欧美偷拍精品| 操B无码| 成人播放视频| 国产日韩在线播放| 色哟哟一中文字慕| 超碰人人在线观看| 久久小视频| 99热免费| 国产福利合集| 国产亚洲精品久久久久久桃色| 国产乱子伦精品免费,| 欧美大黑逼| 国产欧美一区二区三区视频| 日本一区二区视频在线观看| 色呦呦视频在线观看| 在线日韩av| 国产精品你懂的| 中文字幕在线观看福利视频| 色老板av| 特色毛片| 国产一级a毛一级a毛观看视频网站 | 国产色天使| 成人免费无码激情AV片| 欧美日逼片| 欧美一级网站| 国产免费无码一区二区| 91丨九色丨熟女老版| 国产精品一区二区免费| 亚洲福利视频网站| H片在线免费观看| 欧美人操逼一二区| 四虎影院最新地址| 国产操逼图| 四虎A片| 久久综合电影| 一级a片激情啪啪免费观| 亚洲高清毛片一区二区| 黄色视频网站在线看| 日韩精品三级片| 日韩欧美在线观看视频| 色婷婷Av一区| 国产91视频在线观看| 91在线无码精品秘蜜桃入口| 日韩AV在线免费观看| 色五月视频在线| 久久久xxx| 成人免费在线观看| 人人摸人人看| 欧美XX888做受| 91丝袜一区二区三区| 国产AV一区二区三区四区| 久久99久久99久久| 国产在线拍揄自揄拍无码男男 | 亚洲秘av无码一区二区| 操小骚逼视频| 春色Av| 国产福利电影在线| 亚欧成人网站| 超碰AV在线| 精品精品视频| 一级性爱毛片| 成人av免费在线观看| 午夜福利干B在线免费小视频| 亚洲无码人妻在线| 欧美日韩v| 天天射日日干| 久久国产偷拍| 亚洲无码图片| 日韩欧美在线免费观看| 夜夜骚| 在线观看免费黄网站| 亚洲av免费在线| 亚洲欧美日韩在线| 日本熟妇在线| av天天看| 久久99九九| 国产最新在线视频| 国产灌醉| 日韩另类视频| 小佟丽娅大战91哥| 深爱激情五月婷婷| 国产香蕉在线观看| 日本五十路| 成人国产AV网站| 翔田千里91| 亚洲高清无码专区| 人妻少妇中文字幕久久牛牛| 午夜福利电影网| 91国产爽黄在线| 欧美性爱XXXX| 青青色在线视频| 久久天堂网| AV天堂小说网| 国产精品揄拍100视频| 日韩欧美色| www.黄色在线| 黄色高清无码视频| 8050午夜网| 免费肏屄| 亚洲秘一区二区三区-精品亚洲二区-| 四虎精品一区二区三区| 免费看片av| www.日韩无码| 亚洲综合网站| 中文字幕日韩有码| 黄色成人网站在线观看免费| 一区二区三区四区视频| 亚洲精品成人AV| 亚洲有码在线播放| 99re6热在线精品视频功能| 免费版成人久久幺| 久久久久综合| 久久理论电影| 国产精品免费观看久久久久久久久| 黄色免费片| 国产又粗又猛又黄又爽无遮挡| 亚洲人天堂| 黄色福利| 中文字幕AV在线免费观看| 在线视频免费观看| 性爱AV在线观看| 中文字幕人妻在线中文乱码怎么解决 | 欧美理论片在线观看|