www.久久久久|狼友网站av天堂|精品国产无码a片|一级av色欲av|91在线播放视频|亚洲无码主播在线|国产精品草久在线|明星AV网站在线|污污内射久久一区|婷婷综合视频网站

當前位置:首頁 > 公眾號精選 > 架構師社區(qū)
[導讀]我們使用MQ作為消息中間件,傳輸一些消息的時候,必須考慮到消息丟失的可能。因為有的時候消息丟失了,會產生很嚴重的后果,比如消息計費數據,跟錢有關的消息。這篇文章我們以RocketMQ為例來講解,如何設計一套全鏈路消息不丟失的方案。接下來我們分別講下生產者、broker、消費者,如...

我們使用MQ作為消息中間件,傳輸一些消息的時候,必須考慮到消息丟失的可能。因為有的時候消息丟失了,會產生很嚴重的后果,比如消息計費數據,跟錢有關的消息。



這篇文章我們以RocketMQ為例來講解,如何設計一套全鏈路消息不丟失的方案。



接下來我們分別講下生產者、broker、消費者,如何確保消息不丟失的。



1、生產者如何確保消息不丟失?



發(fā)送消息的時候,可能存在消息的丟失,就是說可能消息根本就沒有進入到MQ就丟了,我們看下面的圖。



圖1 生產者丟失消息



解決生產者丟失消息,一般有兩種方法。



(1)重試發(fā)消



RocketMQ生產者發(fā)送消息一般有三種api:
  • 同步發(fā)送
  • 異步發(fā)送
  • OneWay 發(fā)送


同步發(fā)送,就是生產者向broker發(fā)送消息,阻塞當前線程等待broker響應發(fā)送結果。



異步發(fā)送,就是生產者首先創(chuàng)建一個向broker發(fā)送消息的任務,把該任務提交給線程池,等執(zhí)行完該任務時,回調用戶自定義的回調函數,執(zhí)行處理結果。



Oneway發(fā)送,就是生產者只負責發(fā)送請求,不等待應答,生產者只負責把請求發(fā)出去,而不處理響應結果。



為了確保消息一定發(fā)送到了broker,我們可以采用同步發(fā)送的方式,然后等待發(fā)送的結果。一直等待,如果消息發(fā)送失敗,或者MQ內部異常,我們肯定會收到一個異常,比如請求超時,或者網絡錯誤。



如果我們在收到異常之后,就認為消息到MQ發(fā)送失敗了,然后再次重試嘗試發(fā)送消息到MQ,接著再次同步等待MQ返回響應給我們,這樣反復重試,是否可以確保消息一定會到達MQ?




理論上一些短暫網絡異常的場景下,我們是可以通過不停的重試去保證消息到達MQ的,因為如果短時間網絡異常了消息一直沒法發(fā)送,我們只要不停的重試,網絡一旦恢復了,消息就可以發(fā)送到MQ了。



如果要是反復重試多次還是沒法把消息投遞到MQ,此時我們就可以直接當作消息發(fā)送失敗了。



其代碼就像是這樣的:


