并發(fā)下載(多線程和多進(jìn)程補(bǔ)充知識(shí)點(diǎn) )
并發(fā)下載
多線程和多進(jìn)程補(bǔ)充知識(shí)點(diǎn)
threading.local類
使用線程時(shí)最不愿意遇到的情況就是多個(gè)線程競爭資源,在這種情況下為了保證資源狀態(tài)的正確性,我們可能需要對資源進(jìn)行加鎖保護(hù)的處理,這一方面會(huì)導(dǎo)致程序失去并發(fā)性,另外如果多個(gè)線程競爭多個(gè)資源時(shí),還有可能因?yàn)榧渔i方式的不當(dāng)導(dǎo)致死鎖。要解決多個(gè)線程競爭資源的問題,其中一個(gè)方案就是讓每個(gè)線程都持有資源的副本(拷貝),這樣每個(gè)線程可以操作自己所持有的資源,從而規(guī)避對資源的競爭。
要實(shí)現(xiàn)將資源和持有資源的線程進(jìn)行綁定的操作,最簡單的做法就是使用threading模塊的local類,在網(wǎng)絡(luò)爬蟲開發(fā)中,就可以使用local類為每個(gè)線程綁定一個(gè)MySQL數(shù)據(jù)庫連接或Redis客戶端對象,這樣通過線程可以直接獲得這些資源,既解決了資源競爭的問題,又避免了在函數(shù)和方法調(diào)用時(shí)傳遞這些資源。具體的請參考本章多線程爬取“手機(jī)搜狐網(wǎng)”(Redis版)的實(shí)例代碼。
concurrent.futures模塊
Python3.2帶來了concurrent.futures?模塊,這個(gè)模塊包含了線程池和進(jìn)程池、管理并行編程任務(wù)、處理非確定性的執(zhí)行流程、進(jìn)程/線程同步等功能。關(guān)于這部分的內(nèi)容推薦大家閱讀《Python并行編程》。
分布式進(jìn)程
使用多進(jìn)程的時(shí)候,可以將進(jìn)程部署在多個(gè)主機(jī)節(jié)點(diǎn)上,Python的multiprocessing模塊不但支持多進(jìn)程,其中managers子模塊還支持把多進(jìn)程部署到多個(gè)節(jié)點(diǎn)上。當(dāng)然,要部署分布式進(jìn)程,首先需要一個(gè)服務(wù)進(jìn)程作為調(diào)度者,進(jìn)程之間通過網(wǎng)絡(luò)進(jìn)行通信來實(shí)現(xiàn)對進(jìn)程的控制和調(diào)度,由于managers模塊已經(jīng)對這些做出了很好的封裝,因此在無需了解網(wǎng)絡(luò)通信細(xì)節(jié)的前提下,就可以編寫分布式多進(jìn)程應(yīng)用。具體的請參照本章分布式多進(jìn)程爬取“手機(jī)搜狐網(wǎng)”的實(shí)例代碼。
協(xié)程和異步I/O
協(xié)程的概念
協(xié)程(coroutine)通常又稱之為微線程或纖程,它是相互協(xié)作的一組子程序(函數(shù))。所謂相互協(xié)作指的是在執(zhí)行函數(shù)A時(shí),可以隨時(shí)中斷去執(zhí)行函數(shù)B,然后又中斷繼續(xù)執(zhí)行函數(shù)A。注意,這一過程并不是函數(shù)調(diào)用(因?yàn)闆]有調(diào)用語句),整個(gè)過程看似像多線程,然而協(xié)程只有一個(gè)線程執(zhí)行。協(xié)程通過yield關(guān)鍵字和 send()操作來轉(zhuǎn)移執(zhí)行權(quán),協(xié)程之間不是調(diào)用者與被調(diào)用者的關(guān)系。
協(xié)程的優(yōu)勢在于以下兩點(diǎn):
執(zhí)行效率極高,因?yàn)樽映绦颍ê瘮?shù))切換不是線程切換,由程序自身控制,沒有切換線程的開銷。 不需要多線程的鎖機(jī)制,因?yàn)橹挥幸粋€(gè)線程,也不存在競爭資源的問題,當(dāng)然也就不需要對資源加鎖保護(hù),因此執(zhí)行效率高很多。
「說明」:協(xié)程適合處理的是I/O密集型任務(wù),處理CPU密集型任務(wù)并不是它擅長的,如果要提升CPU的利用率可以考慮“多進(jìn)程+多線程”或者“多進(jìn)程+協(xié)程”的工作模式。
歷史回顧
Python 2.2:第一次提出了生成器(最初稱之為迭代器)的概念(PEP 255)。 Python 2.5:引入了將對象發(fā)送回暫停了的生成器這一特性即生成器的 send()方法(PEP 342)。Python 3.3:添加了 yield from特性,允許從迭代器中返回任何值(注意生成器本身也是迭代器),這樣我們就可以串聯(lián)生成器并且重構(gòu)出更好的生成器。Python 3.4:引入 asyncio.coroutine裝飾器用來標(biāo)記作為協(xié)程的函數(shù),協(xié)程函數(shù)和asyncio及其事件循環(huán)一起使用,來實(shí)現(xiàn)異步I/O操作。Python 3.5:引入了 async和await,可以使用async def來定義一個(gè)協(xié)程函數(shù),這個(gè)函數(shù)中不能包含任何形式的yield語句,但是可以使用return或await從協(xié)程中返回值。
協(xié)程實(shí)現(xiàn)了協(xié)作式并發(fā),通過提高CPU的利用率來達(dá)到改善性能的目的。著名的三方庫aiohttp就是通過協(xié)程的方式實(shí)現(xiàn)了HTTP客戶端和HTTP服務(wù)器的功能,較之requests有更好的獲取數(shù)據(jù)的性能,有興趣可以閱讀它的官方文檔。
import?asyncio
import?aiohttp
async?def?download(url):
????print('Fetch:',?url)
????async?with?aiohttp.ClientSession()?as?session:
????????async?with?session.get(url,?ssl=False)?as?resp:
????????????print(url,?'--->',?resp.status)
????????????print(url,?'--->',?resp.headers)
????????????print('\n\n',?await?resp.text())
def?main():
????loop?=?asyncio.get_event_loop()
????urls?=?[
????????'https://www.baidu.com',
????????'http://www.sohu.com/',
????????'http://www.sina.com.cn/',
????????'https://www.taobao.com/',
????????'http://jd.com/'
????]
????tasks?=?[download(url)?for?url?in?urls]
????loop.run_until_complete(asyncio.wait(tasks))
????loop.close()
if?__name__?==?'__main__':
????main()
實(shí)例 - 多線程爬取“手機(jī)搜狐網(wǎng)”所有頁面
下面我們把之間講的所有知識(shí)結(jié)合起來,用面向?qū)ο蟮姆绞綄?shí)現(xiàn)一個(gè)爬取“手機(jī)搜狐網(wǎng)”的多線程爬蟲。
import?pickle
import?zlib
from?enum?import?Enum,?unique
from?hashlib?import?sha1
from?random?import?random
from?threading?import?Thread,?current_thread,?local
from?time?import?sleep
from?urllib.parse?import?urlparse
import?pymongo
import?redis
import?requests
from?bs4?import?BeautifulSoup
from?bson?import?Binary
@unique
class?SpiderStatus(Enum):
????IDLE?=?0
????WORKING?=?1
def?decode_page(page_bytes,?charsets=('utf-8',)):
????page_html?=?None
????for?charset?in?charsets:
????????try:
????????????page_html?=?page_bytes.decode(charset)
????????????break
????????except?UnicodeDecodeError:
????????????pass
????return?page_html
class?Retry(object):
????def?__init__(self,?*,?retry_times=3,
?????????????????wait_secs=5,?errors=(Exception,?)):
????????self.retry_times?=?retry_times
????????self.wait_secs?=?wait_secs
????????self.errors?=?errors
????def?__call__(self,?fn):
????????def?wrapper(*args,?**kwargs):
????????????for?_?in?range(self.retry_times):
????????????????try:
????????????????????return?fn(*args,?**kwargs)
????????????????except?self.errors?as?e:
????????????????????print(e)
????????????????????sleep((random()?+?1)?*?self.wait_secs)
????????????return?None
????????return?wrapper
class?Spider(object):
????def?__init__(self):
????????self.status?=?SpiderStatus.IDLE
????@Retry()
????def?fetch(self,?current_url,?*,?charsets=('utf-8',?),
??????????????user_agent=None,?proxies=None):
????????thread_name?=?current_thread().name
????????print(f'[{thread_name}]:?{current_url}')
????????headers?=?{'user-agent':?user_agent}?if?user_agent?else?{}
????????resp?=?requests.get(current_url,
????????????????????????????headers=headers,?proxies=proxies)
????????return?decode_page(resp.content,?charsets)?\
????????????if?resp.status_code?==?200?else?None
????def?parse(self,?html_page,?*,?domain='m.sohu.com'):
????????soup?=?BeautifulSoup(html_page,?'lxml')
????????for?a_tag?in?soup.body.select('a[href]'):
????????????parser?=?urlparse(a_tag.attrs['href'])
????????????scheme?=?parser.scheme?or?'http'
????????????netloc?=?parser.netloc?or?domain
????????????if?scheme?!=?'javascript'?and?netloc?==?domain:
????????????????path?=?parser.path
????????????????query?=?'?'?+?parser.query?if?parser.query?else?''
????????????????full_url?=?f'{scheme}://{netloc}{path}{query}'
????????????????redis_client?=?thread_local.redis_client
????????????????if?not?redis_client.sismember('visited_urls',?full_url):
????????????????????redis_client.rpush('m_sohu_task',?full_url)
????def?extract(self,?html_page):
????????pass
????def?store(self,?data_dict):
????????#?redis_client?=?thread_local.redis_client
????????#?mongo_db?=?thread_local.mongo_db
????????pass
class?SpiderThread(Thread):
????def?__init__(self,?name,?spider):
????????super().__init__(name=name,?daemon=True)
????????self.spider?=?spider
????def?run(self):
????????redis_client?=?redis.Redis(host='1.2.3.4',?port=6379,?password='1qaz2wsx')
????????mongo_client?=?pymongo.MongoClient(host='1.2.3.4',?port=27017)
????????thread_local.redis_client?=?redis_client
????????thread_local.mongo_db?=?mongo_client.msohu?
????????while?True:
????????????current_url?=?redis_client.lpop('m_sohu_task')
????????????while?not?current_url:
????????????????current_url?=?redis_client.lpop('m_sohu_task')
????????????self.spider.status?=?SpiderStatus.WORKING
????????????current_url?=?current_url.decode('utf-8')
????????????if?not?redis_client.sismember('visited_urls',?current_url):
????????????????redis_client.sadd('visited_urls',?current_url)
????????????????html_page?=?self.spider.fetch(current_url)
????????????????if?html_page?not?in?[None,?'']:
????????????????????hasher?=?hasher_proto.copy()
????????????????????hasher.update(current_url.encode('utf-8'))
????????????????????doc_id?=?hasher.hexdigest()
????????????????????sohu_data_coll?=?mongo_client.msohu.webpages
????????????????????if?not?sohu_data_coll.find_one({'_id':?doc_id}):
????????????????????????sohu_data_coll.insert_one({
????????????????????????????'_id':?doc_id,
????????????????????????????'url':?current_url,
????????????????????????????'page':?Binary(zlib.compress(pickle.dumps(html_page)))
????????????????????????})
????????????????????self.spider.parse(html_page)
????????????self.spider.status?=?SpiderStatus.IDLE
def?is_any_alive(spider_threads):
????return?any([spider_thread.spider.status?==?SpiderStatus.WORKING
????????????????for?spider_thread?in?spider_threads])
thread_local?=?local()
hasher_proto?=?sha1()
def?main():
????redis_client?=?redis.Redis(host='1.2.3.4',?port=6379,?password='1qaz2wsx')
????if?not?redis_client.exists('m_sohu_task'):
????????redis_client.rpush('m_sohu_task',?'http://m.sohu.com/')
????spider_threads?=?[SpiderThread('thread-%d'?%?i,?Spider())
??????????????????????for?i?in?range(10)]
????for?spider_thread?in?spider_threads:
????????spider_thread.start()
????while?redis_client.exists('m_sohu_task')?or?is_any_alive(spider_threads):
????????sleep(5)
????print('Over!')
if?__name__?==?'__main__':
????main()
