-
出身:誕生于金融行業(yè)的消息隊(duì)列 -
語(yǔ)言:Erlang -
協(xié)議:AMQP(Advanced Message Queuing Protocol 高級(jí)消息隊(duì)列協(xié)議) -
關(guān)鍵詞:內(nèi)存隊(duì)列,高可用,一條消息
隊(duì)列結(jié)構(gòu)

-
Producer/Consumer:生產(chǎn)者消費(fèi)者 -
Exchange:交換器,可以理解為隊(duì)列的路由邏輯,交換器主要有三種,圖中是Direct交換器 -
Queue:隊(duì)列 -
Binding:綁定關(guān)系,實(shí)際是交換器上映射隊(duì)列的規(guī)則
發(fā)送和消費(fèi)一條消息
在上圖的模式下,交換器的類型為Direct,偽代碼表示消息的生產(chǎn)和消費(fèi)
消息生產(chǎn)
#消息發(fā)送方法
#messageBody?消息體
#exchangeName?交換器名稱
#routingKey?路由鍵
publishMsg(messageBody,exchangeName,routingKey){
?......
}
#消息發(fā)送
publishMsg("This?is?a?warning?log","exchange","log.warning");

RoutingKey=log.warning,和隊(duì)列A與交換器的綁定一致,所以消息被路由到了隊(duì)列A上。
消息消費(fèi)
對(duì)于消息消費(fèi)而言,消費(fèi)者直接指定要消費(fèi)的隊(duì)列即可,比如指定消費(fèi)隊(duì)列A的數(shù)據(jù)。
需要注意的是,在消費(fèi)者消費(fèi)完成數(shù)據(jù)后,返回給RabbitMq ACK消息,RabbitMq會(huì)刪掉隊(duì)列中的該條信息。

多種消息路由模式
在Exchange這個(gè)模塊上,RabbitMq主要支持了Direct,F(xiàn)anout,Topic三種路由模式,RabbitMq在路由模式上下功夫,也說(shuō)明了他在設(shè)計(jì)上想要滿足多樣化的需求。

