十年網(wǎng)站開發(fā)經(jīng)驗(yàn) + 多家企業(yè)客戶 + 靠譜的建站團(tuán)隊(duì)
量身定制 + 運(yùn)營維護(hù)+專業(yè)推廣+無憂售后,網(wǎng)站問題一站解決
今天就跟大家聊聊有關(guān)Spark2.x中如何進(jìn)行BlockManagerMaster源碼剖析,可能很多人都不太了解,為了讓大家更加了解,小編給大家總結(jié)了以下內(nèi)容,希望大家根據(jù)這篇文章可以有所收獲。

成都網(wǎng)站制作、網(wǎng)站建設(shè)的開發(fā),更需要了解用戶,從用戶角度來建設(shè)網(wǎng)站,獲得較好的用戶體驗(yàn)。創(chuàng)新互聯(lián)建站多年互聯(lián)網(wǎng)經(jīng)驗(yàn),見的多,溝通容易、能幫助客戶提出的運(yùn)營建議。作為成都一家網(wǎng)絡(luò)公司,打造的就是網(wǎng)站建設(shè)產(chǎn)品直銷的概念。選擇創(chuàng)新互聯(lián)建站,不只是建站,我們把建站作為產(chǎn)品,不斷的更新、完善,讓每位來訪用戶感受到浩方產(chǎn)品的價(jià)值服務(wù)。
1.BlockManagerMaster創(chuàng)建
BlockManagerMaster要負(fù)責(zé)整個(gè)應(yīng)用程序在運(yùn)行期間block元數(shù)據(jù)的管理和維護(hù),以及向從節(jié)點(diǎn)發(fā)送指令執(zhí)行命令,它是在構(gòu)造SparkEnv的時(shí)候創(chuàng)建的,Driver端是創(chuàng)建SparkContext的時(shí)候創(chuàng)建SparkEnv,SparkEnv中對應(yīng)的初始化代碼如下:
val blockManagerMaster = new BlockManagerMaster(registerOrLookupEndpoint( BlockManagerMaster.DRIVER_ENDPOINT_NAME, new BlockManagerMasterEndpoint(rpcEnv, isLocal, conf, listenerBus)), conf, isDriver)
這里可以看到在構(gòu)造blockManagerMaster時(shí),會創(chuàng)建一個(gè)BlockManagerMasterEndpoint實(shí)例并注冊到了rpcEnv中,Executor中的blockManager通過Driver端BlockManagerMasterEndpoint的引用BlockManagerMasterRef與blockManagerMaster進(jìn)行通信。
2.BlockManagerMaster成員函數(shù):
1).removeExecutor()函數(shù),代碼如下:
//向BlockManagerMasterEndpoint發(fā)送RemoveExecutor消息,移除掛掉的Exeutor //這個(gè)函數(shù)只會在driver端調(diào)用 def removeExecutor(execId: String) { tell(RemoveExecutor(execId)) logInfo("Removed " + execId + " successfully in removeExecutor") } 2).removeExecutorAsync()函數(shù),代碼如下:
// 跟1)作用差不多,移除掛掉的Executor,這里是非阻塞的異步方法 def removeExecutorAsync(execId: String) { driverEndpoint.ask[Boolean](RemoveExecutor(execId)) logInfo("Removal of executor " + execId + " requested") } 3).registerBlockManager()函數(shù),代碼如下:
//Executor端的BlockManager啟動(dòng)會,會向BlockManagerMaster進(jìn)行注冊// BlockManagerMaster會保存在master的blockManagerInfo中 def registerBlockManager( blockManagerId: BlockManagerId, maxOnHeapMemSize: Long, maxOffHeapMemSize: Long, slaveEndpoint: RpcEndpointRef): BlockManagerId = { logInfo(s"Registering BlockManager $blockManagerId") val updatedId = driverEndpoint.askSync[BlockManagerId]( RegisterBlockManager(blockManagerId, maxOnHeapMemSize, maxOffHeapMemSize, slaveEndpoint)) logInfo(s"Registered BlockManager $updatedId") updatedId } 3).updateBlockInfo()函數(shù),代碼如下:
//更新block數(shù)據(jù)塊信息 def updateBlockInfo( blockManagerId: BlockManagerId, blockId: BlockId, storageLevel: StorageLevel, memSize: Long, diskSize: Long): Boolean = { //向BlockManagerMasterEndpoint發(fā)送UpdateBlockInfo消息,并且返回結(jié)果 val res = driverEndpoint.askSync[Boolean]( UpdateBlockInfo(blockManagerId, blockId, storageLevel, memSize, diskSize)) logDebug(s"Updated info of block $blockId") res } 4).getLocations()函數(shù),代碼如下:
//獲取block所在的BockManager節(jié)點(diǎn)信息,這里返回的是Seq集合,//如果block的Replication>1 一個(gè)block塊,可能會在多個(gè)blockmanager//節(jié)點(diǎn)上存在def getLocations(blockId: BlockId): Seq[BlockManagerId] = {//向BlockManagerMasterEndpoint發(fā)送GetLocations消息driverEndpoint.askSync[Seq[BlockManagerId]](GetLocations(blockId))}
5).getPeers()函數(shù),代碼如下:
//獲取參數(shù)blockManagerId之外的其他BlockManagerId, //上面說了一個(gè)block,可能會在多個(gè)blockmanager節(jié)點(diǎn)上存在 def getPeers(blockManagerId: BlockManagerId): Seq[BlockManagerId] = { //向BlockManagerMasterEndpoint發(fā)送GetPeers消息 driverEndpoint.askSync[Seq[BlockManagerId]](GetPeers(blockManagerId)) } 6).getExecutorEndpointRef()函數(shù),代碼如下:
//這里就是獲取BlockManagerMasterEndpoint的引用,與其進(jìn)行通信 private def getExecutorEndpointRef(executorId: String): Option[RpcEndpointRef] = { for ( blockManagerId <- blockManagerIdByExecutor.get(executorId); info <- blockManagerInfo.get(blockManagerId) ) yield { info.slaveEndpoint } } 7).getBlockStatus()函數(shù),代碼如下:
//獲取一個(gè)Block的狀態(tài)信息,位置,占用內(nèi)存和磁盤大小def getBlockStatus( blockId: BlockId, askSlaves: Boolean = true): Map[BlockManagerId, BlockStatus] = { val msg = GetBlockStatus(blockId, askSlaves) val response = driverEndpoint. askSync[Map[BlockManagerId, Future[Option[BlockStatus]]]](msg) val (blockManagerIds, futures) = response.unzip implicit val sameThread = ThreadUtils.sameThread val cbf = implicitly[ CanBuildFrom[Iterable[Future[Option[BlockStatus]]], Option[BlockStatus], Iterable[Option[BlockStatus]]]] val blockStatus = timeout.awaitResult( Future.sequence[Option[BlockStatus], Iterable](futures)(cbf, ThreadUtils.sameThread)) if (blockStatus == null) { throw new SparkException("BlockManager returned null for BlockStatus query: " + blockId) } blockManagerIds.zip(blockStatus).flatMap { case (blockManagerId, status) => status.map { s => (blockManagerId, s) } }.toMap } BlockManagerMaster里面的各種函數(shù)處理其實(shí)都在 BlockManagerMasterEndpoint實(shí)例中,后面我們會詳細(xì)剖析BlockManagerMasterEndpoint類的各個(gè)消息的具體處理流程。
看完上述內(nèi)容,你們對Spark2.x中如何進(jìn)行BlockManagerMaster源碼剖析有進(jìn)一步的了解嗎?如果還想了解更多知識或者相關(guān)內(nèi)容,請關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道,感謝大家的支持。