我們使用MQ作為消息中間件,傳輸一些消息的時候,必須考慮到消息丟失的可能。因為有的時候消息丟失了,會產生很嚴重的后果,比如消息計費數據,跟錢有關的消息。
這篇文章我們以Ro
cketMQ為例來講解,如何設計一套全鏈路消息不丟失的方案。
接下來我們分別講下生產者、broker、消費者,如何確保消息不丟失的。
1、生產者如何確保消息不丟失?
發(fā)送消息的時候,可能存在消息的丟失,就是說可能消息根本就沒有進入到MQ就丟了,我們看下面的圖。
圖1 生產者丟失消息
解決生產者丟失消息,一般有兩種方法。
(1)重試發(fā)消息
RocketMQ生產者發(fā)送消息一般有三種api:
-
同步發(fā)送
-
異步發(fā)送
-
OneWay 發(fā)送
同步發(fā)送,就是生產者向broker發(fā)送消息,阻塞當前線程等待broker響應發(fā)送結果。
異步發(fā)送,就是生產者首先創(chuàng)建一個向broker發(fā)送消息的任務,把該任務提交給線程池,等執(zhí)行完該任務時,回調用戶自定義的回調函數,執(zhí)行處理結果。
Oneway發(fā)送,就是生產者只負責發(fā)送請求,不等待應答,生產者只負責把請求發(fā)出去,而不處理響應結果。
為了確保消息一定發(fā)送到了broker,我們可以采用同步發(fā)送的方式,然后等待發(fā)送的結果。一直等待,如果消息發(fā)送失敗,或者MQ內部異常,我們肯定會收到一個異常,比如請求超時,或者網絡錯誤。
如果我們在收到異常之后,就認為消息到MQ發(fā)送失敗了,然后再次重試嘗試發(fā)送消息到MQ,接著再次同步等待MQ返回響應給我們,這樣反復重試,是否可以確保消息一定會到達MQ?
理論上一些短暫網絡異常的場景下,我們是可以通過不停的重試去保證消息到達MQ的,因為如果短時間網絡異常了消息一直沒法發(fā)送,我們只要不停的重試,網絡一旦恢復了,消息就可以發(fā)送到MQ了。
如果要是反復重試多次還是沒法把消息投遞到MQ,此時我們就可以直接當作消息發(fā)送失敗了。
其代碼就像是這樣的:
try { doSomething(); // 發(fā)送消息到RocketMQ producer.sendMessage();} catch (Exception e) { for (int i = 0; i < 3; i) { // 重試發(fā)消息 producer.sendMessage(); } // 如果重試3次還是發(fā)送失敗,那么此次消息就發(fā)送失敗了。} 另外,如果你是本地先執(zhí)行一些數據庫操作,再把消息發(fā)送到RocketMQ,那么就需要注意把本地事務與發(fā)送消息到RocketMQ放在一個事務里,保證執(zhí)行本地事務和發(fā)送消息要么一起成功,要么一起失敗。
@Transactional(rollbackFor = Exception.class)public void payOrderSuccess() // 執(zhí)行本地事務 try { doSomething(); // 發(fā)送消息到RocketMQ producer.sendMessage(); } catch (Exception e) { for (int i = 0; i < 3; i) { // 重試發(fā)消息 producer.sendMessage(); } // 如果重試3次還是發(fā)送失敗,那么此次消息就發(fā)送失敗了。 throw new Exception(); }} 不過使用這種方式,要考慮到接口耗時問題,如果網絡異常,發(fā)送消息到RocketMQ的請求每次都到超時才返回,那么多次重試可能耗時很久,導致調用payOrderSuccess方法的接口超時異常。
(2)RocketMQ事務
RocketMQ支持事務消息機制,用事務機制保證生產者消息發(fā)送成功,這個方案在業(yè)內還是比較常用的。這個方案落地之后,他可以保證你的本地事務一旦成功,那么消息必然會被投遞到MQ中去,業(yè)務系統(tǒng)的數據也是一致的。
MQ事務機制原理還是有一點復雜的,放著這里講,文章篇幅會過長,所以會單獨起一篇文章講解MQ事務機制。
不管是重試發(fā)消息的方法,還是事務機制,都會大大影響系統(tǒng)的吞吐量。
2、broker如何確保消息不丟失?
假如現在消息提交到MQ里去了,就一定不會丟失嗎?
消息進入MQ后會先落到磁盤上,但寫磁盤的過程,并不是一下子就寫到磁盤上的,而是先進入os cache,再由操作系統(tǒng)的線程不定時刷到磁盤上去。
假如此時這臺機器突然宕機了,os cache里的數據就全部丟失了,此時必然導致你的消息丟失。
那怎么去確保消息寫入MQ之后,MQ自己不要隨便丟失數據呢?
解決這個問題的第一個關鍵點,就是要知道broker的刷盤策略。broker的刷盤策略有兩種:異步刷盤,同步刷盤。
異步刷盤,就是你的消息即使成功寫入了MQ,它也就在機器的os cache中,沒有進入磁盤里,要過一會兒等操作系統(tǒng)自己把os cache里的數據實際刷入磁盤文件中去。
所以異步刷盤模式,寫入消息的吞吐量肯定是非常高的,畢竟消息只需要進入os cache就可以返回了,但是追求了性能,就降低了可用性,消息就有丟失的風險。
所以如果一定要確保數據零丟失的話,可以調整MQ的刷盤策略為同步刷盤。
RocketMQ broker的默認刷盤策略為異步刷盤,即ASYNC_FLUSH??梢詫roker的配置文件中的flushDiskType配置設置為:SYNC_FLUSH同步刷盤。
同步刷盤之后,我們寫入MQ的每條消息,只要MQ告訴我們寫入成功了,那么就表示已經進入了磁盤文件了。
同步刷盤,broker就一定不會丟失數據嗎?如果broker磁盤損壞了呢?
接著我們就要講下,如何避免磁盤故障導致數據丟失。
其實也很簡單,我們必須要對Broker使用主從架構的模式
也就是說,必須讓一個Master Broker有一個Slave Broker去同步它的數據,而且你一條消息寫入成功,必須是讓slave Broker也寫入成功,保證數據有多個冗余的副本。
這樣一來,你一條消息只要寫入成功了,此時主從master Broker和slave broker上都有這條數據了,此時如果你的Master Broker的磁盤損壞了,但是Slave Broker上至少還是有數據的,數據是不會因為磁盤故障而丟失的。
RocketMQ從4.5.0版本開始使用Dledger技術和基于Raft協(xié)議實現,自動故障轉移,有興趣的同學可以自行去查閱相關資料。
3 如何保證消費者消息不丟失?
假如消費者拿到了消息,就一定可以成功處理嗎?
如果消費者從broker拿到一條信息了,但是消息目前還在它的內存里,還沒執(zhí)行具體的業(yè)務邏輯,此時他就直接提交了這條消息的offset到broker去說自己已經處理過了。
接著消費者系統(tǒng)就直接崩潰了,內存里的消息就沒了,業(yè)務邏輯也沒執(zhí)行,結果Broker已經收到他提交的消息offset了,還以為他已經處理完這條消息了。
等消費者系統(tǒng)重啟的時候,就不會再次消費這條消息了,因為已經提交過offset,broker認為你已經成功消費過這條消息了。
所以我們在這里,我們要明確一點,即使你保證發(fā)送消息到MQ的時候絕對不會丟失,而且MQ收到消息之后一定不會把消息搞丟失,但是你的消費者系統(tǒng)在獲取到消息之后還是可能會搞丟。
一般RocketMQ的消費者中會注冊一個監(jiān)聽器,當你的消費者獲取到一批消息之后,就會回調你的這個監(jiān)聽器函數,讓你來處理這一批消息。
consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage( List ConsumeConcurrentlyContext context) { // 執(zhí)行業(yè)務邏輯 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); 處理完畢后,才會返回ConsumeConcurrentlyStatus.CONSUME_SUCCESS作為消費成功的標志,告訴RocketMQ,這批消息我已經處理完畢了。
所以對于RocketMQ而言,只要你的消費者系統(tǒng)是在這個監(jiān)聽器的函數中先處理一批消息,基于這批消息都執(zhí)行完了業(yè)務邏輯,然后返回了那個消費成功的狀態(tài),接著才會去提交這批消息的offset到broker去。
所以在這個情況下,如果你對一批消息都處理完畢了,然后再提交消息的offset給broker,接著消費者系統(tǒng)崩潰了,此時是不會丟失消息的。
但是,如果是消費者系統(tǒng)獲取到一批消息之后,還沒處理完,也就是還沒返回ConsumeConcurrentlyStatus.CONSUME_SUCCESS這個狀態(tài),自然沒提交這批消息的offset給broker呢,此時消費者系統(tǒng)突然掛了,會怎么樣?
在這種情況下,你對一批消息都沒提交他的offset給broker,broker不會認為你已經處理完了這批消息,此時你的消費者系統(tǒng)的一臺機器宕機了,它其實會感知到你的消費者系統(tǒng)的一臺機器作為一個Consumer掛了,它會把你沒處理完的那批消息交給生產者系統(tǒng)的其他機器去進行處理,所以在這種情況下,消息也絕對是不會丟失的。
在默認的Consumer的消費模式之下,必須是你處理完一批消息了,才會返回ConsumeConcurrentlyStatus.CONSUME_SUCCESS這個狀態(tài),表示消息都處理結束了,去提交offset到broker去。在這種情況下,一般來說是不會丟失消息的,即使你一個Consumer宕機了,他會把你沒處理完的消息交給其他Consumer去處理。
但是這里我們要注意一點,就是我們不能在代碼中對消息進行異步的處理,假如我們開啟了一個線程去處理這批消息,然后啟動線程之后,就直接返回ConsumeConcurrentlyStatus.CONSUME_SUCCESS狀態(tài)了。
consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage( List ConsumeConcurrentlyContext context) { new Thread() { public void run() { // 執(zhí)行業(yè)務邏輯 } }.start(); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); 如果要是用這種方式來處理消息的話,那可能就會出現你開啟的線程還沒處理完消息呢,已經返回ConsumeConcurrentlyStatus.CONSUME_SUCCESS狀態(tài)了,就可能提交這批消息的offset給broker了,認為已經處理結束了。
然后此時你消費者系統(tǒng)突然宕機,必然會導致你的消息丟失了!
因此在使用RocketMQ的場景下,我們如果要保證消費數據的時候別丟消息,你就老老實實的在回調函數里處理消息,處理完了你再返回ConsumeConcurrentlyStatus.CONSUME_SUCCESS狀態(tài)表明你處理完畢了。
總結:
基于RocketMQ設計一套全鏈路消息不丟失方案,需要確保生產者、broker、消費者三者都不丟失數據。
(1)生產者不丟失消息
方案1:同步發(fā)送消息 失敗重試;
方案2:事務消息機制;
(2)broker不丟失消息,開啟同步刷盤策略 主從架構同步機制。
只要讓一個master Broker收到消息之后同步寫入磁盤,同時同步復制給其他slave Broker,再返回成功響應給生產者,此時就可以保證MQ自己不會弄丟消息
(3)消費者不丟失消息,采用RocketMQ的消費者天然就可以保證你處理完消息之后,才會提交消息的offset到broker去,不過別采用多線程異步處理消息的方式。
雖然這一整套消息不丟失方案,可以確保消息流轉過程中不丟失。但顯而易見的是,你用了這套方案之后,會讓你整個從頭到尾的消息流轉鏈路的性能大幅度下降,讓你的MQ的吞吐量大幅度的下降。
所以一般大家不要隨便一個業(yè)務里就上如此重的一套方案,要明白這背后的成本!
一般我們建議,對于跟金錢、交易以及核心數據相關的系統(tǒng)和核心鏈路,可以上這套消息零丟失方案。
而對于其他大部分沒那么核心的場景和系統(tǒng),其實即使丟失一些數據,也不會導致太大的問題,此時可以不采取這些方案,或者說你可以在其他的場景里做一些簡化。
ckname="架構師社區(qū)" data-alias="devabc" data-signature="架構師社區(qū),專注分享架構師技術干貨,架構師行業(yè)秘聞,匯集各類奇妙好玩的架構師話題和流行的架構師動向!" data-from="0">
本站聲明: 本文章由作者或相關機構授權發(fā)布,目的在于傳遞更多信息,并不代表本站贊同其觀點,本站亦不保證或承諾內容真實性等。需要轉載請聯(lián)系該專欄作者,如若文章內容侵犯您的權益,請及時聯(lián)系本站刪除。