【Python私活案例】200元:Python實現(xiàn)批量文件的檢測和解壓縮
來活了!來活了!在學習群中灌水時發(fā)現(xiàn)螞蟻老師發(fā)布的任務(wù)需求中有個200大洋的大活沒人接單,還等什么,趕緊盤它!??!

一、需求分析
接活后的第一步肯定都是進行需求分析,來看看金主的需求是什么樣子的吧,
簡單梳理一下關(guān)鍵點:
輸入兩個信息:一個文件夾路徑、一個TXT文件。文件夾中是7z壓縮包文件,TXT中是需要解壓的文件名稱; 找到TXT中的7z文件,解壓并刪除壓縮包,同時需要使用多線程/多進程,提高解壓效率; TXT文件中沒有找到壓縮包的文件名,需要輸出一個TXT。 打包成exe可執(zhí)行文件
二、思路整理
由于需要解壓的文件非常多,非常大,需求方暫時還沒辦法提供測試用例,那么先按自己的理解把主要功能給實現(xiàn)一下先:
雖說需求中要求用遞歸處理子文件夾下的文件,但是,如果先用os模塊的walk功能,先遍歷一遍文件夾下所有的文件(包括子文件夾),似乎效果也是一樣的,所以,先考慮用遍歷,至于要不要遞歸,等有了具體測試用例了再說; 要輸出沒有找到壓縮包的文件名,那就把需要解壓的文件名與遍歷后文件夾后獲取的壓縮包文件名進行匹配,分成兩個list,找到壓縮包的和沒找到壓縮包的; 對找到壓縮包的文件的進行并發(fā)解壓(這里必須贊一下螞蟻老師,在需求里面直接把思路給出來了,后面就一碼平川了^_^) 最后,將沒找到壓縮包的文件輸出到TXT中。 最后的最后,再打包成exe可執(zhí)行文件,搞定?。?!
不過,思路是理好了,但是不會解壓縮7z文件、不會打包exe,多線程用的少,只有半桶水腫么辦?沒關(guān)系?。?!有螞蟻老師做后盾,把老師的教學視頻(Python并發(fā)編程,用多線程加速程序運行)拿出來復習復習,再加上萬能的百度,沒有搞不定的事情,加油奧利給!帶薪學習我們的動力是杠杠滴~~~
三、代碼實現(xiàn)
初始化類
class?UnZipFiles:
????def?__init__(self):
????????self.root_path?=?input('請輸入壓縮文件所在的路徑')??#?根目錄
????????self.zip_file_count?=?0??#?初始化zip文件的個數(shù)
????????self.zip_file_dict?=?{}??#?根目錄中所有的7z文件字典,其中key為去除擴展名后的文件名,value為list格式,為7z文件的完整路徑
????????self.txt_file?=?input('請輸入TXT文件的完整路徑')??#?需要解壓的TXT文件完整路徑
????????self.unzip_files?=?None??#?讀取txt_file后解析的需要解壓縮的文件list
????????self.not_found?=?[]??#?沒有找到的需要解壓的文件list
????????self.found_files?=?[]??#?有找到的需要解壓的文件list
先初始化一個類,把后續(xù)需要用到的屬性進行定義。分步執(zhí)行情況:

實現(xiàn)讀取TXT文件創(chuàng)建需要解壓文件名列表的方法
????def?read_txt_file(self,?code='utf-8'):
????????"""
????????讀取TXT文件,生成需要解壓的文件list
????????:param?code:?讀取文件使用的編碼格式
????????"""
????????with?open(self.txt_file,?'r',?encoding=code)?as?fin:
????????????self.unzip_files?=?fin.read().split('\n')
分步執(zhí)行情況:

