借鑒AQS的CHL思路解決消息多線程消費(fèi)順序ACK問(wèn)題
背景
我們的支付場(chǎng)景下,要求消費(fèi)的業(yè)務(wù)消息絕不能丟失,且能充分利用高規(guī)格的服務(wù)器的性能,比如用線程池對(duì)業(yè)務(wù)消息進(jìn)行快速處理。有同學(xué)可能沒太理解這個(gè)問(wèn)題有啥不好處理,讓我一步步分析下。
MQ的優(yōu)勢(shì)和缺點(diǎn)
MQ是我們?cè)趹?yīng)對(duì)高并發(fā)場(chǎng)景最常用的一種措施,它可以幫我們對(duì)業(yè)務(wù)解耦、對(duì)流程異步化以及削峰填谷的妙用。
但是,由于引入了這一額外的中間件,也增加了系統(tǒng)的復(fù)雜度和不穩(wěn)定因素。
消息可靠性的應(yīng)對(duì)
消息的可靠性保證需要從消息流轉(zhuǎn)的每個(gè)環(huán)節(jié)進(jìn)行保障,比如生產(chǎn)端的事務(wù)型消息,broker的實(shí)時(shí)刷盤持久化,消費(fèi)端的手動(dòng)ACK 。
這里,我們對(duì)生產(chǎn)端和存儲(chǔ)端的保障措施不作討論,重點(diǎn)關(guān)注消費(fèi)端的手動(dòng)ACK機(jī)制。
手動(dòng)ACK的問(wèn)題
手動(dòng)ACK可以保證消息一定被消費(fèi),但是需要確保手動(dòng)ACK的順序和消息順序一致,為什么?
消息隊(duì)列之所以性能高處理快,是因?yàn)椴捎昧宋募樞蜃x寫方式,系統(tǒng)在拉取消息進(jìn)行消費(fèi)時(shí),是按順序文件的offset進(jìn)行拉取的,如果commit offset的順序錯(cuò)亂,會(huì)使得服務(wù)端的消息狀態(tài)錯(cuò)亂,比如消息重發(fā)。
因此,如果我們?cè)诒镜貑?dòng)了線程池,對(duì)消息進(jìn)行拉取處理,由于各線程的處理速度不一定一致,所以無(wú)法保證各線程處理完之后對(duì)各自消息的ACK操作是順序的,怎么辦,難道只能同步拉消費(fèi)取然后ACK么。
解決方案
最不濟(jì),可以提交一批任務(wù),批量等待統(tǒng)一提交。不過(guò)總覺得不優(yōu)雅。
某次看JUC中的AQS的時(shí)候,啟發(fā)了我。
我們平時(shí)用的類似CountDownLauch這些并發(fā)工具類,不也是處理的多線程協(xié)作的問(wèn)題么。
我們的場(chǎng)景完全沒有AQS復(fù)雜,借鑒它的思路,應(yīng)該是沒有問(wèn)題的。
- 創(chuàng)建雙端隊(duì)列,隊(duì)列節(jié)點(diǎn)中需要維護(hù)自身處理狀態(tài)state,和對(duì)應(yīng)msg的offset。
- 服務(wù)從消息中心拉取消息,在提交本地線程池執(zhí)行之前,先入隊(duì)列。
- 消息消費(fèi)完之后,通知隊(duì)列中對(duì)應(yīng)的節(jié)點(diǎn),更新狀態(tài)為完成。
- 隊(duì)列頭被更新后出隊(duì)列,提交offset,并判斷新的隊(duì)列頭的狀態(tài),直到遇到state是未完成的head時(shí)阻塞。undefined
方案解析
該方案可以有效利用本地線程的資源,并行的處理,并通過(guò)隊(duì)列和異步通知機(jī)制保證最終commit offset時(shí)有序。
在最差情況下(即head節(jié)點(diǎn)對(duì)應(yīng)的msg最后一個(gè)被處理完),相當(dāng)于等待一批線程處理完成后統(tǒng)一提交。除此之外等待性能都要更優(yōu)。
異步通知的實(shí)現(xiàn)
public class MSGFuture { /*全局變量,存放msg對(duì)應(yīng)的future對(duì)象*/ private static final MapFUTURES = new ConcurrentHashMap (); /*全局不變唯一標(biāo)識(shí)*/ private final long id; /*最長(zhǎng)等待時(shí)間*/ private final int timeout; /*并發(fā)鎖*/ private final Lock lock = new ReentrantLock(); /*通知條件*/ private final Condition done = lock.newCondition(); /*開始時(shí)間*/ private final long start = System.currentTimeMillis(); /*業(yè)務(wù)結(jié)果*/ private volatile Object response; }
//構(gòu)造函數(shù) public MSGFuture(Request request, int timeout) { /*全局自增ID*/ this.id = request.getrId(); /*超時(shí)時(shí)間*/ this.timeout = timeout > 0 ? timeout : 1000; /*放入全局變量*/ FUTURES.put(id, this); }
//業(yè)務(wù)處理結(jié)果更新 public static void received(long id, Object response) { MSGFuture future = FUTURES.remove(id); if (future != null) { future.doReceived(response); } else { logger.warn("response return timeout,id:"+id); } }
//結(jié)果更新,通知等待條件 private void doReceived(Object res) { lock.lock(); try { response = res; done.signal(); } finally { lock.unlock(); } }
//異步等待獲取結(jié)果 public Object get(int timeout) throws TimeoutException { if (!isDone()) { long start = System.currentTimeMillis(); lock.lock(); try { while (!isDone()) { done.await(timeout, TimeUnit.MILLISECONDS); if (isDone() || System.currentTimeMillis() - start > timeout) { break; } } } catch (InterruptedException e) { throw new RuntimeException(e); } finally { lock.unlock(); } if (!isDone()) { throw new TimeoutException(); } } return returnFromResponse(); }
總結(jié)
看到這里,有同學(xué)會(huì)說(shuō),這個(gè)和AQS有啥關(guān)系呀~
其實(shí),只是處理思路的一種借鑒,比如state狀態(tài),比如鎖機(jī)制和通知等待。既然都是多線程任務(wù)協(xié)調(diào),那總有相似之處。
總之一句話,別說(shuō)背八股文沒用,多多了解會(huì)有大幫助~
免責(zé)聲明:本文內(nèi)容由21ic獲得授權(quán)后發(fā)布,版權(quán)歸原作者所有,本平臺(tái)僅提供信息存儲(chǔ)服務(wù)。文章僅代表作者個(gè)人觀點(diǎn),不代表本平臺(tái)立場(chǎng),如有問(wèn)題,請(qǐng)聯(lián)系我們,謝謝!