十年網(wǎng)站開發(fā)經(jīng)驗 + 多家企業(yè)客戶 + 靠譜的建站團隊
量身定制 + 運營維護+專業(yè)推廣+無憂售后,網(wǎng)站問題一站解決
Kafka重復消費場景及解決方案是什么,針對這個問題,這篇文章詳細介紹了相對應(yīng)的分析和解答,希望可以幫助更多想解決這個問題的小伙伴找到更簡單易行的方法。
十年專注成都網(wǎng)站制作,企業(yè)網(wǎng)站設(shè)計,個人網(wǎng)站制作服務(wù),為大家分享網(wǎng)站制作知識、方案,網(wǎng)站設(shè)計流程、步驟,成功服務(wù)上千家企業(yè)。為您提供網(wǎng)站建設(shè),網(wǎng)站制作,網(wǎng)頁設(shè)計及定制高端網(wǎng)站建設(shè)服務(wù),專注于企業(yè)網(wǎng)站設(shè)計,高端網(wǎng)頁制作,對成都橡塑保溫等多個行業(yè),擁有豐富設(shè)計經(jīng)驗。
Kafka消費者以消費者組(Consumer Group)的形式消費一個topic,發(fā)布到topic中的每個記錄將傳遞到每個訂閱消費者者組中的一個消費者實例。Consumer Group 之間彼此獨立,互不影響,它們能夠訂閱相同的一組主題而互不干涉。生產(chǎn)環(huán)境中消費者在消費消息的時候若不考慮消費者的相關(guān)特性可能會出現(xiàn)重復消費的問題。
在討論重復消費之前,首先來看一下kafka中跟消費者有關(guān)的幾個重要配置參數(shù)。
enable.auto.commit默認值true,表示消費者會周期性自動提交消費的offset
auto.commit.interval.ms在enable.auto.commit為true的情況下, 自動提交的間隔,默認值5000ms
max.poll.records 單次消費者拉取的最大數(shù)據(jù)條數(shù),默認值
500 max.poll.interval.ms默認值5分鐘,表示若5分鐘之內(nèi)消費者沒有消費完上一次poll的消息,那么consumer會主動發(fā)起離開group的請求
在常見的使用場景下,我們的消費者配置比較簡單,特別是集成Spring組件進行消息的消費,通常情況下我們僅需通過一個注解就可以實現(xiàn)消息的消費。例如如下代碼:
這段代碼中我們配置了一個kafka消費注解,制定消費名為"test1"的topic,這個消費者屬于"group1"消費組。開發(fā)者只需要對得到的消息進行處理即可。那么這段 代碼中的消費者在這個過程中是如何拉取消息的呢,消費者消費消息之后又是如何提交對應(yīng)消息的位移(offset)的呢?
實際上在auto.commit=true時,當上一次poll方法拉取的消息消費完時會進行下一次poll,在經(jīng)過auto.commit.interval.ms間隔后,下一次調(diào)用poll時會提交所有已消費消息的offset。
為了驗證consumer自動提交的時機,配置消費者參數(shù)如下:
為了便于獲取消費者消費進度,以下代碼通過kafka提供的相關(guān)接口定時每隔5s獲取一次消費者的消費進度信息,并將獲取到的信息打印到控制臺。
對于topic test1,為了便于觀察消費情況,我們僅設(shè)置了一個partition。對于消費者組group1的配置參數(shù),消費者會單次拉取消息數(shù)20條,消費每條消息耗費1s,部分記錄日志打印結(jié)果如下:
從日志中可以看出,消費組的offset每40s更新一次,因為每次poll會拉取20條消息,每個消息消費1s,在第一次poll之后,下一次poll因為沒有達到auto.commit.interval.ms=30s,所以不會提交offset。第二次poll時,已經(jīng)經(jīng)過40s,因此這次poll會提交之前兩次消費的消息,offset增加40。也就是說只有在經(jīng)過auto.commit.interval.ms間隔后,并且在下一次調(diào)用poll時才會提交所有 已消費消息的offset。
考慮到以上消費者消費消息的特點,在配置自動提交enable.auto.commit 默認值true情況下,出現(xiàn)重復消費的場景有以下幾種:
Consumer 在消費過程中,應(yīng)用進程被強制kill掉或發(fā)生異常退出。
例如在一次poll500條消息后,消費到200條時,進程被強制kill消費導致offset 未提交,或出現(xiàn)異常退出導致消費到offset未提交。下次重啟時,依然會重新拉取這500消息,這樣就造成之前消費到200條消息重復消費了兩次。因此在有消費者線程的應(yīng)用中,應(yīng)盡量避免使用kill -9這樣強制殺進程的命令。
消費者消費時間過長
max.poll.interval.ms參數(shù)定義了兩次poll的最大間隔,它的默認值是 5 分鐘,表示你的 Consumer 程序如果在 5 分鐘之內(nèi)無法消費完 poll 方法返回的消息,那么 Consumer 會主動發(fā)起“離開組”的請求,Coordinator 也會開啟新一輪 Rebalance。若消費者消費的消息比較耗時,那么這種情況可能就會出現(xiàn)。
為了復現(xiàn)這種場景,我們對消費者重新進行了配置,消費者參數(shù)如下:
在消費過程中消費者單次會拉取11條消息,每條消息耗時30s,11條消息耗時 5分鐘30秒,由于max.poll.interval.ms默認值5分鐘,所以理論上消費者無法在5分鐘內(nèi)消費完,consumer會離開組,導致rebalance。
實際運行日志如下:
可以看到在消費完第11條消息后,因為消費時間超出max.poll.interval.ms默認值5分鐘,這時consumer已經(jīng)離開消費組了,開始rebalance,因此提交offset失敗。之后重新rebalance,消費者再次分配partition后,再次poll拉取消息依然從之前消費過的消息處開始消費,這樣就造成重復消費。而且若不解決消費單次消費時間過長的問題,這部分消息可能會一直重復消費。
對于上述重復消費的場景,若不進行相應(yīng)的處理,那么有可能造成一些線上問題。為了避免因重復消費導致的問題,以下提供了兩種解決重復消費的思路。
第一種思路是提高消費能力,提高單條消息的處理速度,例如對消息處理中比 較耗時的步驟可通過異步的方式進行處理、利用多線程處理等。在縮短單條消息消費時常的同時,根據(jù)實際場景可將max.poll.interval.ms值設(shè)置大一點,避免不 必要的rebalance,此外可適當減小max.poll.records的值,默認值是500,可根 據(jù)實際消息速率適當調(diào)小。這種思路可解決因消費時間過長導致的重復消費問題, 對代碼改動較小,但無法絕對避免重復消費問題。
第二種思路是引入單獨去重機制,例如生成消息時,在消息中加入唯一標識符如消息id等。在消費端,我們可以保存最近的1000條消息id到redis或MySQL表中,配置max.poll.records的值小于1000。在消費消息時先通過前置表去重后再進行消息的處理。
此外,在一些消費場景中,我們可以將消費的接口冪等處理,例如數(shù)據(jù)庫的查 詢操作天然具有冪等性,這時候可不用考慮重復消費的問題。對于例如新增數(shù)據(jù)的操作,可通過設(shè)置唯一鍵等方式以達到單次與多次操作對系統(tǒng)的影響相同,從而使接口具有冪等性。
關(guān)于 Kafka重復消費場景及解決方案是什么問題的解答就分享到這里了,希望以上內(nèi)容可以對大家有一定的幫助,如果你還有很多疑惑沒有解開,可以關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道了解更多相關(guān)知識。