www.久久久久|狼友网站av天堂|精品国产无码a片|一级av色欲av|91在线播放视频|亚洲无码主播在线|国产精品草久在线|明星AV网站在线|污污内射久久一区|婷婷综合视频网站

當(dāng)前位置:首頁(yè) > 公眾號(hào)精選 > 架構(gòu)師社區(qū)
[導(dǎo)讀]-???消息隊(duì)列的核心價(jià)值??-?解耦合。異步處理例如電商平臺(tái),秒殺活動(dòng)。一般流程會(huì)分為:1:?風(fēng)險(xiǎn)控制、2:庫(kù)存鎖定、3:生成訂單、4:短信通知、5:更新數(shù)據(jù)。通過(guò)消息系統(tǒng)將秒殺活動(dòng)業(yè)務(wù)拆分開(kāi),將不急需處理的業(yè)務(wù)放在后面慢慢處理;流程改為:1:風(fēng)險(xiǎn)控制、2:庫(kù)存鎖定、3:消息系...


-? ? ?消息隊(duì)列的核心價(jià)值? ? -?


  1. 解耦合。

  2. 異步處理 例如電商平臺(tái),秒殺活動(dòng)。一般流程會(huì)分為:1:?風(fēng)險(xiǎn)控制、2:庫(kù)存鎖定、3:生成訂單、4:短信通知、5:更新數(shù)據(jù)。

  3. 通過(guò)消息系統(tǒng)將秒殺活動(dòng)業(yè)務(wù)拆分開(kāi),將不急需處理的業(yè)務(wù)放在后面慢慢處理;流程改為:1:風(fēng)險(xiǎn)控制、2:庫(kù)存鎖定、3:消息系統(tǒng)、4:生成訂單、5:短信通知、6:更新數(shù)據(jù)。

  4. 流量的控制 1. 網(wǎng)關(guān)在接受到請(qǐng)求后,就把請(qǐng)求放入到消息隊(duì)列里面 2.后端的服務(wù)從消息隊(duì)列里面獲取到請(qǐng)求,完成后續(xù)的秒殺處理流程。然后再給用戶(hù)返回結(jié)果。優(yōu)點(diǎn):控制了流量 缺點(diǎn):會(huì)讓流程變慢。



-? ? ?Kafka 核心概念? ? -?


生產(chǎn)者:Producer 往Kafka集群生成數(shù)據(jù)消費(fèi)者:Consumer 往Kafka里面去獲取數(shù)據(jù),處理數(shù)據(jù)、消費(fèi)數(shù)據(jù)Kafka的數(shù)據(jù)是由消費(fèi)者自己去拉去Kafka里面的數(shù)據(jù)主題:topic分區(qū):partition 默認(rèn)一個(gè)topic有一個(gè)分區(qū)(partition),自己可設(shè)置多個(gè)分區(qū)(分區(qū)分散存儲(chǔ)在服務(wù)器不同節(jié)點(diǎn)上)。

-? ? ?集群架構(gòu)? ? -?


Kafka集群中,一個(gè)kafka服務(wù)器就是一個(gè)broker Topic只是邏輯上的概念,partition在磁盤(pán)上就體現(xiàn)為一個(gè)目錄Consumer Group:消費(fèi)組 消費(fèi)數(shù)據(jù)的時(shí)候,都必須指定一個(gè)group id,指定一個(gè)組的id假定程序A和程序B指定的group id號(hào)一樣,那么兩個(gè)程序就屬于同一個(gè)消費(fèi)組特殊。
比如,有一個(gè)主題topicA程序A去消費(fèi)了這個(gè)topicA,那么程序B就不能再去消費(fèi)topicA(程序A和程序B屬于一個(gè)消費(fèi)組) 再比如程序A已經(jīng)消費(fèi)了topicA里面的數(shù)據(jù),現(xiàn)在還是重新再次消費(fèi)topicA的數(shù)據(jù),是不可以的,但是重新指定一個(gè)group id號(hào)以后,可以消費(fèi)。不同消費(fèi)組之間沒(méi)有影響。
消費(fèi)組需自定義,消費(fèi)者名稱(chēng)程序自動(dòng)生成(獨(dú)一無(wú)二)。Controller:Kafka節(jié)點(diǎn)里面的一個(gè)主節(jié)點(diǎn)。

-? ? ?數(shù)據(jù)性能? ? -?


kafka寫(xiě)數(shù)據(jù):順序?qū)懀疟P(pán)上寫(xiě)數(shù)據(jù)時(shí),就是追加數(shù)據(jù),沒(méi)有隨機(jī)寫(xiě)的操作。經(jīng)驗(yàn): 如果一個(gè)服務(wù)器磁盤(pán)達(dá)到一定的個(gè)數(shù),磁盤(pán)也達(dá)到一定轉(zhuǎn)數(shù),往磁盤(pán)里面順序?qū)懀ㄗ芳訉?xiě))數(shù)據(jù)的速度和寫(xiě)內(nèi)存的速度差不多生產(chǎn)者生產(chǎn)消息,經(jīng)過(guò)kafka服務(wù)先寫(xiě)到os cache 內(nèi)存中,然后經(jīng)過(guò)sync順序?qū)懙酱疟P(pán)上。

-? ? ?零拷貝數(shù)據(jù)高性能??? -?


消費(fèi)者讀取數(shù)據(jù)流程:
  1. 消費(fèi)者發(fā)送請(qǐng)求給kafka服務(wù);
  2. kafka服務(wù)去os cache緩存讀取數(shù)據(jù)(緩存沒(méi)有就去磁盤(pán)讀取數(shù)據(jù));
  3. 從磁盤(pán)讀取了數(shù)據(jù)到os cache緩存中;
  4. os cache復(fù)制數(shù)據(jù)到kafka應(yīng)用程序中;
  5. kafka將數(shù)據(jù)(復(fù)制)發(fā)送到socket cache中;
  6. socket cache通過(guò)網(wǎng)卡傳輸給消費(fèi)者。2?萬(wàn)字長(zhǎng)文深入詳解?Kafka,從源碼到架構(gòu)全部講透

kafka linux sendfile技術(shù) — 零拷貝
1.消費(fèi)者發(fā)送請(qǐng)求給kafka服務(wù) ;2.kafka服務(wù)去os cache緩存讀取數(shù)據(jù)(緩存沒(méi)有就去磁盤(pán)讀取數(shù)據(jù)) ;3.從磁盤(pán)讀取了數(shù)據(jù)到os cache緩存中 ;4.os cache直接將數(shù)據(jù)發(fā)送給網(wǎng)卡 ;5.通過(guò)網(wǎng)卡將數(shù)據(jù)傳輸給消費(fèi)者。
2?萬(wàn)字長(zhǎng)文深入詳解?Kafka,從源碼到架構(gòu)全部講透



-? ? ?Kafka 日志分段保存? ? -?


