Java?8?Stream原理解析
時(shí)間:2021-08-19 15:31:02
手機(jī)看文章
掃描二維碼
隨時(shí)隨地手機(jī)看文章
[導(dǎo)讀]說起Java8,我們知道Java8大改動(dòng)之一就是增加函數(shù)式編程,而StreamAPI便是函數(shù)編程的主角,StreamAPI是一種流式的處理數(shù)據(jù)風(fēng)格,也就是將要處理的數(shù)據(jù)當(dāng)作流,在管道中進(jìn)行傳輸,并在管道中的每個(gè)節(jié)點(diǎn)對數(shù)據(jù)進(jìn)行處理,如過濾、排序、轉(zhuǎn)換等。首先我們先看一個(gè)使用Stre...
說起 Java 8,我們知道 Java 8 大改動(dòng)之一就是增加函數(shù)式編程,而 Stream API 便是函數(shù)編程的主角,Stream API 是一種流式的處理數(shù)據(jù)風(fēng)格,也就是將要處理的數(shù)據(jù)當(dāng)作流,在管道中進(jìn)行傳輸,并在管道中的每個(gè)節(jié)點(diǎn)對數(shù)據(jù)進(jìn)行處理,如過濾、排序、轉(zhuǎn)換等。
首先我們先看一個(gè)使用Stream API的示例,具體代碼如下:
code1 Stream example
這是個(gè)很簡單的一個(gè)Stream使用例子,我們過濾掉空字符串后,轉(zhuǎn)成int類型并計(jì)算出最大值,這其中包括了三個(gè)操作:filter、mapToInt、sum。相信大多數(shù)人再剛使用Stream API的時(shí)候都會(huì)有個(gè)疑問,Stream是指怎么實(shí)現(xiàn)的,是每一次函數(shù)調(diào)用就執(zhí)行一次迭代嗎?答案肯定是否,因?yàn)槿绻娴氖敲恳淮魏瘮?shù)調(diào)用就執(zhí)行一次迭代,這個(gè)效率是很難接受的,Stream也不會(huì)那么受歡迎。
其實(shí)Stream內(nèi)部是通過流水線(Pipeline)的方式來實(shí)現(xiàn)的,基本思想是在迭代的時(shí)候順著流水線盡可能的執(zhí)行更多的操作,從而避免多次迭代。為了對Stream的操作有更清晰的認(rèn)識,我們匯總了Stream的所有操作。

從上表可以看出Stream將所有操作分為兩類:中間操作和終止操作。其中中間操作分為無狀態(tài)和有狀態(tài),終止操作分為非短路操作和短路操作,下面是針對這幾個(gè)操作的含義說明:
1、中間操作:中間操作只是一種標(biāo)記,只有結(jié)束操作才會(huì)觸發(fā)實(shí)際計(jì)算
2、終止操作:顧名思義,就是得出最后計(jì)算結(jié)果的操作
Stream流水線解決方案
?通過上面的介紹,我們了解到Stream在執(zhí)行中間操作時(shí)僅僅是記錄,當(dāng)用戶調(diào)用終止操作時(shí),會(huì)在一個(gè)迭代里將已經(jīng)記錄的操作順著流水線全部執(zhí)行掉。沿著這個(gè)思路,有幾個(gè)問題需要解決:
1、操作如何記錄?
圖1-1
關(guān)于操作如何記錄,在JDK源碼注釋中多次用(操作)stage來標(biāo)識用戶的每一次操作,而通常情況下Stream的操作又需要一個(gè)回調(diào)函數(shù),所以一個(gè)完整的操作是由數(shù)據(jù)來源、操作、回調(diào)函數(shù)組成的三元組來表示。而在具體實(shí)現(xiàn)中,使用實(shí)例化的ReferencePipeline來表示,即圖1-1中的Head、StatelessOp、StatefulOp的實(shí)例。接下來我們來看下Stream幾個(gè)常用方法的源碼。
?code2 Collection.Stream()
code3?StreamSupport.stream()
code4 ReferencePipeline.map()
從上面源碼中可以看出來,我們調(diào)用stream()方法時(shí)最終會(huì)創(chuàng)建一個(gè)Head實(shí)例來表示流操作的頭,當(dāng)調(diào)用map()方法時(shí)則會(huì)創(chuàng)建無狀態(tài)的中間操作實(shí)例StatelessOp,同樣調(diào)用其他操作對應(yīng)的方法也會(huì)生成一個(gè)ReferencePipeline實(shí)例,在這里就不一一列舉。在用戶調(diào)用一系列操作后,最終會(huì)形成一個(gè)雙向鏈表,如下圖所示:
圖1-2
2、操作如何疊加
上面我們說明了Stream是通過stage記錄操作,但stage只保存當(dāng)前操作,它并不知道下個(gè)stage如何操作,需要什么操作。所以要執(zhí)行的話還需要某種協(xié)議將各個(gè)stage關(guān)聯(lián)起來。jdk中就是使用Slink接口來實(shí)現(xiàn)的,Slink接口定義begin()、end()、cancellationRequested()、accept()四個(gè)方法,如下表所示。