Direct和Fanout模式比較好理解,類似于單播和廣播模式,Topic模式比較有意思,它支持自定義匹配規(guī)則,按照規(guī)則把所有滿足條件的消息路由到指定隊(duì)列,能夠幫助開發(fā)者靈活應(yīng)對(duì)各類需求。
消息的存儲(chǔ)
RabbitMQ的消息默認(rèn)是在內(nèi)存里的,實(shí)際上不光是消息,Exchange路由等信息實(shí)際都在內(nèi)存中。內(nèi)存的優(yōu)點(diǎn)是高性能,問題在于故障后無(wú)法恢復(fù)。所以RabbitMQ也支持持久化的存儲(chǔ),也就是寫磁盤。
要在RabbitMQ中持久化消息,要同時(shí)滿足三個(gè)條件:
-
消息投體時(shí)使用持久化投遞模式 -
目標(biāo)交換器是配置為持久化的 -
目標(biāo)隊(duì)列是配置為持久化的
RabbitMQ持久化消息的方式是常見的寫日志方式:
-
當(dāng)一條持久化消息發(fā)送到持久化的Exchange上時(shí),RabbitMQ會(huì)在消息提交到日志文件后,才發(fā)送響應(yīng)。 -
一旦這條消息被消費(fèi)后,RabbitMQ會(huì)將會(huì)把日志中該條消息標(biāo)記為等待垃圾收集,之后會(huì)從日志中清除。 -
如果出現(xiàn)故障,自動(dòng)重建Exchange,Bindings和Queue,同時(shí)通過重播持久化日志來(lái)恢復(fù)消息。
消息持久化的優(yōu)缺點(diǎn)很明顯,擁有故障恢復(fù)能力的同時(shí),也帶來(lái)了性能的急劇下降。同時(shí),由于RabbitMQ默認(rèn)情況下是沒有冗余的,假設(shè)一個(gè)持久化節(jié)點(diǎn)崩潰,一致到該節(jié)點(diǎn)恢復(fù)前,消息和隊(duì)列都無(wú)法恢復(fù)。
消息投遞模式
1.發(fā)后即忘
RabbitMQ默認(rèn)發(fā)布消息是不會(huì)返回任何結(jié)果給生產(chǎn)者的,所以存在發(fā)送過程中丟失數(shù)據(jù)的風(fēng)險(xiǎn)。
2.AMQP事務(wù)
AMQP事務(wù)保證RabbitMQ不僅收到了消息,并成功將消息路由到了所有匹配的訂閱隊(duì)列,AMQP事務(wù)將使得生產(chǎn)者和RabbitMQ產(chǎn)生同步。
雖然事務(wù)使得生產(chǎn)者可以確定消息已經(jīng)到達(dá)RabbitMQ中的對(duì)應(yīng)隊(duì)列,但是卻會(huì)降低2~10倍的消息吞吐量。
3.發(fā)送方確認(rèn)
開啟發(fā)送方確認(rèn)模式后,消息會(huì)有一個(gè)唯一的ID,一旦消息被投遞給所有匹配的隊(duì)列后,會(huì)回調(diào)給發(fā)送方應(yīng)用程序(包含消息的唯一ID),使得生產(chǎn)者知道消息已經(jīng)安全到達(dá)隊(duì)列了。
如果消息和隊(duì)列是配置成了持久化,這個(gè)確認(rèn)消息只會(huì)在隊(duì)列將消息寫入磁盤后才會(huì)返回。如果RabbitMQ內(nèi)部發(fā)生了錯(cuò)誤導(dǎo)致這條消息丟失,那么RabbitMQ會(huì)發(fā)送一條nack消息,當(dāng)然我理解這個(gè)是不能保證的。
這種模式由于不存在事務(wù)回滾,同時(shí)整體仍然是一個(gè)異步過程,所以更加輕量級(jí),對(duì)服務(wù)器性能的影響很小。
RabbitMQ RPC
一般的異步服務(wù)間,可能會(huì)用兩組隊(duì)列實(shí)現(xiàn)兩個(gè)服務(wù)模塊之前的異步通信,有趣的是RabbitMQ就內(nèi)建了這個(gè)功能。
RabbitMQ支持消息應(yīng)答功能,每個(gè)AMQP消息頭中有一個(gè)Reply_to字段,通過該字段指定消息返回到的隊(duì)列名稱(這是一個(gè)私有隊(duì)列)消息的生產(chǎn)者可以監(jiān)聽該字段對(duì)應(yīng)的隊(duì)列。

RabbitMQ集群
RabbitMQ集群的設(shè)計(jì)目標(biāo):
-
允許消費(fèi)者和生產(chǎn)者在RabbitMQ節(jié)點(diǎn)崩潰的情況下繼續(xù)運(yùn)行 -
能過通過添加節(jié)點(diǎn)來(lái)線性擴(kuò)展消息通信吞吐量
從實(shí)際結(jié)果看,RabbitMQ完成設(shè)計(jì)目標(biāo)上并不十分出色,主要原因在于默認(rèn)的模式下,RabbitMQ的隊(duì)列實(shí)例子只存在在一個(gè)節(jié)點(diǎn)上(雖然后續(xù)也支持了鏡像隊(duì)列),既不能保證該節(jié)點(diǎn)崩潰的情況下隊(duì)列還可以繼續(xù)運(yùn)行,也不能線性擴(kuò)展該隊(duì)列的吞吐量。
集群結(jié)構(gòu)
RabbitMQ內(nèi)部的元數(shù)據(jù)主要有:
-
隊(duì)列元數(shù)據(jù)-隊(duì)列名稱和屬性 -
交換器元數(shù)據(jù)-交換器名稱,類型和屬性 -
綁定元數(shù)據(jù)-路由信息
雖然RabbitMQ的隊(duì)列實(shí)際只會(huì)在一個(gè)節(jié)點(diǎn)上,但元數(shù)據(jù)可以存在各個(gè)節(jié)點(diǎn)上。舉個(gè)例子來(lái)說(shuō),當(dāng)創(chuàng)建一個(gè)新的交換器時(shí),RabbitMQ會(huì)把該信息同步到所有節(jié)點(diǎn)上,這個(gè)時(shí)候客戶端不管連接的那個(gè)RabbitMQ節(jié)點(diǎn),都可以訪問到這個(gè)新的交換器,也就能找到交換器下的隊(duì)列。

