Java中的Stream API学习笔记

转自:https://www.ibm.com/developerworks/cn/java/j-lo-java8streamapi/index.html

什么是流

流(Stream)是Java 8引入的新的概念,它和输入输出流不是一个概念。Java 8 中的 Stream 是对集合(Collection)对象功能的增强,它专注于对集合对象进行各种非常便利、高效的聚合操作(aggregate operation),或者大批量数据操作 (bulk data operation)。Stream API 借助于同样新出现的 Lambda 表达式,极大的提高编程效率和程序可读性。同时它提供串行和并行两种模式进行汇聚操作,并发模式能够充分利用多核处理器的优势,使用 fork/join 并行方式来拆分任务和加速处理过程。

Stream 不是集合元素,它不是数据结构并不保存数据,它是有关算法和计算的,它更像一个高级版本的 Iterator。而和迭代器又不同的是,Stream 可以并行化操作,迭代器只能命令式地、串行化操作。顾名思义,当使用串行方式去遍历时,每个 item 读完后再读下一个 item。而使用并行去遍历时,数据会被分成多个段,其中每一个都在不同的线程中处理,然后将结果一起输出。

流的使用

我们使用流的时候一般都有三个过程:流的创建、转化和终结。

流的创建

有多种方法可以创建流:

从 Collection 和 Array:

  • Collection.stream();
  • Collection.parallelStream()
  • Arrays.stream(T array) or Stream.of(array)

用 Stream 的 generate 接口:

  • Stream.generate(Supplier< T >);

从其它的接口:

  • java.io.BufferedReader.lines();
  • Files.lines();

下面提供常见的流的构造方法:

1
2
3
4
5
6
7
8
9
// 1. Individual values
Stream stream = Stream.of("a", "b", "c");
// 2. Arrays
String [] strArray = new String[] {"a", "b", "c"};
stream = Stream.of(strArray);
stream = Arrays.stream(strArray);
// 3. Collections
List<String> list = Arrays.asList(strArray);
stream = list.stream();

对于基本数值型,目前有三种对应的包装类型 Stream:
IntStream、LongStream、DoubleStream。当然我们也可以用 Stream、Stream 、Stream,但是 boxing 和 unboxing 会很耗时,所以特别为这三种基本数值型提供了对应的 Stream。

流的转化

流的操作类型分为两种:

  • Intermediate:一个流可以后面跟随零个或多个 intermediate 操作。其目的主要是打开流,做出某种程度的数据映射/过滤,然后返回一个新的流,交给下一个操作使用。这类操作都是惰性化的(lazy),就是说,仅仅调用到这类方法,并没有真正开始流的遍历。

  • Terminal:一个流只能有一个 terminal 操作,当这个操作执行后,流就被使用“光”了,无法再被操作。所以这必定是流的最后一个操作。Terminal 操作的执行,才会真正开始流的遍历,并且会生成一个结果,或者一个 side effect。

在对于一个 Stream 进行多次转换操作 (Intermediate 操作),每次都对 Stream 的每个元素进行转换,而且是执行多次,这样时间复杂度就是 N(转换次数)个 for 循环里把所有操作都做掉的总和吗?其实不是这样的,转换操作都是 lazy 的,多个转换操作只会在 Terminal 操作的时候融合起来,一次循环完成。我们可以这样简单的理解,Stream 里有个操作函数的集合,每次转换操作就是把转换函数放入这个集合中,在 Terminal 操作的时候循环 Stream 对应的集合,然后对每个元素执行所有的函数。

  • Intermediate 操作:

    1. map/flatMap:

    map的作用就是把 input Stream 的每一个元素,映射成 output Stream 的另外一个元素。

    1
    2
    3
    4
    > List<String> output = wordList.stream().
    > map(String::toUpperCase).
    > collect(Collectors.toList());
    >

从上面例子可以看出,map 生成的是个 1:1 映射,每个输入元素,都按照规则转换成为另外一个元素。还有一些场景,是一对多映射关系的,这时需要 flatMap。

1
2
3
4
5
6
7
> Stream<List<Integer>> inputStream = Stream.of(
> Arrays.asList(1),
> Arrays.asList(2, 3),
> Arrays.asList(4, 5, 6));
> Stream<Integer> outputStream = inputStream.
> flatMap((childList) -> childList.stream());
>

flatMap 把 input Stream 中的层级结构扁平化,就是将最底层元素抽出来放到一起,最终 output 的新 Stream 里面已经没有 List 了,都是直接的数字。

2. filter:

filter 对原始 Stream 进行某项测试,通过测试的元素被留下来生成一个新 Stream。

1
2
3
4
> Integer[] sixNums = {1, 2, 3, 4, 5, 6};
> Integer[] evens =
> Stream.of(sixNums).filter(n -> n%2 == 0).toArray(Integer[]::new);
>

取出所有偶数,结果为{2,4,6}。

3. limit/skip

limit 返回 Stream 的前面 n 个元素;skip 则是扔掉前 n 个元素(它是由一个叫 subStream 的方法改名而来)。

4. distinct

distinct 返回 stream 中不重复的元素。

5. sorted

对 Stream 的排序通过 sorted 进行,它比数组的排序更强之处在于你可以首先对 Stream 进行各类 map、filter、limit、skip 甚至 distinct 来减少元素数量后,再排序,这能帮助程序明显缩短执行时间。

6. peek

peek 对 stream 中的每个元素执行操作然后返回一个新的流。