往回看code3 ReferencePipeline.map()的方法,我們會(huì)發(fā)現(xiàn)我們在創(chuàng)建一個(gè)ReferencePipeline實(shí)例的時(shí)候,需要重寫opWrapSink方法來生成對應(yīng)Sink實(shí)例。而且通過閱讀源碼會(huì)發(fā)現(xiàn)常用的操作都會(huì)創(chuàng)建一個(gè)ChainedReference實(shí)例。我們可以看下code5 ChainedReference抽象類的源碼實(shí)現(xiàn),因?yàn)镃hainedReference只是個(gè)抽象實(shí)現(xiàn),不攜帶具體操作的特性,所以是更能體現(xiàn)作者的設(shè)計(jì)理念。
通過查看源碼可以發(fā)現(xiàn)ChainedReference會(huì)持有下一個(gè)操作的Slink,并在調(diào)用begin、end、cancellationRequested方法會(huì)調(diào)用下一個(gè)操作的Slink的相應(yīng)方法,以此來達(dá)到疊加的效果。
code5?ChainedReference
3、疊加之后的操作如何執(zhí)行
Sink完美封裝了Stream每一步操作,并給出了[處理->轉(zhuǎn)發(fā)]的模式來疊加操作。這一連串的齒輪已經(jīng)咬合,就差最后一步撥動(dòng)齒輪啟動(dòng)執(zhí)行。是什么啟動(dòng)這一連串的操作呢?也許你已經(jīng)想到了啟動(dòng)的原始動(dòng)力就是結(jié)束操作(Terminal Operation),一旦調(diào)用某個(gè)結(jié)束操作,就會(huì)觸發(fā)整個(gè)流水線的執(zhí)行。
結(jié)束操作之后不能再有別的操作,所以結(jié)束操作不會(huì)創(chuàng)建新的流水線階段(Stage),直觀的說就是流水線的鏈表不會(huì)在往后延伸了。結(jié)束操作會(huì)創(chuàng)建一個(gè)包裝了自己操作的Sink,這也是流水線中最后一個(gè)Sink,這個(gè)Sink只需要處理數(shù)據(jù)而不需要將結(jié)果傳遞給下游的Sink(因?yàn)闆]有下游)。對于Sink的[處理->轉(zhuǎn)發(fā)]模型,結(jié)束操作的Sink就是調(diào)用鏈的出口。
我們再來考察一下上游的Sink是如何找到下游Sink的。一種可選的方案是在PipelineHelper中設(shè)置一個(gè)Sink字段,在流水線中找到下游Stage并訪問Sink字段即可。但Stream類庫的設(shè)計(jì)者沒有這么做,而是設(shè)置了一個(gè)Sink AbstractPipeline.opWrapSink(int flags, Sink downstream)方法來得到Sink,該方法的作用是返回一個(gè)新的包含了當(dāng)前Stage代表的操作以及能夠?qū)⒔Y(jié)果傳遞給downstream的Sink對象。為什么要產(chǎn)生一個(gè)新對象而不是返回一個(gè)Sink字段?這是因?yàn)槭褂胦pWrapSink()可以將當(dāng)前操作與下游Sink(上文中的downstream參數(shù))結(jié)合成新Sink。試想只要從流水線的最后一個(gè)Stage開始,不斷調(diào)用上一個(gè)Stage的opWrapSink()方法直到最開始(不包括stage0,因?yàn)閟tage0代表數(shù)據(jù)源,不包含操作),就可以得到一個(gè)代表了流水線上所有操作的Sink,用代碼表示就是這樣:
code6?AbstractPipeline.wrapSink
現(xiàn)在流水線上從開始到結(jié)束的所有的操作都被包裝到了一個(gè)Sink里,執(zhí)行這個(gè)Sink就相當(dāng)于執(zhí)行整個(gè)流水線,執(zhí)行Sink的代碼如下:
code7?AbstractPipeline.copyInto
上述代碼首先調(diào)用wrappedSink.begin()方法告訴Sink數(shù)據(jù)即將到來,然后調(diào)用spliterator.forEachRemaining()方法對數(shù)據(jù)進(jìn)行迭代,最后調(diào)用wrappedSink.end()方法通知Sink數(shù)據(jù)處理結(jié)束。邏輯如此清晰。
首先我們先看一個(gè)使用Stream API的示例,具體代碼如下:

這是個(gè)很簡單的一個(gè)Stream使用例子,我們過濾掉空字符串后,轉(zhuǎn)成int類型并計(jì)算出最大值,這其中包括了三個(gè)操作:filter、mapToInt、sum。相信大多數(shù)人再剛使用Stream API的時(shí)候都會(huì)有個(gè)疑問,Stream是指怎么實(shí)現(xiàn)的,是每一次函數(shù)調(diào)用就執(zhí)行一次迭代嗎?答案肯定是否,因?yàn)槿绻娴氖敲恳淮魏瘮?shù)調(diào)用就執(zhí)行一次迭代,這個(gè)效率是很難接受的,Stream也不會(huì)那么受歡迎。
其實(shí)Stream內(nèi)部是通過流水線(Pipeline)的方式來實(shí)現(xiàn)的,基本思想是在迭代的時(shí)候順著流水線盡可能的執(zhí)行更多的操作,從而避免多次迭代。為了對Stream的操作有更清晰的認(rèn)識,我們匯總了Stream的所有操作。

從上表可以看出Stream將所有操作分為兩類:中間操作和終止操作。其中中間操作分為無狀態(tài)和有狀態(tài),終止操作分為非短路操作和短路操作,下面是針對這幾個(gè)操作的含義說明:
1、中間操作:中間操作只是一種標(biāo)記,只有結(jié)束操作才會(huì)觸發(fā)實(shí)際計(jì)算
- 無狀態(tài):指元素的處理不受前面元素的影響;
- 有狀態(tài):有狀態(tài)的中間操作必須等到所有元素處理之后才知道最終結(jié)果,比如排序是有狀態(tài)操作,在讀取所有元素之前并不能確定排序結(jié)果。
2、終止操作:顧名思義,就是得出最后計(jì)算結(jié)果的操作
- 短路操作:指不用處理全部元素就可以返回結(jié)果;
- 非短路操作:指必須處理所有元素才能得到最終結(jié)果。
Stream流水線解決方案
?通過上面的介紹,我們了解到Stream在執(zhí)行中間操作時(shí)僅僅是記錄,當(dāng)用戶調(diào)用終止操作時(shí),會(huì)在一個(gè)迭代里將已經(jīng)記錄的操作順著流水線全部執(zhí)行掉。沿著這個(gè)思路,有幾個(gè)問題需要解決:
- 用戶的操作如何記錄?
- 操作如何疊加?
- 疊加之后的操作如何執(zhí)行?
1、操作如何記錄?

關(guān)于操作如何記錄,在JDK源碼注釋中多次用(操作)stage來標(biāo)識用戶的每一次操作,而通常情況下Stream的操作又需要一個(gè)回調(diào)函數(shù),所以一個(gè)完整的操作是由數(shù)據(jù)來源、操作、回調(diào)函數(shù)組成的三元組來表示。而在具體實(shí)現(xiàn)中,使用實(shí)例化的ReferencePipeline來表示,即圖1-1中的Head、StatelessOp、StatefulOp的實(shí)例。接下來我們來看下Stream幾個(gè)常用方法的源碼。



從上面源碼中可以看出來,我們調(diào)用stream()方法時(shí)最終會(huì)創(chuàng)建一個(gè)Head實(shí)例來表示流操作的頭,當(dāng)調(diào)用map()方法時(shí)則會(huì)創(chuàng)建無狀態(tài)的中間操作實(shí)例StatelessOp,同樣調(diào)用其他操作對應(yīng)的方法也會(huì)生成一個(gè)ReferencePipeline實(shí)例,在這里就不一一列舉。在用戶調(diào)用一系列操作后,最終會(huì)形成一個(gè)雙向鏈表,如下圖所示:

