作者:_BKing
來源:www.cnblogs.com/xiaowei123/p/13222710.html
最近,又重新學習了下Redis,深深被Redis的魅力所折服,Redis不僅能快還能慢(我想也這么優(yōu)秀o(╥﹏╥)o),簡直利器呀


?咳咳咳,大家不要誤會,本文很正經(jīng)的啦!伙伴們跟我一起沖呀,我們一起去爬爬這座延時隊列的山峰,探一探它究竟到底有高。

?那接下來開始我們的旅行啦~,我們都知道Redis是一種基于內(nèi)存的單進程單線程數(shù)據(jù)庫(Redis6.0開始之后支持多線程啦?。?,處理速度都非???。那么為何Redis又能慢呢?原來,這里說的慢是指Redis可以設置一些參數(shù)達到慢處理的結果。(這就是為什么Redis既能快又能慢啦?。?
那接下來開始講講我們的Redis在隊列中如何實現(xiàn)延時的情況:
在我們?nèi)粘I钪?,我們可以發(fā)現(xiàn):
在淘寶、京東等購物平臺上下單,超過一定時間未付款,訂單會自動取消。
打車的時候,在規(guī)定時間沒有車主接單,平臺會取消你的單并提醒你暫時沒有車主接單。
點外賣的時候,如果商家在10分鐘還沒接單,就會自動取消訂單。
收快遞的時候,如果我們沒有點確認收貨,在一段時間后程序會自動完成訂單。
在平臺完成訂單后,如果我們沒有在規(guī)定時間評論商品,會自動默認買家不評論。
.......
這時,我們可以想想為什么要這樣做?
因為這樣可以保證商品的庫存可以釋放給其他人購買,你可以不用一直等待打車卻得不到回復,你可以及時換一家店點到外賣。
那么這些情況都是如何實現(xiàn)的呢?
這時我們可以看看這個圖,來看看消息延遲是如何處理的:

?當用戶發(fā)送一個消息請求給服務器后臺的時候,服務器會檢測這條消息是否需要進行延時處理,如果需要就放入到延時隊列中,由延時任務檢測器進行檢測和處理,對于不需要進行延時處理的任務,服務器會立馬對消息進行處理,并把處理后的結果返會給用戶。

?對于在延時任務檢測器內(nèi)部的話,有查詢延遲任務和執(zhí)行延時任務兩個職能,任務檢測器會先去延時任務隊列進行隊列中信息讀取,判斷當前隊列中哪些任務已經(jīng)時間到期并將已經(jīng)到期的任務輸出執(zhí)行(設置一個定時任務)。
這時,我們可以想一想在Redis的數(shù)據(jù)結構中有哪些能進行時間設置標志的命令?
是不是想到的 zset 這個命令,具有去重有序(分數(shù)排序)的功能。沒錯,你想對了呀!
我們可以使用 zset(sortedset)這個命令,用設置好的時間戳作為score進行排序,使用 zadd score1 value1 ....命令就可以一直往內(nèi)存中生產(chǎn)消息。再利用 zrangebysocre 查詢符合條件的所有待處理的任務,通過循環(huán)執(zhí)行隊列任務即可。也可以通過 zrangebyscore key min max withscores limit 0 1 查詢最早的一條任務,來進行消費。

總的來說,你可以通過以下兩種方式來實現(xiàn)((*^▽^*)如果你想到其他方法,也可以告訴我下呀~):
(1)使用zrangebyscore來查詢當前延時隊列中所有任務,找出所有需要進行處理的延時任務,在依次進行操作。
(2)查找當前最早的一條任務,通過score值來判斷任務執(zhí)行的時候是否大于了當前系統(tǒng)的時候,比如說:最早的任務執(zhí)行時間在3點,系統(tǒng)時間在2點58分),表示這個應該需要立馬被執(zhí)行啦,時間快到了(沖沖沖,他來了他來了,他帶著死神的步伐來了)。
我們可以想一想Redis來實現(xiàn)延時隊列有何優(yōu)勢呢?
其實,Redis用來進行實現(xiàn)延時隊列是具有這些優(yōu)勢的:
-
Redis zset支持高性能的 score 排序。
-
Redis是在內(nèi)存上進行操作的,速度非??臁?
-
Redis可以搭建集群,當消息很多時候,我們可以用集群來提高消息處理的速度,提高可用性。
-
Redis具有持久化機制,當出現(xiàn)故障的時候,可以通過AOF和RDB方式來對數(shù)據(jù)進行恢復,保證了數(shù)據(jù)的可靠性
這時候,會有小伙伴問了還有沒有其他實現(xiàn)延時隊列的方式呀!emmm....當然有的,只有想不到的沒有做不到。
一、用消息中間件實現(xiàn)延時隊列
(1)通過 RabbitMQ 來實現(xiàn)延時隊列
方法一:在MQ中我們可以對Queue設置 x-expires 過期時間或者對 Message設置超時時間x-message-ttl。
(這里要注意下:延時相同的消息我們要扔到同一個隊列中,對于每一個延時要建立一個與之對應的隊列—這是由于MQ的過期檢測是惰性檢測的。)
方法二:我們可以用RabbitMQ的插件rabbitmq-delayed-message-exchange插件來實現(xiàn)延時隊列。達到可投遞時間時并將其通過 x-delayed-type 類型標記的交換機類型投遞至目標隊列。