Kafka中一個(gè)主題,一般會(huì)設(shè)置分區(qū);比如創(chuàng)建了一個(gè)topic_a,然后創(chuàng)建的時(shí)候指定了這個(gè)主題有三個(gè)分區(qū)。其實(shí)在三臺(tái)服務(wù)器上,會(huì)創(chuàng)建三個(gè)目錄。服務(wù)器1(kafka1)創(chuàng)建目錄topic_a-0:。
目錄下面是我們文件(存儲(chǔ)數(shù)據(jù)),kafka數(shù)據(jù)就是message,數(shù)據(jù)存儲(chǔ)在log文件里。.log結(jié)尾的就是日志文件,在kafka中把數(shù)據(jù)文件就叫做日志文件 。一個(gè)分區(qū)下面默認(rèn)有n多個(gè)日志文件(分段存儲(chǔ)),一個(gè)日志文件默認(rèn)1G。
2?萬(wàn)字長(zhǎng)文深入詳解?Kafka,從源碼到架構(gòu)全部講透


服務(wù)器2(kafka2):創(chuàng)建目錄topic_a-1: 服務(wù)器3(kafka3):創(chuàng)建目錄topic_a-2。

-? ? ?二分查找定位數(shù)據(jù)? ? -?


Kafka里面每一條消息,都有自己的offset(相對(duì)偏移量),存在物理磁盤(pán)上面,在position Position:物理位置(磁盤(pán)上面哪個(gè)地方)也就是說(shuō)一條消息就有兩個(gè)位置:offset:相對(duì)偏移量(相對(duì)位置)position:磁盤(pán)物理位置稀疏索引:???????? Kafka中采用了稀疏索引的方式讀取索引,kafka每當(dāng)寫(xiě)入了4k大小的日志(.log),就往index里寫(xiě)入一個(gè)記錄索引。其中會(huì)采用二分查找:

2?萬(wàn)字長(zhǎng)文深入詳解?Kafka,從源碼到架構(gòu)全部講透


2?萬(wàn)字長(zhǎng)文深入詳解?Kafka,從源碼到架構(gòu)全部講透

-? ? ?高并發(fā)網(wǎng)絡(luò)設(shè)計(jì) NIO? ? -?


網(wǎng)絡(luò)設(shè)計(jì)部分是kafka中設(shè)計(jì)最好的一個(gè)部分,這也是保證Kafka高并發(fā)、高性能的原因,對(duì)kafka進(jìn)行調(diào)優(yōu),就得對(duì)kafka原理比較了解,尤其是網(wǎng)絡(luò)設(shè)計(jì)部分。
Reactor 網(wǎng)絡(luò)設(shè)計(jì)模式1:
2?萬(wàn)字長(zhǎng)文深入詳解?Kafka,從源碼到架構(gòu)全部講透


Reactor網(wǎng)絡(luò)設(shè)計(jì)模式2:
2?萬(wàn)字長(zhǎng)文深入詳解?Kafka,從源碼到架構(gòu)全部講透


Reactor網(wǎng)絡(luò)設(shè)計(jì)模式3:
2?萬(wàn)字長(zhǎng)文深入詳解?Kafka,從源碼到架構(gòu)全部講透


Kafka超高并發(fā)網(wǎng)絡(luò)設(shè)計(jì):
2?萬(wàn)字長(zhǎng)文深入詳解?Kafka,從源碼到架構(gòu)全部講透


2?萬(wàn)字長(zhǎng)文深入詳解?Kafka,從源碼到架構(gòu)全部講透



-? ? ?Kafka?冗余副本保證高可用? ? -?


在kafka里面分區(qū)是有副本的,注:0.8以前是沒(méi)有副本機(jī)制的。創(chuàng)建主題時(shí),可以指定分區(qū),也可以指定副本個(gè)數(shù)。副本是有角色的:leader partition:1、寫(xiě)數(shù)據(jù)、讀數(shù)據(jù)操作都是從leader partition去操作的。
它會(huì)維護(hù)一個(gè)ISR(in-sync- replica )列表,但是會(huì)根據(jù)一定的規(guī)則刪除ISR列表里面的值 生產(chǎn)者發(fā)送來(lái)一個(gè)消息,消息首先要寫(xiě)入到leader partition中 寫(xiě)完了以后,還要把消息寫(xiě)入到ISR列表里面的其它分區(qū),寫(xiě)完后才算這個(gè)消息提交 follower partition:從leader partition同步數(shù)據(jù)。


-? ? ?優(yōu)秀架構(gòu)思考? ? -?


Kafka — 高并發(fā)、高可用、高性能 高可用:多副本機(jī)制 高并發(fā):網(wǎng)絡(luò)架構(gòu)設(shè)計(jì) 三層架構(gòu):多selector -> 多線(xiàn)程 -> 隊(duì)列的設(shè)計(jì)(NIO) 高性能:寫(xiě)數(shù)據(jù):
  1. 把數(shù)據(jù)先寫(xiě)入到OS Cache
  2. 寫(xiě)到磁盤(pán)上面是順序?qū)?,性能很?/span>

讀數(shù)據(jù):
  1. 根據(jù)稀疏索引,快速定位到要消費(fèi)的數(shù)據(jù)
  2. 零拷貝機(jī)制 減少數(shù)據(jù)的拷貝 減少了應(yīng)用程序與操作系統(tǒng)上下文切換



-? ? ?Kafka?生產(chǎn)環(huán)境搭建? ? -?


需求場(chǎng)景分析

電商平臺(tái),需要每天10億請(qǐng)求都要發(fā)送到Kafka集群上面。二八反正,一般評(píng)估出來(lái)問(wèn)題都不大。10億請(qǐng)求 -> 24 過(guò)來(lái)的,一般情況下,每天的12:00 到早上8:00 這段時(shí)間其實(shí)是沒(méi)有多大的數(shù)據(jù)量的。80%的請(qǐng)求是用的另外16小時(shí)的處理的。16個(gè)小時(shí)處理 -> 8億的請(qǐng)求。16 * 0.2 = 3個(gè)小時(shí) 處理了8億請(qǐng)求的80%的數(shù)據(jù)。

也就是說(shuō)6億的數(shù)據(jù)是靠3個(gè)小時(shí)處理完的。我們簡(jiǎn)單的算一下高峰期時(shí)候的qps6億/3小時(shí) =5.5萬(wàn)/s qps=5.5萬(wàn)。
10億請(qǐng)求 * 50kb = 46T 每天需要存儲(chǔ)46T的數(shù)據(jù)。

一般情況下,我們都會(huì)設(shè)置兩個(gè)副本 46T * 2 = 92T ?Kafka里面的數(shù)據(jù)是有保留的時(shí)間周期,保留最近3天的數(shù)據(jù)。92T * 3天 = 276T我這兒說(shuō)的是50kb不是說(shuō)一條消息就是50kb不是(把日志合并了,多條日志合并在一起),通常情況下,一條消息就幾b,也有可能就是幾百字節(jié)。

-? ? ?物理機(jī)數(shù)量評(píng)估? ? -?


