十年網(wǎng)站開(kāi)發(fā)經(jīng)驗(yàn) + 多家企業(yè)客戶 + 靠譜的建站團(tuán)隊(duì)
量身定制 + 運(yùn)營(yíng)維護(hù)+專(zhuān)業(yè)推廣+無(wú)憂售后,網(wǎng)站問(wèn)題一站解決
說(shuō)明Giraph如何借助ZooKeeper來(lái)實(shí)現(xiàn)Master與Workers間的同步(不太確定)。
創(chuàng)新互聯(lián)專(zhuān)注于企業(yè)全網(wǎng)營(yíng)銷(xiāo)推廣、網(wǎng)站重做改版、交口網(wǎng)站定制設(shè)計(jì)、自適應(yīng)品牌網(wǎng)站建設(shè)、H5頁(yè)面制作、成都做商城網(wǎng)站、集團(tuán)公司官網(wǎng)建設(shè)、成都外貿(mào)網(wǎng)站建設(shè)、高端網(wǎng)站制作、響應(yīng)式網(wǎng)頁(yè)設(shè)計(jì)等建站業(yè)務(wù),價(jià)格優(yōu)惠性價(jià)比高,為交口等各大城市提供網(wǎng)站開(kāi)發(fā)制作服務(wù)。
在單機(jī)上(機(jī)器名:giraphx)啟動(dòng)了2個(gè)workers。
Giraph遵從單Master多Workers結(jié)構(gòu),BSPServiceMaster使用MasterThread線程來(lái)進(jìn)行全局的同步。每個(gè)Worker啟動(dòng)成功后,會(huì)向Master匯報(bào)自身的健康狀況,那么Master是如何檢測(cè)Workers是否都成功啟動(dòng)了?
Master在ZooKeeper上創(chuàng)兩個(gè)目錄,_workerHealthyDir和 _workerUnhealthyDir,分別用來(lái)記錄Healthy Workers和UnHealthy Workers。
主要在BspServiceMaster類(lèi)中的getAllWorkerInfos()方法來(lái)完成,其調(diào)用關(guān)系如下,注意下getAllWorkerInfos()到MasterThread.run()方法調(diào)用關(guān)系,比較難找。
cdn.xitu.io/2019/7/26/16c2c19b1f13cc4f?w=640&h=147&f=png&s=108416">
創(chuàng)建的兩個(gè)目錄如下:
/_hadoopBsp/job_201404102333_0002/_applicationAttemptsDir/0/_superstepDir/-1/_workerHealthyDir /_hadoopBsp/job_201404102333_0002/_applicationAttemptsDir/0/_superstepDir/-1/_workerUnhealthyDir
每個(gè)Worker在setup()中,調(diào)用registerHealth()方法來(lái)注冊(cè)自身的狀態(tài)。
若自身是Healthy的,則在_workerHealthyDir目錄下添加子節(jié)點(diǎn) /wokerInfo.getHostNameId(),否則在workerUnhealthyDir目錄下添加。wokerInfo.getHostNameId()為:Hostname+“”+TaskId。 Task1和Task2 (Task 0是master) 創(chuàng)建的子節(jié)點(diǎn)如下:
/_hadoopBsp/job_201404102333_0002/_applicationAttemptsDir/0/_superstepDir/-1/_workerHealthyDir/giraphx_1
/_hadoopBsp/job_201404102333_0002/_applicationAttemptsDir/0/_superstepDir/-1/_workerHealthyDir/giraphx_2
Master 在checkWorkers()方法中,在While死循環(huán)中(實(shí)際有超時(shí)限制),通過(guò)調(diào)用getAllWorkerInfos()方法來(lái)獲取_workerHealthyDir目錄下的子節(jié)點(diǎn),然后比較子節(jié)點(diǎn)數(shù)目是否達(dá)到maxWorkers(啟動(dòng)job時(shí)定義的,-w參數(shù))。
若小于maxWorkers,則繼續(xù)調(diào)用getAllWorkerInfos()方法進(jìn)行下一輪檢測(cè);若等于maxWorker,退出While循環(huán),然后返回healthyWorkersInfoList:[Worker(hostname=giraphx, MRtaskID=1, port=30001), Worker(hostname=giraphx, MRtaskID=2, port=30002)] 。
問(wèn)題:由于在分布式環(huán)境中,每個(gè)Worker和Maste都是并行運(yùn)行,彼此不知道對(duì)方的運(yùn)行情況。上述第3步驟中,若還有子節(jié)點(diǎn)還沒(méi)有創(chuàng)建,就一直在while死循環(huán)中調(diào)用來(lái)檢測(cè)getAllWorkerInfos()方法檢測(cè),效率比較低下,當(dāng)然也比較笨!
Giraph借用ZooKeeper來(lái)高效的進(jìn)行檢測(cè)。設(shè)計(jì)理念如下:
若某個(gè)task創(chuàng)建了子節(jié)點(diǎn)后,就會(huì)觸發(fā)Watcher事件。
若子節(jié)點(diǎn)數(shù)目小于maxWorkers,就調(diào)用 workerHealthRegistrationChanged的await()方法釋放當(dāng)前線程的鎖,陷入等待狀態(tài)。不會(huì)進(jìn)行無(wú)用的檢測(cè)。
說(shuō)明:workerHealthRegistrationChanged為PredicateLock類(lèi)型(implements BspEvent接口),PredicateLock里面使用可重入鎖 ReentrantLock和Condition進(jìn)行線程的控制。
當(dāng)某個(gè)task創(chuàng)建了子節(jié)點(diǎn)后,觸發(fā)Watcher事件。
調(diào)用BspService中的public final void Process(WatchedEvent event)事件,該方法根據(jù)事件的路徑來(lái)激活相應(yīng)的BspEvent事件。此處對(duì)應(yīng)的是:
實(shí)驗(yàn)運(yùn)行如下:
s(926)) - process: Got a new event, path = /_hadoopBsp/job_201404102333_0002/_applicationAttemptsDir/0/_superstepDir/-1/_workerHealthyDir, type = NodeChildrenChanged, state = SyncConnected INFO bsp.BspService (BspService.java:process(960)) - process: workerHealthRegistrationChanged (worker health reported - healthy/unhealthy )
這樣就會(huì)激活master線程,開(kāi)始下一輪檢測(cè)。
子節(jié)點(diǎn)數(shù)目等于maxWorkers時(shí),就停止。
總結(jié):每創(chuàng)建一個(gè)子節(jié)點(diǎn)時(shí),才會(huì)進(jìn)行一次檢測(cè),效率較高!