如上圖所示,隊(duì)列A的實(shí)例實(shí)際只在一個(gè)RabbitMQ節(jié)點(diǎn)上,其它節(jié)點(diǎn)實(shí)際存儲(chǔ)的是只想該隊(duì)列的指針。
為什么RabbitMQ不在各個(gè)節(jié)點(diǎn)間做復(fù)制了,《RabbitMQ實(shí)戰(zhàn)》給出了兩個(gè)原因:
-
存儲(chǔ)成本-RabbitMQ作為內(nèi)存隊(duì)列,復(fù)制對(duì)存儲(chǔ)空間的影響,畢竟內(nèi)存是昂貴而有限的 -
性能損耗-發(fā)布消息需要將消息復(fù)制到所有節(jié)點(diǎn),特別是對(duì)于持久化隊(duì)列而言,性能的影響會(huì)很大
我理解成本這個(gè)原因并不完全成立,復(fù)制并不一定要復(fù)制到所有節(jié)點(diǎn),比如一個(gè)隊(duì)列可以只做兩個(gè)副本,復(fù)制帶來(lái)的內(nèi)存成本可以交給使用方來(lái)評(píng)估,畢竟在內(nèi)存中沒有堆積的情況下,實(shí)際上隊(duì)列是不會(huì)占用多大內(nèi)存的。
還有一點(diǎn)是RabbitMQ本身并沒有保證消息消費(fèi)的有序性,所以實(shí)際上隊(duì)列被Partition到各個(gè)節(jié)點(diǎn)上,這樣才能真正達(dá)到線性擴(kuò)容的目的(以RabbitMQ的現(xiàn)狀來(lái)說(shuō),單隊(duì)列實(shí)際是無(wú)法擴(kuò)容的,只有在業(yè)務(wù)層做切分)。
注:RabbitMQ集群中的節(jié)點(diǎn)可以是內(nèi)存節(jié)點(diǎn)也可以是磁盤節(jié)點(diǎn),但要求至少有一個(gè)磁盤節(jié)點(diǎn),這樣出現(xiàn)故障時(shí)才能恢復(fù)數(shù)據(jù)。
鏡像隊(duì)列
鏡像隊(duì)列架構(gòu)
RabbitMQ自己也考慮到了我們之前分析的單節(jié)點(diǎn)長(zhǎng)時(shí)間故障無(wú)法恢復(fù)的問題,所以RabbitMQ 2.6.0之后它也支持了鏡像隊(duì)列,換個(gè)說(shuō)法也就是副本。

除了發(fā)送消息,所有的操作實(shí)際都在主拷貝上,從拷貝實(shí)際只是個(gè)冷備(默認(rèn)的情況下所有RabbitMQ節(jié)點(diǎn)上都會(huì)有鏡像隊(duì)列的拷貝),如果使用消息確認(rèn)模式,RabbitMQ會(huì)在主拷貝和從拷貝都安全的接受到消息時(shí)才通知生產(chǎn)者。
從這個(gè)結(jié)構(gòu)上來(lái)看,如果從拷貝的節(jié)點(diǎn)掛了,實(shí)際沒有任何影響,如果主拷貝掛了,那么會(huì)有一個(gè)從新選主的過程,這也是鏡像隊(duì)列的優(yōu)點(diǎn),除非所有節(jié)點(diǎn)都掛了,才會(huì)導(dǎo)致消息丟失。重新選主后,RabbitMQ會(huì)給消費(fèi)者一個(gè)消費(fèi)者取消通知(Consumer Cancellation),讓消費(fèi)者重連新的主拷貝。
鏡像隊(duì)列原理
1.RabbitMQ結(jié)構(gòu)