(1)首先分析一下是需要虛擬機(jī)還是物理機(jī) 像Kafka mysql hadoop這些集群搭建的時(shí)候,我們生產(chǎn)里面都是使用物理機(jī)。
(2)高峰期需要處理的請(qǐng)求總的請(qǐng)求每秒5.5萬(wàn)個(gè),其實(shí)一兩臺(tái)物理機(jī)絕對(duì)是可以抗住的。一般情況下,我們?cè)u(píng)估機(jī)器的時(shí)候,是按照高峰期的4倍的去評(píng)估。如果是4倍的話(huà),大概我們集群的能力要準(zhǔn)備到 20萬(wàn)qps。這樣子的集群才是比較安全的集群。大概就需要5臺(tái)物理機(jī)。每臺(tái)承受4萬(wàn)請(qǐng)求。
場(chǎng)景總結(jié):搞定10億請(qǐng)求,高峰期5.5萬(wàn)的qps,276T的數(shù)據(jù),需要5臺(tái)物理機(jī)。

-? ? ?磁盤(pán)選擇? ? -?


搞定10億請(qǐng)求,高峰期5.5萬(wàn)的qps,276T的數(shù)據(jù),需要5臺(tái)物理機(jī)。
(1)SSD固態(tài)硬盤(pán),還是需要普通的機(jī)械硬盤(pán)SSD硬盤(pán):性能比較好,但是價(jià)格貴 SAS盤(pán):某方面性能不是很好,但是比較便宜。SSD硬盤(pán)性能比較好,指的是它隨機(jī)讀寫(xiě)的性能比較好。適合MySQL這樣集群。但是其實(shí)他的順序?qū)懙男阅芨鶶AS盤(pán)差不多。
kafka的理解:就是用的順序?qū)?。所以我們就用普通的【機(jī)械硬盤(pán)】就可以了。
(2)需要我們?cè)u(píng)估每臺(tái)服務(wù)器需要多少塊磁盤(pán) 5臺(tái)服務(wù)器,一共需要276T ,大約每臺(tái)服務(wù)器 需要存儲(chǔ)60T的數(shù)據(jù)。我們公司里面服務(wù)器的配置用的是 11塊硬盤(pán),每個(gè)硬盤(pán) 7T。11 * 7T = 77T。
77T * 5 臺(tái)服務(wù)器 = 385T。

場(chǎng)景總結(jié):搞定10億請(qǐng)求,需要5臺(tái)物理機(jī),11(SAS) * 7T。

-? ? ?內(nèi)存評(píng)估? ? -?


搞定10億請(qǐng)求,需要5臺(tái)物理機(jī),11(SAS) * 7T。
我們發(fā)現(xiàn)kafka讀寫(xiě)數(shù)據(jù)的流程 都是基于os cache,換句話(huà)說(shuō)假設(shè)咱們的os cashe無(wú)限大那么整個(gè)kafka是不是相當(dāng)于就是基于內(nèi)存去操作,如果是基于內(nèi)存去操作,性能肯定很好。內(nèi)存是有限的。
(1)盡可能多的內(nèi)存資源要給 os cache。(2)Kafka的代碼用 核心的代碼用的是scala寫(xiě)的,客戶(hù)端的代碼java寫(xiě)的。都是基于jvm。所以我們還要給一部分的內(nèi)存給jvm。Kafka的設(shè)計(jì),沒(méi)有把很多數(shù)據(jù)結(jié)構(gòu)都放在jvm里面。所以我們的這個(gè)jvm不需要太大的內(nèi)存。根據(jù)經(jīng)驗(yàn),給個(gè)10G就可以了。
NameNode: jvm里面還放了元數(shù)據(jù)(幾十G),JVM一定要給得很大。比如給個(gè)100G。

假設(shè)我們這個(gè)10請(qǐng)求的這個(gè)項(xiàng)目,一共會(huì)有100個(gè)topic。100 topic * 5 partition * 2 = 1000 partition 一個(gè)partition其實(shí)就是物理機(jī)上面的一個(gè)目錄,這個(gè)目錄下面會(huì)有很多個(gè).log的文件。
.log就是存儲(chǔ)數(shù)據(jù)文件,默認(rèn)情況下一個(gè).log文件的大小是1G。我們?nèi)绻WC 1000個(gè)partition 的最新的.log 文件的數(shù)據(jù) 如果都在內(nèi)存里面,這個(gè)時(shí)候性能就是最好。1000 * 1G = 1000G內(nèi)存. 我們只需要把當(dāng)前最新的這個(gè)log 保證里面的25%的最新的數(shù)據(jù)在內(nèi)存里面。250M * 1000 = 0.25 G* 1000 =250G的內(nèi)存。
250內(nèi)存 / 5 = 50G內(nèi)存 50G 10G = 60G內(nèi)存。
64G的內(nèi)存,另外的4G,操作系統(tǒng)本生是不是也需要內(nèi)存。其實(shí)Kafka的jvm也可以不用給到10G這么多。評(píng)估出來(lái)64G是可以的。當(dāng)然如果能給到128G的內(nèi)存的服務(wù)器,那就最好。
我剛剛評(píng)估的時(shí)候用的都是一個(gè)topic是5個(gè)partition,但是如果是數(shù)據(jù)量比較大的topic,可能會(huì)有10個(gè)partition。
總結(jié):搞定10億請(qǐng)求,需要5臺(tái)物理機(jī),11(SAS) * 7T ,需要64G的內(nèi)存(128G更好)

-? ? ?CPU 壓力評(píng)估? ? -?


評(píng)估一下每臺(tái)服務(wù)器需要多少cpu core(資源很有限)。
我們?cè)u(píng)估需要多少個(gè)cpu ,依據(jù)就是看我們的服務(wù)里面有多少線(xiàn)程去跑。線(xiàn)程就是依托cpu 去運(yùn)行的。如果我們的線(xiàn)程比較多,但是cpu core比較少,這樣的話(huà),我們的機(jī)器負(fù)載就會(huì)很高,性能不就不好。
評(píng)估一下,kafka的一臺(tái)服務(wù)器 啟動(dòng)以后會(huì)有多少線(xiàn)程?
Acceptor線(xiàn)程 1 processor線(xiàn)程 3 6~9個(gè)線(xiàn)程 處理請(qǐng)求線(xiàn)程 8個(gè) 32個(gè)線(xiàn)程 定時(shí)清理的線(xiàn)程,拉取數(shù)據(jù)的線(xiàn)程,定時(shí)檢查ISR列表的機(jī)制 等等。所以大概一個(gè)Kafka的服務(wù)啟動(dòng)起來(lái)以后,會(huì)有一百多個(gè)線(xiàn)程。
cpu core = 4個(gè),一遍來(lái)說(shuō),幾十個(gè)線(xiàn)程,就肯定把cpu 打滿(mǎn)了。cpu core = 8個(gè),應(yīng)該很輕松的能支持幾十個(gè)線(xiàn)程。如果我們的線(xiàn)程是100多個(gè),或者差不多200個(gè),那么8 個(gè) cpu core是搞不定的。所以我們這兒建議:CPU core = 16個(gè)。如果可以的話(huà),能有32個(gè)cpu core 那就最好。
結(jié)論:kafka集群,最低也要給16個(gè)cpu core,如果能給到32 cpu core那就更好。2cpu * 8 =16 cpu core 4cpu * 8 = 32 cpu core。
總結(jié):搞定10億請(qǐng)求,需要5臺(tái)物理機(jī),11(SAS) * 7T ,需要64G的內(nèi)存(128G更好),需要16個(gè)cpu core(32個(gè)更好)。

