1. <strong id="7actg"></strong>
    2. <table id="7actg"></table>

    3. <address id="7actg"></address>
      <address id="7actg"></address>
      1. <object id="7actg"><tt id="7actg"></tt></object>

        用 Python 高效處理大文件

        共 5298字,需瀏覽 11分鐘

         ·

        2022-07-27 05:00


        為了進(jìn)行并行處理,我們將任務(wù)劃分為子單元。它增加了程序處理的作業(yè)數(shù)量,減少了整體處理時間。

        例如,如果你正在處理一個大的CSV文件,你想修改一個單列。我們將把數(shù)據(jù)以數(shù)組的形式輸入函數(shù),它將根據(jù)可用的進(jìn)程數(shù)量,一次并行處理多個值。這些進(jìn)程是基于你的處理器內(nèi)核的數(shù)量。

        在這篇文章中,我們將學(xué)習(xí)如何使用multiprocessing、joblibtqdm Python包減少大文件的處理時間。這是一個簡單的教程,可以適用于任何文件、數(shù)據(jù)庫、圖像、視頻和音頻。

        開始

        我們將使用來自 Kaggle 的 US Accidents (2016 - 2021) 數(shù)據(jù)集,它包括280萬條記錄和47個列。

        https://www.kaggle.com/datasets/sobhanmoosavi/us-accidents

        我們將導(dǎo)入multiprocessing、joblibtqdm用于并行處理,pandas用于數(shù)據(jù)導(dǎo)入,re、nltkstring用于文本處理。

        # Parallel Computing import multiprocessing as mp
        from joblib import Parallel, delayed
        from tqdm.notebook import tqdm
        # Data Ingestion
        import pandas as pd
        # Text Processing
        import re
        from nltk.corpus import stopwords
        import string

        在我們開始之前,讓我們通過加倍cpu_count()來設(shè)置n_workers。正如你所看到的,我們有8個workers

        n_workers = 2 * mp.cpu_count()
        print(f"{n_workers} workers are available")
        >>> 8 workers are available

        下一步,我們將使用pandas read_csv函數(shù)讀取大型CSV文件。然后打印出dataframe的形狀、列的名稱和處理時間。

        %%timefile_name="../input/us-accidents/US_Accidents_Dec21_updated.csv"df = pd.read_csv(file_name)
        print(f"Shape:{df.shape}\n\nColumn Names:\n{df.columns}\n")

        輸出:

        Shape:(2845342, 47)
        Column Names:
        Index(['ID', 'Severity', 'Start_Time', 'End_Time', 'Start_Lat', 'Start_Lng','End_Lat', 'End_Lng', 'Distance(mi)', 'Description', 'Number', 'Street','Side', 'City', 'County', 'State', 'Zipcode', 'Country', 'Timezone','Airport_Code', 'Weather_Timestamp', 'Temperature(F)', 'Wind_Chill(F)','Humidity(%)', 'Pressure(in)', 'Visibility(mi)', 'Wind_Direction','Wind_Speed(mph)', 'Precipitation(in)', 'Weather_Condition', 'Amenity','Bump', 'Crossing', 'Give_Way', 'Junction', 'No_Exit', 'Railway','Roundabout', 'Station', 'Stop', 'Traffic_Calming', 'Traffic_Signal','Turning_Loop', 'Sunrise_Sunset', 'Civil_Twilight', 'Nautical_Twilight','Astronomical_Twilight'],dtype='object')
        CPU times: user 33.9 s, sys: 3.93 s, total: 37.9 sWall time: 46.9 s

        處理文本

        clean_text是一個用于處理文本的簡單函數(shù)。我們將使用nltk.copus獲得英語停止詞,并使用它來過濾掉文本行中的停止詞。之后,我們將刪除句子中的特殊字符和多余的空格。它將成為確定串行、并行和批處理的處理時間的基準(zhǔn)函數(shù)。

        def clean_text(text):   # Remove stop words  stops = stopwords.words("english")  text = " ".join([word for word in text.split() if word  not in stops])  # Remove Special Characters  text = text.translate(str.maketrans('', '', string.punctuation))  # removing the extra spaces  text = re.sub(' +',' ', text)  return text

        串行處理

        對于串行處理,我們可以使用pandas的.apply()函數(shù),但是如果你想看到進(jìn)度條,你需要為pandas激活tqdm,然后使用.progress_apply()函數(shù)。

        我們將處理280萬條記錄,并將結(jié)果保存回 “Description” 列中。

        %%timetqdm.pandas()
        df['Description'] = df['Description'].progress_apply(clean_text)

        輸出

        高端處理器串行處理280萬行花了9分5秒。

        100% ???????? 2845342/2845342 [09:05<00:00, 5724.25it/s]
        CPU times: user 8min 14s, sys: 53.6 s, total: 9min 7sWall time: 9min 5s

        多進(jìn)程處理

        有多種方法可以對文件進(jìn)行并行處理,我們將了解所有這些方法。multiprocessing是一個內(nèi)置的python包,通常用于并行處理大型文件。

        我們將創(chuàng)建一個有8個workers的多處理池,并使用map函數(shù)來啟動進(jìn)程。為了顯示進(jìn)度條,我們將使用tqdm

        map函數(shù)由兩部分組成。第一個部分需要函數(shù),第二個部分需要一個參數(shù)或參數(shù)列表。

        %%timep = mp.Pool(n_workers) 
        df['Description'] = p.map(clean_text,tqdm(df['Description']))

        輸出

        我們的處理時間幾乎提高了3倍。處理時間從9分5秒下降到3分51秒。

        100% ???????? 2845342/2845342 [02:58<00:00, 135646.12it/s]
        CPU times: user 5.68 s, sys: 1.56 s, total: 7.23 sWall time: 3min 51s

        并行處理

        我們現(xiàn)在將學(xué)習(xí)另一個Python包來執(zhí)行并行處理。在本節(jié)中,我們將使用joblibParalleldelayed來復(fù)制map函數(shù)。

        • Parallel需要兩個參數(shù):n_job = 8backend = multiprocessing。

        • 然后,我們將在delayed函數(shù)中加入clean_text。

        • 創(chuàng)建一個循環(huán),每次輸入一個值。

        下面的過程是相當(dāng)通用的,你可以根據(jù)你的需要修改你的函數(shù)和數(shù)組。我曾用它來處理成千上萬的音頻和視頻文件,沒有任何問題。

        建議:使用 "try: ""except: "添加異常處理。

        def text_parallel_clean(array):  result = Parallel(n_jobs=n_workers,backend="multiprocessing")(  delayed(clean_text)  (text)   for text in tqdm(array)  )  return result

        text_parallel_clean()中添加“Description”列。

        %%timedf['Description'] = text_parallel_clean(df['Description'])

        輸出

        我們的函數(shù)比多進(jìn)程處理Pool多花了13秒。即使如此,并行處理也比串行處理快4分59秒。

        100% ???????? 2845342/2845342 [04:03<00:00, 10514.98it/s]
        CPU times: user 44.2 s, sys: 2.92 s, total: 47.1 sWall time: 4min 4s

        并行批量處理

        有一個更好的方法來處理大文件,就是把它們分成若干批,然后并行處理。讓我們從創(chuàng)建一個批處理函數(shù)開始,該函數(shù)將在單一批次的值上運(yùn)行clean_function。

        批量處理函數(shù)

        def proc_batch(batch):  return [  clean_text(text)  for text in batch  ]

        將文件分割成批

        下面的函數(shù)將根據(jù)workers的數(shù)量把文件分成多個批次。在我們的例子中,我們得到8個批次。

        def batch_file(array,n_workers):  file_len = len(array)  batch_size = round(file_len / n_workers)  batches = [  array[ix:ix+batch_size]  for ix in tqdm(range(0, file_len, batch_size))  ]  return batches
        batches = batch_file(df['Description'],n_workers)
        >>> 100% 8/8 [00:00<00:00, 280.01it/s]

        運(yùn)行并行批處理

        最后,我們將使用Paralleldelayed來處理批次。

        %%timebatch_output = Parallel(n_jobs=n_workers,backend="multiprocessing")(  delayed(proc_batch)  (batch)   for batch in tqdm(batches)  )
        df['Description'] = [j for i in batch_output for j in i]

        輸出

        我們已經(jīng)改善了處理時間。這種技術(shù)在處理復(fù)雜數(shù)據(jù)和訓(xùn)練深度學(xué)習(xí)模型方面非常有名。

        100% ???????? 8/8 [00:00<00:00, 2.19it/s]
        CPU times: user 3.39 s, sys: 1.42 s, total: 4.81 sWall time: 3min 56s

        tqdm 并發(fā)

        tqdm將多處理帶到了一個新的水平。它簡單而強(qiáng)大。

        process_map需要:

        • 函數(shù)名稱
        • Dataframe 列名
        • max_workers
        • chucksize與批次大小類似。我們將用workers的數(shù)量來計算批處理的大小,或者你可以根據(jù)你的喜好來添加這個數(shù)字。
        %%timefrom tqdm.contrib.concurrent import process_mapbatch = round(len(df)/n_workers)
        df['Description'] = process_map(clean_text,df['Description'], max_workers=n_workers, chunksize=batch)

        輸出

        通過一行代碼,我們得到了最好的結(jié)果:

        100% ???????? 2845342/2845342 [03:48<00:00, 1426320.93it/s]
        CPU times: user 7.32 s, sys: 1.97 s, total: 9.29 sWall time: 3min 51s

        結(jié)論

        我們需要找到一個平衡點(diǎn),它可以是串行處理,并行處理,或批處理。如果你正在處理一個較小的、不太復(fù)雜的數(shù)據(jù)集,并行處理可能會適得其反。

        在這個教程中,我們已經(jīng)了解了各種處理大文件的Python包,它們允許我們對數(shù)據(jù)函數(shù)進(jìn)行并行處理。

        如果你只處理一個表格數(shù)據(jù)集,并且想提高處理性能,那么建議你嘗試DaskdatatableRAPIDS。

        - 點(diǎn)擊下方閱讀原文加入社區(qū)會員 

        瀏覽 69
        點(diǎn)贊
        評論
        收藏
        分享

        手機(jī)掃一掃分享

        分享
        舉報
        評論
        圖片
        表情
        推薦
        點(diǎn)贊
        評論
        收藏
        分享

        手機(jī)掃一掃分享

        分享
        舉報
        1. <strong id="7actg"></strong>
        2. <table id="7actg"></table>

        3. <address id="7actg"></address>
          <address id="7actg"></address>
          1. <object id="7actg"><tt id="7actg"></tt></object>
            九九黄色 | 美女网黄射 | 欧美精品中文字幕在线观看 | 男女肉粗暴进来动态图 | 日本无套内射视频 | 国产成人精品无码高潮 | 一到本在线视频无码 | 国产suv一区二区三区 | 9ⅰ精品久久久久久久久中文字幕 | 色戒未删减版在线观看免费播放 |