流的终结

  • Terminal 操作:

1. min/max/

min 和 max 的功能也可以通过对 Stream 元素先排序,再 findFirst 来实现,但前者的性能会更好,为 O(n),而 sorted 的成本是 O(n log n)。同时它们作为特殊的 reduce 方法被独立出来也是因为求最大最小值是很常见的操作。

2. forEach

forEach 方法接收一个 Lambda 表达式,然后在 Stream 的每一个元素上执行该表达式。

3. reduce

这个方法的主要作用是把 Stream 元素组合起来。它提供一个起始值(种子),然后依照运算规则(BinaryOperator),和前面 Stream 的第一个、第二个、第 n 个元素组合。从这个意义上说,字符串拼接、数值的 sum、min、max、average 都是特殊的 reduce。

4. collect

collect 方法将流中的元素收集到另一个目标中,接收一个 Collector 接口的实例。

5. min/max

min 和 max 的功能也可以通过对 Stream 元素先排序,再 findFirst 来实现,但前者的性能会更好,为 O(n),而 sorted 的成本是 O(n log n)。同时它们作为特殊的 reduce 方法被独立出来也是因为求最大最小值是很常见的操作。

6. count

count 返回当前流中元素的数量。

7. match

Stream 有三个 match 方法,从语义上说:

  • allMatch:Stream 中全部元素符合传入的 predicate,返回 true;
  • anyMatch:Stream 中只要有一个元素符合传入的 predicate,返回 true;
  • noneMatch:Stream 中没有一个元素符合传入的 predicate,返回 true;

它们都不是要遍历全部元素才能返回结果

8. findFirst

reduce 返回 Stream 的第一个元素,或者空。返回类型为Optional,作为一个容器,它可能含有某值,或者不包含。

流操作进阶

1. 自己生成流

Stream.generate()

通过实现 Supplier 接口,你可以自己来控制流的生成。这种情形通常用于随机数、常量的 Stream,或者需要前后元素间维持着某种状态信息的 Stream。把 Supplier 实例传递给 Stream.generate() 生成的 Stream,默认是串行(相对 parallel 而言)但无序的(相对 ordered 而言)。由于它是无限的,在管道中,必须利用 limit 之类的操作限制 Stream 大小。

1
2
3
4
5
6
7
//生成10个随机整数并输出
Random seed = new Random();
Supplier<Integer> random = seed::nextInt;
Stream.generate(random).limit(10).forEach(System.out::println);
//Another way
IntStream.generate(() -> (int) (System.nanoTime() % 100)).
limit(10).forEach(System.out::println);

Stream.iterate()

iterate 跟 reduce 操作很像,接受一个种子值,和一个 UnaryOperator(例如 f)。然后种子值成为 Stream 的第一个元素,f(seed) 为第二个,f(f(seed)) 第三个,以此类推。

1
2
3
4
5
6
7
8
9
10
11
12
13
Stream.generate(new PersonSupplier()).
limit(10).
forEach(p -> System.out.println(p.getName() + ", " + p.getAge()));

private class PersonSupplier implements Supplier<Person> {
private int index = 0;
private Random random = new Random();

@Override
public Person get() {
return new Person(index++, "StormTestUser" + index, random.nextInt(100));
}
}

2. 用 Collectors 来进行 reduction 操作

java.util.stream.Collectors 类的主要作用就是辅助进行各类有用的 reduction 操作,例如转变输出为 Collection,把 Stream 元素进行归组。

groupingBy

1
2
3
4
5
6
7
8
9
10
//按照年龄归组
Map<Integer, List<Person>> personGroups = Stream.generate(new PersonSupplier()).
limit(100).
collect(Collectors.groupingBy(Person::getAge));
Iterator it = personGroups.entrySet().iterator();
while (it.hasNext()) {
Map.Entry<Integer, List<Person>> persons = (Map.Entry) it.next();
System.out.println("Age " + persons.getKey() + " = " + persons.getValue()
.size()
}

上面的 code,首先生成 100 人的信息,然后按照年龄归组,相同年龄的人放到同一个 list 中,可以看到如下的输出:

1
2
3
4
5
6
7
Age 0 = 2
Age 1 = 2
Age 5 = 2
Age 8 = 1
Age 9 = 1
Age 11 = 2
……

结束语

总之,Stream 的特性可以归纳为:

  • 不是数据结构
  • 它没有内部存储,它只是用操作管道从 source(数据结构、数组、generator function、IO channel)抓取数据。
  • 它也绝不修改自己所封装的底层数据结构的数据。例如 Stream 的 filter 操作会产生一个不包含被过滤元素的新 Stream,而不是从 source 删除那些元素。
  • 所有 Stream 的操作必须以 lambda 表达式为参数
  • 不支持索引访问
  • 你可以请求第一个元素,但无法请求第二个,第三个,或最后一个。不过请参阅下一项。
  • 很容易生成数组或者 List
  • 惰性化
  • 很多 Stream 操作是向后延迟的,一直到它弄清楚了最后需要多少数据才会开始。
  • Intermediate 操作永远是惰性化的。
  • 并行能力
  • 当一个 Stream 是并行化的,就不需要再写多线程代码,所有对它的操作会自动并行进行的。
  • 可以是无限的
  • 集合有固定大小,Stream 则不必。limit(n) 和 findFirst() 这类的 short-circuiting 操作可以对无限的 Stream 进行运算并很快完成。