(2)RocketMQ實現(xiàn)延時隊列
rocketmq在發(fā)送延時消息時,是先把消息按照延遲時間段發(fā)送到指定的隊列中(把延時時間段相同的消息放到同一個隊列中,保證了消息處理的順序性,可以讓同一個隊列中消息延時時間是相同的,整個RocketMQ中延時消息時按照遞增順序排序,保證信息處理的先后順序性。)。之后,通過一個定時器來輪詢處理這些隊列里的信息,判斷是否到期。對于到期的消息會發(fā)送到相應的處理隊列中,進行處理。
注意 :目前RocketMQ只支持特定的延時時間段,1s,5s,10s,...2h,不能支持任意時間段的延時設置。有興趣的小伙伴可以去了解下它是相關知識呀~
二、Kafka實現(xiàn)延時隊
Kafka基于時間輪自定義了一個用于實現(xiàn)延遲功能的定時器(SystemTimer),Kafka中的時間輪(TimingWheel)是一個存儲定時任務的環(huán)形隊列,可以進行相關的延時隊列設置。

三、Netty實現(xiàn)延時隊列
Netty也有基于時間輪算法來實現(xiàn)延時隊列。Netty在構建延時隊列主要用HashedWheelTimer,HashedWheelTimer底層數(shù)據(jù)結構是使用DelayedQueue,采用時間輪的算法來實現(xiàn)。

四、DelayQueue來實現(xiàn)延時隊列
Java中有自帶的DelayQueue數(shù)據(jù)類型,我們可以用這個來實現(xiàn)延時隊列。DelayQueue是封裝了一個PriorityQueue(優(yōu)先隊列),在向DelayQueue隊列中添加元素時,會給元素一個Delay(延遲時間)作為排序條件,隊列中最小的元素會優(yōu)先放在隊首,對于隊列中的元素只有到了Delay時間才允許從隊列中取出。這種實現(xiàn)方式是數(shù)據(jù)保存在內(nèi)存中,可能面臨數(shù)據(jù)丟失的情況,同時它是無法支持分布式系統(tǒng)的。

沖沖沖,小伙伴們!又是美好的一天。
特別推薦一個分享架構+算法的優(yōu)質(zhì)內(nèi)容,還沒關注的小伙伴,可以長按關注一下:



長按訂閱更多精彩▼

如有收獲,點個在看,誠摯感謝