十年網(wǎng)站開(kāi)發(fā)經(jīng)驗(yàn) + 多家企業(yè)客戶(hù) + 靠譜的建站團(tuán)隊(duì)
量身定制 + 運(yùn)營(yíng)維護(hù)+專(zhuān)業(yè)推廣+無(wú)憂售后,網(wǎng)站問(wèn)題一站解決
這期內(nèi)容當(dāng)中小編將會(huì)給大家?guī)?lái)有關(guān)如何進(jìn)行Spark Shuffle 原理分析,文章內(nèi)容豐富且以專(zhuān)業(yè)的角度為大家分析和敘述,閱讀完這篇文章希望大家可以有所收獲。
10年積累的網(wǎng)站制作、成都網(wǎng)站制作經(jīng)驗(yàn),可以快速應(yīng)對(duì)客戶(hù)對(duì)網(wǎng)站的新想法和需求。提供各種問(wèn)題對(duì)應(yīng)的解決方案。讓選擇我們的客戶(hù)得到更好、更有力的網(wǎng)絡(luò)服務(wù)。我雖然不認(rèn)識(shí)你,你也不認(rèn)識(shí)我。但先網(wǎng)站設(shè)計(jì)后付款的網(wǎng)站建設(shè)流程,更有江北免費(fèi)網(wǎng)站建設(shè)讓你可以放心的選擇與我們合作。
Shuffle就是對(duì)數(shù)據(jù)進(jìn)行重組,由于分布式計(jì)算的特性和要求,在實(shí)現(xiàn)細(xì)節(jié)上更加繁瑣和復(fù)雜。 在MapReduce框架,Shuffle是連接Map和Reduce之間的橋梁,Map階段通過(guò)shuffle讀取數(shù)據(jù)并輸出到對(duì)應(yīng)的Reduce;而Reduce階段負(fù)責(zé)從Map端拉取數(shù)據(jù)并進(jìn)行計(jì)算。在整個(gè)shuffle過(guò)程中,往往伴隨著大量的磁盤(pán)和網(wǎng)絡(luò)I/O。所以shuffle性能的高低也直接決定了整個(gè)程序的性能高低。Spark也會(huì)有自己的shuffle實(shí)現(xiàn)過(guò)程。
在DAG調(diào)度的過(guò)程中,Stage階段的劃分是根據(jù)是否有shuffle過(guò)程,也就是存在wide Dependency寬依賴(lài)的時(shí)候,需要進(jìn)行shuffle,這時(shí)候會(huì)將作業(yè)job劃分成多個(gè)Stage,每一個(gè)stage內(nèi)部有很多可以并行運(yùn)行的task。 stage與stage之間的過(guò)程就是shuffle階段,在Spark的中,負(fù)責(zé)shuffle過(guò)程的執(zhí)行、計(jì)算和處理的組件主要就是ShuffleManager,也即shuffle管理器。ShuffleManager隨著Spark的發(fā)展有兩種實(shí)現(xiàn)的方式,分別為HashShuffleManager和SortShuffleManager,因此spark的Shuffle有Hash Shuffle和Sort Shuffle兩種。
在Spark 1.2以前,默認(rèn)的shuffle計(jì)算引擎是HashShuffleManager。 該ShuffleManager-HashShuffleManager有著一個(gè)非常嚴(yán)重的弊端,就是會(huì)產(chǎn)生大量的中間磁盤(pán)文件,進(jìn)而由大量的磁盤(pán)IO操作影響了性能。因此在Spark 1.2以后的版本中,默認(rèn)的ShuffleManager改成了SortShuffleManager。 SortShuffleManager相較于HashShuffleManager來(lái)說(shuō),有了一定的改進(jìn)。主要就在于每個(gè)Task在進(jìn)行shuffle操作時(shí),雖然也會(huì)產(chǎn)生較多的臨時(shí)磁盤(pán)文件,但是最后會(huì)將所有的臨時(shí)文件合并(merge)成一個(gè)磁盤(pán)文件,因此每個(gè)Task就只有一個(gè)磁盤(pán)文件。在下一個(gè)stage的shuffle read task拉取自己的數(shù)據(jù)時(shí),只要根據(jù)索引讀取每個(gè)磁盤(pán)文件中的部分?jǐn)?shù)據(jù)即可。
Hash shuffle
一種是普通運(yùn)行機(jī)制
另一種是合并的運(yùn)行機(jī)制。
HashShuffleManager的運(yùn)行機(jī)制主要分成兩種
合并機(jī)制主要是通過(guò)復(fù)用buffer來(lái)優(yōu)化Shuffle過(guò)程中產(chǎn)生的小文件的數(shù)量。
Hash shuffle是不具有排序的Shuffle。
這里我們先明確一個(gè)假設(shè)前提:每個(gè)Executor只有1個(gè)CPU core,也就是說(shuō),無(wú)論這個(gè)Executor上分配多少個(gè)task線程,同一時(shí)間都只能執(zhí)行一個(gè)task線程。 圖中有3個(gè)ReduceTask,從ShuffleMapTask 開(kāi)始那邊各自把自己進(jìn)行 Hash 計(jì)算(分區(qū)器:hash/numreduce取模),分類(lèi)出3個(gè)不同的類(lèi)別,每個(gè) ShuffleMapTask 都分成3種類(lèi)別的數(shù)據(jù),想把不同的數(shù)據(jù)匯聚然后計(jì)算出最終的結(jié)果,所以ReduceTask 會(huì)在屬于自己類(lèi)別的數(shù)據(jù)收集過(guò)來(lái),匯聚成一個(gè)同類(lèi)別的大集合,每1個(gè) ShuffleMapTask 輸出3份本地文件,這里有4個(gè) ShuffleMapTask,所以總共輸出了4 x 3個(gè)分類(lèi)文件 = 12個(gè)本地小文件。
主要就是在一個(gè)stage結(jié)束計(jì)算之后,為了下一個(gè)stage可以執(zhí)行shuffle類(lèi)的算子(比如reduceByKey,groupByKey),而將每個(gè)task處理的數(shù)據(jù)按key進(jìn)行“分區(qū)”。所謂“分區(qū)”,就是對(duì)相同的key執(zhí)行hash算法,從而將相同key都寫(xiě)入同一個(gè)磁盤(pán)文件中,而每一個(gè)磁盤(pán)文件都只屬于reduce端的stage的一個(gè)task。在將數(shù)據(jù)寫(xiě)入磁盤(pán)之前,會(huì)先將數(shù)據(jù)寫(xiě)入內(nèi)存緩沖中,當(dāng)內(nèi)存緩沖填滿(mǎn)之后,才會(huì)溢寫(xiě)到磁盤(pán)文件中去。 那么每個(gè)執(zhí)行shuffle write的task,要為下一個(gè)stage創(chuàng)建多少個(gè)磁盤(pán)文件呢? 很簡(jiǎn)單,下一個(gè)stage的task有多少個(gè),當(dāng)前stage的每個(gè)task就要?jiǎng)?chuàng)建多少份磁盤(pán)文件。比如下一個(gè)stage總共有100個(gè)task,那么當(dāng)前stage的每個(gè)task都要?jiǎng)?chuàng)建100份磁盤(pán)文件。如果當(dāng)前stage有50個(gè)task,總共有10個(gè)Executor,每個(gè)Executor執(zhí)行5個(gè)Task,那么每個(gè)Executor上總共就要?jiǎng)?chuàng)建500個(gè)磁盤(pán)文件,所有Executor上會(huì)創(chuàng)建5000個(gè)磁盤(pán)文件。由此可見(jiàn),未經(jīng)優(yōu)化的shuffle write操作所產(chǎn)生的磁盤(pán)文件的數(shù)量是極其驚人的。
shuffle read,通常就是一個(gè)stage剛開(kāi)始時(shí)要做的事情。此時(shí)該stage的每一個(gè)task就需要將上一個(gè)stage的計(jì)算結(jié)果中的所有相同key,從各個(gè)節(jié)點(diǎn)上通過(guò)網(wǎng)絡(luò)都拉取到自己所在的節(jié)點(diǎn)上,然后進(jìn)行key的聚合或連接等操作。由于shuffle write的過(guò)程中,task給Reduce端的stage的每個(gè)task都創(chuàng)建了一個(gè)磁盤(pán)文件,因此shuffle read的過(guò)程中,每個(gè)task只要從上游stage的所有task所在節(jié)點(diǎn)上,拉取屬于自己的那一個(gè)磁盤(pán)文件即可。 shuffle read的拉取過(guò)程是一邊拉取一邊進(jìn)行聚合的。每個(gè)shuffle read task都會(huì)有一個(gè)自己的buffer緩沖,每次都只能拉取與buffer緩沖相同大小的數(shù)據(jù),然后通過(guò)內(nèi)存中的一個(gè)Map進(jìn)行聚合等操作。聚合完一批數(shù)據(jù)后,再拉取下一批數(shù)據(jù),并放到buffer緩沖中進(jìn)行聚合操作。以此類(lèi)推,直到最后將所有數(shù)據(jù)到拉取完,并得到最終的結(jié)果。
(1)buffer起到的是緩存作用,緩存能夠加速寫(xiě)磁盤(pán),提高計(jì)算的效率,buffer的默認(rèn)大小32k。 (2)分區(qū)器:根據(jù)hash/numRedcue取模決定數(shù)據(jù)由幾個(gè)Reduce處理,也決定了寫(xiě)入幾個(gè)buffer中 (3)block file:磁盤(pán)小文件,從圖中我們可以知道磁盤(pán)小文件的個(gè)數(shù)計(jì)算公式: block file=M*R (4) M為map task的數(shù)量,R為Reduce的數(shù)量,一般Reduce的數(shù)量等于buffer的數(shù)量,都是由分區(qū)器決定的
(1).Shuffle階段在磁盤(pán)上會(huì)產(chǎn)生海量的小文件,建立通信和拉取數(shù)據(jù)的次數(shù)變多,此時(shí)會(huì)產(chǎn)生大量耗時(shí)低效的 IO 操作 (因?yàn)楫a(chǎn)生過(guò)多的小文件) (2).可能導(dǎo)致OOM,大量耗時(shí)低效的 IO 操作 ,導(dǎo)致寫(xiě)磁盤(pán)時(shí)的對(duì)象過(guò)多,讀磁盤(pán)時(shí)候的對(duì)象也過(guò)多,這些對(duì)象存儲(chǔ)在堆內(nèi)存中,會(huì)導(dǎo)致堆內(nèi)存不足,相應(yīng)會(huì)導(dǎo)致頻繁的GC,GC會(huì)導(dǎo)致OOM。由于內(nèi)存中需要保存海量文件操作句柄和臨時(shí)信息,如果數(shù)據(jù)處理的規(guī)模比較龐大的話,內(nèi)存不可承受,會(huì)出現(xiàn) OOM 等問(wèn)題
合并機(jī)制就是復(fù)用buffer緩沖區(qū),開(kāi)啟合并機(jī)制的配置是spark.shuffle.consolidateFiles。該參數(shù)默認(rèn)值為false,將其設(shè)置為true即可開(kāi)啟優(yōu)化機(jī)制。通常來(lái)說(shuō),如果我們使用HashShuffleManager,那么都建議開(kāi)啟這個(gè)選項(xiàng)。
這里有6個(gè)這里有6個(gè)shuffleMapTask,數(shù)據(jù)類(lèi)別還是分成3種類(lèi)型,因?yàn)镠ash算法會(huì)根據(jù)你的 Key 進(jìn)行分類(lèi),在同一個(gè)進(jìn)程中,無(wú)論是有多少過(guò)Task,都會(huì)把同樣的Key放在同一個(gè)Buffer里,然后把Buffer中的數(shù)據(jù)寫(xiě)入以Core數(shù)量為單位的本地文件中,(一個(gè)Core只有一種類(lèi)型的Key的數(shù)據(jù)),每1個(gè)Task所在的進(jìn)程中,分別寫(xiě)入共同進(jìn)程中的3份本地文件,這里有6個(gè)shuffleMapTasks,所以總共輸出是 2個(gè)Cores x 3個(gè)分類(lèi)文件 = 6個(gè)本地小文件。
(1).啟動(dòng)HashShuffle的合并機(jī)制ConsolidatedShuffle的配置 spark.shuffle.consolidateFiles=true(2).block file=Core*R Core為CPU的核數(shù),R為Reduce的數(shù)量
如果 Reducer 端的并行任務(wù)或者是數(shù)據(jù)分片過(guò)多的話則 Core * Reducer Task 依舊過(guò)大,也會(huì)產(chǎn)生很多小文件。
SortShuffleManager的運(yùn)行機(jī)制主要分成兩種,
一種是普通運(yùn)行機(jī)制
另一種是bypass運(yùn)行機(jī)制
在該模式下,數(shù)據(jù)會(huì)先寫(xiě)入一個(gè)數(shù)據(jù)結(jié)構(gòu),聚合算子寫(xiě)入Map,一邊通過(guò)Map局部聚合,一邊寫(xiě)入內(nèi)存。Join算子寫(xiě)入ArrayList直接寫(xiě)入內(nèi)存中。然后需要判斷是否達(dá)到閾值(5M),如果達(dá)到就會(huì)將內(nèi)存數(shù)據(jù)結(jié)構(gòu)的數(shù)據(jù)寫(xiě)入到磁盤(pán),清空內(nèi)存數(shù)據(jù)結(jié)構(gòu)。 在溢寫(xiě)磁盤(pán)前,先根據(jù)key進(jìn)行排序,排序過(guò)后的數(shù)據(jù),會(huì)分批寫(xiě)入到磁盤(pán)文件中。默認(rèn)批次為10000條,數(shù)據(jù)會(huì)以每批一萬(wàn)條寫(xiě)入到磁盤(pán)文件。寫(xiě)入磁盤(pán)文件通過(guò)緩沖區(qū)溢寫(xiě)的方式,每次溢寫(xiě)都會(huì)產(chǎn)生一個(gè)磁盤(pán)文件,也就是說(shuō)一個(gè)task過(guò)程會(huì)產(chǎn)生多個(gè)臨時(shí)文件 。 最后在每個(gè)task中,將所有的臨時(shí)文件合并,這就是merge過(guò)程,此過(guò)程將所有臨時(shí)文件讀取出來(lái),一次寫(xiě)入到最終文件。意味著一個(gè)task的所有數(shù)據(jù)都在這一個(gè)文件中。同時(shí)單獨(dú)寫(xiě)一份索引文件,標(biāo)識(shí)下游各個(gè)task的數(shù)據(jù)在文件中的索引start offset和end offset。 這樣算來(lái)如果第一個(gè)stage 50個(gè)task,每個(gè)Executor執(zhí)行一個(gè)task,那么無(wú)論下游有幾個(gè)task,就需要50*2=100個(gè)磁盤(pán)文件。
1. 小文件明顯變少了,一個(gè)task只生成一個(gè)file文件 2. file文件整體有序,加上索引文件的輔助,查找變快,雖然排序浪費(fèi)一些性能,但是查找變快很多
shuffle map task數(shù)量小于spark.shuffle.sort.bypassMergeThreshold參數(shù)的值
不是聚合類(lèi)的shuffle算子(比如reduceByKey)
該機(jī)制與sortshuffle的普通機(jī)制相比,在shuffleMapTask不多的情況下,首先寫(xiě)的機(jī)制是不同,其次不會(huì)進(jìn)行排序。這樣就可以節(jié)約一部分性能開(kāi)銷(xiāo)。
在shuffleMapTask數(shù)量小于默認(rèn)值200時(shí),啟用bypass模式的sortShuffle(原因是數(shù)據(jù)量本身比較少,沒(méi)必要進(jìn)行sort全排序,因?yàn)閿?shù)據(jù)量少本身查詢(xún)速度就快,正好省了sort的那部分性能開(kāi)銷(xiāo)。) 該機(jī)制與普通SortShuffleManager運(yùn)行機(jī)制的不同在于: 第一: 磁盤(pán)寫(xiě)機(jī)制不同; 第二: 不會(huì)進(jìn)行sort排序;
碰到ShuffleDenpendency就進(jìn)行stage的劃分,ShuffleMapStage: 為shuffle提供數(shù)據(jù)的中間stage,ResultStage: 為一個(gè)action操作計(jì)算結(jié)果的stage。
解決的一個(gè)問(wèn)題是resut task如何知道從哪個(gè)Executor去拉取Shuffle data
ShuffleWriter
(1)HashShuffleWriter
特點(diǎn):根據(jù)Hash分區(qū),分區(qū)數(shù)是m * n 個(gè)。
val counts: RDD[(String, Int)] = wordCount.reduceByKey(new HashPartitioner(2), (x, y) => x + y)
(2)SortShuffleWriter
特點(diǎn):
a、文件數(shù)量為m
b、如果需要排序或者需要combine,那么每一個(gè)partition數(shù)據(jù)排序要自己實(shí)現(xiàn)。(SortShuffleWriter里的sort指的是對(duì)partition的分區(qū)號(hào)進(jìn)行排序)
c、數(shù)據(jù)先放在內(nèi)存,內(nèi)存不夠則寫(xiě)到磁盤(pán)中,最后再全部寫(xiě)到磁盤(pán)。
(3)BypassMergeSortShuffleWriter
這種模式同時(shí)具有HashShuffleWriter和SortShuffleter的特點(diǎn)。因?yàn)槠鋵?shí)HashShufflerWriter的性能不錯(cuò),但是如果task數(shù)太多的話,性能話下降,所以Spark在task數(shù)較少的時(shí)候自動(dòng)使用了這種模式,一開(kāi)始還是像HashShufflerWriter那種生成多個(gè)文件,但是最后會(huì)把多個(gè)文件合并成一個(gè)文件。然后下游來(lái)讀取文件。默認(rèn)map的分區(qū)需要小于spark.shuffle.sort.bypassMergeThreshold(默認(rèn)是200),因?yàn)槿绾畏謪^(qū)數(shù)太多,產(chǎn)生的小文件就會(huì)很多性能就會(huì)下降。
默認(rèn)值:32k
參數(shù)說(shuō)明:該參數(shù)用于設(shè)置shuffle write task的BufferedOutputStream的buffer緩沖大小。將數(shù)據(jù)寫(xiě)到磁盤(pán)文件之前,會(huì)先寫(xiě)入buffer緩沖中,待緩沖寫(xiě)滿(mǎn)之后,才會(huì)溢寫(xiě)到磁盤(pán)。
調(diào)優(yōu)建議:如果作業(yè)可用的內(nèi)存資源較為充足的話,可以適當(dāng)增加這個(gè)參數(shù)的大小(比如64k),從而減少shuffle write過(guò)程中溢寫(xiě)磁盤(pán)文件的次數(shù),也就可以減少磁盤(pán)IO次數(shù),進(jìn)而提升性能。在實(shí)踐中發(fā)現(xiàn),合理調(diào)節(jié)該參數(shù),性能會(huì)有1%~5%的提升。
默認(rèn)值:48m
參數(shù)說(shuō)明:該參數(shù)用于設(shè)置shuffle read task的buffer緩沖大小,而這個(gè)buffer緩沖決定了每次能夠拉取多少數(shù)據(jù)。
調(diào)優(yōu)建議:如果作業(yè)可用的內(nèi)存資源較為充足的話,可以適當(dāng)增加這個(gè)參數(shù)的大小(比如96m),從而減少拉取數(shù)據(jù)的次數(shù),也就可以減少網(wǎng)絡(luò)傳輸?shù)拇螖?shù),進(jìn)而提升性能。在實(shí)踐中發(fā)現(xiàn),合理調(diào)節(jié)該參數(shù),性能會(huì)有1%~5%的提升。
默認(rèn)值:3
參數(shù)說(shuō)明:shuffle read task從shuffle write task所在節(jié)點(diǎn)拉取屬于自己的數(shù)據(jù)時(shí),如果因?yàn)榫W(wǎng)絡(luò)異常導(dǎo)致拉取失敗,是會(huì)自動(dòng)進(jìn)行重試的。該參數(shù)就代表了可以重試的最大次數(shù)。如果在指定次數(shù)之內(nèi)拉取還是沒(méi)有成功,就可能會(huì)導(dǎo)致作業(yè)執(zhí)行失敗。
調(diào)優(yōu)建議:對(duì)于那些包含了特別耗時(shí)的shuffle操作的作業(yè),建議增加重試最大次數(shù)(比如60次),以避免由于JVM的full gc或者網(wǎng)絡(luò)不穩(wěn)定等因素導(dǎo)致的數(shù)據(jù)拉取失敗。在實(shí)踐中發(fā)現(xiàn),對(duì)于針對(duì)超大數(shù)據(jù)量(數(shù)十億~上百億)的shuffle過(guò)程,調(diào)節(jié)該參數(shù)可以大幅度提升穩(wěn)定性。
默認(rèn)值:5s
參數(shù)說(shuō)明:具體解釋同上,該參數(shù)代表了每次重試?yán)?shù)據(jù)的等待間隔,默認(rèn)是5s。
調(diào)優(yōu)建議:建議加大間隔時(shí)長(zhǎng)(比如60s),以增加shuffle操作的穩(wěn)定性。
(Spark1.6是這個(gè)參數(shù),1.6以后參數(shù)變了,具體參考上一講Spark內(nèi)存模型知識(shí))
默認(rèn)值:0.2
參數(shù)說(shuō)明:該參數(shù)代表了Executor內(nèi)存中,分配給shuffle read task進(jìn)行聚合操作的內(nèi)存比例,默認(rèn)是20%。
調(diào)優(yōu)建議:在資源參數(shù)調(diào)優(yōu)中講解過(guò)這個(gè)參數(shù)。如果內(nèi)存充足,而且很少使用持久化操作,建議調(diào)高這個(gè)比例,給shuffle read的聚合操作更多內(nèi)存,以避免由于內(nèi)存不足導(dǎo)致聚合過(guò)程中頻繁讀寫(xiě)磁盤(pán)。在實(shí)踐中發(fā)現(xiàn),合理調(diào)節(jié)該參數(shù)可以將性能提升10%左右。
默認(rèn)值:sort
參數(shù)說(shuō)明:該參數(shù)用于設(shè)置ShuffleManager的類(lèi)型。Spark 1.5以后,有三個(gè)可選項(xiàng):hash、sort和tungsten-sort。HashShuffleManager是Spark 1.2以前的默認(rèn)選項(xiàng),但是Spark 1.2以及之后的版本默認(rèn)都是SortShuffleManager了。Spark1.6以后把hash方式給移除了,tungsten-sort與sort類(lèi)似,但是使用了tungsten計(jì)劃中的堆外內(nèi)存管理機(jī)制,內(nèi)存使用效率更高。
調(diào)優(yōu)建議:由于SortShuffleManager默認(rèn)會(huì)對(duì)數(shù)據(jù)進(jìn)行排序,因此如果你的業(yè)務(wù)邏輯中需要該排序機(jī)制的話,則使用默認(rèn)的SortShuffleManager就可以;而如果你的業(yè)務(wù)邏輯不需要對(duì)數(shù)據(jù)進(jìn)行排序,那么建議參考后面的幾個(gè)參數(shù)調(diào)優(yōu),通過(guò)bypass機(jī)制或優(yōu)化的HashShuffleManager來(lái)避免排序操作,同時(shí)提供較好的磁盤(pán)讀寫(xiě)性能。這里要注意的是,tungsten-sort要慎用,因?yàn)橹鞍l(fā)現(xiàn)了一些相應(yīng)的bug。
默認(rèn)值:200
參數(shù)說(shuō)明:當(dāng)ShuffleManager為SortShuffleManager時(shí),如果shuffle read task的數(shù)量小于這個(gè)閾值(默認(rèn)是200),則shuffle write過(guò)程中不會(huì)進(jìn)行排序操作,而是直接按照未經(jīng)優(yōu)化的HashShuffleManager的方式去寫(xiě)數(shù)據(jù),但是最后會(huì)將每個(gè)task產(chǎn)生的所有臨時(shí)磁盤(pán)文件都合并成一個(gè)文件,并會(huì)創(chuàng)建單獨(dú)的索引文件。
調(diào)優(yōu)建議:當(dāng)你使用SortShuffleManager時(shí),如果的確不需要排序操作,那么建議將這個(gè)參數(shù)調(diào)大一些,大于shuffle read task的數(shù)量。那么此時(shí)就會(huì)自動(dòng)啟用bypass機(jī)制,map-side就不會(huì)進(jìn)行排序了,減少了排序的性能開(kāi)銷(xiāo)。但是這種方式下,依然會(huì)產(chǎn)生大量的磁盤(pán)文件,因此shuffle write性能有待提高。
上述就是小編為大家分享的如何進(jìn)行Spark Shuffle 原理分析了,如果剛好有類(lèi)似的疑惑,不妨參照上述分析進(jìn)行理解。如果想知道更多相關(guān)知識(shí),歡迎關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道。