-? ? ?網(wǎng)絡(luò)需求評(píng)估? ? -?


評(píng)估我們需要什么樣網(wǎng)卡?一般要么是千兆的網(wǎng)卡(1G/s),還有的就是萬(wàn)兆的網(wǎng)卡(10G/s)。
高峰期的時(shí)候?每秒會(huì)有5.5萬(wàn)的請(qǐng)求涌入,5.5/5 =?大約是每臺(tái)服務(wù)器會(huì)有1萬(wàn)個(gè)請(qǐng)求涌入。
我們之前說(shuō)的,
10000?* 50kb = 488M ?也就是每條服務(wù)器,每秒要接受488M的數(shù)據(jù)。數(shù)據(jù)還要有副本,副本之間的同步
也是走的網(wǎng)絡(luò)的請(qǐng)求。488 * 2 = 976m/s
說(shuō)明一下:
???很多公司的數(shù)據(jù),一個(gè)請(qǐng)求里面是沒(méi)有50kb這么大的,我們公司是因?yàn)橹鳈C(jī)在生產(chǎn)端封裝了數(shù)據(jù)
???然后把多條數(shù)據(jù)合并在一起了,所以我們的一個(gè)請(qǐng)求才會(huì)有這么大。
???
說(shuō)明一下:
???一般情況下,網(wǎng)卡的帶寬是達(dá)不到極限的,如果是千兆的網(wǎng)卡,我們能用的一般就是700M左右。
???但是如果最好的情況,我們還是使用萬(wàn)兆的網(wǎng)卡。
???如果使用的是萬(wàn)兆的,那就是很輕松。



-? ? ?集群規(guī)劃? ? -?


請(qǐng)求量規(guī)劃物理機(jī)的個(gè)數(shù) 分析磁盤(pán)的個(gè)數(shù),選擇使用什么樣的磁盤(pán) 內(nèi)存 cpu core 網(wǎng)卡就是告訴大家,以后要是公司里面有什么需求,進(jìn)行資源的評(píng)估,服務(wù)器的評(píng)估,大家按照我的思路去評(píng)估
一條消息的大小 50kb -> 1kb 500byte 1Mip 主機(jī)名 192.168.0.100 hadoop1 192.168.0.101 hadoop2 192.168.0.102 hadoop3。
主機(jī)的規(guī)劃:kafka集群架構(gòu)的時(shí)候:主從式的架構(gòu):controller -> 通過(guò)zk集群來(lái)管理整個(gè)集群的元數(shù)據(jù)。
  1. zookeeper集群 hadoop1 hadoop2 hadoop3;
  2. kafka集群 理論上來(lái)講,我們不應(yīng)該把kafka的服務(wù)于zk的服務(wù)安裝在一起。但是我們這兒服務(wù)器有限。所以我們kafka集群也是安裝在hadoop1 haadoop2 hadoop3。



-? ? ?Kafka?運(yùn)維工具與命令? ? -?


KafkaManager — 頁(yè)面管理工具。


場(chǎng)景一:topic數(shù)據(jù)量太大,要增加topic數(shù)。
一開(kāi)始創(chuàng)建主題的時(shí)候,數(shù)據(jù)量不大,給的分區(qū)數(shù)不多。
kafka-topics.sh?--create?--zookeeper?hadoop1:2181,hadoop2:2181,hadoop3:2181?--replication-factor?1?--partitions?1?--topic?test6
kafka-topics.sh?--alter?--zookeeper?hadoop1:2181,hadoop2:2181,ha

broker id:hadoop1:0 hadoop2:1 hadoop3:2 假設(shè)一個(gè)partition有三個(gè)副本:partition0:a,b,ca:leader partition b,c:follower partition
ISR:{a,b,c}如果一個(gè)follower分區(qū) 超過(guò)10秒 沒(méi)有向leader partition去拉取數(shù)據(jù),那么這個(gè)分區(qū)就從ISR列表里面移除。
場(chǎng)景二:核心topic增加副本因子
如果對(duì)核心業(yè)務(wù)數(shù)據(jù)需要增加副本因子 vim test.json腳本,將下面一行json腳本保存:
{“version”:1,“partitions”:[{“topic”:“test6”,“partition”:0,“replicas”:[0,1,2]},{“topic”:“test6”,“partition”:1,“replicas”:[0,1,2]},{“topic”:“test6”,“partition”:2,“replicas”:[0,1,2]}]}

執(zhí)行上面json腳本:
kafka-reassign-partitions.sh?--zookeeper?hadoop1:2181,hadoop2:2181,hadoop3:2181?--reassignment-json-file?test.json?--execute

場(chǎng)景三:負(fù)載不均衡的topic,手動(dòng)遷移vi topics-to-move.json
{“topics”:?[{“topic”:?“test01”},?{“topic”:?“test02”}],?“version”:?1}?//?把你所有的topic都寫(xiě)在這里
kafka-reassgin-partitions.sh?--zookeeper?hadoop1:2181,hadoop2:2181,hadoop3:2181?--topics-to-move-json-file?topics-to-move.json?--broker-list?“5,6”?--generate

把你所有的包括新加入的broker機(jī)器都寫(xiě)在這里,就會(huì)說(shuō)是把所有的partition均勻的分散在各個(gè)broker上,包括新進(jìn)來(lái)的broker此時(shí)會(huì)生成一個(gè)遷移方案,可以保存到一個(gè)文件里去:expand-cluster-reassignment.json
kafka-reassign-partitions.sh?--zookeeper?hadoop01:2181,hadoop02:2181,hadoop03:2181?--reassignment-json-file?expand-cluster-reassignment.json?--execute

kafka-reassign-partitions.sh?--zookeeper?hadoop01:2181,hadoop02:2181,hadoop03:2181?--reassignment-json-file?expand-cluster-reassignment.json?--verify

這種數(shù)據(jù)遷移操作一定要在晚上低峰的時(shí)候來(lái)做,因?yàn)樗麜?huì)在機(jī)器之間遷移數(shù)據(jù),非常的占用帶寬資源–generate: 根據(jù)給予的Topic列表和Broker列表生成遷移計(jì)劃。generate并不會(huì)真正進(jìn)行消息遷移,而是將消息遷移計(jì)劃計(jì)算出來(lái),供execute命令使用。–execute: 根據(jù)給予的消息遷移計(jì)劃進(jìn)行遷移。–verify: 檢查消息是否已經(jīng)遷移完成。
場(chǎng)景四:如果某個(gè)broker leader partition過(guò)多
正常情況下,我們的leader partition在服務(wù)器之間是負(fù)載均衡。hadoop1 4 hadoop2 1 hadoop3 1。
現(xiàn)在各個(gè)業(yè)務(wù)方可以自行申請(qǐng)創(chuàng)建topic,分區(qū)數(shù)量都是自動(dòng)分配和后續(xù)動(dòng)態(tài)調(diào)整的, kafka本身會(huì)自動(dòng)把leader partition均勻分散在各個(gè)機(jī)器上,這樣可以保證每臺(tái)機(jī)器的讀寫(xiě)吞吐量都是均勻的,但是也有例外。
那就是如果某些broker宕機(jī),會(huì)導(dǎo)致leader partition過(guò)于集中在其他少部分幾臺(tái)broker上, 這會(huì)導(dǎo)致少數(shù)幾臺(tái)broker的讀寫(xiě)請(qǐng)求壓力過(guò)高,其他宕機(jī)的broker重啟之后都是folloer partition,讀寫(xiě)請(qǐng)求很低,造成集群負(fù)載不均衡有一個(gè)參數(shù),auto.leader.rebalance.enable。
默認(rèn)是true,每隔300秒(leader.imbalance.check.interval.seconds)檢查leader負(fù)載是否平衡 如果一臺(tái)broker上的不均衡的leader超過(guò)了10%,leader.imbalance.per.broker.percentage, 就會(huì)對(duì)這個(gè)broker進(jìn)行選舉 配置參數(shù):auto.leader.rebalance.enable 默認(rèn)是true leader.imbalance.per.broker.percentage: 每個(gè)broker允許的不平衡的leader的比率。如果每個(gè)broker超過(guò)了這個(gè)值,控制器會(huì)觸發(fā)leader的平衡。
這個(gè)值表示百分比。10% leader.imbalance.check.interval.seconds:默認(rèn)值300秒。

