由淺入深體驗 Stream 流
由淺入深體驗 Stream 流
Stream 流的分類、接口、相關(guān) API 操作以及并行流的使用
????Stream 流是 Java 8 新提供給開發(fā)者的一組操作集合的 API,將要處理的元素集合看作一種流,流在管道中傳輸,并且可以在管道的節(jié)點上進行處理,比如篩選、排序、聚合等。元素流在管道中經(jīng)過中間操作(intermediate operation)的處理,最后由終端操作 (terminal operation) 得到前面處理的結(jié)果。Stream 流可以極大的提高開發(fā)效率,也可以使用它寫出更加簡潔明了的代碼。我自從接觸過 Stream 流之后,可以說對它愛不釋手。本文將由淺及深帶您體驗 Stream 流的使用。那么就讓我們從流的簡單使用來開啟體驗之旅。
流的簡單使用
????本節(jié)將通過實際的例子帶您一起了解 Stream 流:創(chuàng)建流以及簡單的使用,并且將其與 Java 8 之前的實現(xiàn)方式做一下對比。
????我們將創(chuàng)建一個學(xué)生類?student,它包含姓名?name?和分?jǐn)?shù)?score?兩個屬性。并且初始化一個學(xué)生的集合,然后分別通過 Stream 流和 Java 7 的集合操作實現(xiàn)篩選未及格(分?jǐn)?shù)<60 分)的學(xué)生名單。
創(chuàng)建流
????有以下兩種創(chuàng)建流的方式,第一種方式我們使用的會相對較多。
調(diào)用集合的? stream()?方法或者?parallelStream()?方法創(chuàng)建流。Stream 類的靜態(tài)? of()?方法創(chuàng)建流。
清單 1. 創(chuàng)建 Stream 流
List?createStream?=?new?ArrayList();
//?順序流
Stream?stream?=?createStream.stream();
//?并行流
Stream?parallelStream?=?createStream.parallelStream();
//?of()方法創(chuàng)建
Stream?stringStream?=?Stream.of(
???????createStream.toArray(new?String[createStream.size()]));
使用流
????清單2展示了如何使用 Stream 流篩選未及格學(xué)生名單:
清單 2. 使用 Stream 流篩選未及格學(xué)生名單
public?static?void?streamImpl(List?students)?{
??List?filterStudent?=?students.stream()
???????.filter(one?->?one.getScore()??60).collect(Collectors.toList());
??System.out.println(filterStudent);
}
????而使用 Java 7 實現(xiàn)篩選未及格學(xué)生名單所需代碼相對冗長,如清單3所示:
清單 3. Java 7 實現(xiàn)篩選未及格學(xué)生名單
public?static?void?java7Impl(List?students)?{
??List?filterStudent?=?new?ArrayList<>();
??for?(Student?student?:?students)?{
?????if?(student.getScore()?60)?{
????????filterStudent.add(student);
?????}
??}
??System.out.println(filterStudent);
}
????對比兩段代碼,我們很容易看出來 Stream 流可以讓我操作集合的代碼更加簡潔,而且可以很清晰地體現(xiàn)出來我們是在做一個篩選的動作,在某些情況下可以讓我們的代碼更加易讀。
流的基礎(chǔ)知識
????接下來您將了解 Stream 流的基礎(chǔ)知識,這部分的內(nèi)容將有助于您理解流的相關(guān)操作。
流的分類
????Stream?流分為順序流和并行流,所謂順序流就是按照順序?qū)现械脑剡M行處理,而并行流則是使用多線程同時對集合中多個元素進行處理,所以在使用并行流的時候就要注意線程安全的問題了。
終端操作和中間操作
????終端操作會消費 Stream 流,并且會產(chǎn)生一個結(jié)果,比如?iterator()?和?spliterator()。如果一個 Stream 流被消費過了,那它就不能被重用的。
????中間操作會產(chǎn)生另一個流。需要注意的是中間操作不是立即發(fā)生的。而是當(dāng)在中間操作創(chuàng)建的新流上執(zhí)行完終端操作后,中間操作指定的操作才會發(fā)生。流的中間操作還分無狀態(tài)操作和有狀態(tài)操作兩種。
在無狀態(tài)操作中,在處理流中的元素時,會對當(dāng)前的元素進行單獨處理。比如,過濾操作,因為每個元素都是被單獨進行處理的,所有它和流中的其它元素?zé)o關(guān)。 在有狀態(tài)操作中,某個元素的處理可能依賴于其他元素。比如查找最小值,最大值,和排序,因為他們都依賴于其他的元素。
流接口
????下面是一張 Stream 的 UML (統(tǒng)一建模語言) 類圖,后文會講解其中的一些關(guān)鍵方法。
圖 1. Stream UML 類圖