實現(xiàn)讀取根目錄下所有文件并創(chuàng)建字典的方法
????def?get_all_zip_files(self):
????????"""
????????使用os.walk模塊遍歷根目錄下的所有7z文件,生成zip_file_dict
????????"""
????????for?path,?dirnames,?filenames?in?os.walk(self.root_path):
????????????for?filename?in?filenames:
????????????????if?filename.endswith('.7z')?and?not?filename.startswith('~'):
????????????????????self.zip_file_count?+=?1
????????????????????full_path?=?os.path.join(path,?filename)
????????????????????filename?=?filename.replace('.7z',?'')
????????????????????if?filename?in?self.zip_file_dict.keys():
????????????????????????self.zip_file_dict[filename].append(full_path)
????????????????????else:
????????????????????????self.zip_file_dict[filename]?=?[full_path]
這里使用os.walk方式,遍歷根目錄下所有文件,創(chuàng)建一個壓縮文件的字典,key使用去除擴展名后的文件名以便于TXT里的文件名進行匹配。這里特別說一下,第一稿的時候,在往字典值的list里append值時,居然犯了低級錯誤,寫成了“self.zip_file_dict[filename] = self.zip_file_dict[filename].append(full_path)”,要知道append是直接修改了list,并沒有返回值,導致出現(xiàn)重名文件的時候出錯;然后腦抽筋一直沒有反應(yīng)過來,調(diào)試了半天......(寶寶心里苦)
分步執(zhí)行情況:

檢查文件,分成找到的和沒找到的兩份
????def?check_files(self):
????????"""
????????檢查文件,遍歷unzip_files,與zip_file_dict的key匹配,有找到的放入found_files,沒找到的放入not_found
????????"""
????????for?file?in?set(self.unzip_files):??#?為預防TXT文件中存在重復文件名,用set集合去重一下
????????????if?file?in?self.zip_file_dict.keys():
????????????????for?x?in?self.zip_file_dict[file]:
????????????????????self.found_files.append(x)
????????????else:
????????????????self.not_found.append(file)
這里主要需要考慮存在不同文件夾中出現(xiàn)同名壓縮包的情形。分步執(zhí)行情況:

實現(xiàn)一個單獨解壓7z文件的方法
????def?upzip_file(self,?file_full_path):
????????"""
????????對單個7z文件進行解壓縮,解壓縮后的文件,放置在原7z文件所在路徑下,解壓成功后,刪除7z文件
????????:param?file_full_path:?需要傳入完整的文件路徑
????????:return:?True?or?False
????????"""
????????if?is_7zfile(file_full_path):
????????????try:
????????????????start_time?=?time.time()
????????????????with?SevenZipFile(file_full_path,?mode='r')?as?sevenZ_f:
????????????????????sevenZ_f.extractall(os.path.split(file_full_path)[0])??#?解壓到與壓縮包相同的目錄下
????????????????os.remove(file_full_path)??#?解壓成功后刪除文件
????????????????end_time?=?time.time()
????????????????print(f'解壓縮{file_full_path}文件成功,用時{round(end_time-start_time,?4)}秒')
????????????except?Exception?as?e:
????????????????print('Error?when?uncompress?file!?info:?',?e)
????????????????return?False
????????????else:
????????????????return?True
????????else:
????????????print('This?is?not?a?true?7z?file!')
????????????return?False
這一段是帶薪學習的成果了,主要Ctrl C + Ctrl V(^_^感謝萬能的百度)。不過其實理解起來也不難,使用了py7zr這個庫來解壓7z壓縮包,先判斷是否7z文件,再進行解壓,成功返回True,否則False。
進行批量并發(fā)解壓縮
????def?run_unzip(self,?max_workers=10):
????????"""
????????開啟多線程進行解壓縮
????????:param?max_workers:?開啟的最大線程數(shù),默認10個
????????"""
????????with?ThreadPoolExecutor(max_workers=max_workers)?as?pool:
????????????futures?=?[pool.submit(self.upzip_file,?file)?for?file?in?self.found_files]
螞蟻老師的教程(Python并發(fā)編程,用多線程加速程序運行)里學(抄)來的。
分步執(zhí)行情況:

輸出沒有匹配到壓縮包的文件名,格式為TXT文件
????def?write_not_found_file(self):
????????"""
????????當存在沒有找到的解壓文件時,寫入文件,文件存放在與txt_file相同的目錄下,命名為not_found_fifle.txt
????????"""
????????if?len(self.not_found)?>?0:
????????????txt_path?=?self.txt_file.replace('.txt',?'_not_found.txt')??#?將原TXT文件的完整路徑進行修改,增加“_not_found”部分作為輸出TXT文件的文件名
????????????with?open(txt_path,?'w',?encoding='utf-8')?as?fout:
????????????????fout.write('\n'.join(self.not_found))
按順序執(zhí)行各個方法
????def?run_program(self,?code='utf-8',?max_workers=10):
????????"""
????????按順序運行程序
????????:param?code:?讀取txt_file的編碼格式,默認utf-8
????????:param?max_workers:?開啟的線程數(shù),默認10線程
????????"""
????????start?=?time.time()
????????self.read_txt_file(code)??#?讀取txt文件,生成unzip_files
????????self.get_all_zip_files()??#?獲取根目錄下所有的7z文件字典
????????self.check_files()??#?檢查upzip_files是否都在字典中
????????self.run_unzip(max_workers)??#?開啟多線程進行解壓縮
????????self.write_not_found_file()??#?寫入未找到的文件
????????end?=?time.time()
????????cost_time?=?end?-?start
????????print(f'txt文件中共有{len(self.unzip_files)}個文件名,其中{len(self.unzip_files)-len(self.not_found)}個在文件夾中找到{len(self.found_files)}個7z文件,{len(self.not_found)}個沒有找到7z文件,注文件夾中共有{self.zip_file_count}個7z文件,不重復文件名共{len(self.zip_file_dict.keys())}個')
????????print(f'成功解壓{len(self.found_files)}個7z文件,有{len(self.not_found)}個文件沒有找到')
????????input(f'cost_time:{cost_time}秒,按回車鍵退出窗口')
執(zhí)行情況:

