十年網(wǎng)站開(kāi)發(fā)經(jīng)驗(yàn) + 多家企業(yè)客戶 + 靠譜的建站團(tuán)隊(duì)
量身定制 + 運(yùn)營(yíng)維護(hù)+專(zhuān)業(yè)推廣+無(wú)憂售后,網(wǎng)站問(wèn)題一站解決
本篇文章為大家展示了如何進(jìn)行Spark Shuffle實(shí)現(xiàn),內(nèi)容簡(jiǎn)明扼要并且容易理解,絕對(duì)能使你眼前一亮,通過(guò)這篇文章的詳細(xì)介紹希望你能有所收獲。
創(chuàng)新互聯(lián)建站專(zhuān)注為客戶提供全方位的互聯(lián)網(wǎng)綜合服務(wù),包含不限于做網(wǎng)站、成都做網(wǎng)站、梅縣網(wǎng)絡(luò)推廣、微信小程序開(kāi)發(fā)、梅縣網(wǎng)絡(luò)營(yíng)銷(xiāo)、梅縣企業(yè)策劃、梅縣品牌公關(guān)、搜索引擎seo、人物專(zhuān)訪、企業(yè)宣傳片、企業(yè)代運(yùn)營(yíng)等,從售前售中售后,我們都將竭誠(chéng)為您服務(wù),您的肯定,是我們最大的嘉獎(jiǎng);創(chuàng)新互聯(lián)建站為所有大學(xué)生創(chuàng)業(yè)者提供梅縣建站搭建服務(wù),24小時(shí)服務(wù)熱線:18980820575,官方網(wǎng)址:www.cdcxhl.com
對(duì)于大數(shù)據(jù)計(jì)算框架而言,Shuffle階段的設(shè)計(jì)優(yōu)劣是決定性能好壞的關(guān)鍵因素之一。小編將介紹目前Spark的shuffle實(shí)現(xiàn),并將之與MapReduce進(jìn)行簡(jiǎn)單對(duì)比。
(1) shuffle基本概念與常見(jiàn)實(shí)現(xiàn)方式
shuffle,是一個(gè)算子,表達(dá)的是多對(duì)多的依賴關(guān)系,在類(lèi)MapReduce計(jì)算框架中,是連接Map階段和Reduce階段的紐帶,即每個(gè)Reduce Task從每個(gè)Map Task產(chǎn)生數(shù)的據(jù)中讀取一片數(shù)據(jù),極限情況下可能觸發(fā)M*R個(gè)數(shù)據(jù)拷貝通道(M是Map Task數(shù)目,R是Reduce Task數(shù)目)。通常shuffle分為兩部分:Map階段的數(shù)據(jù)準(zhǔn)備和Reduce階段的數(shù)據(jù)拷貝。首先,Map階段需根據(jù)Reduce階段的Task數(shù)量決定每個(gè)Map Task輸出的數(shù)據(jù)分片數(shù)目,有多種方式存放這些數(shù)據(jù)分片:
1) 保存在內(nèi)存中或者磁盤(pán)上(Spark和MapReduce都存放在磁盤(pán)上);
2) 每個(gè)分片一個(gè)文件(現(xiàn)在Spark采用的方式,若干年前MapReduce采用的方式),或者所有分片放到一個(gè)數(shù)據(jù)文件中,外加一個(gè)索引文件記錄每個(gè)分片在數(shù)據(jù)文件中的偏移量(現(xiàn)在MapReduce采用的方式)。
在Map端,不同的數(shù)據(jù)存放方式各有優(yōu)缺點(diǎn)和適用場(chǎng)景。一般而言,shuffle在Map端的數(shù)據(jù)要存儲(chǔ)到磁盤(pán)上,以防止容錯(cuò)觸發(fā)重算帶來(lái)的龐大開(kāi)銷(xiāo)(如果保存到Reduce端內(nèi)存中,一旦Reduce Task掛掉了,所有Map Task需要重算)。但數(shù)據(jù)在磁盤(pán)上存放方式有多種可選方案,在MapReduce前期設(shè)計(jì)中,采用了現(xiàn)在Spark的方案(目前一直在改進(jìn)),每個(gè)Map Task為每個(gè)Reduce Task產(chǎn)生一個(gè)文件,該文件只保存特定Reduce Task需處理的數(shù)據(jù),這樣會(huì)產(chǎn)生M*R個(gè)文件,如果M和R非常龐大,比如均為1000,則會(huì)產(chǎn)生100w個(gè)文件,產(chǎn)生和讀取這些文件會(huì)產(chǎn)生大量的隨機(jī)IO,效率非常低下。解決這個(gè)問(wèn)題的一種直觀方法是減少文件數(shù)目,常用的方法有:
1) 將一個(gè)節(jié)點(diǎn)上所有Map產(chǎn)生的文件合并成一個(gè)大文件(MapReduce現(xiàn)在采用的方案),
2) 每個(gè)節(jié)點(diǎn)產(chǎn)生{(slot數(shù)目)*R}個(gè)文件(Spark優(yōu)化后的方案)。對(duì)后面這種方案簡(jiǎn)單解釋一下:不管是MapReduce 1.0還是Spark,每個(gè)節(jié)點(diǎn)的資源會(huì)被抽象成若干個(gè)slot,由于一個(gè)Task占用一個(gè)slot,因此slot數(shù)目可看成是最多同時(shí)運(yùn)行的Task數(shù)目。如果一個(gè)Job的Task數(shù)目非常多,限于slot數(shù)目有限,可能需要運(yùn)行若干輪。這樣,只需要由第一輪產(chǎn)生{(slot數(shù)目)*R}個(gè)文件,后續(xù)幾輪產(chǎn)生的數(shù)據(jù)追加到這些文件末尾即可。
因此,后一種方案可減少大作業(yè)產(chǎn)生的文件數(shù)目。
在Reduce端,各個(gè)Task會(huì)并發(fā)啟動(dòng)多個(gè)線程同時(shí)從多個(gè)Map Task端拉取數(shù)據(jù)。由于Reduce階段的主要任務(wù)是對(duì)數(shù)據(jù)進(jìn)行按組規(guī)約。
也就是說(shuō),需要將數(shù)據(jù)分成若干組,以便以組為單位進(jìn)行處理。大家知道,分組的方式非常多,常見(jiàn)的有:Map/HashTable(key相同的,放到同一個(gè)value list中)和Sort(按key進(jìn)行排序,key相同的一組,經(jīng)排序后會(huì)挨在一起),這兩種方式各有優(yōu)缺點(diǎn),第一種復(fù)雜度低,效率高,但是需要將數(shù)據(jù)全部放到內(nèi)存中,第二種方案復(fù)雜度高,但能夠借助磁盤(pán)(外部排序)處理龐大的數(shù)據(jù)集。Spark前期采用了第一種方案,而在最新的版本中加入了第二種方案, MapReduce則從一開(kāi)始就選用了基于sort的方案。
(2) MapReduce Shuffle發(fā)展史
【階段1】:MapReduce Shuffle的發(fā)展也并不是一馬平川的,剛開(kāi)始(0.10.0版本之前)采用了“每個(gè)Map Task產(chǎn)生R個(gè)文件”的方案,前面提到,該方案會(huì)產(chǎn)生大量的隨機(jī)讀寫(xiě)IO,對(duì)于大數(shù)據(jù)處理而言,非常不利。
【階段2】:為了避免Map Task產(chǎn)生大量文件,HADOOP-331嘗試對(duì)該方案進(jìn)行優(yōu)化,優(yōu)化方法:為每個(gè)Map Task提供一個(gè)環(huán)形buffer,一旦buffer滿了后,則將內(nèi)存數(shù)據(jù)spill到磁盤(pán)上(外加一個(gè)索引文件,保存每個(gè)partition的偏移量),最終合并產(chǎn)生的這些spill文件,同時(shí)創(chuàng)建一個(gè)索引文件,保存每個(gè)partition的偏移量。
(階段2):這個(gè)階段并沒(méi)有對(duì)shuffle架構(gòu)做調(diào)成,只是對(duì)shuffle的環(huán)形buffer進(jìn)行了優(yōu)化。在Hadoop 2.0版本之前,對(duì)MapReduce作業(yè)進(jìn)行參數(shù)調(diào)優(yōu)時(shí),Map階段的buffer調(diào)優(yōu)非常復(fù)雜的,涉及到多個(gè)參數(shù),這是由于buffer被切分成兩部分使用:一部分保存索引(比如parition、key和value偏移量和長(zhǎng)度),一部分保存實(shí)際的數(shù)據(jù),這兩段buffer均會(huì)影響spill文件數(shù)目,因此,需要根據(jù)數(shù)據(jù)特點(diǎn)對(duì)多個(gè)參數(shù)進(jìn)行調(diào)優(yōu),非常繁瑣。而MAPREDUCE-64則解決了該問(wèn)題,該方案讓索引和數(shù)據(jù)共享一個(gè)環(huán)形緩沖區(qū),不再將其分成兩部分獨(dú)立使用,這樣只需設(shè)置一個(gè)參數(shù)控制spill頻率。
【階段3(進(jìn)行中)】:目前shuffle被當(dāng)做一個(gè)子階段被嵌到Reduce階段中的。由于MapReduce模型中,Map Task和Reduce Task可以同時(shí)運(yùn)行,因此一個(gè)作業(yè)前期啟動(dòng)的Reduce Task將一直處于shuffle階段,直到所有Map Task運(yùn)行完成,而在這個(gè)過(guò)程中,Reduce Task占用著資源,但這部分資源利用率非常低,基本上只使用了IO資源。為了提高資源利用率,一種非常好的方法是將shuffle從Reduce階段中獨(dú)立處理,變成一個(gè)獨(dú)立的階段/服務(wù),由專(zhuān)門(mén)的shuffler service負(fù)責(zé)數(shù)據(jù)拷貝,目前百度已經(jīng)實(shí)現(xiàn)了該功能(準(zhǔn)備開(kāi)源?),且收益明顯,具體參考:MAPREDUCE-2354。
(3) Spark Shuffle發(fā)展史
目前看來(lái),Spark Shuffle的發(fā)展史與MapReduce發(fā)展史非常類(lèi)似。初期Spark在Map階段采用了“每個(gè)Map Task產(chǎn)生R個(gè)文件”的方法,在Reduce階段采用了map分組方法,但隨Spark變得流行,用戶逐漸發(fā)現(xiàn)這種方案在處理大數(shù)據(jù)時(shí)存在嚴(yán)重瓶頸問(wèn)題,因此嘗試對(duì)Spark進(jìn)行優(yōu)化和改進(jìn),相關(guān)鏈接有:External Sorting for Aggregator and CoGroupedRDDs,“Optimizing Shuffle Performance in Spark”,“Consolidating Shuffle Files in Spark”,優(yōu)化動(dòng)機(jī)和思路與MapReduce非常類(lèi)似。
Spark在前期設(shè)計(jì)中過(guò)多依賴于內(nèi)存,使得一些運(yùn)行在MapReduce之上的大作業(yè)難以直接運(yùn)行在Spark之上(可能遇到OOM問(wèn)題)。目前Spark在處理大數(shù)據(jù)集方面尚不完善,用戶需根據(jù)作業(yè)特點(diǎn)選擇性的將一部分作業(yè)遷移到Spark上,而不是整體遷移。隨著Spark的完善,很多內(nèi)部關(guān)鍵模塊的設(shè)計(jì)思路將變得與MapReduce升級(jí)版Tez非常類(lèi)似。
上述內(nèi)容就是如何進(jìn)行Spark Shuffle實(shí)現(xiàn),你們學(xué)到知識(shí)或技能了嗎?如果還想學(xué)到更多技能或者豐富自己的知識(shí)儲(chǔ)備,歡迎關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道。