1. <strong id="7actg"></strong>
    2. <table id="7actg"></table>

    3. <address id="7actg"></address>
      <address id="7actg"></address>
      1. <object id="7actg"><tt id="7actg"></tt></object>

        并發(fā)下載(多線程和多進(jìn)程補(bǔ)充知識(shí)點(diǎn) )

        共 7351字,需瀏覽 15分鐘

         ·

        2021-10-14 15:49

        并發(fā)下載

        多線程和多進(jìn)程補(bǔ)充知識(shí)點(diǎn)

        threading.local類

        使用線程時(shí)最不愿意遇到的情況就是多個(gè)線程競爭資源,在這種情況下為了保證資源狀態(tài)的正確性,我們可能需要對資源進(jìn)行加鎖保護(hù)的處理,這一方面會(huì)導(dǎo)致程序失去并發(fā)性,另外如果多個(gè)線程競爭多個(gè)資源時(shí),還有可能因?yàn)榧渔i方式的不當(dāng)導(dǎo)致死鎖。要解決多個(gè)線程競爭資源的問題,其中一個(gè)方案就是讓每個(gè)線程都持有資源的副本(拷貝),這樣每個(gè)線程可以操作自己所持有的資源,從而規(guī)避對資源的競爭。

        要實(shí)現(xiàn)將資源和持有資源的線程進(jìn)行綁定的操作,最簡單的做法就是使用threading模塊的local類,在網(wǎng)絡(luò)爬蟲開發(fā)中,就可以使用local類為每個(gè)線程綁定一個(gè)MySQL數(shù)據(jù)庫連接或Redis客戶端對象,這樣通過線程可以直接獲得這些資源,既解決了資源競爭的問題,又避免了在函數(shù)和方法調(diào)用時(shí)傳遞這些資源。具體的請參考本章多線程爬取“手機(jī)搜狐網(wǎng)”(Redis版)的實(shí)例代碼。

        concurrent.futures模塊

        Python3.2帶來了concurrent.futures?模塊,這個(gè)模塊包含了線程池和進(jìn)程池、管理并行編程任務(wù)、處理非確定性的執(zhí)行流程、進(jìn)程/線程同步等功能。關(guān)于這部分的內(nèi)容推薦大家閱讀《Python并行編程》。

        分布式進(jìn)程

        使用多進(jìn)程的時(shí)候,可以將進(jìn)程部署在多個(gè)主機(jī)節(jié)點(diǎn)上,Python的multiprocessing模塊不但支持多進(jìn)程,其中managers子模塊還支持把多進(jìn)程部署到多個(gè)節(jié)點(diǎn)上。當(dāng)然,要部署分布式進(jìn)程,首先需要一個(gè)服務(wù)進(jìn)程作為調(diào)度者,進(jìn)程之間通過網(wǎng)絡(luò)進(jìn)行通信來實(shí)現(xiàn)對進(jìn)程的控制和調(diào)度,由于managers模塊已經(jīng)對這些做出了很好的封裝,因此在無需了解網(wǎng)絡(luò)通信細(xì)節(jié)的前提下,就可以編寫分布式多進(jìn)程應(yīng)用。具體的請參照本章分布式多進(jìn)程爬取“手機(jī)搜狐網(wǎng)”的實(shí)例代碼。

        協(xié)程和異步I/O

        協(xié)程的概念

        協(xié)程(coroutine)通常又稱之為微線程或纖程,它是相互協(xié)作的一組子程序(函數(shù))。所謂相互協(xié)作指的是在執(zhí)行函數(shù)A時(shí),可以隨時(shí)中斷去執(zhí)行函數(shù)B,然后又中斷繼續(xù)執(zhí)行函數(shù)A。注意,這一過程并不是函數(shù)調(diào)用(因?yàn)闆]有調(diào)用語句),整個(gè)過程看似像多線程,然而協(xié)程只有一個(gè)線程執(zhí)行。協(xié)程通過yield關(guān)鍵字和 send()操作來轉(zhuǎn)移執(zhí)行權(quán),協(xié)程之間不是調(diào)用者與被調(diào)用者的關(guān)系。

        協(xié)程的優(yōu)勢在于以下兩點(diǎn):

        1. 執(zhí)行效率極高,因?yàn)樽映绦颍ê瘮?shù))切換不是線程切換,由程序自身控制,沒有切換線程的開銷。
        2. 不需要多線程的鎖機(jī)制,因?yàn)橹挥幸粋€(gè)線程,也不存在競爭資源的問題,當(dāng)然也就不需要對資源加鎖保護(hù),因此執(zhí)行效率高很多。

        「說明」:協(xié)程適合處理的是I/O密集型任務(wù),處理CPU密集型任務(wù)并不是它擅長的,如果要提升CPU的利用率可以考慮“多進(jìn)程+多線程”或者“多進(jìn)程+協(xié)程”的工作模式。

        歷史回顧

        1. Python 2.2:第一次提出了生成器(最初稱之為迭代器)的概念(PEP 255)。
        2. Python 2.5:引入了將對象發(fā)送回暫停了的生成器這一特性即生成器的send()方法(PEP 342)。
        3. Python 3.3:添加了yield from特性,允許從迭代器中返回任何值(注意生成器本身也是迭代器),這樣我們就可以串聯(lián)生成器并且重構(gòu)出更好的生成器。
        4. Python 3.4:引入asyncio.coroutine裝飾器用來標(biāo)記作為協(xié)程的函數(shù),協(xié)程函數(shù)和asyncio及其事件循環(huán)一起使用,來實(shí)現(xiàn)異步I/O操作。
        5. Python 3.5:引入了asyncawait,可以使用async def來定義一個(gè)協(xié)程函數(shù),這個(gè)函數(shù)中不能包含任何形式的yield語句,但是可以使用returnawait從協(xié)程中返回值。

        協(xié)程實(shí)現(xiàn)了協(xié)作式并發(fā),通過提高CPU的利用率來達(dá)到改善性能的目的。著名的三方庫aiohttp就是通過協(xié)程的方式實(shí)現(xiàn)了HTTP客戶端和HTTP服務(wù)器的功能,較之requests有更好的獲取數(shù)據(jù)的性能,有興趣可以閱讀它的官方文檔。

        import?asyncio
        import?aiohttp


        async?def?download(url):
        ????print('Fetch:',?url)
        ????async?with?aiohttp.ClientSession()?as?session:
        ????????async?with?session.get(url,?ssl=False)?as?resp:
        ????????????print(url,?'--->',?resp.status)
        ????????????print(url,?'--->',?resp.headers)
        ????????????print('\n\n',?await?resp.text())


        def?main():
        ????loop?=?asyncio.get_event_loop()
        ????urls?=?[
        ????????'https://www.baidu.com',
        ????????'http://www.sohu.com/',
        ????????'http://www.sina.com.cn/',
        ????????'https://www.taobao.com/',
        ????????'http://jd.com/'
        ????]
        ????tasks?=?[download(url)?for?url?in?urls]
        ????loop.run_until_complete(asyncio.wait(tasks))
        ????loop.close()


        if?__name__?==?'__main__':
        ????main()

        實(shí)例 - 多線程爬取“手機(jī)搜狐網(wǎng)”所有頁面

        下面我們把之間講的所有知識(shí)結(jié)合起來,用面向?qū)ο蟮姆绞綄?shí)現(xiàn)一個(gè)爬取“手機(jī)搜狐網(wǎng)”的多線程爬蟲。

        import?pickle
        import?zlib
        from?enum?import?Enum,?unique
        from?hashlib?import?sha1
        from?random?import?random
        from?threading?import?Thread,?current_thread,?local
        from?time?import?sleep
        from?urllib.parse?import?urlparse

        import?pymongo
        import?redis
        import?requests
        from?bs4?import?BeautifulSoup
        from?bson?import?Binary


        @unique
        class?SpiderStatus(Enum):
        ????IDLE?=?0
        ????WORKING?=?1


        def?decode_page(page_bytes,?charsets=('utf-8',)):
        ????page_html?=?None
        ????for?charset?in?charsets:
        ????????try:
        ????????????page_html?=?page_bytes.decode(charset)
        ????????????break
        ????????except?UnicodeDecodeError:
        ????????????pass
        ????return?page_html


        class?Retry(object):

        ????def?__init__(self,?*,?retry_times=3,
        ?????????????????wait_secs=5,?errors=(Exception,?))
        :

        ????????self.retry_times?=?retry_times
        ????????self.wait_secs?=?wait_secs
        ????????self.errors?=?errors

        ????def?__call__(self,?fn):

        ????????def?wrapper(*args,?**kwargs):
        ????????????for?_?in?range(self.retry_times):
        ????????????????try:
        ????????????????????return?fn(*args,?**kwargs)
        ????????????????except?self.errors?as?e:
        ????????????????????print(e)
        ????????????????????sleep((random()?+?1)?*?self.wait_secs)
        ????????????return?None

        ????????return?wrapper


        class?Spider(object):

        ????def?__init__(self):
        ????????self.status?=?SpiderStatus.IDLE

        ????@Retry()
        ????def?fetch(self,?current_url,?*,?charsets=('utf-8',?),
        ??????????????user_agent=None,?proxies=None)
        :

        ????????thread_name?=?current_thread().name
        ????????print(f'[{thread_name}]:?{current_url}')
        ????????headers?=?{'user-agent':?user_agent}?if?user_agent?else?{}
        ????????resp?=?requests.get(current_url,
        ????????????????????????????headers=headers,?proxies=proxies)
        ????????return?decode_page(resp.content,?charsets)?\
        ????????????if?resp.status_code?==?200?else?None

        ????def?parse(self,?html_page,?*,?domain='m.sohu.com'):
        ????????soup?=?BeautifulSoup(html_page,?'lxml')
        ????????for?a_tag?in?soup.body.select('a[href]'):
        ????????????parser?=?urlparse(a_tag.attrs['href'])
        ????????????scheme?=?parser.scheme?or?'http'
        ????????????netloc?=?parser.netloc?or?domain
        ????????????if?scheme?!=?'javascript'?and?netloc?==?domain:
        ????????????????path?=?parser.path
        ????????????????query?=?'?'?+?parser.query?if?parser.query?else?''
        ????????????????full_url?=?f'{scheme}://{netloc}{path}{query}'
        ????????????????redis_client?=?thread_local.redis_client
        ????????????????if?not?redis_client.sismember('visited_urls',?full_url):
        ????????????????????redis_client.rpush('m_sohu_task',?full_url)

        ????def?extract(self,?html_page):
        ????????pass

        ????def?store(self,?data_dict):
        ????????#?redis_client?=?thread_local.redis_client
        ????????#?mongo_db?=?thread_local.mongo_db
        ????????pass


        class?SpiderThread(Thread):

        ????def?__init__(self,?name,?spider):
        ????????super().__init__(name=name,?daemon=True)
        ????????self.spider?=?spider

        ????def?run(self):
        ????????redis_client?=?redis.Redis(host='1.2.3.4',?port=6379,?password='1qaz2wsx')
        ????????mongo_client?=?pymongo.MongoClient(host='1.2.3.4',?port=27017)
        ????????thread_local.redis_client?=?redis_client
        ????????thread_local.mongo_db?=?mongo_client.msohu?
        ????????while?True:
        ????????????current_url?=?redis_client.lpop('m_sohu_task')
        ????????????while?not?current_url:
        ????????????????current_url?=?redis_client.lpop('m_sohu_task')
        ????????????self.spider.status?=?SpiderStatus.WORKING
        ????????????current_url?=?current_url.decode('utf-8')
        ????????????if?not?redis_client.sismember('visited_urls',?current_url):
        ????????????????redis_client.sadd('visited_urls',?current_url)
        ????????????????html_page?=?self.spider.fetch(current_url)
        ????????????????if?html_page?not?in?[None,?'']:
        ????????????????????hasher?=?hasher_proto.copy()
        ????????????????????hasher.update(current_url.encode('utf-8'))
        ????????????????????doc_id?=?hasher.hexdigest()
        ????????????????????sohu_data_coll?=?mongo_client.msohu.webpages
        ????????????????????if?not?sohu_data_coll.find_one({'_id':?doc_id}):
        ????????????????????????sohu_data_coll.insert_one({
        ????????????????????????????'_id':?doc_id,
        ????????????????????????????'url':?current_url,
        ????????????????????????????'page':?Binary(zlib.compress(pickle.dumps(html_page)))
        ????????????????????????})
        ????????????????????self.spider.parse(html_page)
        ????????????self.spider.status?=?SpiderStatus.IDLE


        def?is_any_alive(spider_threads):
        ????return?any([spider_thread.spider.status?==?SpiderStatus.WORKING
        ????????????????for?spider_thread?in?spider_threads])


        thread_local?=?local()
        hasher_proto?=?sha1()


        def?main():
        ????redis_client?=?redis.Redis(host='1.2.3.4',?port=6379,?password='1qaz2wsx')
        ????if?not?redis_client.exists('m_sohu_task'):
        ????????redis_client.rpush('m_sohu_task',?'http://m.sohu.com/')

        ????spider_threads?=?[SpiderThread('thread-%d'?%?i,?Spider())
        ??????????????????????for?i?in?range(10)]
        ????for?spider_thread?in?spider_threads:
        ????????spider_thread.start()

        ????while?redis_client.exists('m_sohu_task')?or?is_any_alive(spider_threads):
        ????????sleep(5)

        ????print('Over!')


        if?__name__?==?'__main__':
        ????main()


        瀏覽 40
        點(diǎn)贊
        評論
        收藏
        分享

        手機(jī)掃一掃分享

        分享
        舉報(bào)
        評論
        圖片
        表情
        推薦
        點(diǎn)贊
        評論
        收藏
        分享

        手機(jī)掃一掃分享

        分享
        舉報(bào)
        1. <strong id="7actg"></strong>
        2. <table id="7actg"></table>

        3. <address id="7actg"></address>
          <address id="7actg"></address>
          1. <object id="7actg"><tt id="7actg"></tt></object>
            我要操逼网站 | 女人18片毛片90分钟视频播放 | 自拍偷拍亚洲 | 在线播放中文字幕 | 国产又粗又大又长 | 豆花无码一区二区三区 | 国产男女乱淫视频高清免费 | 国模极品美軳人体销魂 | 久热精品在线观看 | 美女一级毛片老司机 |