如何編碼檢查依賴關(guān)系是否有循環(huán)依賴
什么是永不過(guò)時(shí)的技能:算法思維。
之前做數(shù)據(jù)倉(cāng)庫(kù)的運(yùn)維,上線部署時(shí)需要處理很多任務(wù)的依賴關(guān)系,所謂任務(wù),就是一個(gè)一個(gè) shell 腳本或者存儲(chǔ)過(guò)程等批處理任務(wù),他們之間是有依賴關(guān)系的,由于數(shù)據(jù)倉(cāng)庫(kù)的任務(wù)超級(jí)多,約 3000 多個(gè)任務(wù),這么多的任務(wù)是無(wú)法使用一張有向無(wú)環(huán)圖來(lái)表示,因此依賴關(guān)系除了使用直觀的有向連線來(lái)配置,還使用了隱藏式的配置,就是依賴關(guān)系無(wú)法使用有向線條來(lái)直觀的看到。
既然看不到,就有可能出現(xiàn)循環(huán)依賴而不自知,只要有可能,就一定會(huì)有人犯錯(cuò),不是你就是他,不是今天就是未來(lái)某一天,這就是墨菲定律。這不,我就經(jīng)歷過(guò)。
調(diào)度平臺(tái)用的是先進(jìn)數(shù)通的 MoiaControl V5,這是我用過(guò)的最好的調(diào)度平臺(tái)了,之前用過(guò) ETlplus,Airflow。但 MoiaControl 中出現(xiàn)循環(huán)依賴并不提示,會(huì)導(dǎo)致第二天的任務(wù)不會(huì)跑批,影響數(shù)據(jù)的時(shí)效性。假如你準(zhǔn)備面試先進(jìn)數(shù)通這家公司,說(shuō)你可以為該產(chǎn)品增加一項(xiàng)檢查否有循環(huán)依賴的功能,我想這一定是個(gè)加分項(xiàng)。
那問(wèn)題來(lái)了,如何編碼檢查任務(wù)依賴關(guān)系是否有循環(huán)依賴?
答案很簡(jiǎn)單,就是構(gòu)造一個(gè)有向圖,進(jìn)行拓?fù)渑判颍绻負(fù)渑判蚝鬀](méi)有未訪問(wèn)的點(diǎn),那就沒(méi)有環(huán),否則就有環(huán)。
下面,我用 Python 來(lái)演示這一解決過(guò)程,帶你徹底掌握拓?fù)渑判颉?/p>
首先,我們需要借助一種數(shù)據(jù)結(jié)構(gòu)來(lái)表示有向圖,使用方便即可,這里,我使用字典來(lái)表示,比如表達(dá) a->b, a->c, c->d 這樣的依賴關(guān)系,我們可以構(gòu)造字典 edges = { 'a':{'b','c'},'c':{'d'} } 來(lái)表示。字典的鍵表示前驅(qū)任務(wù),字典的值是一個(gè)集合,表示依賴前驅(qū)的任務(wù)集合。這樣的字典可以借助于標(biāo)準(zhǔn)庫(kù)的 collections 來(lái)快速初始化:
edges?=?collections.defaultdict(set)
僅保存邊是不夠的,我們還需要保存頂點(diǎn),這可以借助一個(gè)集合,它可以自動(dòng)去重,后面看是否所有的任務(wù)節(jié)點(diǎn)都參與了拓?fù)渑判?,就靠它了?/p>
self.edges?=?collections.defaultdict(set)
vertex?=?set()
接下來(lái)就是拓?fù)渑判虻拇a實(shí)現(xiàn)了。
拓?fù)渑判蛞话銇?lái)說(shuō)有兩種思路,一種是廣度優(yōu)先遍歷,借助于先進(jìn)先出的隊(duì)列,一種是深度優(yōu)先遍歷,借助于后進(jìn)先出的棧。無(wú)論哪一種思路,都與入度和出度有關(guān)。下面分別進(jìn)行分析。
廣度優(yōu)先遍歷比較符合人的習(xí)慣思維,從前到后逐層推進(jìn)。它首先找出不被任何任務(wù)依賴的任務(wù)進(jìn)入隊(duì)列,哪一種任務(wù)不被任何任務(wù)依賴呢?比如 a->b->c ,a 就是不被任何任務(wù)依賴的任務(wù),這樣的任務(wù)有個(gè)特點(diǎn),就是入度為 0,沒(méi)有箭頭指向的任務(wù)的入度就是 0。
首先,我們計(jì)算所有節(jié)點(diǎn)的入度,把所有入度為 0 的任務(wù)依次放入隊(duì)列,然后開(kāi)始循環(huán)遍歷隊(duì)列,取出第一個(gè)任務(wù),記為 a,標(biāo)記為已訪問(wèn),同時(shí)將依賴于 a 的任務(wù)的入度都減少 1,如果減少 1 后入度為 0 的任務(wù)放入隊(duì)列。繼續(xù)循環(huán),直到所有的節(jié)點(diǎn)都被訪問(wèn)。如果循環(huán)結(jié)束,仍有節(jié)點(diǎn)未被遍歷,說(shuō)明存在循環(huán)依賴,無(wú)論如何他們的入度也不可能為 0。
以上的思路,翻譯成代碼,如下所示:
import?collections
class?CheckCycle(object):
????def?__init__(self):
????????self.vertex?=?set()??#?頂點(diǎn)集合
????????self.edges?=?collections.defaultdict(
????????????set
????????)??#?使用字典表示有向邊?如 a ->?{b,c,e}?表示 b,c,e 均依賴 a
????????self.indegree?=?collections.defaultdict(int)??#?計(jì)算每個(gè)頂點(diǎn)的入度
????def?add_edge(self,?from_job:?str,?to_job:?str)?->?bool:
????????"""
????????添加一條邊
????????"""
????????if?from_job?==?to_job:
????????????return?False
????????if?from_job:
????????????self.vertex.add(from_job)
????????????if?not?from_job?in?self.indegree:
????????????????self.indegree[from_job]?=?0??#?初始化入度為?0
????????if?to_job:
????????????self.vertex.add(to_job)
????????????if?not?to_job?in?self.indegree:??#?初始化入度為0
????????????????self.indegree[to_job]?=?0
????????if?from_job?and?to_job:
????????????if?to_job?not?in?self.edges[from_job]:??#?防止充分添加相同的邊
????????????????self.indegree[to_job]?+=?1??#?入度加 1
????????????????self.edges[from_job].add(to_job)??#?防止充分添加相同的邊
????????return?True
????def?can_finish(self)?->?bool:
????????"""
????????Returns:
????????????True:?表示沒(méi)有環(huán),任務(wù)可以完成
????????????False:?表示有環(huán),任務(wù)不可以完成
????????"""
????????q?=?collections.deque([u?for?u?in?self.indegree?if?self.indegree[u]?==?0])
????????visited?=?0
????????possible_sequence?=?[]
????????while?q:
????????????visited?+=?1
????????????u?=?q.popleft()
????????????possible_sequence.append(u)
????????????for?v?in?self.edges[u]:
????????????????self.indegree[v]?-=?1
????????????????if?self.indegree[v]?==?0:
????????????????????q.append(v)
????????print(f'possible?sequence:?{"->".join(possible_sequence)}')
????????return?visited?==?len(self.vertex)
if?__name__?==?"__main__":
????"""
????a->b->c
???????b->d
???????True
????a->b->c
???????b->d->a
???????False
????"""
????check_cycle?=?CheckCycle()
????check_cycle.add_edge(from_job="a",?to_job="b")
????check_cycle.add_edge(from_job="b",?to_job="c")
????check_cycle.add_edge(from_job="b",?to_job="d")
????print(check_cycle.can_finish())
????check_cycle?=?CheckCycle()
????check_cycle.add_edge(from_job="a",?to_job="b")
????check_cycle.add_edge(from_job="b",?to_job="c")
????check_cycle.add_edge(from_job="b",?to_job="d")
????check_cycle.add_edge(from_job="d",?to_job="a")
????print(check_cycle.can_finish())
時(shí)間復(fù)雜度和空間復(fù)雜度的分析同廣度優(yōu)先遍歷算法,都是 O(m+n), m 是頂點(diǎn)數(shù),n 是邊數(shù),不在贅述。
另一種方法就是深度優(yōu)先遍歷, 深度優(yōu)先遍歷則是一種逆向思維,為了簡(jiǎn)單的理解,先考慮沒(méi)有環(huán)的情況,如 a->b->c-d :從任意任務(wù)節(jié)點(diǎn)出發(fā),假如是 b,一條路走到黑,遍歷到最后一個(gè)節(jié)點(diǎn) d ,將其入棧,同時(shí)將已經(jīng)訪問(wèn)過(guò)的節(jié)點(diǎn)標(biāo)記為已訪問(wèn)(b,c 已訪問(wèn)),將已入棧的節(jié)點(diǎn)標(biāo)記為已完成(d 已完成),還沒(méi)有訪問(wèn)過(guò)的節(jié)點(diǎn)標(biāo)記為未訪問(wèn) (a 未訪問(wèn))。也就是說(shuō)任何一個(gè)節(jié)點(diǎn),只會(huì)有以下三種狀態(tài):
「未訪問(wèn)」:我們還沒(méi)有訪問(wèn)到這個(gè)節(jié)點(diǎn),使用 0 來(lái)表示。
「已訪問(wèn)」:我們?cè)L問(wèn)過(guò)這個(gè)節(jié)點(diǎn),但還沒(méi)有回溯到該節(jié)點(diǎn),即該節(jié)點(diǎn)還沒(méi)有入棧,還有相鄰的節(jié)點(diǎn)沒(méi)有完成,使用 1 來(lái)表示。
「已完成」:我們?cè)L問(wèn)過(guò)且回溯過(guò)這個(gè)節(jié)點(diǎn),即該節(jié)點(diǎn)已經(jīng)入棧,并且所有該節(jié)點(diǎn)的后續(xù)節(jié)點(diǎn)都出現(xiàn)在棧的更底部的位置,滿足拓?fù)渑判虻囊?,使?2 來(lái)表示。
現(xiàn)在回溯到 c,發(fā)現(xiàn) c 已訪問(wèn),且 c 的后續(xù)節(jié)點(diǎn) d 已經(jīng)完成,因此將 c 入棧,標(biāo)記為已完成,依次類推,現(xiàn)在,棧底到棧頂依次為 d,c,b。然后從剩余節(jié)點(diǎn) a 出發(fā),執(zhí)行同樣的邏輯,a 也入棧,標(biāo)記為完成,最終從棧底到棧頂為 d,c,b,a,將這些節(jié)點(diǎn)依次出棧,即為拓?fù)渑判颉?/p>
現(xiàn)在考慮有環(huán)的情況 a->b->c->d->b,訪問(wèn)到 d 時(shí),繼續(xù)訪問(wèn) b 發(fā)現(xiàn) b 已經(jīng)被訪問(wèn),說(shuō)明有環(huán),退出即可。根據(jù)上面的分析,不難寫出以下深度遍歷的代碼:
def?can_finish2(self)?->?bool:
????"""
????深度優(yōu)先遍歷
????Returns:
????????True:?表示沒(méi)有環(huán),任務(wù)可以完成
????????False:?表示有環(huán),任務(wù)不可以完成
????"""
????visited?=?collections.defaultdict(int)??#?保存每個(gè)頂點(diǎn)是否被訪問(wèn)過(guò)
????for?job?in?self.vertex:
????????visited[job]?=?0??#?初始化,均未被訪問(wèn)過(guò)
????result?=?list()??#?模擬棧
????valid?=?True
????def?dfs(from_job:?str):
????????nonlocal?valid
????????visited[from_job]?=?1
????????for?to_job?in?self.edges[from_job]:
????????????if?visited[to_job]?==?0:
????????????????dfs(to_job)
????????????????if?not?valid:
????????????????????return
????????????elif?visited[to_job]?==?1:
????????????????valid?=?False
????????????????return
????????visited[from_job]?=?2
????????result.append(from_job)
????for?job?in?self.vertex:
????????if?valid?and?not?visited[job]:
????????????dfs(job)
????#?print(result)
????return?valid
時(shí)間復(fù)雜度即為深度優(yōu)先遍歷或廣度優(yōu)先遍歷的時(shí)間復(fù)雜度,都為 O(m+n) ,其中 m 是頂點(diǎn)數(shù),n 是邊數(shù),對(duì)應(yīng)著任務(wù)數(shù)和任務(wù)的依賴數(shù)。
其實(shí)即使寫不出深度優(yōu)先或廣度優(yōu)先的代碼關(guān)系也不大,只有會(huì)靈活使用就行,網(wǎng)上都是現(xiàn)成的代碼,最重要的是要理解這些代碼,為我所用。
想使用代碼時(shí)不必辛苦的復(fù)制,回復(fù)「拓?fù)渑判颉公@取可執(zhí)行代碼。
感謝你的點(diǎn)贊支持。