-
AMQPQueue:負(fù)責(zé)AMQP協(xié)議相關(guān)的消息處理,包括接收消息,投遞消息,Confirm消息等 -
BackingQueue:提供AMQQueue調(diào)用的接口,完成消息的存儲(chǔ)和持久化工作
BackingQueue由Q1,Q2,Delta,Q3,Q4五個(gè)子隊(duì)列構(gòu)成,在Backing中,消息的生命周期有四個(gè)狀態(tài):
-
Alpha:消息的內(nèi)容和消息索引都在RAM中。(Q1,Q4) -
Beta:消息的內(nèi)容保存在Disk上,消息索引保存在RAM中。(Q2,Q3) -
Gamma:消息的內(nèi)容保存在Disk上,消息索引在DISK和RAM上都有。(Q2,Q3) -
Delta:消息內(nèi)容和索引都在Disk上。(Delta)
這里以持久化消息為例(可以看到非持久化消息的生命周期會(huì)簡(jiǎn)單很多),從Q1到Q4,消息實(shí)際經(jīng)歷了一個(gè)RAM->DISK->RAM這樣的過程,BackingQueue這么設(shè)計(jì)的目的有點(diǎn)類似于Linux的Swap,當(dāng)隊(duì)列負(fù)載很高時(shí),通過將部分消息放到磁盤上來(lái)節(jié)省內(nèi)存空間,當(dāng)負(fù)載降低時(shí),消息又從磁盤回到內(nèi)存中,讓整個(gè)隊(duì)列有很好的彈性。因此觸發(fā)消息流動(dòng)的主要因素是:1.消息被消費(fèi);2.內(nèi)存不足。
RabbitMQ會(huì)更具消息的傳輸速度來(lái)計(jì)算當(dāng)前內(nèi)存中允許保存的最大消息數(shù)量(Traget_RAM_Count),當(dāng):內(nèi)存中保存的消息數(shù)量+等待ACK的消息數(shù)量>Target_RAM_Count時(shí),RabbitMQ才會(huì)把消息寫到磁盤上,所以說(shuō)雖然理論上消息會(huì)按照Q1->Q2->Delta->Q3->Q4的順序流動(dòng),但是并不是每條消息都會(huì)經(jīng)歷所有的子隊(duì)列以及對(duì)應(yīng)的生命周期。
從RabbitMQ的Backing Queue結(jié)構(gòu)來(lái)看,當(dāng)內(nèi)部不足時(shí),消息要經(jīng)歷多個(gè)生命周期,在Disk和RAM之間置換,者實(shí)際會(huì)降低RabbitMQ的處理性能(后續(xù)的流控就是關(guān)聯(lián)的解決方法)。
2.鏡像隊(duì)列結(jié)構(gòu)

所有對(duì)鏡像隊(duì)列主拷貝的操作,都會(huì)通過Guarented Multicasting(GM)同步到各個(gè)Salve節(jié)點(diǎn),Coodinator負(fù)責(zé)組播結(jié)果的確認(rèn)。
GM是一種可靠的組播通信協(xié)議,保證組組內(nèi)的存活節(jié)點(diǎn)都收到消息。