-? ? ?Kafka 生產(chǎn)者發(fā)消息原理? ? -?


2?萬(wàn)字長(zhǎng)文深入詳解?Kafka,從源碼到架構(gòu)全部講透


生產(chǎn)者發(fā)送消息原理—基礎(chǔ)案例演示:


2?萬(wàn)字長(zhǎng)文深入詳解?Kafka,從源碼到架構(gòu)全部講透



-? ? ?如何提升吞吐量 ? ?-?


如何提升吞吐量:參數(shù)一:buffer.memory:設(shè)置發(fā)送消息的緩沖區(qū),默認(rèn)值是33554432,就是32MB 參數(shù)二:compression.type:默認(rèn)是none,不壓縮,但是也可以使用lz4壓縮,效率還是不錯(cuò)的,壓縮之后可以減小數(shù)據(jù)量,提升吞吐量,但是會(huì)加大producer端的cpu開(kāi)銷(xiāo) 參數(shù)三:batch.size:設(shè)置batch的大小,如果batch太小,會(huì)導(dǎo)致頻繁網(wǎng)絡(luò)請(qǐng)求,吞吐量下降。
如果batch太大,會(huì)導(dǎo)致一條消息需要等待很久才能被發(fā)送出去,而且會(huì)讓內(nèi)存緩沖區(qū)有很大壓力,過(guò)多數(shù)據(jù)緩沖在內(nèi)存里,默認(rèn)值是:16384,就是16kb,也就是一個(gè)batch滿(mǎn)了16kb就發(fā)送出去,一般在實(shí)際生產(chǎn)環(huán)境,這個(gè)batch的值可以增大一些來(lái)提升吞吐量,如果一個(gè)批次設(shè)置大了,會(huì)有延遲。
一般根據(jù)一條消息大小來(lái)設(shè)置。如果我們消息比較少。配合使用的參數(shù)linger.ms,這個(gè)值默認(rèn)是0,意思就是消息必須立即被發(fā)送,但是這是不對(duì)的,一般設(shè)置一個(gè)100毫秒之類(lèi)的,這樣的話(huà)就是說(shuō),這個(gè)消息被發(fā)送出去后進(jìn)入一個(gè)batch,如果100毫秒內(nèi),這個(gè)batch滿(mǎn)了16kb,自然就會(huì)發(fā)送出去。

-? ? ?如何處理異常 ? ?-?


  1. LeaderNotAvailableException:這個(gè)就是如果某臺(tái)機(jī)器掛了,此時(shí)leader副本不可用,會(huì)導(dǎo)致你寫(xiě)入失敗,要等待其他follower副本切換為leader副本之后,才能繼續(xù)寫(xiě)入,此時(shí)可以重試發(fā)送即可;如果說(shuō)你平時(shí)重啟kafka的broker進(jìn)程,肯定會(huì)導(dǎo)致leader切換,一定會(huì)導(dǎo)致你寫(xiě)入報(bào)錯(cuò),是LeaderNotAvailableException。
  2. NotControllerException:這個(gè)也是同理,如果說(shuō)Controller所在Broker掛了,那么此時(shí)會(huì)有問(wèn)題,需要等待Controller重新選舉,此時(shí)也是一樣就是重試即可。
  3. NetworkException:網(wǎng)絡(luò)異常 timeout a. 配置retries參數(shù),他會(huì)自動(dòng)重試的 b. 但是如果重試幾次之后還是不行,就會(huì)提供Exception給我們來(lái)處理了,我們獲取到異常以后,再對(duì)這個(gè)消息進(jìn)行單獨(dú)處理。我們會(huì)有備用的鏈路。發(fā)送不成功的消息發(fā)送到Redis或者寫(xiě)到文件系統(tǒng)中,甚至是丟棄。



-? ? ?重試機(jī)制 ? ?-?


重試會(huì)帶來(lái)一些問(wèn)題:
  1. 消息會(huì)重復(fù)有的時(shí)候一些leader切換之類(lèi)的問(wèn)題,需要進(jìn)行重試,設(shè)置retries即可,但是消息重試會(huì)導(dǎo)致,重復(fù)發(fā)送的問(wèn)題,比如說(shuō)網(wǎng)絡(luò)抖動(dòng)一下導(dǎo)致他以為沒(méi)成功,就重試了,其實(shí)人家都成功了。
  2. 消息亂序消息重試是可能導(dǎo)致消息的亂序的,因?yàn)榭赡芘旁谀愫竺娴南⒍及l(fā)送出去了。所以可以使用"max.in.flight.requests.per.connection"參數(shù)設(shè)置為1, 這樣可以保證producer同一時(shí)間只能發(fā)送一條消息。兩次重試的間隔默認(rèn)是100毫秒,用"retry.backoff.ms"來(lái)進(jìn)行設(shè)置 基本上在開(kāi)發(fā)過(guò)程中,靠重試機(jī)制基本就可以搞定95%的異常問(wèn)題。



-? ? ?ACK 參數(shù)詳情? ??-?


producer端設(shè)置的 request.required.acks=0;只要請(qǐng)求已發(fā)送出去,就算是發(fā)送完了,不關(guān)心有沒(méi)有寫(xiě)成功。性能很好,如果是對(duì)一些日志進(jìn)行分析,可以承受丟數(shù)據(jù)的情況,用這個(gè)參數(shù),性能會(huì)很好。request.required.acks=1;發(fā)送一條消息,當(dāng)leader partition寫(xiě)入成功以后,才算寫(xiě)入成功。
不過(guò)這種方式也有丟數(shù)據(jù)的可能。request.required.acks=-1;需要ISR列表里面,所有副本都寫(xiě)完以后,這條消息才算寫(xiě)入成功。ISR:1個(gè)副本。1 leader partition 1 follower partition kafka服務(wù)端:min.insync.replicas:1, 如果我們不設(shè)置的話(huà),默認(rèn)這個(gè)值是1 一個(gè)leader partition會(huì)維護(hù)一個(gè)ISR列表,這個(gè)值就是限制ISR列表里面 至少得有幾個(gè)副本,比如這個(gè)值是2,那么當(dāng)ISR列表里面只有一個(gè)副本的時(shí)候。
往這個(gè)分區(qū)插入數(shù)據(jù)的時(shí)候會(huì)報(bào)錯(cuò)。設(shè)計(jì)一個(gè)不丟數(shù)據(jù)的方案:數(shù)據(jù)不丟失的方案:1)分區(qū)副本 >=2 2)acks = -1 3)min.insync.replicas >=2 還有可能就是發(fā)送有異常:對(duì)異常進(jìn)行處理。