2、操作如何疊加
上面我們說明了Stream是通過stage記錄操作,但stage只保存當(dāng)前操作,它并不知道下個(gè)stage如何操作,需要什么操作。所以要執(zhí)行的話還需要某種協(xié)議將各個(gè)stage關(guān)聯(lián)起來。jdk中就是使用Slink接口來實(shí)現(xiàn)的,Slink接口定義begin()、end()、cancellationRequested()、accept()四個(gè)方法,如下表所示。

往回看code3 ReferencePipeline.map()的方法,我們會(huì)發(fā)現(xiàn)我們在創(chuàng)建一個(gè)ReferencePipeline實(shí)例的時(shí)候,需要重寫opWrapSink方法來生成對應(yīng)Sink實(shí)例。而且通過閱讀源碼會(huì)發(fā)現(xiàn)常用的操作都會(huì)創(chuàng)建一個(gè)ChainedReference實(shí)例。我們可以看下code5 ChainedReference抽象類的源碼實(shí)現(xiàn),因?yàn)镃hainedReference只是個(gè)抽象實(shí)現(xiàn),不攜帶具體操作的特性,所以是更能體現(xiàn)作者的設(shè)計(jì)理念。
通過查看源碼可以發(fā)現(xiàn)ChainedReference會(huì)持有下一個(gè)操作的Slink,并在調(diào)用begin、end、cancellationRequested方法會(huì)調(diào)用下一個(gè)操作的Slink的相應(yīng)方法,以此來達(dá)到疊加的效果。

3、疊加之后的操作如何執(zhí)行
Sink完美封裝了Stream每一步操作,并給出了[處理->轉(zhuǎn)發(fā)]的模式來疊加操作。這一連串的齒輪已經(jīng)咬合,就差最后一步撥動(dòng)齒輪啟動(dòng)執(zhí)行。是什么啟動(dòng)這一連串的操作呢?也許你已經(jīng)想到了啟動(dòng)的原始動(dòng)力就是結(jié)束操作(Terminal Operation),一旦調(diào)用某個(gè)結(jié)束操作,就會(huì)觸發(fā)整個(gè)流水線的執(zhí)行。
結(jié)束操作之后不能再有別的操作,所以結(jié)束操作不會(huì)創(chuàng)建新的流水線階段(Stage),直觀的說就是流水線的鏈表不會(huì)在往后延伸了。結(jié)束操作會(huì)創(chuàng)建一個(gè)包裝了自己操作的Sink,這也是流水線中最后一個(gè)Sink,這個(gè)Sink只需要處理數(shù)據(jù)而不需要將結(jié)果傳遞給下游的Sink(因?yàn)闆]有下游)。對于Sink的[處理->轉(zhuǎn)發(fā)]模型,結(jié)束操作的Sink就是調(diào)用鏈的出口。
我們再來考察一下上游的Sink是如何找到下游Sink的。一種可選的方案是在PipelineHelper中設(shè)置一個(gè)Sink字段,在流水線中找到下游Stage并訪問Sink字段即可。但Stream類庫的設(shè)計(jì)者沒有這么做,而是設(shè)置了一個(gè)Sink AbstractPipeline.opWrapSink(int flags, Sink downstream)方法來得到Sink,該方法的作用是返回一個(gè)新的包含了當(dāng)前Stage代表的操作以及能夠?qū)⒔Y(jié)果傳遞給downstream的Sink對象。為什么要產(chǎn)生一個(gè)新對象而不是返回一個(gè)Sink字段?這是因?yàn)槭褂胦pWrapSink()可以將當(dāng)前操作與下游Sink(上文中的downstream參數(shù))結(jié)合成新Sink。試想只要從流水線的最后一個(gè)Stage開始,不斷調(diào)用上一個(gè)Stage的opWrapSink()方法直到最開始(不包括stage0,因?yàn)閟tage0代表數(shù)據(jù)源,不包含操作),就可以得到一個(gè)代表了流水線上所有操作的Sink,用代碼表示就是這樣:

現(xiàn)在流水線上從開始到結(jié)束的所有的操作都被包裝到了一個(gè)Sink里,執(zhí)行這個(gè)Sink就相當(dāng)于執(zhí)行整個(gè)流水線,執(zhí)行Sink的代碼如下:

上述代碼首先調(diào)用wrappedSink.begin()方法告訴Sink數(shù)據(jù)即將到來,然后調(diào)用spliterator.forEachRemaining()方法對數(shù)據(jù)進(jìn)行迭代,最后調(diào)用wrappedSink.end()方法通知Sink數(shù)據(jù)處理結(jié)束。邏輯如此清晰。