十年網(wǎng)站開發(fā)經(jīng)驗(yàn) + 多家企業(yè)客戶 + 靠譜的建站團(tuán)隊(duì)
量身定制 + 運(yùn)營維護(hù)+專業(yè)推廣+無憂售后,網(wǎng)站問題一站解決
本期內(nèi)容:
創(chuàng)新互聯(lián)成立與2013年,先為霍城等服務(wù)建站,霍城等地企業(yè),進(jìn)行企業(yè)商務(wù)咨詢服務(wù)。為霍城企業(yè)網(wǎng)站制作PC+手機(jī)+微官網(wǎng)三網(wǎng)同步一站式服務(wù)解決您的所有建站問題。
1、Spark Streaming資源動(dòng)態(tài)分配
2、Spark Streaming動(dòng)態(tài)控制消費(fèi)速率
為什么需要?jiǎng)討B(tài)?
a)Spark默認(rèn)情況下粗粒度的,先分配好資源再計(jì)算。對于Spark Streaming而言有高峰值和低峰值,但是他們需要的資源是不一樣的,如果按照高峰值的角度的話,就會(huì)有大量的資源浪費(fèi)。
b) Spark Streaming不斷的運(yùn)行,對資源消耗和管理也是我們要考慮的因素。
Spark Streaming資源動(dòng)態(tài)調(diào)整的時(shí)候會(huì)面臨挑戰(zhàn):
Spark Streaming是按照Batch Duration運(yùn)行的,Batch Duration需要很多資源,下一次Batch Duration就不需要那么多資源了,調(diào)整資源的時(shí)候還沒調(diào)整完Batch Duration運(yùn)行就已經(jīng)過期了。這個(gè)時(shí)候調(diào)整時(shí)間間隔。
Spark Streaming資源動(dòng)態(tài)申請
1. 在SparkContext中默認(rèn)是不開啟動(dòng)態(tài)資源分配的,但是可以通過手動(dòng)在SparkConf中配置。
// Optionally scale number of executors dynamically based on workload. Exposed for testing. val dynamicAllocationEnabled = Utils.isDynamicAllocationEnabled(_conf) if (!dynamicAllocationEnabled && _conf.getBoolean("spark.dynamicAllocation.enabled", false)) { logWarning("Dynamic Allocation and num executors both set, thus dynamic allocation disabled.") } _executorAllocationManager = if (dynamicAllocationEnabled) { Some(new ExecutorAllocationManager(this, listenerBus, _conf)) } else { None } _executorAllocationManager.foreach(_.start())
設(shè)置spark.dynamicAllocation.enabled參數(shù)為true
這里會(huì)通過實(shí)例化ExecutorAllocationManager對象來動(dòng)態(tài)分配資源,其內(nèi)部是有定時(shí)器會(huì)不斷的去掃描Executor的情況,通過線程池的方式調(diào)用schedule()來完成資源動(dòng)態(tài)分配。
/** * Register for scheduler callbacks to decide when to add and remove executors, and start * the scheduling task. */ def start(): Unit = { listenerBus.addListener(listener) val scheduleTask = new Runnable() { override def run(): Unit = { try { schedule() //動(dòng)態(tài)調(diào)整Executor分配數(shù)量 } catch { case ct: ControlThrowable => throw ct case t: Throwable => logWarning(s"Uncaught exception in thread ${Thread.currentThread().getName}", t) } } } executor.scheduleAtFixedRate(scheduleTask, 0, intervalMillis, TimeUnit.MILLISECONDS) }
private def schedule(): Unit = synchronized { val now = clock.getTimeMillis updateAndSyncNumExecutorsTarget(now) //更新Executor數(shù)量 removeTimes.retain { case (executorId, expireTime) => val expired = now >= expireTime if (expired) { initializing = false removeExecutor(executorId) } !expired } }
/** * Updates our target number of executors and syncs the result with the cluster manager. * * Check to see whether our existing allocation and the requests we've made previously exceed our * current needs. If so, truncate our target and let the cluster manager know so that it can * cancel pending requests that are unneeded. * * If not, and the add time has expired, see if we can request new executors and refresh the add * time. * * @return the delta in the target number of executors. */ private def updateAndSyncNumExecutorsTarget(now: Long): Int = synchronized { val maxNeeded = maxNumExecutorsNeeded if (initializing) { // Do not change our target while we are still initializing, // Otherwise the first job may have to ramp up unnecessarily 0 } else if (maxNeeded < numExecutorsTarget) { // The target number exceeds the number we actually need, so stop adding new // executors and inform the cluster manager to cancel the extra pending requests val oldNumExecutorsTarget = numExecutorsTarget numExecutorsTarget = math.max(maxNeeded, minNumExecutors) numExecutorsToAdd = 1 // If the new target has not changed, avoid sending a message to the cluster manager if (numExecutorsTarget < oldNumExecutorsTarget) { client.requestTotalExecutors(numExecutorsTarget, localityAwareTasks, hostToLocalTaskCount) logDebug(s"Lowering target number of executors to $numExecutorsTarget (previously " + s"$oldNumExecutorsTarget) because not all requested executors are actually needed") } numExecutorsTarget - oldNumExecutorsTarget } else if (addTime != NOT_SET && now >= addTime) { val delta = addExecutors(maxNeeded) logDebug(s"Starting timer to add more executors (to " + s"expire in $sustainedSchedulerBacklogTimeoutS seconds)") addTime += sustainedSchedulerBacklogTimeoutS * 1000 delta } else { 0 } }
動(dòng)態(tài)控制消費(fèi)速率:
Spark Streaming提供了一種彈性機(jī)制,流進(jìn)來的速度和處理速度的關(guān)系,是否來得及處理數(shù)據(jù)。如果不能來得及的話,他會(huì)自動(dòng)動(dòng)態(tài)控制數(shù)據(jù)流進(jìn)來的速度,spark.streaming.backpressure.enabled參數(shù)設(shè)置。