十年網(wǎng)站開發(fā)經(jīng)驗 + 多家企業(yè)客戶 + 靠譜的建站團(tuán)隊
量身定制 + 運(yùn)營維護(hù)+專業(yè)推廣+無憂售后,網(wǎng)站問題一站解決
RocketMQ支持消息消費(fèi)失敗后重新消費(fèi),具體代碼如下:
創(chuàng)新互聯(lián)主要從事網(wǎng)站制作、網(wǎng)站建設(shè)、網(wǎng)頁設(shè)計、企業(yè)做網(wǎng)站、公司建網(wǎng)站等業(yè)務(wù)。立足成都服務(wù)遼陽,10年網(wǎng)站建設(shè)經(jīng)驗,價格優(yōu)惠、服務(wù)專業(yè),歡迎來電咨詢建站服務(wù):028-86922220
/*
* Register callback to execute on arrival of messages fetched from brokers.
*/
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List msgs,
ConsumeConcurrentlyContext context) {
System.out.printf(Thread.currentThread().getName() + "%n");
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
});
也就是需要返回ConsumeConcurrentlyStatus.RECONSUME_LATER,那這個怎么實現(xiàn)的呢?
以push 非同步消息為例,消息消費(fèi)過程可以參考https://blog.51cto.com/483181/2056301
我們從客戶端成功獲取到一條消息開始,也就是DefaultMQPushConsumerImpl.pullMessage
public void pullMessage(final PullRequest pullRequest) {
...
PullCallback pullCallback = new PullCallback() {
@Override
public void onSuccess(PullResult pullResult) {
if (pullResult != null) {
pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,
subscriptionData);
switch (pullResult.getPullStatus()) {
case FOUND:
...
DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
pullResult.getMsgFoundList(),
processQueue,
pullRequest.getMessageQueue(),
dispathToConsume);
....
break;
}
}
}
@Override
public void onException(Throwable e) {
...
}
};
...
try {
this.pullAPIWrapper.pullKernelImpl(
pullRequest.getMessageQueue(),
subExpression,
subscriptionData.getExpressionType(),
subscriptionData.getSubVersion(),
pullRequest.getNextOffset(),
this.defaultMQPushConsumer.getPullBatchSize(),
sysFlag,
commitOffsetValue,
BROKER_SUSPEND_MAX_TIME_MILLIS,
CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND,
CommunicationMode.ASYNC,
pullCallback
);
} catch (Exception e) {
}
}
pullKernelImpl獲取消息后,如果是異步請求,那么將會回調(diào)pullCallback,我們假設(shè)成功拿到消息,也就是FOUND分支。
那么就會調(diào)用submit 消費(fèi)請求:consumeMessageService.submitConsumeRequest
submitConsumeRequest有兩個實現(xiàn)類,一個是pull, 一個是push。
我們以push為例。
ConsumeMessageConcurrentlyService.java
public void submitConsumeRequest(
final List msgs,
final ProcessQueue processQueue,
final MessageQueue messageQueue,
final boolean dispatchToConsume) {
final int consumeBatchSize = this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();
...
ConsumeRequest consumeRequest = new ConsumeRequest(msgThis, processQueue, messageQueue);
try {
this.consumeExecutor.submit(consumeRequest);
} catch (RejectedExecutionException e) {
}
}
}
}
初始化一個ConsumeRequest Runnable對象,然后提交到線程池consumeExecutor里面,那么我們繼續(xù)看ConsumeRequest。
class ConsumeRequest implements Runnable {
private final List msgs;
private final ProcessQueue processQueue;
private final MessageQueue messageQueue;
@Override
public void run() {
...
try {
...
status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);
} catch (Throwable e) {
....
}
....
if (!processQueue.isDropped()) {
ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);
} else {
log.warn("processQueue is dropped without process consume result. messageQueue={}, msgs={}", messageQueue, msgs);
}
}
}
首先看try-catch里面,這里面就是回調(diào)客戶端來消費(fèi)消息。
listener.consumeMessage(Collections.unmodifiableList(msgs), context);
就像我們的消息消費(fèi)重寫如下:
/*
* Register callback to execute on arrival of messages fetched from brokers.
*/
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List msgs,
ConsumeConcurrentlyContext context) {
System.out.printf(Thread.currentThread().getName() + "%n");
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
});
然后根據(jù)返回的status調(diào)用processConsumeResult來處理返回結(jié)果
public void processConsumeResult(
final ConsumeConcurrentlyStatus status,
final ConsumeConcurrentlyContext context,
final ConsumeRequest consumeRequest
) {
...
switch (this.defaultMQPushConsumer.getMessageModel()) {
case BROADCASTING:
for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
MessageExt msg = consumeRequest.getMsgs().get(i);
log.warn("BROADCASTING, the message consume failed, drop it, {}", msg.toString());
}
break;
case CLUSTERING:
log.info("CLUSTERING...");
List msgBackFailed = new ArrayList(consumeRequest.getMsgs().size());
for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
MessageExt msg = consumeRequest.getMsgs().get(i);
boolean result = this.sendMessageBack(msg, context);
if (!result) {
msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);
msgBackFailed.add(msg);
}
}
log.info("msgBackFailed.isEmpty() [{}]", msgBackFailed.isEmpty());
if (!msgBackFailed.isEmpty()) {
log.info("msgBackFailed [{}]", msgBackFailed);
consumeRequest.getMsgs().removeAll(msgBackFailed);
this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue());
}
break;
default:
break;
}
...
}
首先會判斷消費(fèi)模式是集群還是廣播模式,如果廣播模式,就日志記錄下不處理了。
如果是集群模式,那么把調(diào)用sendMessageBack發(fā)送消息到broker。等待下一次broker重新消費(fèi)消息。
如果發(fā)送失敗,那么立即就會消費(fèi)消息。
public boolean sendMessageBack(final MessageExt msg, final ConsumeConcurrentlyContext context) {
int delayLevel = context.getDelayLevelWhenNextConsume();
try {
this.defaultMQPushConsumerImpl.sendMessageBack(msg, delayLevel, context.getMessageQueue().getBrokerName());
return true;
} catch (Exception e) {
...
}
return false;
}
public void sendMessageBack(MessageExt msg, int delayLevel, final String brokerName)
throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
try {
String brokerAddr = (null != brokerName) ? this.mQClientFactory.findBrokerAddressInPublish(brokerName)
: RemotingHelper.parseSocketAddressAddr(msg.getStoreHost());
this.mQClientFactory.getMQClientAPIImpl().consumerSendMessageBack(brokerAddr, msg,
this.defaultMQPushConsumer.getConsumerGroup(), delayLevel, 5000, getMaxReconsumeTimes());
} catch (Exception e) {
....
}
}
public void consumerSendMessageBack(
final String addr,
final MessageExt msg,
final String consumerGroup,
final int delayLevel,
final long timeoutMillis,
final int maxConsumeRetryTimes
) throws RemotingException, MQBrokerException, InterruptedException {
ConsumerSendMsgBackRequestHeader requestHeader = new ConsumerSendMsgBackRequestHeader();
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CONSUMER_SEND_MSG_BACK, requestHeader);
requestHeader.setGroup(consumerGroup);
requestHeader.setOriginTopic(msg.getTopic());
requestHeader.setOffset(msg.getCommitLogOffset());
requestHeader.setDelayLevel(delayLevel);
requestHeader.setOriginMsgId(msg.getMsgId());
requestHeader.setMaxReconsumeTimes(maxConsumeRetryTimes);
log.info("addr [{}] request [{}] timeoutMillis [{}]", addr, request, timeoutMillis);
RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
request, timeoutMillis);
assert response != null;
switch (response.getCode()) {
case ResponseCode.SUCCESS: {
return;
}
default:
break;
}
throw new MQBrokerException(response.getCode(), response.getRemark());
}
簡單的調(diào)用api通過netty發(fā)到broker而已,請求碼是RequestCode.CONSUMER_SEND_MSG_BACK
public static final int CONSUMER_SEND_MSG_BACK = 36;
Broker會把消息存儲到文件中,當(dāng)然也會讓它的reconsume次數(shù)+1
具體可以參考SendMessageProcessor.proce***equest方法,這個后續(xù)再講