-? ? ?自定義分區(qū)? ? -?


分區(qū):1、沒(méi)有設(shè)置key我們的消息就會(huì)被輪訓(xùn)的發(fā)送到不同的分區(qū)。2、設(shè)置了keykafka自帶的分區(qū)器,會(huì)根據(jù)key計(jì)算出來(lái)一個(gè)hash值,這個(gè)hash值會(huì)對(duì)應(yīng)某一個(gè)分區(qū)。如果key相同的,那么hash值必然相同,key相同的值,必然是會(huì)被發(fā)送到同一個(gè)分區(qū)。但是有些比較特殊的時(shí)候,我們就需要自定義分區(qū)了:
public?class?HotDataPartitioner?implements?Partitioner?{
private?Random?random;
@Override
public?void?configure(Map?configs)?{
random?=?new?Random();
}
@Override
public?int?partition(String?topic,?Object?keyObj,?byte[]?keyBytes,?Object?value,?byte[]?valueBytes,?Cluster?cluster)?{
String?key?=?(String)keyObj;
List?partitionInfoList?=?cluster.availablePartitionsForTopic(topic);
//獲取到分區(qū)的個(gè)數(shù)?0,1,2
int?partitionCount?=?partitionInfoList.size();
//最后一個(gè)分區(qū)
int?hotDataPartition?=?partitionCount?-?1;
return?!key.contains(“hot_data”)???random.nextInt(partitionCount?-?1)?:?hotDataPartition;
}
}

如何使用:配置上這個(gè)類(lèi)即可:props.put(”partitioner.class”, “com.zhss.HotDataPartitioner”);



-? ? ?綜合案例演示? ? -?


消費(fèi)組概念:groupid相同就屬于同一個(gè)消費(fèi)組。
1)每個(gè)consumer都要屬于一個(gè)consumer.group,就是一個(gè)消費(fèi)組,topic的一個(gè)分區(qū)只會(huì)分配給 一個(gè)消費(fèi)組下的一個(gè)consumer來(lái)處理,每個(gè)consumer可能會(huì)分配多個(gè)分區(qū),也有可能某個(gè)consumer沒(méi)有分配到任何分區(qū)。
(2)如果想要實(shí)現(xiàn)一個(gè)廣播的效果,那只需要使用不同的group id去消費(fèi)就可以。topicA: partition0、partition1 groupA:consumer1:消費(fèi) partition0 consuemr2:消費(fèi) partition1 consuemr3:消費(fèi)不到數(shù)據(jù) groupB: consuemr3:消費(fèi)到partition0和partition1 3)如果consumer group中某個(gè)消費(fèi)者掛了,此時(shí)會(huì)自動(dòng)把分配給他的分區(qū)交給其他的消費(fèi)者,如果他又重啟了,那么又會(huì)把一些分區(qū)重新交還給他。

-? ? ?Kafka 消費(fèi)組概念? ? -?


groupid 相同就屬于同一個(gè)消費(fèi)組。
(1)每個(gè)consumer都要屬于一個(gè)consumer.group,就是一個(gè)消費(fèi)組,topic的一個(gè)分區(qū)只會(huì)分配給 一個(gè)消費(fèi)組下的一個(gè)consumer來(lái)處理,每個(gè)consumer可能會(huì)分配多個(gè)分區(qū),也有可能某個(gè)consumer沒(méi)有分配到任何分區(qū)。
(2)如果想要實(shí)現(xiàn)一個(gè)廣播的效果,那只需要使用不同的group id去消費(fèi)就可以。topicA: partition0、partition1 groupA:consumer1:消費(fèi) partition0 consuemr2:消費(fèi) partition1 consuemr3:消費(fèi)不到數(shù)據(jù) groupB: consuemr3:消費(fèi)到partition0和partition1 3)如果consumer group中某個(gè)消費(fèi)者掛了,此時(shí)會(huì)自動(dòng)把分配給他的分區(qū)交給其他的消費(fèi)者,如果他又重啟了,那么又會(huì)把一些分區(qū)重新交還給他。
基礎(chǔ)案例演示:
2?萬(wàn)字長(zhǎng)文深入詳解?Kafka,從源碼到架構(gòu)全部講透



-? ? ?偏移量管理? ? -?


  1. 每個(gè)consumer內(nèi)存里數(shù)據(jù)結(jié)構(gòu)保存對(duì)每個(gè)topic的每個(gè)分區(qū)的消費(fèi)offset,定期會(huì)提交offset,老版本是寫(xiě)入zk,但是那樣高并發(fā)請(qǐng)求zk是不合理的架構(gòu)設(shè)計(jì),zk是做分布式系統(tǒng)的協(xié)調(diào)的,輕量級(jí)的元數(shù)據(jù)存儲(chǔ),不能負(fù)責(zé)高并發(fā)讀寫(xiě),作為數(shù)據(jù)存儲(chǔ)。
  2. 現(xiàn)在新的版本提交offset發(fā)送給kafka內(nèi)部topic:__consumer_offsets,提交過(guò)去的時(shí)候, key是group.id topic 分區(qū)號(hào),value就是當(dāng)前offset的值,每隔一段時(shí)間,kafka內(nèi)部會(huì)對(duì)這個(gè)topic進(jìn)行compact(合并),也就是每個(gè)group.id topic 分區(qū)號(hào)就保留最新數(shù)據(jù)。
  3. __consumer_offsets可能會(huì)接收高并發(fā)的請(qǐng)求,所以默認(rèn)分區(qū)50個(gè)(leader partitiron -> 50 kafka),這樣如果你的kafka部署了一個(gè)大的集群,比如有50臺(tái)機(jī)器,就可以用50臺(tái)機(jī)器來(lái)抗offset提交的請(qǐng)求壓力. 消費(fèi)者 -> broker端的數(shù)據(jù) message -> 磁盤(pán) -> offset 順序遞增 從哪兒開(kāi)始消費(fèi)?-> offset 消費(fèi)者(offset)。


-? ? ?偏移量監(jiān)控工具介紹? ? -?


  1. web頁(yè)面管理的一個(gè)管理軟件(kafka Manager) 修改bin/kafka-run-class.sh腳本,第一行增加JMX_PORT=9988 重啟kafka進(jìn)程。
  2. 另一個(gè)軟件:主要監(jiān)控的consumer的偏移量。就是一個(gè)jar包 java -cp KafkaOffsetMonitor-assembly-0.3.0-SNAPSHOT.jar com.quantifind.kafka.offsetapp.OffsetGetterWeb –offsetStorage kafka \(根據(jù)版本:偏移量存在kafka就填kafka,存在zookeeper就填zookeeper) –zk hadoop1:2181 –port 9004 –refresh 15.seconds –retain 2.days。



