Java8新特性Stream流詳解
一、Stream 流是如何工作的?
流表示包含著一系列元素的集合,我們可以對其做不同類型的操作,用來對這些元素執(zhí)行計算。
List<String> myList =
Arrays.asList("a1", "a2", "b1", "c2", "c1");
myList
.stream() // 創(chuàng)建流
.filter(s -> s.startsWith("c")) // 執(zhí)行過濾,過濾出以 c 為前綴的字符串
.map(String::toUpperCase) // 轉(zhuǎn)換成大寫
.sorted() // 排序
.forEach(System.out::println); // for 循環(huán)打印
12345678910我們可以對流進行中間操作或者終端操作。兄弟們可能會疑問?什么是中間操作?什么又是終端操作?
中間操作:①:中間操作會再次返回一個流,所以,我們可以鏈接多個中間操作,注意這里是不用加分號的。上圖中的filter 過濾,map 對象轉(zhuǎn)換,sorted 排序,就屬于中間操作。
終端操作:②:終端操作是對流操作的一個結(jié)束動作,一般返回 void 或者一個非流的結(jié)果。上圖中的 forEach循環(huán) 就是一個終止操作。
看完上面的操作,感覺是不是很像一個流水線式操作呢。實際上,大部分流操作都支持 lambda 表達式作為參數(shù),正確理解,應該說是接受一個函數(shù)式接口的實現(xiàn)作為參數(shù)。
二、不同類型的 Stream 流
我們可以從各種數(shù)據(jù)源中創(chuàng)建 Stream 流,其中以 Collection 集合最為常見。如 List 和 Set 均支持 stream() 方法來創(chuàng)建順序流或者是并行流。
1.Arrays.asList()
并行流是通過多線程的方式來執(zhí)行的,它能夠充分發(fā)揮多核 CPU 的優(yōu)勢來提升性能。本文在最后再來介紹并行流,我們先討論順序流:
Arrays.asList("a1", "a2", "a3")
.stream() // 創(chuàng)建流
.findFirst() // 找到第一個元素
.ifPresent(System.out::println); // 如果存在,即輸出
12342.Stream.of()
在集合上調(diào)用stream()方法會返回一個普通的 Stream 流。但是, 大可不必刻意地創(chuàng)建一個集合,再通過集合來獲取 Stream 流,還可以通過如下這種方式:
Stream.of("a1", "a2", "a3")
.findFirst()
.ifPresent(System.out::println);
123例如上面這樣,我們可以通過 Stream.of() 從一堆對象中創(chuàng)建 Stream 流。
注: 除了常規(guī)對象流之外,Java 8還附帶了一些特殊類型的流,用于處理原始數(shù)據(jù)類型int,long以及double。說道這里,你可能已經(jīng)猜到了它們就是IntStream,LongStream還有DoubleStream。
3.IntStream.range()
IntStreams.range()方法還可以被用來取代常規(guī)的 for 循環(huán), 如下所示:
IntStream.range(1, 4)
.forEach(System.out::println); // 相當于 for (int i = 1; i < 4; i++) {}
123注: 上面這些原始類型流的工作方式與常規(guī)對象流基本是一樣的,但還是略微存在一些區(qū)別:
原始類型流使用其獨有的函數(shù)式接口,例如IntFunction代替Function,IntPredicate代替Predicate。
4.average()
原始類型流支持額外的終端聚合操作,sum()以及average(),如下所示:
Arrays.stream(new int[] {1, 2, 3})
.map(n -> 2 * n + 1) // 對數(shù)值中的每個對象執(zhí)行 2*n + 1 操作
.average() // 求平均值
.ifPresent(System.out::println); // 如果值不為空,則輸出
12345.mapToInt(),mapToLong() ,mapToDouble()
但是,偶爾我們也有這種需求,需要將常規(guī)對象流轉(zhuǎn)換為原始類型流,這個時候,中間操作 mapToInt(),mapToLong() 以及mapToDouble就派上用場了:
Stream.of("a1", "a2", "a3")
.map(s -> s.substring(1)) // 對每個字符串元素從下標1位置開始截取
.mapToInt(Integer::parseInt) // 轉(zhuǎn)成 int 基礎(chǔ)類型類型流
.max() // 取最大值
.ifPresent(System.out::println); // 不為空則輸出
123456.mapToObj()
如果說,需要將原始類型流裝換成對象流,您可以使用 mapToObj()來達到目的:
IntStream.range(1, 4)
.mapToObj(i -> "a" + i) // for 循環(huán) 1->4, 拼接前綴 a
.forEach(System.out::println); // for 循環(huán)打印
123下面是一個組合示例,我們將雙精度流首先轉(zhuǎn)換成 int 類型流,然后再將其裝換成對象流:
Stream.of(1.0, 2.0, 3.0)
.mapToInt(Double::intValue) // double 類型轉(zhuǎn) int
.mapToObj(i -> "a" + i) // 對值拼接前綴 a
.forEach(System.out::println); // for 循環(huán)打印
1234三、Stream 流的處理順序
上小節(jié)中,我們已經(jīng)學會了如何創(chuàng)建不同類型的 Stream 流,接下來我們再深入了解下數(shù)據(jù)流的執(zhí)行順序。
在討論處理順序之前,您需要明確一點,那就是中間操作的有個重要特性 —— 延遲性。觀察下面這個沒有終端操作的示例代碼:
Stream.of("d2", "a2", "b1", "b3", "c")
.filter(s -> {
System.out.println("filter: " + s);
return true;
});
12345注: 執(zhí)行此代碼段時,您可能會認為,將依次打印 “d2”, “a2”, “b1”, “b3”, “c” 元素。然而當你實際去執(zhí)行的時候,它不會打印任何內(nèi)容。
出現(xiàn)這樣的原因是:當且僅當存在終端操作時,中間操作操作才會被執(zhí)行。
接下來,對上面的代碼添加 forEach終端操作:
Stream.of("d2", "a2", "b1", "b3", "c")
.filter(s -> {
System.out.println("filter: " + s);
return true;
})
.forEach(s -> System.out.println("forEach: " + s));
123456再次執(zhí)行,我們會看到輸出如下:
filter: d2
forEach: d2
filter: a2
forEach: a2
filter: b1
forEach: b1
filter: b3
forEach: b3
filter: c
forEach: c12345678910
思考: 輸出的順序可能會讓你很驚訝!你腦海里肯定會想,應該是先將所有 filter 前綴的字符串打印出來,接著才會打印 forEach 前綴的字符串。
事實上,輸出的結(jié)果卻是隨著鏈條垂直移動的。比如說,當 Stream 開始處理 d2 元素時,它實際上會在執(zhí)行完 filter 操作后,再執(zhí)行 forEach 操作,接著才會處理第二個元素。
是不是很神奇?為什么要設(shè)計成這樣呢?
原因是出于性能的考慮。這樣設(shè)計可以減少對每個元素的實際操作數(shù),看完下面代碼你就明白了:
Stream.of("d2", "a2", "b1", "b3", "c")
.map(s -> {
System.out.println("map: " + s);
return s.toUpperCase(); // 轉(zhuǎn)大寫
})
.anyMatch(s -> {
System.out.println("anyMatch: " + s);
return s.startsWith("A"); // 過濾出以 A 為前綴的元素
});
// map: d2
// anyMatch: D2
// map: a2
// anyMatch: A2
1234567891011121314說明: 終端操作 anyMatch()表示任何一個元素以 A 為前綴,返回為 true,就停止循環(huán)。所以它會從 d2 開始匹配,接著循環(huán)到 a2 的時候,返回為 true ,于是停止循環(huán)。
由于數(shù)據(jù)流的鏈式調(diào)用是垂直執(zhí)行的,map這里只需要執(zhí)行兩次。相對于水平執(zhí)行來說,map會執(zhí)行盡可能少的次數(shù),而不是把所有元素都 map 轉(zhuǎn)換一遍。
四、中間操作順序這么重要?
1.map和filter垂直執(zhí)行
下面的例子由兩個中間操作map和filter,以及一個終端操作forEach組成。讓我們再來看看這些操作是如何執(zhí)行的:
Stream.of("d2", "a2", "b1", "b3", "c")
.map(s -> {
System.out.println("map: " + s);
return s.toUpperCase(); // 轉(zhuǎn)大寫
})
.filter(s -> {
System.out.println("filter: " + s);
return s.startsWith("A"); // 過濾出以 A 為前綴的元素
})
.forEach(s -> System.out.println("forEach: " + s)); // for 循環(huán)輸出
// map: d2
// filter: D2
// map: a2
// filter: A2
// forEach: A2
// map: b1
// filter: B1
// map: b3
// filter: B3
// map: c
// filter: C
12345678910111213141516171819202122注: 學習了上面一小節(jié),您應該已經(jīng)知道了,map和filter會對集合中的每個字符串調(diào)用五次,而forEach卻只會調(diào)用一次,因為只有 “a2” 滿足過濾條件,滿足條件才會放行
如果我們改變中間操作的順序,將filter移動到鏈頭的最開始,就可以大大減少實際的執(zhí)行次數(shù):
Stream.of("d2", "a2", "b1", "b3", "c")
.filter(s -> {
System.out.println("filter: " + s)
return s.startsWith("a"); // 過濾出以 a 為前綴的元素
})
.map(s -> {
System.out.println("map: " + s);
return s.toUpperCase(); // 轉(zhuǎn)大寫
})
.forEach(s -> System.out.println("forEach: " + s)); // for 循環(huán)輸出
// filter: d2
// filter: a2
// map: a2
// forEach: A2
// filter: b1
// filter: b3
// filter: c
123456789101112131415161718現(xiàn)在,map僅僅只需調(diào)用一次,性能得到了提升,這種小技巧對于流中存在大量元素來說,是非常很有用的。
2.sorted水平執(zhí)行
接下來,讓我們對上面的代碼再添加一個中間操作sorted:
Stream.of("d2", "a2", "b1", "b3", "c")
.sorted((s1, s2) -> {
System.out.printf("sort: %s; %s\n", s1, s2);
return s1.compareTo(s2); // 排序
})
.filter(s -> {
System.out.println("filter: " + s);
return s.startsWith("a"); // 過濾出以 a 為前綴的元素
})
.map(s -> {
System.out.println("map: " + s);
return s.toUpperCase(); // 轉(zhuǎn)大寫
})
.forEach(s -> System.out.println("forEach: " + s)); // for 循環(huán)輸出
1234567891011121314sorted 是一個有狀態(tài)的操作,因為它需要在處理的過程中,保存狀態(tài)以對集合中的元素進行排序。
執(zhí)行上面代碼,輸出如下:
sort: a2; d2
sort: b1; a2
sort: b1; d2
sort: b1; a2
sort: b3; b1
sort: b3; d2
sort: c; b3
sort: c; d2
filter: a2
map: a2
forEach: A2
filter: b1
filter: b3
filter: c
filter: d2123456789101112131415
sorted是水平執(zhí)行的。因此,在這種情況下,sorted會對集合中的元素組合調(diào)用八次。這里,我們也可以利用上面說道的優(yōu)化技巧
將 filter 過濾中間操作移動到開頭部分:
Stream.of("d2", "a2", "b1", "b3", "c")
.filter(s -> {
System.out.println("filter: " + s);
return s.startsWith("a");
})
.sorted((s1, s2) -> {
System.out.printf("sort: %s; %s\n", s1, s2);
return s1.compareTo(s2);
})
.map(s -> {
System.out.println("map: " + s);
return s.toUpperCase();
})
.forEach(s -> System.out.println("forEach: " + s));
// filter: d2
// filter: a2
// filter: b1
// filter: b3
// filter: c
// map: a2
// forEach: A2
12345678910111213141516171819202122從上面的輸出中,我們看到了 sorted從未被調(diào)用過,因為經(jīng)過filter過后的元素已經(jīng)減少到只有一個,這種情況下,是不用執(zhí)行排序操作的。因此性能被大大提高了。
五、數(shù)據(jù)流復用問題
Java8 Stream 流是不能被復用的,一旦你調(diào)用任何終端操作,流就會關(guān)閉:
Stream<String> stream =
Stream.of("d2", "a2", "b1", "b3", "c")
.filter(s -> s.startsWith("a"));
stream.anyMatch(s -> true); // ok
stream.noneMatch(s -> true); // exception
123456當我們對 stream 調(diào)用了 anyMatch 終端操作以后,流即關(guān)閉了,再調(diào)用 noneMatch 就會拋出異常:
java.lang.IllegalStateException: stream has already been operated upon or closed
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:229)
at java.util.stream.ReferencePipeline.noneMatch(ReferencePipeline.java:459)
at com.winterbe.java8.Streams5.test7(Streams5.java:38)
at com.winterbe.java8.Streams5.main(Streams5.java:28)
12345為了克服這個限制,我們必須為我們想要執(zhí)行的每個終端操作創(chuàng)建一個新的流鏈,例如,我們可以通過 Supplier 來包裝一下流,通過 get() 方法來構(gòu)建一個新的 Stream 流,如下所示:
Supplier<Stream<String>> streamSupplier =
() -> Stream.of("d2", "a2", "b1", "b3", "c")
.filter(s -> s.startsWith("a"));
streamSupplier.get().anyMatch(s -> true); // ok
streamSupplier.get().noneMatch(s -> true); // ok
123456通過構(gòu)造一個新的流,來避開流不能被復用的限制, 這也是取巧的一種方式。
六、高級操作
Streams 支持的操作很豐富,除了上面介紹的這些比較常用的中間操作,如filter或map(參見Stream Javadoc)外。還有一些更復雜的操作,如collect,flatMap以及reduce。接下來,就讓我們學習一下:
本小節(jié)中的大多數(shù)代碼示例均會使用以下 List進行演示:
class Person {
String name;
int age;
Person(String name, int age) {
this.name = name;
this.age = age;
}
@Override
public String toString() {
return name;
}
}
1234567891011121314// 構(gòu)建一個 Person 集合
List<Person> persons =
Arrays.asList(
new Person("Max", 18),
new Person("Peter", 23),
new Person("Pamela", 23),
new Person("David", 12));
12345671.Collect
collect 是一個非常有用的終端操作,它可以將流中的元素轉(zhuǎn)變成另外一個不同的對象,例如一個List,Set或Map。collect 接受入?yún)镃ollector(收集器),它由四個不同的操作組成:供應器(supplier)、累加器(accumulator)、組合器(combiner)和終止器(finisher)。
感覺復雜其實很簡單,其實并不需要自己去實現(xiàn)收集器。因為 Java 8通過Collectors類內(nèi)置了各種常用的收集器,你直接拿來用就行了。
2.Collectors.toList()
讓我們先從一個非常常見的用例開始:
List<Person> filtered =
persons
.stream() // 構(gòu)建流
.filter(p -> p.name.startsWith("P")) // 過濾出名字以 P 開頭的
.collect(Collectors.toList()); // 生成一個新的 List
System.out.println(filtered); // [Peter, Pamela]
1234567你也看到了,從流中構(gòu)造一個 List 異常簡單。如果說你需要構(gòu)造一個 Set 集合,只需要使用Collectors.toSet()就可以了。
3.Collectors.groupingBy
接下來這個示例,將會按年齡對所有人進行分組:
Map<Integer, List<Person>> personsByAge = persons
.stream()
.collect(Collectors.groupingBy(p -> p.age)); // 以年齡為 key,進行分組
personsByAge
.forEach((age, p) -> System.out.format("age %s: %s\n", age, p));
// age 18: [Max]
// age 23: [Peter, Pamela]
// age 12: [David]
123456789105.Collectors.summarizingInt
如果您還想得到一個更全面的統(tǒng)計信息,摘要收集器可以返回一個特殊的內(nèi)置統(tǒng)計對象。通過它,我們可以簡單地計算出最小年齡、最大年齡、平均年齡、總和以及總數(shù)量。
IntSummaryStatistics ageSummary =
persons
.stream()
.collect(Collectors.summarizingInt(p -> p.age)); // 生成摘要統(tǒng)計
System.out.println(ageSummary);
// IntSummaryStatistics{count=4, sum=76, min=12, average=19.000000, max=23}
12345676.Collectors.joining
下一個這個示例,可以將所有人名連接成一個字符串:
String phrase = persons
.stream()
.filter(p -> p.age >= 18) // 過濾出年齡大于等于18的
.map(p -> p.name) // 提取名字
.collect(Collectors.joining(" and ", "In Germany ", " are of legal age.")); // 以 In Germany 開頭,and 連接各元素,再以 are of legal age. 結(jié)束
System.out.println(phrase);
// In Germany Max and Peter and Pamela are of legal age.
12345678連接收集器的入?yún)⒔邮芊指舴?,以及可選的前綴以及后綴。
7.對于如何將流轉(zhuǎn)換為 Map集合
我們必須指定 Map 的鍵和值。這里需要注意,Map 的鍵必須是唯一的,否則會拋出IllegalStateException 異常。
你可以選擇傳遞一個合并函數(shù)作為額外的參數(shù)來避免發(fā)生這個異常:
Map<Integer, String> map = persons
.stream()
.collect(Collectors.toMap(
p -> p.age,
p -> p.name,
(name1, name2) -> name1 + ";" + name2)); // 對于同樣 key 的,將值拼接
System.out.println(map);
// {18=Max, 23=Peter;Pamela, 12=David}
1234567898.構(gòu)建自定義收集器
既然我們已經(jīng)知道了這些強大的內(nèi)置收集器,接下來就讓我們嘗試構(gòu)建自定義收集器吧。
比如說,我們希望將流中的所有人轉(zhuǎn)換成一個字符串,包含所有大寫的名稱,并以|分割。為了達到這種效果,我們需要通過Collector.of()創(chuàng)建一個新的收集器。同時,我們還需要傳入收集器的四個組成部分:供應器、累加器、組合器和終止器。
Collector<Person, StringJoiner, String> personNameCollector =
Collector.of(
() -> new StringJoiner(" | "), // supplier 供應器
(j, p) -> j.add(p.name.toUpperCase()), // accumulator 累加器
(j1, j2) -> j1.merge(j2), // combiner 組合器
StringJoiner::toString); // finisher 終止器
String names = persons
.stream()
.collect(personNameCollector); // 傳入自定義的收集器
System.out.println(names); // MAX | PETER | PAMELA | DAVID
123456789101112由于Java 中的字符串是 final 類型的,我們需要借助輔助類StringJoiner,來幫我們構(gòu)造字符串。
最開始供應器使用分隔符構(gòu)造了一個StringJointer。
累加器用于將每個人的人名轉(zhuǎn)大寫,然后加到StringJointer中。
組合器將兩個StringJointer合并為一個。
最終,終結(jié)器從StringJointer構(gòu)造出預期的字符串。
9.FlatMap
上面我們已經(jīng)學會了如通過map操作, 將流中的對象轉(zhuǎn)換為另一種類型。但是,Map只能將每個對象映射到另一個對象。
如果說,我們想要將一個對象轉(zhuǎn)換為多個其他對象或者根本不做轉(zhuǎn)換操作呢?這個時候,flatMap就派上用場了。
FlatMap 能夠?qū)⒘鞯拿總€元素, 轉(zhuǎn)換為其他對象的流。因此,每個對象可以被轉(zhuǎn)換為零個,一個或多個其他對象,并以流的方式返回。之后,這些流的內(nèi)容會被放入flatMap返回的流中。
在學習如何實際操作flatMap之前,我們先新建兩個類,用來測試:
class Foo {
String name;
List<Bar> bars = new ArrayList<>();
Foo(String name) {
this.name = name;
}
}
class Bar {
String name;
Bar(String name) {
this.name = name;
}
}
12345678910111213141516接下來,通過我們上面學習到的流知識,來實例化一些對象:
List<Foo> foos = new ArrayList<>();
// 創(chuàng)建 foos 集合
IntStream
.range(1, 4)
.forEach(i -> foos.add(new Foo("Foo" + i)));
// 創(chuàng)建 bars 集合
foos.forEach(f ->
IntStream
.range(1, 4)
.forEach(i -> f.bars.add(new Bar("Bar" + i + " <- " + f.name))));
123456789101112我們創(chuàng)建了包含三個foo的集合,每個foo中又包含三個 bar。
flatMap 的入?yún)⒔邮芤粋€返回對象流的函數(shù)。為了處理每個foo中的bar,我們需要傳入相應 stream 流:
foos.stream()
.flatMap(f -> f.bars.stream())
.forEach(b -> System.out.println(b.name));
// Bar1 <- Foo1
// Bar2 <- Foo1
// Bar3 <- Foo1
// Bar1 <- Foo2
// Bar2 <- Foo2
// Bar3 <- Foo2
// Bar1 <- Foo3
// Bar2 <- Foo3
// Bar3 <- Foo3
12345678910111213如上所示,我們已成功將三個 foo對象的流轉(zhuǎn)換為九個bar對象的流。
最后,上面的這段代碼可以簡化為單一的流式操作:
IntStream.range(1, 4)
.mapToObj(i -> new Foo("Foo" + i))
.peek(f -> IntStream.range(1, 4)
.mapToObj(i -> new Bar("Bar" + i + " <- " f.name))
.forEach(f.bars::add))
.flatMap(f -> f.bars.stream())
.forEach(b -> System.out.println(b.name));
1234567flatMap也可用于Java8引入的Optional類。Optional的flatMap操作返回一個Optional或其他類型的對象。所以它可以用于避免繁瑣的null檢查。
接下來,讓我們創(chuàng)建層次更深的對象:
class Outer {
Nested nested;
}
class Nested {
Inner inner;
}
class Inner {
String foo;
}
1234567891011我們還可以使用Optional的flatMap操作,來完成上述相同功能的判斷,且更加優(yōu)雅:
Optional.of(new Outer())
.flatMap(o -> Optional.ofNullable(o.nested))
.flatMap(n -> Optional.ofNullable(n.inner))
.flatMap(i -> Optional.ofNullable(i.foo))
.ifPresent(System.out::println);
12345注: 如果不為空的話,每個flatMap的調(diào)用都會返回預期對象的Optional包裝,否則返回為null的Optional包裝類。
10.Reduce
規(guī)約操作可以將流的所有元素組合成一個結(jié)果。Java 8 支持三種不同的reduce方法。第一種將流中的元素規(guī)約成流中的一個元素。
讓我們看看如何使用這種方法,來篩選出年齡最大的那個人:
persons
.stream()
.reduce((p1, p2) -> p1.age > p2.age ? p1 : p2)
.ifPresent(System.out::println); // Pamela
1234reduce方法接受BinaryOperator積累函數(shù)。該函數(shù)實際上是兩個操作數(shù)類型相同的BiFunction。BiFunction功能和Function一樣,但是它接受兩個參數(shù)。示例代碼中,我們比較兩個人的年齡,來返回年齡較大的人。
第二種reduce方法接受標識值和BinaryOperator累加器。此方法可用于構(gòu)造一個新的 Person,其中包含來自流中所有其他人的聚合名稱和年齡:
Person result =
persons
.stream()
.reduce(new Person("", 0), (p1, p2) -> {
p1.age += p2.age;
p1.name += p2.name;
return p1;
});
System.out.format("name=%s; age=%s", result.name, result.age);
// name=MaxPeterPamelaDavid; age=76
1234567891011第三種reduce方法接受三個參數(shù):標識值,BiFunction累加器和類型的組合器函數(shù)BinaryOperator。由于初始值的類型不一定為Person,我們可以使用這個歸約函數(shù)來計算所有人的年齡總和:
Integer ageSum = persons
.stream()
.reduce(0, (sum, p) -> sum += p.age, (sum1, sum2) -> sum1 + sum2);
System.out.println(ageSum); // 76
12345結(jié)果為76,但是內(nèi)部究竟發(fā)生了什么呢?讓我們再打印一些調(diào)試日志:
Integer ageSum = persons
.stream()
.reduce(0,
(sum, p) -> {
System.out.format("accumulator: sum=%s; person=%s\n", sum, p);
return sum += p.age;
},
(sum1, sum2) -> {
System.out.format("combiner: sum1=%s; sum2=%s\n", sum1, sum2);
return sum1 + sum2;
});
// accumulator: sum=0; person=Max
// accumulator: sum=18; person=Peter
// accumulator: sum=41; person=Pamela
// accumulator: sum=64; person=David
12345678910111213141516你可以看到,累加器函數(shù)完成了所有工作。它首先使用初始值0和第一個人年齡相加。接下來的三步中sum會持續(xù)增加,直到76。
我們以并行流的方式運行上面的代碼,看看日志輸出:
Integer ageSum = persons
.parallelStream()
.reduce(0,
(sum, p) -> {
System.out.format("accumulator: sum=%s; person=%s\n", sum, p);
return sum += p.age;
},
(sum1, sum2) -> {
System.out.format("combiner: sum1=%s; sum2=%s\n", sum1, sum2);
return sum1 + sum2;
});
// accumulator: sum=0; person=Pamela
// accumulator: sum=0; person=David
// accumulator: sum=0; person=Max
// accumulator: sum=0; person=Peter
// combiner: sum1=18; sum2=23
// combiner: sum1=23; sum2=12
// combiner: sum1=41; sum2=35
12345678910111213141516171819注: 并行流的執(zhí)行方式完全不同。這里組合器被調(diào)用了。實際上,由于累加器被并行調(diào)用,組合器需要被用于計算部分累加值的總和。
七、并行流
流是可以并行執(zhí)行的,當流中存在大量元素時,可以顯著提升性能。并行流底層使用的ForkJoinPool, 它由ForkJoinPool.commonPool()方法提供。底層線程池的大小最多為五個 - 具體取決于 CPU 可用核心數(shù):
ForkJoinPool commonPool = ForkJoinPool.commonPool(); System.out.println(commonPool.getParallelism()); // 3 12
在我的機器上,公共池初始化默認值為 3。你也可以通過設(shè)置以下JVM參數(shù)可以減小或增加此值:
-Djava.util.concurrent.ForkJoinPool.common.parallelism=5 1
集合支持parallelStream()方法來創(chuàng)建元素的并行流。或者你可以在已存在的數(shù)據(jù)流上調(diào)用中間方法parallel(),將串行流轉(zhuǎn)換為并行流,這也是可以的。
為了詳細了解并行流的執(zhí)行行為,我們在下面的示例代碼中,打印當前線程的信息:
Arrays.asList("a1", "a2", "b1", "c2", "c1")
.parallelStream()
.filter(s -> {
System.out.format("filter: %s [%s]\n",
s, Thread.currentThread().getName());
return true;
})
.map(s -> {
System.out.format("map: %s [%s]\n",
s, Thread.currentThread().getName());
return s.toUpperCase();
})
.forEach(s -> System.out.format("forEach: %s [%s]\n",
s, Thread.currentThread().getName()));
1234567891011121314通過日志輸出,我們可以對哪個線程被用于執(zhí)行流式操作,有個更深入的理解:
filter: b1 [main]
filter: a2 [ForkJoinPool.commonPool-worker-1]
map: a2 [ForkJoinPool.commonPool-worker-1]
filter: c2 [ForkJoinPool.commonPool-worker-3]
map: c2 [ForkJoinPool.commonPool-worker-3]
filter: c1 [ForkJoinPool.commonPool-worker-2]
map: c1 [ForkJoinPool.commonPool-worker-2]
forEach: C2 [ForkJoinPool.commonPool-worker-3]
forEach: A2 [ForkJoinPool.commonPool-worker-1]
map: b1 [main]
forEach: B1 [main]
filter: a1 [ForkJoinPool.commonPool-worker-3]
map: a1 [ForkJoinPool.commonPool-worker-3]
forEach: A1 [ForkJoinPool.commonPool-worker-3]
forEach: C1 [ForkJoinPool.commonPool-worker-2]123456789101112131415
注: 所見,并行流使用了所有的ForkJoinPool中的可用線程來執(zhí)行流式操作。在持續(xù)的運行中,輸出結(jié)果可能有所不同,因為所使用的特定線程是非特定的。
讓我們通過添加中間操作sort來擴展上面示例:
Arrays.asList("a1", "a2", "b1", "c2", "c1")
.parallelStream()
.filter(s -> {
System.out.format("filter: %s [%s]\n",
s, Thread.currentThread().getName());
return true;
})
.map(s -> {
System.out.format("map: %s [%s]\n",
s, Thread.currentThread().getName());
return s.toUpperCase();
})
.sorted((s1, s2) -> {
System.out.format("sort: %s <> %s [%s]\n",
s1, s2, Thread.currentThread().getName());
return s1.compareTo(s2);
})
.forEach(s -> System.out.format("forEach: %s [%s]\n",
s, Thread.currentThread().getName()));
12345678910111213141516171819運行代碼,輸出結(jié)果看上去有些奇怪:
filter: c2 [ForkJoinPool.commonPool-worker-3]
filter: c1 [ForkJoinPool.commonPool-worker-2]
map: c1 [ForkJoinPool.commonPool-worker-2]
filter: a2 [ForkJoinPool.commonPool-worker-1]
map: a2 [ForkJoinPool.commonPool-worker-1]
filter: b1 [main]
map: b1 [main]
filter: a1 [ForkJoinPool.commonPool-worker-2]
map: a1 [ForkJoinPool.commonPool-worker-2]
map: c2 [ForkJoinPool.commonPool-worker-3]
sort: A2 <> A1 [main]
sort: B1 <> A2 [main]
sort: C2 <> B1 [main]
sort: C1 <> C2 [main]
sort: C1 <> B1 [main]
sort: C1 <> C2 [main]
forEach: A1 [ForkJoinPool.commonPool-worker-1]
forEach: C2 [ForkJoinPool.commonPool-worker-3]
forEach: B1 [main]
forEach: A2 [ForkJoinPool.commonPool-worker-2]
forEach: C1 [ForkJoinPool.commonPool-worker-1]123456789101112131415161718192021
貌似sort只在主線程上串行執(zhí)行。但是實際上,并行流中的sort在底層使用了Java8中新的方法Arrays.parallelSort()。如 javadoc官方文檔解釋的,這個方法會按照數(shù)據(jù)長度來決定以串行方式,或者以并行的方式來執(zhí)行。
如果指定數(shù)據(jù)的長度小于最小數(shù)值,它則使用相應的Arrays.sort方法來進行排序。
回到上小節(jié) reduce的例子。我們已經(jīng)發(fā)現(xiàn)了組合器函數(shù)只在并行流中調(diào)用,而不不會在串行流中被調(diào)用。
讓我們來實際觀察一下涉及到哪個線程:
List<Person> persons = Arrays.asList(
new Person("Max", 18),
new Person("Peter", 23),
new Person("Pamela", 23),
new Person("David", 12));
persons
.parallelStream()
.reduce(0,
(sum, p) -> {
System.out.format("accumulator: sum=%s; person=%s [%s]\n",
sum, p, Thread.currentThread().getName());
return sum += p.age;
},
(sum1, sum2) -> {
System.out.format("combiner: sum1=%s; sum2=%s [%s]\n",
sum1, sum2, Thread.currentThread().getName());
return sum1 + sum2;
});
12345678910111213141516171819通過控制臺日志輸出,累加器和組合器均在所有可用的線程上并行執(zhí)行:
accumulator: sum=0; person=Pamela; [main]
accumulator: sum=0; person=Max; [ForkJoinPool.commonPool-worker-3]
accumulator: sum=0; person=David; [ForkJoinPool.commonPool-worker-2]
accumulator: sum=0; person=Peter; [ForkJoinPool.commonPool-worker-1]
combiner: sum1=18; sum2=23; [ForkJoinPool.commonPool-worker-1]
combiner: sum1=23; sum2=12; [ForkJoinPool.commonPool-worker-2]
combiner: sum1=41; sum2=35; [ForkJoinPool.commonPool-worker-2]1234567
總之,你需要記住的是,并行流對含有大量元素的數(shù)據(jù)流提升性能極大。但是你也需要記住并行流的一些操作,例如reduce和collect操作,需要額外的計算(如組合操作),這在串行執(zhí)行時是并不需要。
此外,我們也了解了,所有并行流操作都共享相同的 JVM 相關(guān)的公共ForkJoinPool。所以你可能需要避免寫出一些又慢又卡的流式操作,這很有可能會拖慢你應用中,嚴重依賴并行流的其它部分代碼的性能。
以上就是Java8新特性Stream流詳解的詳細內(nèi)容,更多關(guān)于Java8 Stream流的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
Java中SpringBoot的@Transactional原理
這篇文章主要介紹了Java中SpringBoot的@Transactional原理,面向元數(shù)據(jù)遍歷已經(jīng)成為越來越多開發(fā)者的偏好,因此原理從Springboot的EnableTransactionManagement注解說起,需要的朋友可以參考下2023-07-07
SpringBoot如何讀取xml配置bean(@ImportResource)
這篇文章主要介紹了SpringBoot如何讀取xml配置bean(@ImportResource),具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2022-01-01
@CacheEvict 清除多個key的實現(xiàn)方式
這篇文章主要介紹了@CacheEvict 清除多個key的實現(xiàn)方式,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2021-02-02
自定義的Troop<T>泛型類( c++, java和c#)的實現(xiàn)代碼
這篇文章主要介紹了自定義的Troop<T>泛型類( c++, java和c#)的實現(xiàn)代碼的相關(guān)資料,需要的朋友可以參考下2017-05-05
Spring中的AutowireCandidateResolver的具體使用詳解
這篇文章主要介紹了Spring中的AutowireCandidateResolver的具體使用詳解,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧2020-04-04