BaseStream 接口
????從上面的 UML 圖可以看出來?BaseStream?接口是 Stream 流最基礎(chǔ)的接口,它提供了所有流都可以使用的基本功能。BaseStream?是一個泛型接口,它有兩個類型參數(shù)?T?和?S?, 其中?T?指定了流中的元素的類型,?S?指定了具體流的類型,由??可以知道?>S?必須為?BaseStream?或?BaseStream?子類,換句話說,就是?S?必須是擴展自?BaseStream?的。BaseStream?繼承了?AutoCloseable?接口,簡化了關(guān)閉資源的操作,但是像平時我們操作的集合或數(shù)組,基本上都不會出現(xiàn)關(guān)閉流的情況。下面是?BaseStream?接口下定義的方法的相關(guān)解釋:
Iterator?:獲取流的迭代器。iterator() Spliterator spliterator()?:獲取流的?spliterator?。boolean isParallel()?:判斷一個流是否是并行流,如果是則返回?true?,否則返回?false?。S sequential()?:基于調(diào)用流返回一個順序流,如果調(diào)用流本身就是順序流,則返回其本身。S parallel()?:基于調(diào)用流,返回一個并行流,如果調(diào)用流本身就是并行流,則返回其本身。S unordered()?:基于調(diào)用流,返回一個無序流。S onClose(Runnable closeHandler)?:返回一個新流,?closeHandler?指定了該流的關(guān)閉處理程序,當(dāng)關(guān)閉該流時,將調(diào)用這個處理程序。void close()?:從?AutoCloseable?繼承來的,調(diào)用注冊關(guān)閉處理程序,關(guān)閉調(diào)用流(很少會被使用到)。
????清單 4 列舉了由 BaseStream 接口派生出來的流接口,包括了?IntStream,LongStream,?Stream?以及?DoubleStream。其中 Stream 接口最為通用,本文的主要講解對象也是它。
清單 4. 由?BaseStream?接口派生出的流接口
public?interface?IntStream?extends?BaseStream<、Intege、r,?IntStream>?{}
public?interface?LongStream?extends?BaseStream?{}
public?interface?DoubleStream?extends?BaseStream?{}
public?interface?Stream?extends?BaseStream>?{}
Stream 接口
Stream filter(Predicate predicate)?:產(chǎn)生一個新流,其中包含調(diào)用流中滿足?predicate?指定的謂詞元素,即篩選符合條件的元素后重新生成一個新的流。(中間操作)Stream map(Function mapper)?,產(chǎn)生一個新流,對調(diào)用流中的元素應(yīng)用?mapper?,新?Stream?流中包含這些元素。(中間操作)IntStream mapToInt(ToIntFunction mapper)?:對調(diào)用流中元素應(yīng)用?mapper?,產(chǎn)生包含這些元素的一個新?IntStream?流。(中間操作)Stream sorted(Comparator comparator)?:產(chǎn)生一個自然順序排序或者指定排序條件的新流。(中間操作)void forEach(Consumer action)?:遍歷了流中的元素。(終端操作)Optional min(Comparator comparator)?和?Optional max(Comparator comparator)?:獲得流中最大最小值,比較器可以由自己定義。(終端操作)boolean anyMatch(Predicate super T> predicate)?:判斷?Stream?流中是否有任何符合要求的元素,如果有則返回?ture,沒有返回?false?。(終端操作)Stream?,去重操作,將?distinct() Stream?流中的元素去重后,返回一個新的流。(中間操作)
流的 API 操作
縮減操作
????什么是縮減操作呢?最終將流縮減為一個值的終端操作,我們稱之為縮減操作。在上一節(jié)中提到的?min(),max()?方法返回的是流中的最小或者最大值,這兩個方法屬于特例縮減操作。而通用的縮減操作就是指的我們的?reduce()?方法了,在 Stream 類中?reduce?方法有三種簽名方法,如下所示:
清單 5.?reduce()?方法的三種實現(xiàn)
public?interface?Stream?extends?BaseStream>?{
?...
Optional?reduce(BinaryOperator?accumulator);
T?reduce(T?identity,?BinaryOperator?accumulator);
?U?reduce(U?identity,
?????????????????BiFunction?accumulator,
?????????????????BinaryOperator?combiner);
?...
}
????由上面的代碼可以看出,在 Stream API 中?reduce()?方法一共存在著三種簽名,而這三種簽名則分別會適用在不同的場景,我們下面就一起來看一下如何使用。
第一種簽名
在下面的代碼中我們將對一個 Integer 類型的集合做求和操作。
清單 6. 第一種簽名的?reduce()?的使用
public?static?void?reduceFirstSign()?{
??List?list?=?Arrays.asList(1,2,3,4,5,6);
??ptional?count?=?list.stream().reduce((a,?b)?->?(a?+?b));
??System.out.println(count.get());?//?21
}
第二種簽名
????與第一種簽名不同的是多接收了一個參數(shù)?identity?,在首次執(zhí)行?accumulator?表達(dá)式的時候它的第一個參數(shù)并不是 Stream 流的第一個元素,而是?identity?。比如下面的例子最終輸出的結(jié)果是 Stream 流中所有元素乘積的 2 倍。
清單 7. 第二種簽名的?reduce()?的使用
public?static?void?reduceSecondSign()?{
??List?list?=?Arrays.asList(1,2,3,4,5,6);
??Integer?count?=?list.stream().reduce(2,?(a,?b)?->?(a?*?b));
??System.out.println(count);??//?1440
}
第三種簽名
????前面兩種前面的一個缺點在于返回的數(shù)據(jù)都只能和 Stream 流中元素類型一致,但這在某些情況下是無法滿足我們的需求的,比如 Stream 流中元素都是?Integer?類型,但是求和之后數(shù)值超過了?Integer?能夠表示的范圍,需要使用?Long?類型接受,這就用到了我們第三種簽名的?reduce()?方法。
清單 8. 第三種簽名的?reduce()?的使用
public?static?void?reduceThirdSign()?{
??List?list?=?Arrays.asList(Integer.MAX_VALUE,?Integer.MAX_VALUE);
??long?count?=?list.stream().reduce(0L,?(a,?b)?->?(a?+?b),?(a,b)?->?0L);
??System.out.println(count);
}
????總的來說縮減操作有兩個特點,一是他只返回一個值,二是它是一個終端操作。在這里順便給大家留一個縮減操作的題目,統(tǒng)計一個班上所有及格同學(xué)的分?jǐn)?shù)總。
映射
????可能在我們的日常開發(fā)過程中經(jīng)常會遇到將一個集合轉(zhuǎn)換成另外一個對象的集合,那么這種操作放到 Stream 流中就是映射操作。映射操作主要就是將一個 Stream 流轉(zhuǎn)換成另外一個對象的 Stream 流或者將一個 Stream 流中符合條件的元素放到一個新的 Stream 流里面。
????在 Stream API 庫中也提供了豐富的 API 來支持我們的映射操作,清單 9 中的方法都是我們所講的映射操作。
清單 9. 映射操作相關(guān)方法定義
public?interface?Stream?extends?BaseStream>?{
???...
???Stream?map(Function?super?T,???extends?R>?mapper);
??IntStream?mapToInt(ToIntFunction?super?T>?mapper);
??LongStream?mapToLong(ToLongFunction?super?T>?mapper);
??DoubleStream?mapToDouble(ToDoubleFunction?super?T>?mapper);
???Stream?flatMap(Function?super?T,???extends?Stream?extends?R>>>?mapper);
??IntStream?flatMapToInt(Function?super?T,???extends?IntStream>?mapper);
??LongStream?flatMapToLong(Function?super?T,???extends?LongStream>?mapper);
??DoubleStream?flatMapToDouble(Function?super?T,???extends?DoubleStream>?mapper);
???...
}
????其中最通用的應(yīng)該就屬?mapv?和?flatMap?兩個方法了,下面將以不同的例子分別來講解著兩個方法。
map()
????map()?方法可以將一個流轉(zhuǎn)換成另外一種對象的流,其中的?T?是原始流中元素的類型,而?R?則是轉(zhuǎn)換之后的流中元素的類型。通過下面的代碼我們將一個學(xué)生對象的 Stream 流轉(zhuǎn)換成一個?Double?類型(學(xué)生的分?jǐn)?shù))的 Stream 流并求和后輸出。
清單 10.?map()?方法的使用示例
public?static?void?useMap()?{
??List?students?=?initData();
??double?scoreCount?=?students.stream()
????????????.map(Student::getScore)
????????????.reduce(0.0,?(a,b)?->?(a?+?b));
??System.out.println(scoreCount);
}
????當(dāng)然上面這種情況用?mapToDouble()?會更加方便,使用?map()?是為了展示一下?map?的使用方式,那么使用?mapToDouble()?方法的代碼如下:
清單 11.?mapToDouble()?方法的使用示例
double?scoreCount?=?students.stream()
????????????????.mapToDouble(Student::getScore)
????????????????.sum();
flatMap()
????flatMap()?操作能把原始流中的元素進行一對多的轉(zhuǎn)換,并且將新生成的元素全都合并到它返回的流里面。假如現(xiàn)每個班的學(xué)生都學(xué)了不同的課程,現(xiàn)在需要統(tǒng)計班里所有學(xué)生所學(xué)的課程列表,該如何實現(xiàn)呢?
清單 12.?flatMap ()方法的使用示例
public?static?void?useFlatMap()?{
??List?students?=?initData();
??List?course?=?students.stream().flatMap(one?->?one.getCourse().stream()).distinct()
????????????????.collect(Collectors.toList());
??System.out.println(course);
}
????如上代碼中?flatMap()?中返回的是一個一個的?String?類型的 Stream 流,它們會被合并到最終返回的 Stream 流(String 類型)中。而后面的?distinct()?則是一個去重的操作,?collect()?是收集操作。
收集操作
????很多時候我們需要從流中收集起一些元素,并以集合的方式返回,我們把這種反向操作稱為收集操作。對于收集操作,Stream API 也提供了相應(yīng)的方法。
清單 13. 收集操作相關(guān) API
public?interface?Stream?extends?BaseStream>?{
?...
?R?collect(Collector?super?T,?A,?R>?collector);
?...
}
????其中?R?指定結(jié)果的類型,?T?指定了調(diào)用流的元素類型。內(nèi)部積累的類型由?A?指定。collector?是一個收集器,指定收集過程如何執(zhí)行,?collect()?方法是一個終端方法。一般情況我們只需要借助?Collectors?中的方法就可以完成收集操作。
????Collectors?類是一個最終類,里面提供了大量的靜態(tài)的收集器方法,借助他,我們基本可以實現(xiàn)各種復(fù)雜的功能了。
清單 14. Collectors
public?final?class?Collectors?{
...
public?static??Collector>?toList()?{
...
}
public?static??Collector>?toMap(
Function?super?T,???extends?K>?keyMapper,
Function?super?T,???extends?U>?valueMapper)?{
??...
}
...
}
????Collectors?給我們提供了非常豐富的收集器,這里只列出來了?toList?和?toMap?兩種,其他的可以參考?Collectors?類的源碼。toList()?相信您在清單 14 中已經(jīng)見到了,那么下面將展示如何將一個使用收集操作將一個?List?集合轉(zhuǎn)為?Map?。
清單 15. 使收集操作將 List 轉(zhuǎn) Map
public?static?void?list2Map()?{
??List?students?=?initData();
??Map?collect?=?students.stream()
?????????.collect(Collectors.toMap(one?->?one.getName(),
one?->?one.getScore()));
??System.out.println(collect);
}
????可以看到通過 Stream API 可以很方便地將一個?List?轉(zhuǎn)成了?Map?,但是這里有一個地方需要注意。那就是在通過 Stream API 將?List?轉(zhuǎn)成?Map?的時候我們需要確保?key?不會重復(fù),否則轉(zhuǎn)換的過程將會直接拋出異常。
并行流的使用
????我們處于一個多核處理器的時代,在日常的開發(fā)過程中也經(jīng)常會接觸到多線程。Stream API 也提供了相應(yīng)的并行流來支持我們并行地操作數(shù)組和集合框架,從而高速地執(zhí)行我們對數(shù)組或者集合的一些操作。
????其實創(chuàng)建一個并行流非常簡單,在創(chuàng)建流部分已經(jīng)提到過如何創(chuàng)建一個并行流,我們只需要調(diào)用集合的?parallelStream()?方法就可以輕松的得到一個并行流。相信大家也知道多線程編程非常容易出錯,所以使用并行流也有一些限制,一般來說,應(yīng)用到并行流的任何操作都必須符合三個約束條件:無狀態(tài)、不干預(yù)、關(guān)聯(lián)性。因為這三大約束確保在并行流上執(zhí)行操作的結(jié)果和在順序流上執(zhí)行的結(jié)果是相同的。
????在縮減操作部分我們一共提到了三種簽名的?reduce()?方法,其中第三種簽名的?reduce()?方法最適合與并行流結(jié)合使用。
清單 16. 第三種簽名方式的?reduce()?方法與并行流結(jié)合使用
public?interface?Stream?extends?BaseStream>?{
?...
?U?reduce(U?identity,
?????????????????BiFunction?accumulator,
?????????????????BinaryOperator?combiner);
?...
}
????其中?accumulator?被為累加器,?combiner?為合成器。combiner?定義的函數(shù)將?accumulator?提到的兩個值合并起來,在之前的例子中我們沒有為合并器設(shè)置具體的表達(dá)式,因為在那個場景下我們不會使用到合并器。下面我們來看一個例子,并且分析其執(zhí)行的步驟:
清單 17. 并行流使用場景
public?static?void?main(String[]?args)?{
??List?list?=?Arrays.asList(2,2);
??Integer?result?=?list.stream().parallel().reduce(2,?(a,?b)?->?(a?+?b),?(a,?b)?->?(a?+?b));
??System.out.println(result);
}
????上面的代碼實際上是先使用累加器把 Stream 流中的兩個元素都加?2?后,然后再使用合并器將兩部分的結(jié)果相加。最終得到的結(jié)果也就是?8?。并行流的使用場景也不光是在這中縮減操作上,比如我會經(jīng)常使用并行流處理一些復(fù)雜的對象集合轉(zhuǎn)換,或者是一些必須循環(huán)調(diào)用的網(wǎng)絡(luò)請求等等,當(dāng)然在使用的過程中最需要注意的還是線程安全問題。
參考答案
????在流的 API 操作 章節(jié)給大家留了一個統(tǒng)計一個班上所有及格同學(xué)的分?jǐn)?shù)總和的題目,此處給出我的實現(xiàn)方式,第一種方式是使用?reduce()?方法實現(xiàn),也就是我們留題目的地方所講解的 API 方法:
清單 18. 第一種實現(xiàn)方式
public?static?void?answer()?{
????List?students?=?initData();
????Double?result?=?students.stream()
????????????.filter(one?->?one.getScore()?>=?60).map(o?->?o.getScore())
????????????.reduce(0d,?(a,b)?->?(a?+?b));
????System.out.println(result);
}
????第二種實現(xiàn)方法是通過?sum()?方法實現(xiàn),?sum()?也是一個終端操作,它可以對一個數(shù)字類型的流進行求和操作并返回結(jié)果:
清單 19. 第二種實現(xiàn)方式
public?static?void?answerSecondImpl()?{
????????List?students?=?initData();
????????Double?result?=?students.stream()
????????????????.filter(one?->?one.getScore()?>=?60).mapToDouble(o?->?o.getScore()).sum();
????????System.out.println(result);
}
????以上是我提供的兩種解題方式,如果您有更好的解決方法歡迎以評論的方式共享給大家。
結(jié)束語
????在本教程中,我們主要了解了 Java 8 Stream 流的基礎(chǔ)知識及使用,涵蓋 Stream 流的分類、接口、相關(guān) API 操作以及并行流的使用。