GM的主播并不是由Master節(jié)點(diǎn)來(lái)負(fù)責(zé)通知所有Slave的(目的是為了避免Master壓力過大,同時(shí)避免Master失效導(dǎo)致消息無(wú)法最終Ack),RabbitMQ把一個(gè)鏡像隊(duì)列的所有節(jié)點(diǎn)組成一個(gè)鏈表,由主拷貝發(fā)起,由主拷貝最終確認(rèn)通知到了所有的Slave,而中間由Slave接力的方式進(jìn)行消息傳播。
從這個(gè)結(jié)構(gòu)來(lái)看,消息完成整個(gè)鏡像隊(duì)列的同步耗時(shí)理論上是不低的,但是由于RabbitMQ消息的消息確認(rèn)本身是異步的模式,所以整體的吞吐量并不會(huì)受到太大影響。
流控
當(dāng)RabbitMQ出現(xiàn)內(nèi)存(默認(rèn)是0.4)或者磁盤資源達(dá)到閾值時(shí),會(huì)觸發(fā)流控機(jī)制,阻塞Producer的Connection,讓生產(chǎn)者不能繼續(xù)發(fā)送消息,直到內(nèi)存或者磁盤資源得到釋放。
RabbitMQ基于Erlang/OTP開發(fā),一個(gè)消息的生命周期中,會(huì)涉及多個(gè)進(jìn)程間的轉(zhuǎn)發(fā),這些Erlang進(jìn)程之間不共享內(nèi)存,每個(gè)進(jìn)程都有自己獨(dú)立的內(nèi)存空間,如果沒有合適的流控機(jī)制,可能會(huì)導(dǎo)致某個(gè)進(jìn)程占用內(nèi)存過大,導(dǎo)致OOM。因此,要保證各個(gè)進(jìn)程占用的內(nèi)容在一個(gè)合理的范圍,RabbitMQ的流控采用了一種信用證機(jī)制(Credit),為每個(gè)進(jìn)程維護(hù)了四類鍵值對(duì):
-
{credit_from,From}-該值表示還能向消息接收進(jìn)程From發(fā)送多少條消息 -
{credit_to,To}-表示當(dāng)前進(jìn)程再接收多少條消息,就要向消息發(fā)送進(jìn)程增加Credit數(shù)量 -
credit_blocked-表示當(dāng)前進(jìn)程被哪些進(jìn)程block了,比如進(jìn)程A向B發(fā)送消息,那么當(dāng)A的進(jìn)程字典中{credit_from,B}的值為0是,那么A的credit_blocked值為[B] -
credit_deferred-消息接收進(jìn)程向消息發(fā)送進(jìn)程增加Credit的消息列表,當(dāng)進(jìn)程被Block時(shí)會(huì)記錄消息信息,Unblock后依次發(fā)送這些消息

如圖所示,A進(jìn)程當(dāng)前可以發(fā)送給B的消息有100條,每發(fā)一次,值減1,直到為0,A才會(huì)被Block住。B消費(fèi)消息后,會(huì)給A增加新的Credit,這樣A才可以持續(xù)的發(fā)送消息。這里只畫了兩個(gè)進(jìn)程,多進(jìn)程串聯(lián)的情況下,這中影響也就是從底向上傳遞的。
想學(xué)習(xí)Java工程化、分布式架構(gòu)、高并發(fā)、高性能、深入淺出、微服務(wù)架構(gòu)、Spring,MyBatis,Netty源碼分析等技術(shù)可以加群:479499375,群里有阿里大牛直播講解技術(shù),以及Java大型互聯(lián)網(wǎng)技術(shù)的視頻免費(fèi)分享給大家,歡迎進(jìn)群一起深入交流學(xué)習(xí)。
總結(jié)
注:本文基于的RabbitMQ材料可能較為陳舊,新的RabbitMQ可能會(huì)有不同的功能特性
整體來(lái)看,RabbitMQ的功能比較豐富(可惜沒有看到延遲,優(yōu)先級(jí)等功能),更適用于偏實(shí)時(shí)的業(yè)務(wù)場(chǎng)景,與Kafka這樣的隊(duì)列定位上有明顯的區(qū)別。它本身應(yīng)該是一個(gè)簡(jiǎn)單健壯的組件,但如果要應(yīng)用在一個(gè)大規(guī)模的分布式系統(tǒng)中,實(shí)際還是需要做一些外部的再次開發(fā),以解決我們前面提到的隊(duì)列存儲(chǔ)單點(diǎn),流控等問題。直觀上看它的運(yùn)維成本是會(huì)比較高的,需要使用方有一定的經(jīng)驗(yàn)。
特別推薦一個(gè)分享架構(gòu)+算法的優(yōu)質(zhì)內(nèi)容,還沒關(guān)注的小伙伴,可以長(zhǎng)按關(guān)注一下:
長(zhǎng)按訂閱更多精彩▼
如有收獲,點(diǎn)個(gè)在看,誠(chéng)摯感謝
免責(zé)聲明:本文內(nèi)容由21ic獲得授權(quán)后發(fā)布,版權(quán)歸原作者所有,本平臺(tái)僅提供信息存儲(chǔ)服務(wù)。文章僅代表作者個(gè)人觀點(diǎn),不代表本平臺(tái)立場(chǎng),如有問題,請(qǐng)聯(lián)系我們,謝謝!