-? ? ?消費(fèi)異常感知? ? -?


heartbeat.interval.ms:consumer心跳時(shí)間間隔,必須得與coordinator保持心跳才能知道consumer是否故障了, 然后如果故障之后,就會(huì)通過(guò)心跳下發(fā)rebalance的指令給其他的consumer通知他們進(jìn)行rebalance的操作 session.timeout.ms:kafka多長(zhǎng)時(shí)間感知不到一個(gè)consumer就認(rèn)為他故障了,默認(rèn)是10秒 max.poll.interval.ms:如果在兩次poll操作之間,超過(guò)了這個(gè)時(shí)間,那么就會(huì)認(rèn)為這個(gè)consume處理能力太弱了,會(huì)被踢出消費(fèi)組,分區(qū)分配給別人去消費(fèi),一般來(lái)說(shuō)結(jié)合業(yè)務(wù)處理的性能來(lái)設(shè)置就可以了。

-? ? ?核心參數(shù)解釋? ? -?


fetch.max.bytes:獲取一條消息最大的字節(jié)數(shù),一般建議設(shè)置大一些,默認(rèn)是1M 其實(shí)我們?cè)谥岸鄠€(gè)地方都見(jiàn)到過(guò)這個(gè)類(lèi)似的參數(shù),意思就是說(shuō)一條信息最大能多大?
  1. Producer 發(fā)送的數(shù)據(jù),一條消息最大多大, -> 10M。
  2. Broker 存儲(chǔ)數(shù)據(jù),一條消息最大能接受多大 -> 10M。
  3. Consumer max.poll.records: 一次poll返回消息的最大條數(shù),默認(rèn)是500條 connection.max.idle.ms:consumer跟broker的socket連接如果空閑超過(guò)了一定的時(shí)間,此時(shí)就會(huì)自動(dòng)回收連接,但是下次消費(fèi)就要重新建立socket連接,這個(gè)建議設(shè)置為-1,不要去回收 enable.auto.commit: 開(kāi)啟自動(dòng)提交偏移量 auto.commit.interval.ms: 每隔多久提交一次偏移量,默認(rèn)值5000毫秒 _consumer_offset auto.offset.reset:earliest 當(dāng)各分區(qū)下有已提交的offset時(shí),從提交的offset開(kāi)始消費(fèi);無(wú)提交的offset時(shí),從頭開(kāi)始消費(fèi) topica -> partition0:1000 partitino1:2000 latest 當(dāng)各分區(qū)下有已提交的offset時(shí),從提交的offset開(kāi)始消費(fèi);無(wú)提交的offset時(shí),消費(fèi)新產(chǎn)生的該分區(qū)下的數(shù)據(jù) none topic各分區(qū)都存在已提交的offset時(shí),從offset后開(kāi)始消費(fèi);只要有一個(gè)分區(qū)不存在已提交的offset,則拋出異常。



-? ? ?綜合案例演示? ? -?


引入案例:二手電商平臺(tái)(歡樂(lè)送),根據(jù)用戶(hù)消費(fèi)的金額,對(duì)用戶(hù)星星進(jìn)行累計(jì)。訂單系統(tǒng)(生產(chǎn)者) -> Kafka集群里面發(fā)送了消息。會(huì)員系統(tǒng)(消費(fèi)者) -> Kafak集群里面消費(fèi)消息,對(duì)消息進(jìn)行處理。

group coordinator原理


面試題:消費(fèi)者是如何實(shí)現(xiàn)rebalance的?— 根據(jù)coordinator實(shí)現(xiàn):
  1. 什么是coordinator 每個(gè)consumer group都會(huì)選擇一個(gè)broker作為自己的coordinator,他是負(fù)責(zé)監(jiān)控這個(gè)消費(fèi)組里的各個(gè)消費(fèi)者的心跳,以及判斷是否宕機(jī),然后開(kāi)啟rebalance的。
  2. 如何選擇coordinator機(jī)器 首先對(duì)groupId進(jìn)行hash(數(shù)字),接著對(duì)__consumer_offsets的分區(qū)數(shù)量取模,默認(rèn)是50,_consumer_offsets的分區(qū)數(shù)可以通過(guò)offsets.topic.num.partitions來(lái)設(shè)置,找到分區(qū)以后,這個(gè)分區(qū)所在的broker機(jī)器就是coordinator機(jī)器。比如說(shuō):groupId,“myconsumer_group” -> hash值(數(shù)字)-> 對(duì)50取模 -> 8 __consumer_offsets 這個(gè)主題的8號(hào)分區(qū)在哪臺(tái)broker上面,那一臺(tái)就是coordinator 就知道這個(gè)consumer group下的所有的消費(fèi)者提交offset的時(shí)候是往哪個(gè)分區(qū)去提交offset。
  3. 運(yùn)行流程(1)每個(gè)consumer都發(fā)送JoinGroup請(qǐng)求到Coordinator,( 2)然后Coordinator從一個(gè)consumer group中選擇一個(gè)consumer作為leader,(3)把consumer group情況發(fā)送給這個(gè)leader,(4)接著這個(gè)leader會(huì)負(fù)責(zé)制定消費(fèi)方案,(5)通過(guò)SyncGroup發(fā)給Coordinator 6)接著Coordinator就把消費(fèi)方案下發(fā)給各個(gè)consumer,他們會(huì)從指定的分區(qū)的 leader broker開(kāi)始進(jìn)行socket連接以及消費(fèi)消息。

2?萬(wàn)字長(zhǎng)文深入詳解?Kafka,從源碼到架構(gòu)全部講透



-? ? ?Rebalance 策略? ? -?


consumer group靠coordinator實(shí)現(xiàn)了Rebalance。
這里有三種rebalance的策略:range、round-robin、sticky。
比如我們消費(fèi)的一個(gè)主題有12個(gè)分區(qū):p0,p1,p2,p3,p4,p5,p6,p7,p8,p9,p10,p11 假設(shè)我們的消費(fèi)者組里面有三個(gè)消費(fèi)者:
  1. range策略 range策略就是按照partiton的序號(hào)范圍 p0~3 consumer1 p4~7 consumer2 p8~11 consumer3 默認(rèn)就是這個(gè)策略;
  2. round-robin策略 就是輪詢(xún)分配 consumer1:0,3,6,9 consumer2:1,4,7,10 consumer3:2,5,8,11 但是前面的這兩個(gè)方案有個(gè)問(wèn)題:12 -> 2 每個(gè)消費(fèi)者會(huì)消費(fèi)6個(gè)分區(qū)。假設(shè)consuemr1掛了:p0-5分配給consumer2,p6-11分配給consumer3 這樣的話(huà),原本在consumer2上的的p6,p7分區(qū)就被分配到了 consumer3上。
  3. sticky策略 最新的一個(gè)sticky策略,就是說(shuō)盡可能保證在rebalance的時(shí)候,讓原本屬于這個(gè)consumer 的分區(qū)還是屬于他們,然后把多余的分區(qū)再均勻分配過(guò)去,這樣盡可能維持原來(lái)的分區(qū)分配的策略。

