十年網(wǎng)站開(kāi)發(fā)經(jīng)驗(yàn) + 多家企業(yè)客戶(hù) + 靠譜的建站團(tuán)隊(duì)
量身定制 + 運(yùn)營(yíng)維護(hù)+專(zhuān)業(yè)推廣+無(wú)憂(yōu)售后,網(wǎng)站問(wèn)題一站解決
概念:
半消息: 在原有隊(duì)列消息執(zhí)行后的邏輯,如果后面的本地邏輯出錯(cuò),則不發(fā)送該消息,如果通過(guò)則告知rocketmq發(fā)送
操作步驟 :
1.(生產(chǎn)者)發(fā)送-【半消息】
2.(生產(chǎn)者)本地監(jiān)聽(tīng)-【半消息】處理結(jié)果
3.(消費(fèi)者)處理-【半消息】
1.(生產(chǎn)者)發(fā)送-【半消息】
10年的宣化網(wǎng)站建設(shè)經(jīng)驗(yàn),針對(duì)設(shè)計(jì)、前端、開(kāi)發(fā)、售后、文案、推廣等六對(duì)一服務(wù),響應(yīng)快,48小時(shí)及時(shí)工作處理。成都全網(wǎng)營(yíng)銷(xiāo)推廣的優(yōu)勢(shì)是能夠根據(jù)用戶(hù)設(shè)備顯示端的尺寸不同,自動(dòng)調(diào)整宣化建站的顯示方式,使網(wǎng)站能夠適用不同顯示終端,在瀏覽器中調(diào)整網(wǎng)站的寬度,無(wú)論在任何一種瀏覽器上瀏覽網(wǎng)站,都能展現(xiàn)優(yōu)雅布局與設(shè)計(jì),從而大程度地提升瀏覽體驗(yàn)。成都創(chuàng)新互聯(lián)公司從事“宣化網(wǎng)站設(shè)計(jì)”,“宣化網(wǎng)站推廣”以來(lái),每個(gè)客戶(hù)項(xiàng)目都認(rèn)真落實(shí)執(zhí)行。
// 消息體
@Data
@Builder
@ToString
public class UserMoneyParams {
int userId;
String act;
double money;
String info;
String infoParams;
}
// 發(fā)送消息
// 發(fā)送-隊(duì)列半消息: rocketMQ
@RequestMapping("rocketMQHalf")
public ApiResult rocketMQHalf() {
int orderId = 2;
double money = 10;
// 用戶(hù)余額變更-參數(shù)體
UserMoneyParams userMoneyParams = UserMoneyParams.builder()
.act("pay-order")
.userId(orderId)
.money(money)
.build();
// 用戶(hù)數(shù)據(jù)變更-參數(shù)
UserOrder userOrder = this.userOrderMapper.selectByPrimaryKey(1);
log.info("發(fā)送前參數(shù): "+userMoneyParams.toString());
rocketMQTemplate.sendMessageInTransaction(
// 半消息-分組
"tsca-group-half",
// 半消息-topic
"member-change-money-half-topic",
// 半消息-數(shù)據(jù)體
MessageBuilder
.withPayload(userMoneyParams)
.setHeader(RocketMQHeaders.TRANSACTION_ID, UUID.randomUUID())
.build(),
userOrder
);
return ApiResult.success("發(fā)送隊(duì)列-半消息");
}
2.(生產(chǎn)者)本地監(jiān)聽(tīng)-【半消息】處理結(jié)果
@RocketMQTransactionListener(txProducerGroup = "tsca-group-half")
@RequiredArgsConstructor
@Slf4j
public class UserMoneyHalfListener implements RocketMQLocalTransactionListener {
@Autowired
redisUtil redisUtil;
@Autowired
UserOrderService userOrderService;
// 生產(chǎn)者-消息處理完畢,繼續(xù)執(zhí)行本地方法(含事務(wù))
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object arg) {
try {
Object userMoneyParams=message.getPayload();
log.info("消息-args:"+arg);
// 消息主體加密無(wú)法獲取
log.info("消息-主體:"+ JSON.toJSONString(userMoneyParams));
log.info("消息-主體-頭部:"+message.getHeaders().get(RocketMQHeaders.TRANSACTION_ID));
log.info("半消息-本地-處理完成");
return RocketMQLocalTransactionState.COMMIT;
} catch (Exception e) {
log.warn("半消息-本地-發(fā)生異常,回滾: "+e.getMessage());
return RocketMQLocalTransactionState.ROLLBACK;
}
}
// 生產(chǎn)者-消息處理超時(shí)
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
// 查詢(xún)消息是否已經(jīng)處理
String messageID = String.valueOf(message.getHeaders().get("tsca-half-message-id"));
Object messageData = this.redisUtil.getValue(messageID, String.class);
if (messageData != null && messageData.equals("ok")) {
// 超時(shí)且消息已經(jīng)處理完畢
log.info("半消息-本地消息超時(shí)-且已經(jīng)處理完畢");
return RocketMQLocalTransactionState.COMMIT;
} else {
log.info("半消息-本地消息超時(shí)-且未處理完畢");
// 超時(shí)且消息未處理完畢
return RocketMQLocalTransactionState.ROLLBACK;
}
}
}
3.(消費(fèi)者)處理-【半消息】
@Service
@RocketMQMessageListener(consumerGroup = "tsca-group-half", topic = "member-change-money-half-topic")
@Slf4j
public class UserMoneyHalfListener implements RocketMQListener {
// @Autowired
// UserMoneyService memberOrderService;
@Override
public void onMessage(UserMoneyParams memberMoneyMessage) {
log.info("收到-用戶(hù)余額變動(dòng)-半消息");
try {
} catch (Exception e) {
log.info("更改余額錯(cuò)誤: "+e.getMessage());
e.printStackTrace();
}
log.info(JSON.toJSONString(memberMoneyMessage));
}
}