> 文章列表 > JAVA-- 带你重温Java8 Stream的流式操作及注意事项

JAVA-- 带你重温Java8 Stream的流式操作及注意事项

JAVA-- 带你重温Java8 Stream的流式操作及注意事项

Stream API是一组功能强大但易于理解的处理元素序列的工具。如果使用得当,它可以让程序员减少大量的样板代码,创建更具可读性的程序,并提高应用程序的生产力。

流使用的注意事项

  • 流实例是不能重用的 Java 8 streams can't be reused.,否则会抛出异常:java.lang.IllegalStateException: stream has already been operated upon or closed. 这种行为是合乎逻辑的,设计流的目的是:以函数式风格对元素源的有限序列进行操作,而不是存储元素。
    @Testpublic void testStream(){Stream<String> stream =Stream.of("a", "abc","b", "c").filter(element -> element.contains("b"));Optional<String> anyElement = stream.findAny();//再次使用System.out.println(stream.findAny().get());}
    
  • Intermediate operations are lazy. they will be invoked only if it is necessary for the terminal operation execution. 中间操作是懒惰的。这意味着只有在执行终端操作时才会调用它们。如list.stream().skip(1).map(element -> element.substring(0, 3)).sorted().collect(Collectors.joining(";")) 只有在执行collect操作时,前面的skip、map、sorted这些操作才会执行。
  • 执行顺序 Order of Execution: 从性能的角度来看,正确的顺序是流管道中链接操作最重要的方面之一:
    • 减少流大小的中间操作应该放在应用于每个元素的操作之前,【 intermediate operations which reduce the size of the stream should be placed before operations which are applying to each element】
    • 如要先执行skip,filter,distinct等操作,再执行map等转换操作
  • 在真正的应用程序中,不要让实例化的流未被使用,因为这会导致内存泄漏

流的创建

有许多方法可以创建不同源的流实例。一旦创建,实例将不会修改其源,因此允许从单个源创建多个实例。

创建Stream of Generics(泛型流)

//Empty Stream
Stream<String> streamEmpty = Stream.empty();//Stream of Collection
Collection<String> collection = Arrays.asList("a", "b", "c");
Stream<String> streamOfCollection = collection.stream();//Stream of Array
Stream<String> streamOfArray = Stream.of("a", "b", "c");
String[] arr = new String[]{"a", "b", "c"};
Stream<String> streamOfArrayFull = Arrays.stream(arr);
Stream<String> streamOfArrayPart = Arrays.stream(arr, 1, 3);//Stream.builder()
Stream<String> streamBuilder =
Stream.<String>builder().add("a").add("b").add("c").build();//Stream.generate() or iterate()
Stream<String> streamGenerated =
Stream.generate(() -> "element").limit(10);Stream<Integer> streamIterated = Stream.iterate(40, n -> n + 2).limit(20);

创建Stream of Primitives(基本类型【原语流】)

Java 8提供了用三种基本类型创建流的可能性:int、long和double。由于Stream是一个泛型接口,并且无法将原语【基本类型】用作泛型的类型参数,因此创建了三个新的特殊接口:IntStream、LongStream、DoubleStream。

  • range(int startInclusive, int endExclusive)
  • rangeClosed(int startInclusive, int endInclusive)
  • Random类提供了大量生成原语流的方法。例如,如代码random.doubles(3)创建了一个DoubleStream,它有三个随机产生的元素
IntStream intStream = IntStream.range(1, 3);
LongStream longStream = LongStream.rangeClosed(1, 3);Random random = new Random();
DoubleStream doubleStream = random.doubles(3);

创建Stream of String

IntStream streamOfChars = "abc".chars();Stream<String> streamOfString = Pattern.compile(", ").splitAsStream("a, b, c");

创建Stream of File

Path path = Paths.get("C:\\\\file.txt");
Stream<String> streamOfStrings = Files.lines(path);
Stream<String> streamWithCharset = Files.lines(path, Charset.forName("UTF-8"));

流的引用 Referencing a Stream

实例化一个流,并有一个对它的可访问引用,只要只调用中间转化操作【 intermediate operations】。执行结束操作【terminal operation】将使流不可访问。

如何正确的引用流实例?

Java 8 streams can’t be reused.流实例不能重用,所以正确引用流实例的做法如下:

List<String> elements =Stream.of("a", "b", "c").filter(element -> element.contains("b")).collect(Collectors.toList());
Optional<String> anyElement = elements.stream().findAny();
Optional<String> firstElement = elements.stream().findFirst();

Stream Pipeline操作

要对数据源的元素执行一系列操作并聚合其结果,我们需要三个部分:
source, intermediate operation(s) and a terminal operation. 示例代码如下:

List<String> list = Arrays.asList("abc1", "abc2", "abc3");
long size = list.stream().skip(1).map(element -> element.substring(0, 3)).sorted().count();//or
list.stream().skip(1).map(element -> element.substring(0, 3)).sorted().collect(Collectors.joining(";"))

隋性调用 Lazy Invocation

结果日志显示程序调用了两次filter()方法,调用了一次map()方法。这是因为管道是垂直执行的。
在下面的例子中,流的第一个元素不满足过滤器的谓词。然后程序为第二个元素调用filter()方法,它通过了过滤器。但程序没有为第三个元素调用filter(),而是通过管道向下访问map()方法。
findFirst()操作只满足一个元素,因此,在这个特定的示例中,隋性调用允许程序避免两个方法调用,一个用于filter(),另一个用于map()。

public class StreamTest {@Testpublic void testStream(){List<String> list = Arrays.asList("abc1","abc2","abc3");Optional<String> stream = list.stream().filter(element -> {print("filter() was called");return element.contains("2");}).map(element -> {print("map() was called");return element.toUpperCase();}).findFirst();}private static void print(String msg){System.out.println(msg);}
}

程序的输出结果是:

filter() was called
filter() was called
map() was called

流聚合操作 Stream Reduction

内置的聚合操作

API有许多终端操作,可以将流聚合为类型或原语:count()、max()、min()和sum()。

如何自定义聚合行为?

The reduce() Method

  • identity: 累加器的初始值,如果流为空且没有需要累加,则为默认值
  • accumulator: 元素聚合逻辑的函数。
  • combiner:accumulator结果聚合逻辑的函数
public class StreamTest {@Testpublic void testStream(){OptionalInt reduced = IntStream.range(1, 4).reduce((a, b) -> a + b);print(reduced.getAsInt()+"");int reducedTwoParams = IntStream.range(1, 4).reduce(10, (a, b) -> a + b);print(reducedTwoParams+"");int reducedParams = Arrays.asList(1, 2, 3).parallelStream().reduce(10,(a, b) -> a + b,(a, b) -> {print("combiner was called");return a + b;});print(reducedParams+"");}private static void print(String msg){System.out.println(msg);}
}

输出结果是:

6
16
combiner was called
combiner was called
36

The collect() Method

方法averagingXX(), summingXX()和summarizingXX()可以使用原语(int, long, double)和它们的包装类(Integer, Long, Double)。
这些方法的一个更强大的特性是提供映射。因此,开发人员不需要在collect()方法之前使用额外的map()操作。

  • 通过collect操作,转化为List、String、 统计结果
    public class StreamTest {@Testpublic void testStream(){List<Product> productList = Arrays.asList(new Product(23, "potatoes"),new Product(14, "orange"), new Product(13, "lemon"),new Product(23, "bread"), new Product(13, "sugar"));List<String> collectorCollection =productList.stream().map(Product::getName).collect(Collectors.toList());String listToString = productList.stream().map(Product::getName).collect(Collectors.joining(", ", "[", "]"));print(listToString);double averagePrice = productList.stream().collect(Collectors.averagingInt(Product::getPrice));print(averagePrice+"");int summingPrice = productList.stream().collect(Collectors.summingInt(Product::getPrice));IntSummaryStatistics statistics = productList.stream().collect(Collectors.summarizingInt(Product::getPrice));print(statistics.toString());}private static void print(String msg){System.out.println(msg);}
    }
    @Data
    @AllArgsConstructor
    class Product {private int price;private String name;
    }
    
  • 通过collect的groupby,转化为Map、Set
    public void testStream(){List<Product> productList = Arrays.asList(new Product(23, "potatoes"),new Product(14, "orange"), new Product(13, "lemon"),new Product(23, "bread"), new Product(13, "sugar"));Map<Integer, List<Product>> collectorMapOfLists = productList.stream().collect(Collectors.groupingBy(Product::getPrice));Map<Boolean, List<Product>> mapPartioned = productList.stream().collect(Collectors.partitioningBy(element -> element.getPrice() > 15));Set<Product> unmodifiableSet = productList.stream().collect(Collectors.collectingAndThen(Collectors.toSet(),Collections::unmodifiableSet));}   
    
  • 自定义Collector
    @Testpublic void testStream(){List<Product> productList = Arrays.asList(new Product(23, "potatoes"),new Product(14, "orange"), new Product(13, "lemon"),new Product(23, "bread"), new Product(13, "sugar"));Collector<Product, LinkedList<Product>, LinkedList<Product>> toLinkedList =Collector.of(LinkedList::new, LinkedList::add,(first, second) -> {first.addAll(second);return first;});LinkedList<Product> linkedListOfPersons =productList.stream().collect(toLinkedList);print(linkedListOfPersons.toString());}
    

Parallel Streams 并行流

在Java 8之前,并行化非常复杂。ExecutorService和ForkJoin的出现略微简化了开发人员的工作,但仍然需要记住如何创建特定的executor及如何运行它等等。Java 8引入了一种以函数式风格实现并行的方法。

API允许程序创建并行流,以并行模式执行操作。

  • 当流的源是一个Collection或数组时,可以通过parallelStream()方法来实现:
  • 当流的源不是Collection或数组,则应该使用parallel()方法:
  • 并行模式的流可以通过使用sequential()方法转换回顺序模式:
Stream<Product> streamOfCollection = productList.parallelStream();
boolean isParallel = streamOfCollection.isParallel();
boolean bigPrice = streamOfCollection.map(product -> product.getPrice() * 12).anyMatch(price -> price > 200);IntStream intStreamParallel = IntStream.range(1, 150).parallel();
boolean isParallel = intStreamParallel.isParallel();IntStream intStreamSequential = intStreamParallel.sequential();
boolean isParallel = intStreamSequential.isParallel();

在底层,Stream API自动使用ForkJoin框架并行执行操作。默认情况下,将使用公共线程池,并且没有办法(至少目前)将一些自定义线程池分配给它。这可以通过使用一组自定义的并行收集器来解决。

当以并行模式使用流时,避免阻塞操作。当任务需要相同的时间来执行时,最好使用并行模式。如果一个任务比另一个任务持续的时间长得多,它会减慢整个应用程序的工作流程。

有用的代码片段

如何向流中添加单个元素

  • 添加到最前面
  • 添加到最后面
  • 添加到指定位置
@Test
public void givenStream_whenPrependingObject_thenPrepended() {Stream<Integer> anStream = Stream.of(1, 2, 3, 4, 5);Stream<Integer> newStream = Stream.concat(Stream.of(99), anStream);assertEquals(newStream.findFirst().get(), (Integer) 99);
}@Test
public void givenStream_whenAppendingObject_thenAppended() {Stream<String> anStream = Stream.of("a", "b", "c", "d", "e");Stream<String> newStream = Stream.concat(anStream, Stream.of("A"));List<String> resultList = newStream.collect(Collectors.toList());assertEquals(resultList.get(resultList.size() - 1), "A");
}private static <T> Stream insertInStream(Stream stream, T elem, int index) {Spliterator spliterator = stream.spliterator();Iterator iterator = Spliterators.iterator(spliterator);return Stream.concat(Stream.concat(Stream.generate(iterator::next).limit(index), Stream.of(elem)), StreamSupport.stream(spliterator, false));}@Testpublic void givenStream_whenInsertingObject_thenInserted() {Stream<Double> anStream = Stream.of(1.1, 2.2, 3.3);Stream<Double> newStream = insertInStream(anStream, 9.9D, 3);List<Double> resultList = newStream.collect(Collectors.toList());assertEquals(resultList.get(3), (Double) 9.9);}

如何将List

转化为Map

    @Testpublic void testStream(){List<Map<String,Object>> rows = Lists.list(Map.of("name","张三","addr","广州"),Map.of("name","李四","addr","上海"));Map<String, String> map = rows.stream().map(r->{return Pair.of(r.get("name").toString(),r.get("addr").toString());}).collect(Collectors.toMap(Pair::getLeft, Pair::getRight));print(map.toString());}

key是对象中的某个属性值,value是对象本身,当key冲突时选择第二个key值覆盖第一个key值。

Map<String, User> userMap4 = userList.stream().collect(Collectors.toMap(User::getId, Function.identity(), (oldValue, newValue) -> newValue));

参考

The Java 8 Stream API Tutorial
core-java-streams-2