consumer1:0-3 consumer2: 4-7 consumer3: 8-11 假設(shè)consumer3掛了 consumer1:0-3, 8,9 consumer2: 4-7, 10,11。

-? ? ?Broker 管理? ? -?


Leo、hw含義:
  1. Kafka的核心原理
  2. 如何去評(píng)估一個(gè)集群資源
  3. 搭建了一套kafka集群 -》 介紹了簡(jiǎn)單的一些運(yùn)維管理的操作。
  4. 生產(chǎn)者(使用,核心的參數(shù))
  5. 消費(fèi)者(原理,使用的,核心參數(shù))
  6. broker內(nèi)部的一些原理

核心的概念:LEO,HW LEO:是跟offset偏移量有關(guān)系。
LEO:在kafka里面,無(wú)論leader partition還是follower partition統(tǒng)一都稱(chēng)作副本(replica)。
每次partition接收到一條消息,都會(huì)更新自己的LEO,也就是log end offset,LEO其實(shí)就是最新的offset 1

HW:高水位 LEO有一個(gè)很重要的功能就是更新HW,如果follower和leader的LEO同步了,此時(shí)HW就可以更新 HW之前的數(shù)據(jù)對(duì)消費(fèi)者是可見(jiàn),消息屬于commit狀態(tài)。HW之后的消息消費(fèi)者消費(fèi)不到。

Leo更新:


2?萬(wàn)字長(zhǎng)文深入詳解?Kafka,從源碼到架構(gòu)全部講透


hw更新:


2?萬(wàn)字長(zhǎng)文深入詳解?Kafka,從源碼到架構(gòu)全部講透


controller如何管理整個(gè)集群

1: 競(jìng)爭(zhēng)controller的 /controller/id 2:controller服務(wù)監(jiān)聽(tīng)的目錄:/broker/ids/ 用來(lái)感知 broker上下線(xiàn) /broker/topics/ 創(chuàng)建主題,我們當(dāng)時(shí)創(chuàng)建主題命令,提供的參數(shù),ZK地址。/admin/reassign_partitions 分區(qū)重分配……
2?萬(wàn)字長(zhǎng)文深入詳解?Kafka,從源碼到架構(gòu)全部講透


延時(shí)任務(wù)

kafka的延遲調(diào)度機(jī)制(擴(kuò)展知識(shí)) 我們先看一下kafka里面哪些地方需要有任務(wù)要進(jìn)行延遲調(diào)度。第一類(lèi)延時(shí)的任務(wù):比如說(shuō)producer的acks=-1,必須等待leader和follower都寫(xiě)完才能返回響應(yīng)。
有一個(gè)超時(shí)時(shí)間,默認(rèn)是30秒(request.timeout.ms)。所以需要在寫(xiě)入一條數(shù)據(jù)到leader磁盤(pán)之后,就必須有一個(gè)延時(shí)任務(wù),到期時(shí)間是30秒延時(shí)任務(wù) 放到DelayedOperationPurgatory(延時(shí)管理器)中。
假如在30秒之前如果所有follower都寫(xiě)入副本到本地磁盤(pán)了,那么這個(gè)任務(wù)就會(huì)被自動(dòng)觸發(fā)蘇醒,就可以返回響應(yīng)結(jié)果給客戶(hù)端了, 否則的話(huà),這個(gè)延時(shí)任務(wù)自己指定了最多是30秒到期,如果到了超時(shí)時(shí)間都沒(méi)等到,就直接超時(shí)返回異常。
第二類(lèi)延時(shí)的任務(wù):follower往leader拉取消息的時(shí)候,如果發(fā)現(xiàn)是空的,此時(shí)會(huì)創(chuàng)建一個(gè)延時(shí)拉取任務(wù) 延時(shí)時(shí)間到了之后(比如到了100ms),就給follower返回一個(gè)空的數(shù)據(jù),然后follower再次發(fā)送請(qǐng)求讀取消息, 但是如果延時(shí)的過(guò)程中(還沒(méi)到100ms),leader寫(xiě)入了消息,這個(gè)任務(wù)就會(huì)自動(dòng)蘇醒,自動(dòng)執(zhí)行拉取任務(wù)。
海量的延時(shí)任務(wù),需要去調(diào)度。

-? ? ?時(shí)間輪機(jī)制? ? -?


  1. 什么會(huì)有要設(shè)計(jì)時(shí)間輪?Kafka內(nèi)部有很多延時(shí)任務(wù),沒(méi)有基于JDK Timer來(lái)實(shí)現(xiàn),那個(gè)插入和刪除任務(wù)的時(shí)間復(fù)雜度是O(nlogn), 而是基于了自己寫(xiě)的時(shí)間輪來(lái)實(shí)現(xiàn)的,時(shí)間復(fù)雜度是O(1),依靠時(shí)間輪機(jī)制,延時(shí)任務(wù)插入和刪除,O(1)。
  2. 時(shí)間輪是什么?其實(shí)時(shí)間輪說(shuō)白其實(shí)就是一個(gè)數(shù)組。tickMs:時(shí)間輪間隔 1ms wheelSize:時(shí)間輪大小 20 interval:timckMS * whellSize,一個(gè)時(shí)間輪的總的時(shí)間跨度。20ms currentTime:當(dāng)時(shí)時(shí)間的指針。a:因?yàn)闀r(shí)間輪是一個(gè)數(shù)組,所以要獲取里面數(shù)據(jù)的時(shí)候,靠的是index,時(shí)間復(fù)雜度是O(1) b:數(shù)組某個(gè)位置上對(duì)應(yīng)的任務(wù),用的是雙向鏈表存儲(chǔ)的,往雙向鏈表里面插入,刪除任務(wù),時(shí)間復(fù)雜度也是O(1) 舉例:插入一個(gè)8ms以后要執(zhí)行的任務(wù) 19ms 3.多層級(jí)的時(shí)間輪 比如:要插入一個(gè)110毫秒以后運(yùn)行的任務(wù)。tickMs:時(shí)間輪間隔 20ms wheelSize:時(shí)間輪大小 20 interval:timckMS * whellSize,一個(gè)時(shí)間輪的總的時(shí)間跨度。20ms currentTime:當(dāng)時(shí)時(shí)間的指針。第一層時(shí)間輪:1ms * 20 第二層時(shí)間輪:20ms * 20 第三層時(shí)間輪:400ms * 20。

2?萬(wàn)字長(zhǎng)文深入詳解?Kafka,從源碼到架構(gòu)全部講透

:erainm

https://blog.csdn.net/eraining/article/details/115860664

本站聲明: 本文章由作者或相關(guān)機(jī)構(gòu)授權(quán)發(fā)布,目的在于傳遞更多信息,并不代表本站贊同其觀(guān)點(diǎn),本站亦不保證或承諾內(nèi)容真實(shí)性等。需要轉(zhuǎn)載請(qǐng)聯(lián)系該專(zhuān)欄作者,如若文章內(nèi)容侵犯您的權(quán)益,請(qǐng)及時(shí)聯(lián)系本站刪除。
關(guān)閉
關(guān)閉