十年網(wǎng)站開發(fā)經(jīng)驗(yàn) + 多家企業(yè)客戶 + 靠譜的建站團(tuán)隊(duì)
量身定制 + 運(yùn)營(yíng)維護(hù)+專業(yè)推廣+無憂售后,網(wǎng)站問題一站解決
Kafka官網(wǎng)自己的介紹是:一個(gè)可支持分布式的流平臺(tái)。
kafka官網(wǎng)介紹
創(chuàng)新互聯(lián)建站長(zhǎng)期為近千家客戶提供的網(wǎng)站建設(shè)服務(wù),團(tuán)隊(duì)從業(yè)經(jīng)驗(yàn)10年,關(guān)注不同地域、不同群體,并針對(duì)不同對(duì)象提供差異化的產(chǎn)品和服務(wù);打造開放共贏平臺(tái),與合作伙伴共同營(yíng)造健康的互聯(lián)網(wǎng)生態(tài)環(huán)境。為北侖企業(yè)提供專業(yè)的成都網(wǎng)站建設(shè)、做網(wǎng)站,北侖網(wǎng)站改版等技術(shù)服務(wù)。擁有十多年豐富建站經(jīng)驗(yàn)和眾多成功案例,為您定制開發(fā)。
kafka三個(gè)關(guān)鍵能力:
1.發(fā)布訂閱記錄流,類似于消息隊(duì)列與企業(yè)信息系統(tǒng)
2.以容錯(cuò)的持久方式存儲(chǔ)記錄流
3.對(duì)流進(jìn)行處理
kafka通常應(yīng)用再兩大類應(yīng)用中:
1.構(gòu)建實(shí)時(shí)流數(shù)據(jù)管道,在系統(tǒng)或應(yīng)用程序之間可靠地獲取數(shù)據(jù)
2.構(gòu)建轉(zhuǎn)換或響應(yīng)數(shù)據(jù)流的實(shí)時(shí)流應(yīng)用程序
kafka的一些基本概念:
1.Kafka作為一個(gè)集群運(yùn)行在一個(gè)或多個(gè)服務(wù)器上,這些服務(wù)器可以跨越多個(gè)數(shù)據(jù)中心。
2.Kafka集群將記錄流存儲(chǔ)在稱為topic的類別中。
3.每個(gè)記錄由一個(gè)鍵、一個(gè)值和一個(gè)時(shí)間戳組成。
kafka核心API:
1.Producer API:允許應(yīng)用程序?qū)⒂涗浟靼l(fā)布到一個(gè)或多個(gè)topic。
2.Consumer API:允許應(yīng)用程序訂閱一個(gè)或多個(gè)topic并處理生成給它們的記錄流。
3.Streams API:允許應(yīng)用程序充當(dāng)流處理器,使用來自一個(gè)或多個(gè)topic的輸入流,
并生成一個(gè)或多個(gè)輸出topic的輸出流,從而有效地將輸入流轉(zhuǎn)換為輸出流。
4.Connector API:允許構(gòu)建和運(yùn)行可重用的生產(chǎn)者或消費(fèi)者,將topic連接到現(xiàn)有的應(yīng)用程序或數(shù)據(jù)系統(tǒng)。
例如,到關(guān)系數(shù)據(jù)庫(kù)的連接器可能捕獲對(duì)表的每個(gè)更改。
傳統(tǒng)消息傳遞有兩類模型:消息隊(duì)列、發(fā)布訂閱。在消息隊(duì)列中,一個(gè)消費(fèi)者池可以從一個(gè)服務(wù)器讀取數(shù)據(jù),而每個(gè)記錄都將被發(fā)送到其中一個(gè)服務(wù)器;在發(fā)布-訂閱中,記錄被廣播給所有消費(fèi)者。這兩種模型各有優(yōu)缺點(diǎn):
消息隊(duì)列優(yōu)缺點(diǎn):
它允許您在多個(gè)使用者實(shí)例上劃分?jǐn)?shù)據(jù)處理,這使您可以擴(kuò)展處理。
隊(duì)列不是多訂閱者的—一旦一個(gè)進(jìn)程讀取了它丟失的數(shù)據(jù)。
發(fā)布訂閱優(yōu)缺點(diǎn):
Publish-subscribe允許您將數(shù)據(jù)廣播到多個(gè)進(jìn)程,
但是由于每個(gè)消息都傳遞到每個(gè)訂閱者,因此無法擴(kuò)展處理。
作為消息傳遞系統(tǒng),那么跟mq有什么區(qū)別呢?(RabbitMq\redis\RocketMq\ActiveMq)
RabbitMQ:
遵循AMQP協(xié)議,由內(nèi)在高并發(fā)的erlang語言開發(fā),用在實(shí)時(shí)的對(duì)可靠性要求比較高的消息傳遞上.
萬級(jí)數(shù)據(jù)量,社區(qū)活躍度極高,可視化操作界面豐富。
提供了全面的核心功能,是消息隊(duì)列的優(yōu)秀產(chǎn)品。
因?yàn)槭莈rlang語言開發(fā),難以維護(hù)并且開發(fā)者很難二次開發(fā)。
Redis:
redis的主要場(chǎng)景是內(nèi)存數(shù)據(jù)庫(kù),作為消息隊(duì)列來說可靠性太差,而且速度太依賴網(wǎng)絡(luò)IO。
在服務(wù)器本機(jī)上的速度較快,且容易出現(xiàn)數(shù)據(jù)堆積的問題,在比較輕量的場(chǎng)合下能夠適用。
RocketMq:
rocketMq幾十萬級(jí)別數(shù)據(jù)量,基于Java開發(fā)。是阿里巴巴開源的一個(gè)消息產(chǎn)品。
應(yīng)對(duì)了淘寶雙十一考驗(yàn),并且文檔十分的完善,擁有一些其他消息隊(duì)列不具備的高級(jí)特性,
如定時(shí)推送,其他消息隊(duì)列是延遲推送,如rabbitMq通過設(shè)置expire字段設(shè)置延遲推送時(shí)間。
又比如rocketmq實(shí)現(xiàn)分布式事務(wù),比較可靠的。RocketMq也是用過的唯一支持分布式事務(wù)的一款產(chǎn)品。
Kafka:
kafka原本設(shè)計(jì)的初衷是日志統(tǒng)計(jì)分析,現(xiàn)在基于大數(shù)據(jù)的背景下也可以做運(yùn)營(yíng)數(shù)據(jù)的分析統(tǒng)計(jì)。
kafka真正的大規(guī)模分布式消息隊(duì)列,提供的核心功能比較少?;趜ookeeper實(shí)現(xiàn)的分布式消息訂閱。
幾十萬級(jí)數(shù)據(jù)量級(jí),比RokectMq更強(qiáng)。
客戶端和服務(wù)器之間的通信是通過一個(gè)簡(jiǎn)單的、高性能的、語言無關(guān)的TCP協(xié)議來完成的。
ActiveMq:
Apache ActiveMQ?是最流行的開源、多協(xié)議、基于java的消息服務(wù)器。它支持行業(yè)標(biāo)準(zhǔn)協(xié)議,
因此用戶可以在各種語言和平臺(tái)上選擇客戶端??梢允褂脕碜訡、c++、Python、. net等的連接性。
使用通用的AMQP協(xié)議集成您的多平臺(tái)應(yīng)用程序。使用STOMP在websockets上交換web應(yīng)用程序之間的消息。
使用MQTT管理物聯(lián)網(wǎng)設(shè)備。支持您現(xiàn)有的JMS基礎(chǔ)結(jié)構(gòu)及其他。ActiveMQ提供了支持任何messagi的強(qiáng)大功能
和靈活性。
備注:因?yàn)樵撐恼轮饕榻Bkafka,所以上述只是簡(jiǎn)單羅列了一些特點(diǎn),如果有興趣的同學(xué)可以詳細(xì)的分析一下,這些產(chǎn)品我后續(xù)都會(huì)專門寫文章來歸納總結(jié)分析,在這里先簡(jiǎn)單帶過。
該部分是擴(kuò)展內(nèi)容,很多人包括我剛畢業(yè)那年使用消息隊(duì)列,但別人問道我為啥用消息隊(duì)列,我都沒有一個(gè)很清晰的認(rèn)識(shí),所以在這里也說一下。希望給有需要的同學(xué)一些幫助。
那么為什么要使用消息隊(duì)列呢?首先我們來回顧一下消息傳遞。前端而言,傳統(tǒng)方式是通過全局變量來傳遞,后面有了數(shù)據(jù)總線的概念,再后來有相應(yīng)的解決方案產(chǎn)品比如說vuex、redux、store等。對(duì)于后端來說,最先系統(tǒng)之間的通信,消息傳遞都非常依賴于通信對(duì)象彼此,高度耦合,后面有了一些產(chǎn)品來解決這些問題,比如說webservice.但這樣的方式極其不友好,而且維護(hù)繁瑣,職責(zé)難以分清,工作量增加,所以mq誕生后,基本解決了這些問題。
消息隊(duì)列的引入是為了:
1.解耦:
比如:A系統(tǒng)操作p,需要將消息傳遞給B、C兩個(gè)系統(tǒng),如果沒有消息隊(duì)列,那么A系統(tǒng)中需要給B發(fā)一條消息,
又得給C發(fā)一條消息,然后有一天D、E、F系統(tǒng)說:A系統(tǒng)你也要給我發(fā)p的消息,這個(gè)時(shí)候A又得修改代碼,
發(fā)布上線,DEF才能正常接收消息。然后過了n天,C又說,不要給我發(fā)消息了,把給我發(fā)消息的部分去掉吧。
A系統(tǒng)的開發(fā)人員又得哐哧哐哧的去掉,發(fā)布上線。這樣日復(fù)一日,隨著系統(tǒng)增多,接入和退出的操作增多,
那么A系統(tǒng)需要頻繁發(fā)布上線,降低了穩(wěn)定性、可用時(shí)間、同時(shí)每次上線都需要測(cè)試跟蹤測(cè)試,這里面的成本
與風(fēng)險(xiǎn)不言而喻。而消息隊(duì)列一旦引入,A不需要關(guān)心誰消費(fèi),誰退出消費(fèi),A只負(fù)責(zé)將消息放入隊(duì)列即可,
而其他系統(tǒng)只需要監(jiān)聽這個(gè)隊(duì)列,就算其他系統(tǒng)退出,對(duì)A而言也是沒有任何影響的,能夠一直持續(xù)不斷的
提供服務(wù),這難道不香嗎?
2.異步
比如說:傳統(tǒng)方式發(fā)送消息給B、C、D,需要120ms,那么如果采用了消息隊(duì)列,就可以大大降低耗時(shí)。但
這些對(duì)于那些非必要的同步業(yè)務(wù)邏輯適用。
3.削峰
傳統(tǒng)模式下,請(qǐng)求直接進(jìn)入到數(shù)據(jù)庫(kù),當(dāng)峰值到達(dá)一定時(shí),必然會(huì)掛掉。如果適用了中間件消息隊(duì)列,那么就可以很好的保證系統(tǒng)正常提供服務(wù),這也是秒殺系統(tǒng)中會(huì)常常談到的限流、這樣可以防止系統(tǒng)崩潰,提供系統(tǒng)可用性。
org.apache.kafka
kafka_2.12
1.0.0
provided
org.apache.kafka
kafka-clients
1.0.0
org.apache.kafka
kafka-streams
1.0.0
/**
* @author chandlerHuang
* @description @TODO
* @date 2020/1/15
*/
public class KafkaProducerService implements Runnable {
private final KafkaProducer producer;
private final String topic;
public KafkaProducerService(String topic) {
Properties props = new Properties();
props.put("bootstrap.servers", "綁定的外網(wǎng)IP:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("key.serializer", StringSerializer.class.getName());
props.put("value.serializer", StringSerializer.class.getName());
this.producer = new KafkaProducer(props);
this.topic = topic;
}
@Override
public void run() {
int messageNo = 1;
try {
for(;;) {
String messageStr="["+messageNo+"]:hello,boys!";
producer.send(new ProducerRecord(topic, "Message", messageStr));
//生產(chǎn)了100條就打印
if(messageNo%100==0){
System.out.println("sendMessages:" + messageStr);
}
//生產(chǎn)1000條就退出
if(messageNo%1000==0){
System.out.println("successCount:"+messageNo);
break;
}
messageNo++;
}
} catch (Exception e) {
e.printStackTrace();
} finally {
producer.close();
}
}
public static void main(String args[]) {
KafkaProducerService test = new KafkaProducerService(TopicConstant.CHART_TOPIC);
Thread thread = new Thread(test);
thread.start();
}
}
/**
* @author chandlerHuang
* @description @TODO
* @date 2020/1/15
*/
public class KafkaConsumerService implements Runnable{
private final KafkaConsumer consumer;
private ConsumerRecords msgList;
private final String topic;
private static final String GROUPID = "groupA";
public KafkaConsumerService(String topicName) {
Properties props = new Properties();
props.put("bootstrap.servers", "綁定的外網(wǎng)IP:9092");
props.put("group.id", GROUPID);
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("auto.offset.reset", "earliest");
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());
this.consumer = new KafkaConsumer(props);
this.topic = topicName;
this.consumer.subscribe(Arrays.asList(topic));
}
@Override
public void run() {
int messageNo = 1;
System.out.println("---------開始消費(fèi)---------");
try {
for (;;) {
msgList = consumer.poll(1000);
if(null!=msgList&&msgList.count()>0){
for (ConsumerRecord record : msgList) {
//消費(fèi)100條就打印 ,但打印的數(shù)據(jù)不一定是這個(gè)規(guī)律的
if(messageNo%100==0){
System.out.println(messageNo+"=======receive: key = " + record.key() + ", value = " + record.value()+" offset==="+record.offset());
}
//當(dāng)消費(fèi)了1000條就退出
if(messageNo%1000==0){
break;
}
messageNo++;
}
}else{
Thread.sleep(1000);
}
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
consumer.close();
}
}
public static void main(String args[]) {
KafkaConsumerService test1 = new KafkaConsumerService(TopicConstant.CHART_TOPIC);
Thread thread1 = new Thread(test1);
thread1.start();
}
}
備注:上述demo編寫過程中,發(fā)現(xiàn)報(bào)了一個(gè)Exception:Kafka java client 連接異常(org.apache.kafka.common.errors.TimeoutException: Failed to update metadata )...
kafka中需要配置server.文件:
advertised.listeners=PLAINTEXT://外網(wǎng)地址:9092
zookeeper.connect=內(nèi)網(wǎng)地址:2181
如果你是云服務(wù)器的話需要,在安全組設(shè)置對(duì)應(yīng)端口開放,否則無法訪問響應(yīng)接口!