什么是Stream
生產線
Stream就像處理生產流水線一樣去工作,傳送帶就是Stream的管道,每個工廠關注直接的生產,將上游產品加工成下游需要的產品。為什么Stream比傳統的處理方式好呢?我們都知道,傳統的處理中,每一步我們都需要通過循環控制,邏輯控制,解包,重新裝箱這些工作。
非生產線示意處理圖
這些步驟讓我們的程序的業務邏輯支離破碎,經常處理數據類的小伙伴尤為痛苦。幸運的是,Java8為我們引入了Stream,使用Stream后我們只關注數據處理邏輯,其他的事情交給流處理對應的方法來完成。
創建數據流
指北君先為大家介紹如何創建Stream,這里有非常多的方式,需要注意一點就是:流一旦創建后,修改創建的源不會影響已經創建的Stream中的數據。
- 空流 為了避免出現空指針異常,系統提供一個靜態方法提供空流。
public void createStream() {
Stream< String > myStream = Stream.empty();
}
- 通過數組對象創建流
public void createStream() {
Integer[] arr = new Integer[]{1,2,3};
Stream< Integer > stream1 = Arrays.stream(arr);
Stream< Integer > stream2 = Arrays.stream(arr, 0, 2);
}
- 通過集合對象創建流
public void createStream() {
Collection< String > collection = Arrays.asList("a", "b", "c");
Stream< String > stream1 = collection.stream();
}
支持多種集合:List,Set,Map等實現了Collection接口的集合對象。
- 通過builder創建
public void createStream() {
Stream< Long > stream1 =
Stream.< Long >builder().add(1L).add(2L).add(3L).build();
}
- 通過generate生成
public void createStream() {
Random r = new Random();
Stream< Long > stream =
Stream.generate(() - > r.nextLong()).limit(10);
}
按照提供給generate的Supplier邏輯生成數據,通過limit限制生成的數據量
- 通過Stream.iterate創建
public void createStream() {
Stream< Integer > stream = Stream.iterate(1, n - > n * n).limit(20);
}
iterate提供兩種方法來滿足我們比較常用的迭代生成邏輯
- iterate(final T seed, final UnaryOperatorf)
- iterate(T seed, Predicate hasNext, UnaryOperatornext)
- 原生類型生成 通過對應的IntStream,LongStream,DoubleStream類中提供的方法來獲取,包含常用的方法
- builder()
- empty()
- of()
- iterate()
- generate()
- range()
- concat()
- 其他地方 這里介紹兩處:字符分割匹配和文件行數據
String.chars()返回IntStream
Files.lines()返回通過行分割的字符內容
流的使用機制(重要事項)
我們通過上面的方法創建好流后,就可以對流進行相關的業務邏輯處理了,需要注意:如果我們重復對一個流進行操作,就會出錯,系統會爆出IllegalStateException異常,這是因為Stream設計為不可重用的模式。流的下一個環節都是對當前環節處理后新生成流的處理。
流的執行順序
采用Stream方式進行多個邏輯處理時,他們之間的執行順序是什么樣的呢?指北君為了展示效果,寫了一段測試代碼:
public void exeOrder() {
List< String > list = Arrays.asList("data_1","data_2", "data_3", "data_12");
list.stream().filter(x - > {
System.out.println("filter() was called: " + x);
return x.contains("2");
}).map(x - > {
System.out.println("map() was called: " + x);
return x.toUpperCase();
}).forEach(x- >System.out.println("forEach() was called: " + x));
}
執行結果如下:
filter() was called: data_1
filter() was called: data_2
map() was called: data_2
forEach() was called: data_2
filter() was called: data_3
filter() was called: data_12
map() was called: data_12
forEach() was called: data_12
從示例代碼的打印的順序中我們可以發現:流處理的順序不是以代碼順序(執行完一步再到下一步),而是按照數據處理完一個單位數據的所有環節再處理下一個數據,見下面的動態示意圖:
Stream處理順序
既然我們了解流的處理順序,也能理解某些流操作會提前結束流處理的,比如findFirst(),在處理完第一個符合條件的數據后,后續的數據不會參與任何一個環節的處理。
轉換處理
轉換處理時最常用的邏輯處理方式,介紹轉換處理的文章較多,這里不再一一詳細描述只是簡單列一下,轉換處理對應大數據MapReduce中的Map處理
- distinct剔重
- filter過濾
- map轉換映射
- peek
- limit
- skip
合并處理(reduce)
對于Map-Reduce模型的reduce操作,國內對這個詞翻譯不太統一,指北君就先稱之為合并處理吧。這里介紹兩個方法reduce和collect
- reduce 先來看一個reduce的示例
public void reduce() {
int sum = IntStream.range(1, 100).reduce(0, (a, b) - > a + b);
System.out.print(sum);
}
合并Stream中的所有值,合并的初始值為0,如果初始為0還可以省略初始值。reduce函數包含三部分關鍵信息:
- 初始值,指定合并操作的初始值
- 合并函數
- 合路器(函數),在并行(多線程)運算時需要用到
下面是一個使用合路器的示例,在并行運算時使用。
public void parallelReduce() {
int sum = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10).parallelStream()
.reduce(0, (a, b) - > a + b, (a, b) - > {
return a + b;
});
System.out.println(sum);
}
這里指北君留一道思考題給大家,如果這里初始值0修改為10,最終的結果是多少?為什么是這種結果呢?
- collect 現在我們再來看collect,collect嚴格上說是reduce有些牽強,因為是否reduce在于collect中的執行邏輯 比如這段:
List String > collector = list.stream().map(Product::getName).collect(Collectors.toList());
然后再看下面的例子:
String mergString = list.stream().map(Product::getName).collect(Collectors.joining(", ", "[", "]"));
還有其他對應的方法:
- Collectors.averagingInt
- Collectors.summingInt
- Collectors.groupingBy
- Collectors.partitioningBy
各位小伙伴可以查看Collectors對應的API,這里就不一一列舉了,總之,collect通過Collectors對象的API類完成合并處理。
-
接口
+關注
關注
33文章
8600瀏覽量
151166 -
JAVA
+關注
關注
19文章
2967瀏覽量
104759 -
字符
+關注
關注
0文章
233瀏覽量
25210 -
數據處理
+關注
關注
0文章
599瀏覽量
28568 -
Stream
+關注
關注
0文章
20瀏覽量
7983
發布評論請先 登錄
相關推薦
評論