1. <strong id="7actg"></strong>
    2. <table id="7actg"></table>

    3. <address id="7actg"></address>
      <address id="7actg"></address>
      1. <object id="7actg"><tt id="7actg"></tt></object>

        Lambda表達式Stream Pipelines詳解

        共 14204字,需瀏覽 29分鐘

         ·

        2022-08-03 06:04

        你知道的越多,不知道的就越多,業(yè)余的像一棵小草!

        你來,我們一起精進!你不來,我和你的競爭對手一起精進!

        編輯:業(yè)余草

        blog.csdn.net/listeningsea

        推薦:https://www.xttblog.com/?p=5353

        Java 的 Stream API 用起來真的很爽,但簡潔的方法下面似乎隱藏著無盡的秘密,如此強大的API是如何實現(xiàn)的呢?比如Pipeline是怎么執(zhí)行的,每次方法調(diào)用都會導(dǎo)致一次迭代嗎?自動并行又是怎么做到的,線程個數(shù)是多少?本節(jié)我們學(xué)習(xí)Stream流水線的原理,這是Stream實現(xiàn)的關(guān)鍵所在。

        首先回顧一下容器執(zhí)行Lambda表達式的方式,以ArrayList.forEach()方法為例,具體代碼如下:

        // ArrayList.forEach()
        public void forEach(Consumer<? super E> action) {
            ...
            for (int i=0; modCount == expectedModCount && i < size; i++) {
                action.accept(elementData[i]);// 回調(diào)方法
            }
            ...
        }

        我們看到ArrayList.forEach()方法的主要邏輯就是一個for循環(huán),在該for循環(huán)里不斷調(diào)用action.accept()回調(diào)方法完成對元素的遍歷。這完全沒有什么新奇之處,回調(diào)方法在Java GUI的監(jiān)聽器中廣泛使用。Lambda表達式的作用就是相當(dāng)于一個回調(diào)方法,這很好理解。

        Stream API中大量使用Lambda表達式作為回調(diào)方法,但這并不是關(guān)鍵。理解Stream我們更關(guān)心的是另外兩個問題:流水線和自動并行。使用Stream或許很容易寫入如下形式的代碼

        int longestStringLengthStartingWithA
                = strings.stream()
                      .filter(s -> s.startsWith("A"))
                      .mapToInt(String::length)
                      .max();

        上述代碼求出以字母A開頭的字符串的最大長度,一種直白的方式是為每一次函數(shù)調(diào)用都執(zhí)一次迭代,這樣做能夠?qū)崿F(xiàn)功能,但效率上肯定是無法接受的。類庫的實現(xiàn)著使用流水線(Pipeline)的方式巧妙的避免了多次迭代,其基本思想是在一次迭代中盡可能多的執(zhí)行用戶指定的操作。為講解方便我們匯總了Stream的所有操作。

        Stream操作分類
        中間操作(Intermediate operations)無狀態(tài)(Stateless)unordered() filter() map() mapToInt() mapToLong() mapToDouble() flatMap() flatMapToInt() flatMapToLong() flatMapToDouble() peek()
        有狀態(tài)(Stateful)distinct() sorted() sorted() limit() skip()
        結(jié)束操作(Terminal operations)非短路操作forEach() forEachOrdered() toArray() reduce() collect() max() min() count()
        短路操作(short-circuiting)anyMatch() allMatch() noneMatch() findFirst() findAny()

        Stream上的所有操作分為兩類:中間操作和結(jié)束操作,中間操作只是一種標(biāo)記,只有結(jié)束操作才會觸發(fā)實際計算。中間操作又可以分為無狀態(tài)的(Stateless)和有狀態(tài)的(Stateful),無狀態(tài)中間操作是指元素的處理不受前面元素的影響,而有狀態(tài)的中間操作必須等到所有元素處理之后才知道最終結(jié)果,比如排序是有狀態(tài)操作,在讀取所有元素之前并不能確定排序結(jié)果;結(jié)束操作又可以分為短路操作和非短路操作,短路操作是指不用處理全部元素就可以返回結(jié)果,比如找到第一個滿足條件的元素。之所以要進行如此精細(xì)的劃分,是因為底層對每一種情況的處理方式不同。

        一種直白的實現(xiàn)方式

        仍然考慮上述求最長字符串的程序,一種直白的流水線實現(xiàn)方式是為每一次函數(shù)調(diào)用都執(zhí)一次迭代,并將處理中間結(jié)果放到某種數(shù)據(jù)結(jié)構(gòu)中(比如數(shù)組,容器等)。具體說來,就是調(diào)用filter()方法后立即執(zhí)行,選出所有以A開頭的字符串并放到一個列表list1中,之后讓list1傳遞給mapToInt()方法并立即執(zhí)行,生成的結(jié)果放到list2中,最后遍歷list2找出最大的數(shù)字作為最終結(jié)果。程序的執(zhí)行流程如如所示:

        這樣做實現(xiàn)起來非常簡單直觀,但有兩個明顯的弊端:

        1. 迭代次數(shù)多。迭代次數(shù)跟函數(shù)調(diào)用的次數(shù)相等。

        2. 頻繁產(chǎn)生中間結(jié)果。每次函數(shù)調(diào)用都產(chǎn)生一次中間結(jié)果,存儲開銷無法接受。

        這些弊端使得效率底下,根本無法接受。如果不使用Stream API我們都知道上述代碼該如何在一次迭代中完成,大致是如下形式:

        int longest = 0;
        for(String str : strings){
            if(str.startsWith("A")){// 1. filter(), 保留以A開頭的字符串
                int len = str.length();// 2. mapToInt(), 轉(zhuǎn)換成長度
                longest = Math.max(len, longest);// 3. max(), 保留最長的長度
            }
        }

        采用這種方式我們不但減少了迭代次數(shù),也避免了存儲中間結(jié)果,顯然這就是流水線,因為我們把三個操作放在了一次迭代當(dāng)中。只要我們事先知道用戶意圖,總是能夠采用上述方式實現(xiàn)跟Stream API等價的功能,但問題是Stream類庫的設(shè)計者并不知道用戶的意圖是什么。如何在無法假設(shè)用戶行為的前提下實現(xiàn)流水線,是類庫的設(shè)計者要考慮的問題。

        Stream流水線解決方案

        我們大致能夠想到,應(yīng)該采用某種方式記錄用戶每一步的操作,當(dāng)用戶調(diào)用結(jié)束操作時將之前記錄的操作疊加到一起在一次迭代中全部執(zhí)行掉。沿著這個思路,有幾個問題需要解決:

        1. 用戶的操作如何記錄?

        2. 操作如何疊加?

        3. 疊加之后的操作如何執(zhí)行?

        4. 執(zhí)行后的結(jié)果(如果有)在哪里?

        操作如何記錄?

        注意這里使用的是“操作(operation)”一詞,指的是“Stream中間操作”的操作,很多Stream操作會需要一個回調(diào)函數(shù)(Lambda表達式),因此一個完整的操作是<數(shù)據(jù)來源,操作,回調(diào)函數(shù)>構(gòu)成的三元組。Stream中使用Stage的概念來描述一個完整的操作,并用某種實例化后的PipelineHelper來代表Stage,將具有先后順序的各個Stage連到一起,就構(gòu)成了整個流水線。跟Stream相關(guān)類和接口的繼承關(guān)系圖示。

        還有IntPipeline, LongPipeline, DoublePipeline沒在圖中畫出,這三個類專門為三種基本類型(不是包裝類型)而定制的,跟ReferencePipeline是并列關(guān)系。圖中Head用于表示第一個Stage,即調(diào)用調(diào)用諸如Collection.stream()方法產(chǎn)生的Stage,很顯然這個Stage里不包含任何操作;StatelessOpStatefulOp分別表示無狀態(tài)和有狀態(tài)的Stage,對應(yīng)于無狀態(tài)和有狀態(tài)的中間操作。

        Stream流水線組織結(jié)構(gòu)示意圖如下:

        圖中通過Collection.stream()方法得到Head也就是stage0,緊接著調(diào)用一系列的中間操作,不斷產(chǎn)生新的Stream。「這些Stream對象以雙向鏈表的形式組織在一起,構(gòu)成整個流水線,由于每個Stage都記錄了前一個Stage和本次的操作以及回調(diào)函數(shù),依靠這種結(jié)構(gòu)就能建立起對數(shù)據(jù)源的所有操作」。這就是Stream記錄操作的方式。

        操作如何疊加?

        以上只是解決了操作記錄的問題,要想讓流水線起到應(yīng)有的作用我們需要一種將所有操作疊加到一起的方案。你可能會覺得這很簡單,只需要從流水線的head開始依次執(zhí)行每一步的操作(包括回調(diào)函數(shù))就行了。這聽起來似乎是可行的,但是你忽略了前面的Stage并不知道后面Stage到底執(zhí)行了哪種操作,以及回調(diào)函數(shù)是哪種形式。換句話說,只有當(dāng)前Stage本身才知道該如何執(zhí)行自己包含的動作。這就需要有某種協(xié)議來協(xié)調(diào)相鄰Stage之間的調(diào)用關(guān)系。

        這種協(xié)議由Sink接口完成,Sink接口包含的方法如下表所示:

        方法名作用
        void begin(long size)開始遍歷元素之前調(diào)用該方法,通知Sink做好準(zhǔn)備。
        void end()所有元素遍歷完成之后調(diào)用,通知Sink沒有更多的元素了。
        boolean cancellationRequested()是否可以結(jié)束操作,可以讓短路操作盡早結(jié)束。
        void accept(T t)遍歷元素時調(diào)用,接受一個待處理元素,并對元素進行處理。Stage把自己包含的操作和回調(diào)方法封裝到該方法里,前一個Stage只需要調(diào)用當(dāng)前Stage.accept(T t)方法就行了。

        有了上面的協(xié)議,相鄰Stage之間調(diào)用就很方便了,每個Stage都會將自己的操作封裝到一個Sink里,前一個Stage只需調(diào)用后一個Stage的accept()方法即可,并不需要知道其內(nèi)部是如何處理的。當(dāng)然對于有狀態(tài)的操作,Sink的begin()end()方法也是必須實現(xiàn)的。比如Stream.sorted()是一個有狀態(tài)的中間操作,其對應(yīng)的Sink.begin()方法可能創(chuàng)建一個乘放結(jié)果的容器,而accept()方法負(fù)責(zé)將元素添加到該容器,最后end()負(fù)責(zé)對容器進行排序。對于短路操作,Sink.cancellationRequested()也是必須實現(xiàn)的,比如Stream.findFirst()是短路操作,只要找到一個元素,cancellationRequested()就應(yīng)該返回true,以便調(diào)用者盡快結(jié)束查找。Sink的四個接口方法常常相互協(xié)作,共同完成計算任務(wù)。「實際上Stream API內(nèi)部實現(xiàn)的的本質(zhì),就是如何重載Sink的這四個接口方法」。

        有了Sink對操作的包裝,Stage之間的調(diào)用問題就解決了,執(zhí)行時只需要從流水線的head開始對數(shù)據(jù)源依次調(diào)用每個Stage對應(yīng)的Sink.{begin(), accept(), cancellationRequested(), end()}方法就可以了。一種可能的Sink.accept()方法流程是這樣的:

        void accept(U u){
            // 1. 使用當(dāng)前Sink包裝的回調(diào)函數(shù)處理u
            // 2. 將處理結(jié)果傳遞給流水線下游的Sink
        }

        Sink接口的其他幾個方法也是按照這種[處理->轉(zhuǎn)發(fā)]的模型實現(xiàn)。下面我們結(jié)合具體例子看看Stream的中間操作是如何將自身的操作包裝成Sink以及Sink是如何將處理結(jié)果轉(zhuǎn)發(fā)給下一個Sink的。先看Stream.map()方法:

        // Stream.map(),調(diào)用該方法將產(chǎn)生一個新的Stream
        public final <R> Stream<R> map(Function<? super P_OUT, ? extends R> mapper) {
            ...
            return new StatelessOp<P_OUT, R>(this, StreamShape.REFERENCE,
                                         StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
                @Override /*opWripSink()方法返回由回調(diào)函數(shù)包裝而成Sink*/
                Sink<P_OUT> opWrapSink(int flags, Sink<R> downstream) {
                    return new Sink.ChainedReference<P_OUT, R>(downstream) {
                        @Override
                        public void accept(P_OUT u) {
                            R r = mapper.apply(u);// 1. 使用當(dāng)前Sink包裝的回調(diào)函數(shù)mapper處理u
                            downstream.accept(r);// 2. 將處理結(jié)果傳遞給流水線下游的Sink
                        }
                    };
                }
            };
        }

        上述代碼看似復(fù)雜,其實邏輯很簡單,就是將回調(diào)函數(shù)mapper包裝到一個Sink當(dāng)中。由于Stream.map()是一個無狀態(tài)的中間操作,所以map()方法返回了一個StatelessOp內(nèi)部類對象(一個新的Stream),調(diào)用這個新Stream的opWripSink()方法將得到一個包裝了當(dāng)前回調(diào)函數(shù)的Sink。

        再來看一個復(fù)雜一點的例子。Stream.sorted()方法將對Stream中的元素進行排序,顯然這是一個有狀態(tài)的中間操作,因為讀取所有元素之前是沒法得到最終順序的。拋開模板代碼直接進入問題本質(zhì),sorted()方法是如何將操作封裝成Sink的呢?sorted()一種可能封裝的Sink代碼如下:

        // Stream.sort()方法用到的Sink實現(xiàn)
        class RefSortingSink<Textends AbstractRefSortingSink<T{
            private ArrayList<T> list;// 存放用于排序的元素
            RefSortingSink(Sink<? super T> downstream, Comparator<? super T> comparator) {
                super(downstream, comparator);
            }
            @Override
            public void begin(long size) {
                ...
                // 創(chuàng)建一個存放排序元素的列表
                list = (size >= 0) ? new ArrayList<T>((int) size) : new ArrayList<T>();
            }
            @Override
            public void end() {
                list.sort(comparator);// 只有元素全部接收之后才能開始排序
                downstream.begin(list.size());
                if (!cancellationWasRequested) {// 下游Sink不包含短路操作
                    list.forEach(downstream::accept);// 2. 將處理結(jié)果傳遞給流水線下游的Sink
                }
                else {// 下游Sink包含短路操作
                    for (T t : list) {// 每次都調(diào)用cancellationRequested()詢問是否可以結(jié)束處理。
                        if (downstream.cancellationRequested()) break;
                        downstream.accept(t);// 2. 將處理結(jié)果傳遞給流水線下游的Sink
                    }
                }
                downstream.end();
                list = null;
            }
            @Override
            public void accept(T t) {
                list.add(t);// 1. 使用當(dāng)前Sink包裝動作處理t,只是簡單的將元素添加到中間列表當(dāng)中
            }
        }

        上述代碼完美的展現(xiàn)了Sink的四個接口方法是如何協(xié)同工作的:

        1. 首先beging()方法告訴Sink參與排序的元素個數(shù),方便確定中間結(jié)果容器的的大小;

        2. 之后通過accept()方法將元素添加到中間結(jié)果當(dāng)中,最終執(zhí)行時調(diào)用者會不斷調(diào)用該方法,直到遍歷所有元素;

        3. 最后end()方法告訴Sink所有元素遍歷完畢,啟動排序步驟,排序完成后將結(jié)果傳遞給下游的Sink;

        4. 如果下游的Sink是短路操作,將結(jié)果傳遞給下游時不斷詢問下游cancellationRequested()是否可以結(jié)束處理。

        疊加之后的操作如何執(zhí)行?

        Sink完美封裝了Stream每一步操作,并給出了[處理->轉(zhuǎn)發(fā)]的模式來疊加操作。這一連串的齒輪已經(jīng)咬合,就差最后一步撥動齒輪啟動執(zhí)行。是什么啟動這一連串的操作呢?也許你已經(jīng)想到了啟動的原始動力就是結(jié)束操作(Terminal Operation),一旦調(diào)用某個結(jié)束操作,就會觸發(fā)整個流水線的執(zhí)行。

        結(jié)束操作之后不能再有別的操作,所以結(jié)束操作不會創(chuàng)建新的流水線階段(Stage),直觀的說就是流水線的鏈表不會在往后延伸了。結(jié)束操作會創(chuàng)建一個包裝了自己操作的Sink,這也是流水線中最后一個Sink,這個Sink只需要處理數(shù)據(jù)而不需要將結(jié)果傳遞給下游的Sink(因為沒有下游)。對于Sink的[處理->轉(zhuǎn)發(fā)]模型,結(jié)束操作的Sink就是調(diào)用鏈的出口。

        我們再來考察一下上游的Sink是如何找到下游Sink的。一種可選的方案是在PipelineHelper中設(shè)置一個Sink字段,在流水線中找到下游Stage并訪問Sink字段即可。但Stream類庫的設(shè)計者沒有這么做,而是設(shè)置了一個Sink AbstractPipeline.opWrapSink(int flags, Sink downstream)方法來得到Sink,該方法的作用是返回一個新的包含了當(dāng)前Stage代表的操作以及能夠?qū)⒔Y(jié)果傳遞給downstream的Sink對象。為什么要產(chǎn)生一個新對象而不是返回一個Sink字段?這是因為使用opWrapSink()可以將當(dāng)前操作與下游Sink(上文中的downstream參數(shù))結(jié)合成新Sink。試想只要從流水線的最后一個Stage開始,不斷調(diào)用上一個Stage的opWrapSink()方法直到最開始(不包括stage0,因為stage0代表數(shù)據(jù)源,不包含操作),就可以得到一個代表了流水線上所有操作的Sink,用代碼表示就是這樣:

        // AbstractPipeline.wrapSink()
        // 從下游向上游不斷包裝Sink。如果最初傳入的sink代表結(jié)束操作,
        // 函數(shù)返回時就可以得到一個代表了流水線上所有操作的Sink。
        final <P_IN> Sink<P_IN> wrapSink(Sink<E_OUT> sink) {
            ...
            for (AbstractPipeline p=AbstractPipeline.this; p.depth > 0; p=p.previousStage) {
                sink = p.opWrapSink(p.previousStage.combinedFlags, sink);
            }
            return (Sink<P_IN>) sink;
        }

        現(xiàn)在流水線上從開始到結(jié)束的所有的操作都被包裝到了一個Sink里,執(zhí)行這個Sink就相當(dāng)于執(zhí)行整個流水線,執(zhí)行Sink的代碼如下:

        // AbstractPipeline.copyInto(), 對spliterator代表的數(shù)據(jù)執(zhí)行wrappedSink代表的操作。
        final <P_IN> void copyInto(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) {
            ...
            if (!StreamOpFlag.SHORT_CIRCUIT.isKnown(getStreamAndOpFlags())) {
                wrappedSink.begin(spliterator.getExactSizeIfKnown());// 通知開始遍歷
                spliterator.forEachRemaining(wrappedSink);// 迭代
                wrappedSink.end();// 通知遍歷結(jié)束
            }
            ...
        }

        上述代碼首先調(diào)用wrappedSink.begin()方法告訴Sink數(shù)據(jù)即將到來,然后調(diào)用spliterator.forEachRemaining()方法對數(shù)據(jù)進行迭代(Spliterator是容器的一種迭代器,參閱),最后調(diào)用wrappedSink.end()方法通知Sink數(shù)據(jù)處理結(jié)束。邏輯如此清晰。

        執(zhí)行后的結(jié)果在哪里?

        最后一個問題是流水線上所有操作都執(zhí)行后,用戶所需要的結(jié)果(如果有)在哪里?首先要說明的是不是所有的Stream結(jié)束操作都需要返回結(jié)果,有些操作只是為了使用其副作用(Side-effects),比如使用Stream.forEach()方法將結(jié)果打印出來就是常見的使用副作用的場景(事實上,除了打印之外其他場景都應(yīng)避免使用副作用),對于真正需要返回結(jié)果的結(jié)束操作結(jié)果存在哪里呢?

        ?

        特別說明:副作用不應(yīng)該被濫用,也許你會覺得在Stream.forEach()里進行元素收集是個不錯的選擇,就像下面代碼中那樣,但遺憾的是這樣使用的正確性和效率都無法保證,因為Stream可能會并行執(zhí)行。大多數(shù)使用副作用的地方都可以使用歸約操作更安全和有效的完成。

        ?
        // 錯誤的收集方式
        ArrayList<String> results = new ArrayList<>();
        stream.filter(s -> pattern.matcher(s).matches())
              .forEach(s -> results.add(s));  // Unnecessary use of side-effects!
        // 正確的收集方式
        List<String>results =
             stream.filter(s -> pattern.matcher(s).matches())
                     .collect(Collectors.toList());  // No side-effects!

        回到流水線執(zhí)行結(jié)果的問題上來,需要返回結(jié)果的流水線結(jié)果存在哪里呢?這要分不同的情況討論,下表給出了各種有返回結(jié)果的Stream結(jié)束操作。

        返回類型對應(yīng)的結(jié)束操作
        booleananyMatch() allMatch() noneMatch()
        OptionalfindFirst() findAny()
        歸約結(jié)果reduce() collect()
        數(shù)組toArray()
        1. 對于表中返回boolean或者Optional的操作(Optional是存放 一個 值的容器)的操作,由于值返回一個值,只需要在對應(yīng)的Sink中記錄這個值,等到執(zhí)行結(jié)束時返回就可以了。

        2. 對于歸約操作,最終結(jié)果放在用戶調(diào)用時指定的容器中(容器類型通過收集器指定)。collect(), reduce(), max(), min()都是歸約操作,雖然max()和min()也是返回一個Optional,但事實上底層是通過調(diào)用reduce()方法實現(xiàn)的。

        3. 對于返回是數(shù)組的情況,毫無疑問的結(jié)果會放在數(shù)組當(dāng)中。這么說當(dāng)然是對的,但在最終返回數(shù)組之前,結(jié)果其實是存儲在一種叫做Node的數(shù)據(jù)結(jié)構(gòu)中的。Node是一種多叉樹結(jié)構(gòu),元素存儲在樹的葉子當(dāng)中,并且一個葉子節(jié)點可以存放多個元素。這樣做是為了并行執(zhí)行方便。關(guān)于Node的具體結(jié)構(gòu),我們會在下一節(jié)探究Stream如何并行執(zhí)行時給出詳細(xì)說明。

        本文詳細(xì)介紹了Stream流水線的組織方式和執(zhí)行過程,學(xué)習(xí)本文將有助于理解原理并寫出正確的Stream代碼,同時打消你對Stream API效率方面的顧慮。如你所見,Stream API實現(xiàn)如此巧妙,即使我們使用外部迭代手動編寫等價代碼,也未必更加高效。

        瀏覽 70
        點贊
        評論
        收藏
        分享

        手機掃一掃分享

        分享
        舉報
        評論
        圖片
        表情
        推薦
        點贊
        評論
        收藏
        分享

        手機掃一掃分享

        分享
        舉報
        1. <strong id="7actg"></strong>
        2. <table id="7actg"></table>

        3. <address id="7actg"></address>
          <address id="7actg"></address>
          1. <object id="7actg"><tt id="7actg"></tt></object>
            五月婷婷啪 | 美女被艹的网站 | 天天操天天干天天爱 | 日韩A级毛片免费视频 | 淫荡留学生激情 | 好逼天天操 | 午夜伊人| 国产高潮高清生活片 | 国产不卡一二三区 | 亚洲日逼|