try { doSomething(); // 發(fā)送消息到RocketMQ producer.sendMessage();} catch (Exception e) { for (int i = 0; i < 3; i) { // 重試發(fā)消息 producer.sendMessage(); } // 如果重試3次還是發(fā)送失敗,那么此次消息就發(fā)送失敗了。} 另外,如果你是本地先執(zhí)行一些數據庫操作,再把消息發(fā)送到RocketMQ,那么就需要注意把本地事務與發(fā)送消息到RocketMQ放在一個事務里,保證執(zhí)行本地事務和發(fā)送消息要么一起成功,要么一起失敗。


@Transactional(rollbackFor = Exception.class)public void payOrderSuccess() // 執(zhí)行本地事務 try { doSomething(); // 發(fā)送消息到RocketMQ producer.sendMessage(); } catch (Exception e) { for (int i = 0; i < 3; i) { // 重試發(fā)消息 producer.sendMessage(); } // 如果重試3次還是發(fā)送失敗,那么此次消息就發(fā)送失敗了。 throw new Exception(); }} 不過使用這種方式,要考慮到接口耗時問題,如果網絡異常,發(fā)送消息到RocketMQ的請求每次都到超時才返回,那么多次重試可能耗時很久,導致調用payOrderSuccess方法的接口超時異常。




(2)RocketMQ事務



RocketMQ支持事務消息機制,用事務機制保證生產者消息發(fā)送成功,這個方案在業(yè)內還是比較常用的。這個方案落地之后,他可以保證你的本地事務一旦成功,那么消息必然會被投遞到MQ中去,業(yè)務系統(tǒng)的數據也是一致的。



MQ事務機制原理還是有一點復雜的,放著這里講,文章篇幅會過長,所以會單獨起一篇文章講解MQ事務機制。



不管是重試發(fā)消息的方法,還是事務機制,都會大大影響系統(tǒng)的吞吐量。



2、broker如何確保消息不丟失?



假如現在消息提交到MQ里去了,就一定不會丟失嗎?



消息進入MQ后會先落到磁盤上,但寫磁盤的過程,并不是一下子就寫到磁盤上的,而是先進入os cache,再由操作系統(tǒng)的線程不定時刷到磁盤上去。



假如此時這臺機器突然宕機了,os cache里的數據就全部丟失了,此時必然導致你的消息丟失。





那怎么去確保消息寫入MQ之后,MQ自己不要隨便丟失數據呢?



解決這個問題的第一個關鍵點,就是要知道broker的刷盤策略。broker的刷盤策略有兩種:異步刷盤,同步刷盤。



異步刷盤,就是你的消息即使成功寫入了MQ,它也就在機器的os cache中,沒有進入磁盤里,要過一會兒等操作系統(tǒng)自己把os cache里的數據實際刷入磁盤文件中去。



所以異步刷盤模式,寫入消息的吞吐量肯定是非常高的,畢竟消息只需要進入os cache就可以返回了,但是追求了性能,就降低了可用性,消息就有丟失的風險。



所以如果一定要確保數據零丟失的話,可以調整MQ的刷盤策略為同步刷盤。



RocketMQ broker的默認刷盤策略為異步刷盤,即ASYNC_FLUSH??梢詫roker的配置文件中的flushDiskType配置設置為:SYNC_FLUSH同步刷盤。



同步刷盤之后,我們寫入MQ的每條消息,只要MQ告訴我們寫入成功了,那么就表示已經進入了磁盤文件了。



同步刷盤,broker就一定不會丟失數據嗎?如果broker磁盤損壞了呢?



接著我們就要講下,如何避免磁盤故障導致數據丟失。



其實也很簡單,我們必須要對Broker使用主從架構的模式



也就是說,必須讓一個Master Broker有一個Slave Broker去同步它的數據,而且你一條消息寫入成功,必須是讓slave Broker也寫入成功,保證數據有多個冗余的副本。





這樣一來,你一條消息只要寫入成功了,此時主從master Broker和slave broker上都有這條數據了,此時如果你的Master Broker的磁盤損壞了,但是Slave Broker上至少還是有數據的,數據是不會因為磁盤故障而丟失的。



RocketMQ從4.5.0版本開始使用Dledger技術和基于Raft協(xié)議實現,自動故障轉移,有興趣的同學可以自行去查閱相關資料。



3 如何保證消費者消息不丟失?



假如消費者拿到了消息,就一定可以成功處理嗎?




如果消費者從broker拿到一條信息了,但是消息目前還在它的內存里,還沒執(zhí)行具體的業(yè)務邏輯,此時他就直接提交了這條消息的offset到broker去說自己已經處理過了。



接著消費者系統(tǒng)就直接崩潰了,內存里的消息就沒了,業(yè)務邏輯也沒執(zhí)行,結果Broker已經收到他提交的消息offset了,還以為他已經處理完這條消息了。



等消費者系統(tǒng)重啟的時候,就不會再次消費這條消息了,因為已經提交過offset,broker認為你已經成功消費過這條消息了。



所以我們在這里,我們要明確一點,即使你保證發(fā)送消息到MQ的時候絕對不會丟失,而且MQ收到消息之后一定不會把消息搞丟失,但是你的消費者系統(tǒng)在獲取到消息之后還是可能會搞丟。



一般RocketMQ的消費者中會注冊一個監(jiān)聽器,當你的消費者獲取到一批消息之后,就會回調你的這個監(jiān)聽器函數,讓你來處理這一批消息。


consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage( List ConsumeConcurrentlyContext context) { // 執(zhí)行業(yè)務邏輯 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); 處理完畢后,才會返回ConsumeConcurrentlyStatus.CONSUME_SUCCESS作為消費成功的標志,告訴RocketMQ,這批消息我已經處理完畢了。



所以對于RocketMQ而言,只要你的消費者系統(tǒng)是在這個監(jiān)聽器的函數中先處理一批消息,基于這批消息都執(zhí)行完了業(yè)務邏輯,然后返回了那個消費成功的狀態(tài),接著才會去提交這批消息的offset到broker去。



所以在這個情況下,如果你對一批消息都處理完畢了,然后再提交消息的offset給broker,接著消費者系統(tǒng)崩潰了,此時是不會丟失消息的。



但是,如果是消費者系統(tǒng)獲取到一批消息之后,還沒處理完,也就是還沒返回ConsumeConcurrentlyStatus.CONSUME_SUCCESS這個狀態(tài),自然沒提交這批消息的offset給broker呢,此時消費者系統(tǒng)突然掛了,會怎么樣?



在這種情況下,你對一批消息都沒提交他的offset給broker,broker不會認為你已經處理完了這批消息,此時你的消費者系統(tǒng)的一臺機器宕機了,它其實會感知到你的消費者系統(tǒng)的一臺機器作為一個Consumer掛了,它會把你沒處理完的那批消息交給生產者系統(tǒng)的其他機器去進行處理,所以在這種情況下,消息也絕對是不會丟失的。



在默認的Consumer的消費模式之下,必須是你處理完一批消息了,才會返回ConsumeConcurrentlyStatus.CONSUME_SUCCESS這個狀態(tài),表示消息都處理結束了,去提交offset到broker去。在這種情況下,一般來說是不會丟失消息的,即使你一個Consumer宕機了,他會把你沒處理完的消息交給其他Consumer去處理。



但是這里我們要注意一點,就是我們不能在代碼中對消息進行異步的處理,假如我們開啟了一個線程去處理這批消息,然后啟動線程之后,就直接返回ConsumeConcurrentlyStatus.CONSUME_SUCCESS狀態(tài)了。


consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage( List ConsumeConcurrentlyContext context) { new Thread() { public void run() { // 執(zhí)行業(yè)務邏輯 } }.start(); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); 如果要是用這種方式來處理消息的話,那可能就會出現你開啟的線程還沒處理完消息呢,已經返回ConsumeConcurrentlyStatus.CONSUME_SUCCESS狀態(tài)了,就可能提交這批消息的offset給broker了,認為已經處理結束了。



然后此時你消費者系統(tǒng)突然宕機,必然會導致你的消息丟失了!



因此在使用RocketMQ的場景下,我們如果要保證消費數據的時候別丟消息,你就老老實實的在回調函數里處理消息,處理完了你再返回ConsumeConcurrentlyStatus.CONSUME_SUCCESS狀態(tài)表明你處理完畢了。



總結:



基于RocketMQ設計一套全鏈路消息不丟失方案,需要確保生產者、broker、消費者三者都不丟失數據。



(1)生產者不丟失消息



方案1:同步發(fā)送消息  失敗重試;
方案2:事務消息機制;
(2)broker不丟失消息,開啟同步刷盤策略 主從架構同步機制。



只要讓一個master Broker收到消息之后同步寫入磁盤,同時同步復制給其他slave Broker,再返回成功響應給生產者,此時就可以保證MQ自己不會弄丟消息



(3)消費者不丟失消息,采用RocketMQ的消費者天然就可以保證你處理完消息之后,才會提交消息的offset到broker去,不過別采用多線程異步處理消息的方式。



雖然這一整套消息不丟失方案,可以確保消息流轉過程中不丟失。但顯而易見的是,你用了這套方案之后,會讓你整個從頭到尾的消息流轉鏈路的性能大幅度下降,讓你的MQ的吞吐量大幅度的下降。



所以一般大家不要隨便一個業(yè)務里就上如此重的一套方案,要明白這背后的成本!



一般我們建議,對于跟金錢、交易以及核心數據相關的系統(tǒng)和核心鏈路,可以上這套消息零丟失方案。



而對于其他大部分沒那么核心的場景和系統(tǒng),其實即使丟失一些數據,也不會導致太大的問題,此時可以不采取這些方案,或者說你可以在其他的場景里做一些簡化。


ckname="架構師社區(qū)" data-alias="devabc" data-signature="架構師社區(qū),專注分享架構師技術干貨,架構師行業(yè)秘聞,匯集各類奇妙好玩的架構師話題和流行的架構師動向!" data-from="0">


本站聲明: 本文章由作者或相關機構授權發(fā)布,目的在于傳遞更多信息,并不代表本站贊同其觀點,本站亦不保證或承諾內容真實性等。需要轉載請聯(lián)系該專欄作者,如若文章內容侵犯您的權益,請及時聯(lián)系本站刪除。
換一批
延伸閱讀

廣告科技領導者Kira LeBlanc晉升為全球首席營銷官  蒙特利爾和多倫多2022年4月1日 /美通社/ -- 全球最大的獨立程序化數字戶外(DOOH)廣告技術公司之一Hivestack今天宣布...

關鍵字: ck

(全球TMT2022年4月1日訊)獨立程序化數字戶外(DOOH)廣告技術公司Hivestack宣布任命Kira LeBlanc為全球首席營銷官。LeBlanc于2021年初Hivestack宣布其全球擴張計劃時加入該公...

關鍵字: ck

2021年全年多項業(yè)績指標再創(chuàng)新高; “企業(yè)數字化運營解決方案”全年收入持續(xù)三位數同比增長; “SaaS+X”商業(yè)模式為“企業(yè)數字化運營解決方案”的迅猛增長...

關鍵字: ic ck

(全球TMT2022年3月24日訊)Shutterstock, Inc.是一個全球領先的創(chuàng)意平臺,為眾多品牌、企業(yè)和媒體公司提供全方位服務解決方案、高質量內容及創(chuàng)意工作流程解決方案。該公司宣布在其已有十年傳統(tǒng)的年度奧斯...

關鍵字: ck

在其推出年度“奧斯卡流行藝術!”活動系列10周年之際,Shutterstock內部創(chuàng)意團隊立足其平臺逾4億創(chuàng)意資產,創(chuàng)作原創(chuàng)波普藝術風格作品...

關鍵字: ck

倫敦2022年3月15日 /美通社/ -- Warwick Investment Group在貝爾格拉維亞的伊布里大道(Ebury Street)收購了五處相毗鄰的永久產權房產,共25套公寓,由此完成了該公司迄今為止規(guī)模...

關鍵字: ic ck

Hivestack 任命前三星廣告 AdTech 資深人士 Mina Naguib 擔任首席技術官 加拿大蒙特利爾2022年2月7 日 /美通社/ -- Hivestack——全球領先的獨...

關鍵字: ck

(全球TMT2022年2月7日訊)獨立程序化數字戶外?(DOOH) 廣告技術公司Hivestack,宣布聘請前三星廣告技術資深人士?Mina Naguib 擔任首席技術官。Naguib 將直接向首席執(zhí)行官?Andrea...

關鍵字: 三星 ck

新加坡、菲律賓馬尼拉和曼谷2022年1月26日 /美通社/ -- 東南亞技術驅動型物流平臺 Inteluck 宣布,今天已獲得 1500 萬美元的&n...

關鍵字: ck

- 這家全球支付處理商的估值達到400億美元,迄今已累計籌得18億美元 - 主要投資者包括奧特米特(Altimeter)、德龍集團(Dragoneer)、富蘭克林鄧普頓(F...

關鍵字: ck

架構師社區(qū)

1736 篇文章

關注

發(fā)布文章

編輯精選

技術子站

關閉