四、完整代碼
import?os
from?py7zr?import?is_7zfile,?SevenZipFile
from?concurrent.futures?import?ThreadPoolExecutor
import?time
class?UnZipFiles:
????def?__init__(self):
????????self.root_path?=?input('請輸入壓縮文件所在的路徑')??#?根目錄
????????self.zip_file_count?=?0??#?初始化zip文件的個數(shù)
????????self.zip_file_dict?=?{}??#?根目錄中所有的7z文件字典,其中key為去除擴展名后的文件名,value為list格式,為7z文件的完整路徑
????????self.txt_file?=?input('請輸入TXT文件的完整路徑')??#?需要解壓的TXT文件完整路徑
????????self.unzip_files?=?None??#?讀取txt_file后解析的需要解壓縮的文件list
????????self.not_found?=?[]??#?沒有找到的需要解壓的文件list
????????self.found_files?=?[]??#?有找到的需要解壓的文件list
????def?read_txt_file(self,?code='utf-8'):
????????"""
????????讀取TXT文件,生成需要解壓的文件list
????????:param?code:?讀取文件使用的編碼格式
????????"""
????????with?open(self.txt_file,?'r',?encoding=code)?as?fin:
????????????self.unzip_files?=?fin.read().split('\n')
????def?get_all_zip_files(self):
????????"""
????????使用os.walk模塊遍歷根目錄下的所有7z文件,生成zip_file_dict
????????"""
????????for?path,?dirnames,?filenames?in?os.walk(self.root_path):
????????????for?filename?in?filenames:
????????????????if?filename.endswith('.7z')?and?not?filename.startswith('~'):
????????????????????self.zip_file_count?+=?1
????????????????????full_path?=?os.path.join(path,?filename)
????????????????????filename?=?filename.replace('.7z',?'')
????????????????????if?filename?in?self.zip_file_dict.keys():
????????????????????????self.zip_file_dict[filename].append(full_path)
????????????????????else:
????????????????????????self.zip_file_dict[filename]?=?[full_path]
????def?check_files(self):
????????"""
????????檢查文件,遍歷unzip_files,與zip_file_dict的key匹配,有找到的放入found_files,沒找到的放入not_found
????????"""
????????for?file?in?set(self.unzip_files):??#為預防TXT文件中存在重復的文件名,加個set集合去重
????????????if?file?in?self.zip_file_dict.keys():
????????????????for?x?in?self.zip_file_dict[file]:
????????????????????self.found_files.append(x)
????????????else:
????????????????self.not_found.append(file)
????def?upzip_file(self,?file_full_path):
????????"""
????????對單個7z文件進行解壓縮,解壓縮后的文件,放置在原7z文件所在路徑下,解壓成功后,刪除7z文件
????????:param?file_full_path:?需要傳入完整的文件路徑
????????:return:?True?or?False
????????"""
????????if?is_7zfile(file_full_path):
????????????try:
????????????????start_time?=?time.time()
????????????????with?SevenZipFile(file_full_path,?mode='r')?as?sevenZ_f:
????????????????????sevenZ_f.extractall(os.path.split(file_full_path)[0])
????????????????os.remove(file_full_path)
????????????????end_time?=?time.time()
????????????????print(f'解壓縮{file_full_path}文件成功,用時{round(end_time-start_time,?4)}秒')
????????????except?Exception?as?e:
????????????????print('Error?when?uncompress?file!?info:?',?e)
????????????????return?False
????????????else:
????????????????return?True
????????else:
????????????print('This?is?not?a?true?7z?file!')
????????????return?False
????def?run_unzip(self,?max_workers=10):
????????"""
????????開啟多線程進行解壓縮
????????:param?max_workers:?開啟的最大線程數(shù),默認10個
????????"""
????????with?ThreadPoolExecutor(max_workers=max_workers)?as?pool:
????????????futures?=?[pool.submit(self.upzip_file,?file)?for?file?in?self.found_files]
????def?write_not_found_file(self):
????????"""
????????當存在沒有找到的解壓文件時,寫入文件,文件存放在與txt_file相同的目錄下,命名為not_found_fifle.txt
????????"""
????????if?len(self.not_found)?>?0:
????????????#?txt_path?=?os.path.split(self.txt_file)[0]
????????????txt_path?=?self.txt_file.replace('.txt',?'_not_found.txt')
????????????with?open(txt_path,?'w',?encoding='utf-8')?as?fout:
????????????????fout.write('\n'.join(self.not_found))
????def?run_program(self,?code='utf-8',?max_workers=10):
????????"""
????????按順序運行程序
????????:param?code:?讀取txt_file的編碼格式,默認utf-8
????????:param?max_workers:?開啟的線程數(shù),默認10線程
????????"""
????????start?=?time.time()
????????self.read_txt_file(code)??#?讀取txt文件,生成unzip_files
????????self.get_all_zip_files()??#?獲取根目錄下所有的7z文件字典
????????self.check_files()??#?檢查upzip_files是否都在字典中
????????self.run_unzip(max_workers)??#?開啟多線程進行解壓縮
????????self.write_not_found_file()??#?寫入未找到的文件
????????end?=?time.time()
????????cost_time?=?end?-?start
????????print(f'txt文件中共有{len(self.unzip_files)}個文件名,其中{len(self.unzip_files)-len(self.not_found)}個在文件夾中找到{len(self.found_files)}個7z文件,{len(self.not_found)}個沒有找到7z文件,注文件夾中共有{self.zip_file_count}個7z文件,不重復文件名共{len(self.zip_file_dict.keys())}個')
????????print(f'成功解壓{len(self.found_files)}個7z文件,有{len(self.not_found)}個文件沒有找到')
????????input(f'cost_time:{cost_time}秒,按回車鍵退出窗口')
if?__name__?==?'__main__':
????obj?=?UnZipFiles()
????obj.run_program()
使用代碼跑了一個需求方給的一個示例文件夾,里面上千個7z文件,下載并解壓了其中735個,已經(jīng)解壓出53.5G的文件,幾乎撐爆我的筆記本硬盤,只好放棄后續(xù)測試了。


五、打包成exe可執(zhí)行文件
功能成功實現(xiàn)之后就可以打包成exe可執(zhí)行文件了,這個部分使用到了pyinstaller,雖然一開始不會,但是查閱了一些文檔后,還是很簡單的,網(wǎng)絡(luò)上有很多教程就不重復寫了,唯一需要注意的是:如果希望打包出來的exe執(zhí)行文件盡可能的小,那么就需要創(chuàng)建虛擬環(huán)境,引入最少的包。個人試了一下,在anaconda環(huán)境下打包出來215M,在虛擬環(huán)境下只安裝需要的包打包只有9M。
交稿
等待回復的空余時間,給螞蟻老師投個稿,掙點稿費改善生活......不不不,我這是為了總結(jié)經(jīng)驗、加深印象、穩(wěn)固知識......
曬單
最后還是要曬一下單,小金庫又充實了,距離財富自由又邁進了一小步^_^

