十年網(wǎng)站開(kāi)發(fā)經(jīng)驗(yàn) + 多家企業(yè)客戶 + 靠譜的建站團(tuán)隊(duì)
量身定制 + 運(yùn)營(yíng)維護(hù)+專業(yè)推廣+無(wú)憂售后,網(wǎng)站問(wèn)題一站解決
上一章節(jié),我們從全局了解了一下Nacos項(xiàng)目的模塊架構(gòu),做到了心中有數(shù),現(xiàn)在,我們?nèi)ブ鸩饺ネ诰蚶锩娴拇a細(xì)節(jié),很多人在學(xué)習(xí)開(kāi)源的時(shí)候,無(wú)從下手,代碼那么多,從哪個(gè)地方開(kāi)始看呢?我們可以從一個(gè)接口開(kāi)始入手,這個(gè)接口是你使用過(guò)的,知道它大概做什么事,有體感的,大家還記得第一章時(shí),我們寫的HelloWorld嗎,對(duì),就從里面的接口開(kāi)始剝洋蔥。
創(chuàng)新互聯(lián)成立于2013年,我們提供高端網(wǎng)站建設(shè)、網(wǎng)站制作公司、成都網(wǎng)站設(shè)計(jì)、網(wǎng)站定制、成都營(yíng)銷網(wǎng)站建設(shè)、小程序制作、微信公眾號(hào)開(kāi)發(fā)、成都網(wǎng)站推廣服務(wù),提供專業(yè)營(yíng)銷思路、內(nèi)容策劃、視覺(jué)設(shè)計(jì)、程序開(kāi)發(fā)來(lái)完成項(xiàng)目落地,為成都戶外休閑椅企業(yè)提供源源不斷的流量和訂單咨詢。
https://github.com/alibaba/nacos,這個(gè)是Nacos的github代碼地址,開(kāi)始之前先start關(guān)注一下,加上watch,后續(xù)Nacos的郵件列表也會(huì)通知到你,可以關(guān)注到Nacos的最新實(shí)時(shí)消息,及各大牛之間的精彩討論。
下面這段代碼,是第一章節(jié)發(fā)布一個(gè)服務(wù)的代碼:
public static void main(String[] args) throws NacosException, InterruptedException { //發(fā)布的服務(wù)名 String serviceName = "helloworld.services"; //構(gòu)造一個(gè)Nacos實(shí)例,入?yún)⑹荖acos server的ip和服務(wù)端口 NamingService naming = NacosFactory.createNamingService("100.81.0.34:8080"); //發(fā)布一個(gè)服務(wù),該服務(wù)對(duì)外提供的ip為127.0.0.1,端口為8888 naming.registerInstance(serviceName, "100.81.0.35", 8080); Thread.sleep(Integer.MAX_VALUE);}
其中,第一步,是構(gòu)造一個(gè)Nacos服務(wù)實(shí)例,構(gòu)造實(shí)例的入?yún)?,是一個(gè)String,值的規(guī)范為ip:port,這個(gè)ip,就是我們?nèi)我庖慌_(tái)Nacos server的地址,我們點(diǎn)進(jìn)去看這個(gè)方法:
public static NamingService createNamingService(String serverAddr) throws NacosException { return NamingFactory.createNamingService(serverAddr); }
同時(shí)我們看下創(chuàng)建配置服務(wù)實(shí)例的代碼:
public static ConfigService createConfigService(String serverAddr) throws NacosException { return ConfigFactory.createConfigService(serverAddr);}
我們可以看到,NacosFactory實(shí)際上是一個(gè)服務(wù)發(fā)現(xiàn)和配置管理接口的統(tǒng)一入口,再由它不通的方法,創(chuàng)建不同服務(wù)的實(shí)例,我們可以直接使用NamingFactory,或者ConfigFactory直接創(chuàng)建Nacos的服務(wù)實(shí)例,也能work
cdn.nlark.com/lark/0/2018/png/4232/1537627839251-445ff4a4-d8ec-4cb0-abf5-ffd86d771bec.png">
接下來(lái),看一下,是如何構(gòu)造出這個(gè)Nacos naming實(shí)例的:
public static NamingService createNamingService(String serverList) throws NacosException { try { Class> driverImplClass = Class.forName("com.alibaba.nacos.client.naming.NacosNamingService"); Constructor constructor = driverImplClass.getConstructor(String.class); NamingService vendorImpl = (NamingService) constructor.newInstance(serverList); return vendorImpl; } catch (Throwable e) { throw new NacosException(-400, e.getMessage()); }}
通過(guò)反射實(shí)例化出了一個(gè)NamingService的實(shí)例NacosNamingService,構(gòu)造器是一個(gè)帶String入?yún)⒌?,我們順著往下看,?gòu)造函數(shù)里面做了哪些事情:
public NacosNamingService(String serverList) { this.serverList = serverList; init(); eventDispatcher = new EventDispatcher(); serverProxy = new NamingProxy(namespace, endpoint, serverList); beatReactor = new BeatReactor(serverProxy); hostReactor = new HostReactor(eventDispatcher, serverProxy, cacheDir);}
入?yún)erverList就是我們剛才傳入的服務(wù)端地址,值賦給了實(shí)例的serverList字段,接下來(lái)調(diào)用了一個(gè)init方法,這個(gè)方法里面如下:
private void init() { namespace = System.getProperty(PropertyKeyConst.NAMESPACE); if (StringUtils.isEmpty(namespace)) { namespace = UtilAndComs.DEFAULT_NAMESPACE_ID; } logName = System.getProperty(UtilAndComs.NACOS_NAMING_LOG_NAME); if (StringUtils.isEmpty(logName)) { logName = "naming.log"; } cacheDir = System.getProperty("com.alibaba.nacos.naming.cache.dir"); if (StringUtils.isEmpty(cacheDir)) { cacheDir = System.getProperty("user.home") + "/nacos/naming/" + namespace; }}
這面做了3件事,給namespace,logName,cacheDir賦值,namespace我們么有傳入,默認(rèn)是default,namespace在Nacos里面的作用,是用來(lái)進(jìn)行本地緩存隔離的,一臺(tái)機(jī)器上,啟動(dòng)一個(gè)Nacos的客戶端進(jìn)程,默認(rèn)的本地緩存路徑是default,如果再啟動(dòng)一個(gè),需要重新設(shè)置一個(gè)namespace,否則就會(huì)復(fù)用之前的緩存,造成沖突;logName和cacheDir,這2個(gè)字段就不解釋了,字面理解。這里多說(shuō)一句,這些值的設(shè)置,可以在java啟動(dòng)時(shí),通過(guò)系統(tǒng)參數(shù)的形式傳入,并且是第一優(yōu)先級(jí)的。
init方法執(zhí)行完之后,接下來(lái)是實(shí)例化一些框架組件,EventDispatcher,這個(gè)是一個(gè)經(jīng)典的事件分發(fā)組件,它的工作模式如下:
會(huì)有一個(gè)單獨(dú)線程從blockQueue中獲取事件,這個(gè)事件在Nacos這里, 就是服務(wù)端推送下來(lái)的數(shù)據(jù),listener在我們訂閱一條數(shù)據(jù)時(shí),會(huì)從生成一個(gè)listener實(shí)例,在事件到了隊(duì)列中,找到對(duì)應(yīng)的listener,去執(zhí)行里面listener的回調(diào)函數(shù)onEvent。如果對(duì)這個(gè)模式不熟悉的同學(xué),可以再翻看下EventDispatcher的代碼,這個(gè)屬于基礎(chǔ)知識(shí)了,和業(yè)務(wù)沒(méi)有關(guān)系,這里就不過(guò)多詳細(xì)講解,篇幅太長(zhǎng)。
接下來(lái),實(shí)例化了一個(gè)NameProxy的組件,這個(gè)東西是干嘛的呢?我們看下里面代碼:
public NamingProxy(String namespace, String endpoint, String serverList) { this.namespace = namespace; this.endpoint = endpoint; if (StringUtils.isNotEmpty(serverList)) { this.serverList = Arrays.asList(serverList.split(",")); if (this.serverList.size() == 1) { this.nacosDomain = serverList; } } executorService = new ScheduledThreadPoolExecutor(1, new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread t = new Thread(r); t.setName("com.taobao.vipserver.serverlist.updater"); t.setDaemon(true); return t; } }); executorService.scheduleWithFixedDelay(new Runnable() { @Override public void run() { refreshSrvIfNeed(); } }, 0, vipSrvRefInterMillis, TimeUnit.MILLISECONDS); refreshSrvIfNeed();}
這里面邏輯有些多,我總結(jié)下,主要是啟動(dòng)了一個(gè)線程,每隔30s,去執(zhí)行refreshSrvIfNeed()這個(gè)方法,
refreshSrvIfNeed()這個(gè)方法里面,做的事情,是通過(guò)一個(gè)http請(qǐng)求,去Nacos server獲取一串Nacos server集群的地址列表,具體代碼如下:
private void refreshSrvIfNeed() { try { if (!CollectionUtils.isEmpty(serverList)) { LogUtils.LOG.info("server list provided by user: " + serverList); return; } if (System.currentTimeMillis() - lastSrvRefTime < vipSrvRefInterMillis) { return; } Listlist = getServerListFromEndpoint(); if (list.isEmpty()) { throw new Exception("Can not acquire vipserver list"); } if (!CollectionUtils.isEqualCollection(list, serversFromEndpoint)) { LogUtils.LOG.info("SERVER-LIST", "server list is updated: " + list); } serversFromEndpoint = list; lastSrvRefTime = System.currentTimeMillis(); } catch (Throwable e) { LogUtils.LOG.warn("failed to update server list", e); } }
獲取完地址列表后,賦值給serversFromEndpoint,并且記錄當(dāng)前更新時(shí)間,在下一次更新時(shí),小于30s,就不更新,避免頻繁更新,總的來(lái)說(shuō),NameProxy的目的就是定時(shí)在客戶端維護(hù)Nacos服務(wù)端的最新地址列表。
我們繼續(xù)往下看,接下來(lái)初始化了BeatReactor這個(gè)組件,從名字可以猜測(cè),應(yīng)該是和心跳相關(guān)的事情,它初始化的代碼如下:
public BeatReactor(NamingProxy serverProxy) { this.serverProxy = serverProxy; executorService.scheduleAtFixedRate(new BeatProcessor(), 0, clientBeatInterval, TimeUnit.MILLISECONDS);}
起了一個(gè)定時(shí)間隔為10s的任務(wù),去執(zhí)行BeatProcessor里面的邏輯,BeatProcessor的代碼里面,是循環(huán)的去取當(dāng)前客戶端注冊(cè)好的實(shí)例,然后向服務(wù)端發(fā)送一個(gè)http的心跳通知請(qǐng)求,告訴客戶端,這個(gè)服務(wù)的健康狀態(tài),具體代碼如下:
class BeatTask implements Runnable { BeatInfo beatInfo; public BeatTask(BeatInfo beatInfo) { this.beatInfo = beatInfo; } @Override public void run() { Mapparams = new HashMap (2); params.put("beat", JSON.toJSONString(beatInfo)); params.put("dom", beatInfo.getDom()); try { String result = serverProxy.callAllServers(UtilAndComs.NACOS_URL_BASE + "/api/clientBeat", params); JSONObject jsonObject = JSON.parseObject(result); if (jsonObject != null) { clientBeatInterval = jsonObject.getLong("clientBeatInterval"); } } catch (Exception e) { LogUtils.LOG.error("CLIENT-BEAT", "failed to send beat: " + JSON.toJSONString(beatInfo), e); } }}
這里就是naocs的客戶端主動(dòng)上報(bào)服務(wù)健康狀況的邏輯了,是服務(wù)發(fā)現(xiàn)功能,比較重要的一個(gè)概念,服務(wù)健康檢查機(jī)制,常用的還有服務(wù)端主動(dòng)去探測(cè)客戶端的接口返回。
最后一步,就是初始化了一個(gè)叫HostReactor的實(shí)例,我們來(lái)看下,它干了些啥:
public HostReactor(EventDispatcher eventDispatcher, NamingProxy serverProxy, String cacheDir) { this.eventDispatcher = eventDispatcher; this.serverProxy = serverProxy; this.cacheDir = cacheDir; this.serviceInfoMap = new ConcurrentHashMap<>(DiskCache.read(this.cacheDir)); this.failoverReactor = new FailoverReactor(this, cacheDir); this.pushRecver = new PushRecver(this);}
第五行,是從緩存文件中加載數(shù)據(jù)到serviceInfoMap的內(nèi)存map中,接下來(lái),初始化了一個(gè)FailoverReactor的組件,這個(gè)是Nacos客戶端緩存容災(zāi)相關(guān)的,它里面的初始化代碼如下:
public void init() { executorService.scheduleWithFixedDelay(new SwitchRefresher(), 0L, 5000L, TimeUnit.MILLISECONDS); executorService.scheduleWithFixedDelay(new DiskFileWriter(), 30, DAY_PERIOD_MINUTES, TimeUnit.MINUTES); // backup file on startup if failover directory is empty. executorService.schedule(new Runnable() { @Override public void run() { try { File cacheDir = new File(failoverDir); if (!cacheDir.exists() && !cacheDir.mkdirs()) { throw new IllegalStateException("failed to create cache dir: " + failoverDir); } File[] files = cacheDir.listFiles(); if (files == null || files.length <= 0) { new DiskFileWriter().run(); } } catch (Throwable e) { LogUtils.LOG.error("NA", "failed to backup file on startup.", e); } } }, 10000L, TimeUnit.MILLISECONDS);}
初始化了3個(gè)定時(shí)任務(wù),第一個(gè)任務(wù)的代碼如下:
class SwitchRefresher implements Runnable { long lastModifiedMillis = 0L; @Override public void run() { try { File switchFile = new File(failoverDir + UtilAndComs.FAILOVER_SWITCH); if (!switchFile.exists()) { switchParams.put("failover-mode", "false"); LogUtils.LOG.debug("failover switch is not found, " + switchFile.getName()); return; } long modified = switchFile.lastModified(); if (lastModifiedMillis < modified) { lastModifiedMillis = modified; String failover = ConcurrentDiskUtil.getFileContent(failoverDir + UtilAndComs.FAILOVER_SWITCH, Charset.defaultCharset().toString()); if (!StringUtils.isEmpty(failover)) { Listlines = Arrays.asList(failover.split(DiskCache.getLineSeperator())); for (String line : lines) { String line1 = line.trim(); if ("1".equals(line1)) { switchParams.put("failover-mode", "true"); LogUtils.LOG.info("failover-mode is on"); new FailoverFileReader().run(); } else if ("0".equals(line1)) { switchParams.put("failover-mode", "false"); LogUtils.LOG.info("failover-mode is off"); } } } else { switchParams.put("failover-mode", "false"); } } } catch (Throwable e) { LogUtils.LOG.error("NA", "failed to read failover switch.", e); } }}
首先判定下容災(zāi)開(kāi)關(guān)是否有,容災(zāi)開(kāi)關(guān)是一個(gè)磁盤文件的形式存在,通過(guò)容災(zāi)開(kāi)關(guān)文件名字,判定容災(zāi)開(kāi)關(guān)是否打開(kāi),1表示打開(kāi),0為關(guān)閉,讀取到容災(zāi)開(kāi)關(guān)后,將值更新到內(nèi)存中,后續(xù)解析地址列表時(shí),首先會(huì)判定一下容災(zāi)開(kāi)關(guān)是否打開(kāi),如果打開(kāi)了,就讀緩存的數(shù)據(jù),否則從服務(wù)端獲取最新數(shù)據(jù)。
第二個(gè)定時(shí)任務(wù),做的事情如下:
class DiskFileWriter extends TimerTask { public void run() { Mapmap = hostReactor.getServiceInfoMap(); for (Map.Entry entry : map.entrySet()) { ServiceInfo serviceInfo = entry.getValue(); if (StringUtils.equals(serviceInfo.getKey(), UtilAndComs.ALL_IPS) || StringUtils.equals(serviceInfo.getName(), UtilAndComs.ENV_LIST_KEY) || StringUtils.equals(serviceInfo.getName(), "00-00---000-ENV_CONFIGS-000---00-00") || StringUtils.equals(serviceInfo.getName(), "vipclient.properties") || StringUtils.equals(serviceInfo.getName(), "00-00---000-ALL_HOSTS-000---00-00")) { continue; } DiskCache.write(serviceInfo, failoverDir); } }}
每隔24小時(shí),把內(nèi)存中所有的服務(wù)數(shù)據(jù),寫一遍到磁盤中,其中需要過(guò)濾掉一些非域名數(shù)據(jù)的特殊數(shù)據(jù),具體可看代碼中的描述。最后一個(gè)定時(shí)任務(wù),是每隔10s,是檢查緩存目錄是否存在,同時(shí)如果緩存里面值沒(méi)有的話,主動(dòng)觸發(fā)一次緩存寫磁盤的操作。
以上就是客戶端構(gòu)造一個(gè)Nacos實(shí)例的初始化全部流程,大部分都是在初始化多個(gè)線程池或者定時(shí)任務(wù),各司其職,這個(gè)也是我們寫后端程序的一些基本套路,提高系統(tǒng)的并發(fā)能力,同時(shí)在對(duì)任務(wù)的分發(fā)和執(zhí)行,引入一些常用的異步編程模型如隊(duì)列模型的事件分發(fā),這些都是異步和并發(fā)的很好學(xué)習(xí)素材,這2點(diǎn)也是寫高性能程序的基本要求。
這一章節(jié),我們通過(guò)Nacos的NacosFactory構(gòu)造一個(gè)nacos服務(wù)實(shí)例作為切入點(diǎn),把客戶端的初始化流程給串了一遍,概述下客戶端初始化流程做的幾件事:
初始化事件分發(fā)組件,用于處理服務(wù)端主動(dòng)通知下來(lái)的變更數(shù)據(jù)
初始化Nacos服務(wù)集群地址列表更新組件,用于客戶端維護(hù)Nacos服務(wù)端的最新地址列表
初始化服務(wù)健康檢查模塊,主動(dòng)給服務(wù)端上報(bào)服務(wù)的健康情況
初始化客戶端的緩存,10s檢查一次,如果沒(méi)有,則創(chuàng)建
24小時(shí)備份一次客戶端的緩存文件
5s檢查一次容災(zāi)開(kāi)關(guān),更新到內(nèi)存中,容災(zāi)模式情況下,服務(wù)地址讀的都是緩存
以上就是Nacos客戶端實(shí)例初始化的整體流程,是不是感覺(jué)做的事情挺多的,還有一些代碼的細(xì)節(jié)點(diǎn),大家自己多精讀一下,如果有什么不明白的,可以留言,或者在社區(qū)找@超哥幫你解答,如果能發(fā)現(xiàn)bug或者其他建議,可以在社區(qū)提issue。