分布式場景怎么Join?
程序員的成長之路互聯(lián)網(wǎng)/程序員/技術(shù)/資料共享 關(guān)注
閱讀本文大概需要 6 分鐘。
來自: blog.csdn.net/weixin_56270372/article/details/135936319
背景
最近在閱讀查詢優(yōu)化器的論文,發(fā)現(xiàn)System R中對(duì)于Join操作的定義一般分為了兩種,即嵌套循環(huán)、排序-合并聯(lián)接??紤]到我的領(lǐng)域是在處理分庫分表或者其他的分區(qū)模式,這讓我開始不由得聯(lián)想我們?cè)趺丛诜植际綀鼍皯?yīng)用這個(gè)Join邏輯,對(duì)于兩個(gè)不同庫里面的不同表我們是沒有辦法直接進(jìn)行Join操作的。查閱資料后發(fā)現(xiàn)原來早有定義,即分布式聯(lián)接算法。分布式聯(lián)接算法
跨界點(diǎn)處理數(shù)據(jù)即分布式聯(lián)接算法,常見的有四種模型:Shuffle Join(洗牌聯(lián)接)、Broadcast Join(廣播聯(lián)接)、MapReduce Join(MapReduce聯(lián)接)、Sort-Merge Join(排序-合并聯(lián)接)。接下來將進(jìn)行逐一了解與分析,以便后續(xù)開發(fā)的應(yīng)用。
Shuffle Join(洗牌聯(lián)接)
先上原理解釋:
“Shuffle Join的核心思想是將來自不同節(jié)點(diǎn)的數(shù)據(jù)重新分發(fā)(洗牌),使得可以聯(lián)接的數(shù)據(jù)行最終位于同一個(gè)節(jié)點(diǎn)上。
“通常,對(duì)于要聯(lián)接的兩個(gè)表,會(huì)對(duì)聯(lián)接鍵應(yīng)用相同的哈希函數(shù),哈希函數(shù)的結(jié)果決定了數(shù)據(jù)行應(yīng)該被發(fā)送到哪個(gè)節(jié)點(diǎn)。這樣,所有具有相同哈希值的行都會(huì)被送到同一個(gè)節(jié)點(diǎn),然后在該節(jié)點(diǎn)上執(zhí)行聯(lián)接操作。可能解釋完還是有點(diǎn)模糊,舉個(gè)例子,有兩張表,分別以id字段進(jìn)行分庫操作,且哈希算法相同(為了簡單,這里只介紹分庫場景,分庫分表同理。算法有很多種,這里舉例是hash算法),那么這兩張表的分片或許可以在同一個(gè)物理庫中,這樣我們不需要做大表維度的處理,我們可以直接下推Join操作到對(duì)應(yīng)的物理庫操作即可。在
ShardingSphere中,這種場景類似于綁定表的定義,如果兩張表的算法相同,可以直接配置綁定表的關(guān)系,進(jìn)行相同算法的連接查詢,避免復(fù)雜的笛卡爾積。這樣做的好處是可以盡量下推到數(shù)據(jù)庫操作,在中間件層面我們可以做并行處理,適合大規(guī)模的數(shù)據(jù)操作。但是,這很理想,有多少表會(huì)采用相同算法處理呢。
Broadcast Join(廣播聯(lián)接)
先上原理解釋:“當(dāng)一個(gè)表的大小相對(duì)較小時(shí),可以將這個(gè)小表的全部數(shù)據(jù)廣播到所有包含另一個(gè)表數(shù)據(jù)的節(jié)點(diǎn)上。
“每個(gè)節(jié)點(diǎn)上都有小表的完整副本,因此可以獨(dú)立地與本地的大表數(shù)據(jù)進(jìn)行聯(lián)接操作,而不需要跨節(jié)點(diǎn)通信。舉個(gè)例子,有一張非常小的表A,還有一張按照ID分片的表B,我們可以在每一個(gè)物理庫中復(fù)制一份表A,這樣我們的Join操作就可以直接下推到每一個(gè)數(shù)據(jù)庫操作了。這種情況比Shuffle Join甚至還有性能高效,這種類似于
ShardingSphere中的廣播表的定義,其存在類似于字典表,在每一個(gè)數(shù)據(jù)庫都同時(shí)存在一份,每次寫入會(huì)同步到多個(gè)節(jié)點(diǎn)。這種操作的好處顯而易見,不僅支持并行操作而且性能極佳。但是缺點(diǎn)也顯而易見,如果小表不夠小數(shù)據(jù)冗余不說,廣播可能會(huì)消耗大量的網(wǎng)絡(luò)帶寬和資源。
MapReduce Join(MapReduce聯(lián)接)
先上原理解釋:MapReduce是一種編程模型,用于處理和生成大數(shù)據(jù)集,其中的聯(lián)接操作可以分為兩個(gè)階段:Map階段和Reduce階段。Map階段:
- 每個(gè)節(jié)點(diǎn)讀取其數(shù)據(jù)分片,并對(duì)需要聯(lián)接的鍵值對(duì)應(yīng)用一個(gè)映射函數(shù),生成中間鍵值對(duì)。
Reduce階段:
- 中間鍵值對(duì)會(huì)根據(jù)鍵進(jìn)行排序(在某些實(shí)現(xiàn)中排序發(fā)生在Shuffle階段)和分組,然后發(fā)送到Reduce節(jié)點(diǎn)。
- 在Reduce節(jié)點(diǎn)上,具有相同鍵的所有值都會(huì)聚集在一起,這時(shí)就可以執(zhí)行聯(lián)接操作。
MapReduce Join不直接應(yīng)用于傳統(tǒng)數(shù)據(jù)庫邏輯,而是適用于Hadoop這樣的分布式處理系統(tǒng)中。但是為了方便理解,還是用SQL語言來分析,例如一條SQL:
SELECT orders.order_id, orders.date, customers.name
FROM orders
JOIN customers ON orders.customer_id = customers.customer_id;
SELECT customer_id, order_id, date FROM orders;
SELECT customer_id, name FROM customers;
customer_id,值是記錄的其余部分。下一個(gè)階段可有可無,即Shuffle階段。如果不在這里排序可能會(huì)在Map階段執(zhí)行SQL時(shí)候排序/分組或者在接下來的Reduce階段進(jìn)行額外排序/分組。在這個(gè)階段主要將收集到的數(shù)據(jù)按照customer_id排序分組,以確保相同的customer_id的數(shù)據(jù)達(dá)到Reduce階段。Reduce階段將每個(gè)對(duì)應(yīng)的customer_id進(jìn)行聯(lián)接操作,輸出并返回最后的結(jié)果。這種操作普遍應(yīng)用于兩個(gè)算法完全不相同的表單,也是一種標(biāo)準(zhǔn)的處理模型,在這個(gè)過程中,我們以一張邏輯表的維度進(jìn)行操作。這種算法可能會(huì)消耗大量內(nèi)存,甚至導(dǎo)致內(nèi)存溢出,并且在處理大數(shù)據(jù)量時(shí)會(huì)相當(dāng)耗時(shí),因此不適合需要低延遲的場景。
額外補(bǔ)充
內(nèi)存溢出場景普遍在如下場景:- 大鍵值對(duì)數(shù)量: 如果Map階段產(chǎn)生了大量的鍵值對(duì),這些數(shù)據(jù)需要在內(nèi)存中進(jìn)行緩存以進(jìn)行排序和傳輸,這可能會(huì)消耗大量內(nèi)存。
- 數(shù)據(jù)傾斜: 如果某個(gè)鍵非常常見,而其他鍵則不那么常見,那么處理這個(gè)鍵的Reducer可能會(huì)接收到大量的數(shù)據(jù),導(dǎo)致內(nèi)存不足。這種現(xiàn)象稱為數(shù)據(jù)傾斜。
- 大值列表: 在Reduce階段,如果某個(gè)鍵對(duì)應(yīng)的值列表非常長,處理這些值可能會(huì)需要很多內(nèi)存。
- 不合理的并行度: 如果Reduce任務(wù)的數(shù)量設(shè)置得不合適(太少或太多),可能會(huì)導(dǎo)致單個(gè)任務(wù)處理不均勻,從而導(dǎo)致內(nèi)存問題。
- 內(nèi)存到磁盤的溢寫:當(dāng)Map任務(wù)的輸出緩沖區(qū)滿了,它會(huì)將數(shù)據(jù)溢寫到磁盤。這有助于限制內(nèi)存使用,但會(huì)增加I/O開銷。
- 通過設(shè)置合適的Map和Reduce任務(wù)數(shù)量,可以更有效地分配資源,避免某些任務(wù)過載。具體操作可以將Map操作的分段比如1~100,100~200,Reduce階段開設(shè)較少的并發(fā)處理。
- 優(yōu)化數(shù)據(jù)分布,比如使用范圍分區(qū)(
range partitioning)或哈希分區(qū)(hash partitioning)來減少數(shù)據(jù)傾斜。
Sort-Merge Join(排序-合并聯(lián)接)
先上原理解釋:
“在分布式環(huán)境中,Sort-Merge Join首先在每個(gè)節(jié)點(diǎn)上對(duì)數(shù)據(jù)進(jìn)行局部排序,然后將排序后的數(shù)據(jù)合并起來,最后在合并的數(shù)據(jù)上執(zhí)行聯(lián)接操作。
“這通常涉及到多階段處理,包括局部排序、數(shù)據(jù)洗牌(重新分發(fā)),以及最終的排序和合并。舉個(gè)理解,還是上面的SQL。
SELECT orders.order_id, orders.date, customers.name
FROM orders
JOIN customers ON orders.customer_id = customers.customer_id;
- 對(duì)orders表按
customer_id進(jìn)行排序。 - 對(duì)customers表按
customer_id進(jìn)行排序。 - 同時(shí)遍歷兩個(gè)已排序的表,將具有相同
customer_id的行配對(duì)。
- 當(dāng)數(shù)據(jù)集太大而無法一次性加載到內(nèi)存中時(shí),可以使用外部排序算法。外部排序算法會(huì)將數(shù)據(jù)分割成多個(gè)批次,每個(gè)批次單獨(dú)排序,然后將排序后的批次合并。這種方法通常涉及到磁盤I/O操作,因此會(huì)比內(nèi)存中操作慢。
- 對(duì)于合并步驟,可以使用流式處理技術(shù),一次只處理數(shù)據(jù)的一小部分,并持續(xù)將結(jié)果輸出到下一個(gè)處理步驟或存儲(chǔ)系統(tǒng)。這樣可以避免一次性加載大量數(shù)據(jù)到內(nèi)存中。
- 當(dāng)內(nèi)存不足以處理數(shù)據(jù)時(shí),可以使用磁盤空間作為臨時(shí)存儲(chǔ)。數(shù)據(jù)庫管理系統(tǒng)通常有機(jī)制來處理內(nèi)存溢出,比如創(chuàng)建磁盤上的臨時(shí)文件來存儲(chǔ)過程中的數(shù)據(jù)。
- 在分布式系統(tǒng)中,可以將數(shù)據(jù)分散到多個(gè)節(jié)點(diǎn)上進(jìn)行處理,這樣每個(gè)節(jié)點(diǎn)只需要處理數(shù)據(jù)的一部分,從而減少單個(gè)節(jié)點(diǎn)上的內(nèi)存壓力。
推薦閱讀:
野心藏不住了!不滿CPU統(tǒng)治,英偉達(dá)決定徹底重寫軟件開發(fā)棧!黃仁勛:為什么還要用Python?命令行都不需要!GPU開發(fā)時(shí)代將至
互聯(lián)網(wǎng)初中高級(jí)大廠面試題(9個(gè)G)
內(nèi)容包含Java基礎(chǔ)、JavaWeb、MySQL性能優(yōu)化、JVM、鎖、百萬并發(fā)、消息隊(duì)列、高性能緩存、反射、Spring全家桶原理、微服務(wù)、Zookeeper......等技術(shù)棧!
?戳閱讀原文領(lǐng)?。?/span> 朕已閱 ![]()
評(píng)論
圖片
表情
