jdk8-28-30-自定义收集器&坑

收集器中有很5个抽象方法,每个抽象方法都有自己特殊的作用,如果说我们要自己实现要给收集器的话,我们就需要分别实现下面五个方法。

1 简单自定义一个收集器

这个收集器的目的是将一个 list 转换成一个 set

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
public class MySetCollector<T> implements Collector<T, Set<T>, Set<T>> {
@Override
public Supplier<Set<T>> supplier() {
System.out.println("supplier invoked!!!");
return HashSet::new;
}
@Override
public BiConsumer<Set<T>, T> accumulator() {
System.out.println("accumulator invoked!!!");
return Set<T>::add;
}
/**
* 并行性流才会调用,将两个分段的集合
* @return
*/
@Override
public BinaryOperator<Set<T>> combiner() {
System.out.println("combiner invoked!!!");
return (t1, t2) -> {
t1.addAll(t2);
return t1;
};
}
@Override
public Function<Set<T>, Set<T>> finisher() {
System.out.println("finisher invoked!!!");
return Function.identity();
}
/**
* 描述这个集合的特性
* IDENTITY_FINISH 表示结果容器和中间容器是一致的,这个时候 JDK 会在返回的时候自动帮助我们做类型转换。而不用再去调用 finisher
* @return
*/
@Override
public Set<Characteristics> characteristics() {
System.out.println("characteristics invoked!!!");
return Collections.unmodifiableSet(EnumSet.of(IDENTITY_FINISH,UNORDERED));
}
public static void main(String[] args) {
List<String> words = Arrays.asList("hello" , "world", "welcome", "hello");
Set<String> stringSet = words.stream().collect(new MySetCollector<>());
System.out.println(stringSet);
}
}

实现类里,我们分别实现了5个方法。 其中比较重要的两个方式 finishercharacteristics 方法

这俩方法是相互影响的。首先看 characteristics 方法,这个方法要求返回一个 Characteristics 枚举类型对的 Set。而 Characteristics 一共有三个枚举值。分别有不同的含义。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
/**
* 当我们采用并行流的时候,同时设置了 CONCURRENT 作为这个流的特性
* 那么,操作的是同一个集合,而不是多个
* 同样的道理,我们采用了并行流的方式,但是没有这个特性,我们就会生成多个
* 集合
*/
CONCURRENT,
/**
* 代表集合是否是有序的
*/
UNORDERED,
/**
* 表示 finisher function 可以被省略,因为中间结果和最后的返回的结果的类型是一致的。
* 所以当我们设置了这个属性之后,我们就必须了解到, 最后的返回的类型,可以由中间结果类型进行强制的返回。
*/
IDENTITY_FINISH

上面的例子中,我们为我们自己的 collector 设置了两个属性, IDENTITY_FINISH & UNORDERED ; 意思就是这个收集器收集的元素是无序的。 而同时,返回的类型和中间结果类型是可以完全强制转换的(如果不可以,则会报错,下面会说到);而 finisher() 函数是不会被执行的,即使你在函数内部直接抛出一个异常都没有问题,因为根本不会执行。
执行结果我们可以看一下:

14914603137015.jpg-102.1kB

这里需要注意两点:

  1. 虽然 combiner() 函数被调用了,但是只是返回了一个 BinaryOperator 而已,而这个 BinaryOperator 并不会被调用。
  2. finisher() 函数并没有被调用,原因上面已经说过
  3. characteristics() 函数被调用了两次,分别代表不同的意思

对于第3点,我们跟进 collector() 源代码里看一下, 便会有答案。

14914608156971.jpg-302.5kB

图中书说法其实不准确,其实是 evaluate 方法中的 ReduceOps.makeRef(collector) 中,会逐个调用相关方法, 包括 characteristics() 方法。

14914612652241.jpg-351.7kB

所以 characteristics() 被调用了两次,第一次用于判断 是否是无序集合 ;第二次用于判断 是否需要执行 finisher 用于中间结果和最终结果的类型转换

2 当中间结果与返回结果不一致

看下面这个改造的自定义收集器,中间结果是个 set 而返回的是个 map,也就是中间结果和最终最终结果是不一致的。这个时候,finisher 函数就会排上用场了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
public class MySetCollector2<T> implements Collector<T, Set<T>, Map<T,T>> {
@Override
public Supplier<Set<T>> supplier() {
System.out.println("supplier invoked!!!!!");
return HashSet::new;
}
@Override
public BiConsumer<Set<T>, T> accumulator() {
System.out.println("accumulator invoked!!!!!");
return (set, item) -> {
System.out.println(set);
System.out.println("threadName: " + Thread.currentThread().getName());
set.add(item);
};
// return Set<T>::add;
}
@Override
public BinaryOperator<Set<T>> combiner() {
System.out.println("combiner invoked!!!!!");
return (set1, set2) -> {
set1.addAll(set2);
return set1;
};
}
@Override
public Function<Set<T>, Map<T, T>> finisher() {
System.out.println("finisher invoked!!!!");
return set -> {
Map<T, T> map = new HashMap<>();
set.stream().forEach(item -> map.put(item, item));
return map;
};
}
/**
* 当有 Characteristics.CONCURRENT 意味着如果有并发,则是多个线程操作一个集合。
* 这个时候 accumulator 如果有 遍历操作,就有可能会抛出 ConcurrentModificationException
* @return
*/
@Override
public Set<Characteristics> characteristics() {
System.out.println("characteristics invoked!!!!");
return Collections.unmodifiableSet(EnumSet.of(Characteristics.UNORDERED, Characteristics.CONCURRENT));
}
public static void main(String[] args) {
for(int i = 0; i < 100; i ++) {
List<String> words = Arrays.asList("hello", "word", "helloword", "hello", "a", "b", "c", "d", "e");
Set<String> set = new HashSet<>();
set.addAll(words);
Map<String, String> map = set.parallelStream().collect(new MySetCollector2<>());
System.out.println(map);
}
}
}

这里由于我输出与中间结果类型并不一致,所以如果我在 characteristics() 方法中,依旧使用 IDENTITY_FINISH ,也就是进行类型强转,则必然报错,类型转换异常。大家可以自己试一下。

3 并发与并行的区别-并行时候的一些坑

这里,有另外一个很要命的坑。

在说这个概念之前,先说一下收集器中 parallelconcurrent 的区别

  • parallel并行,会将产生多个集合,多个线程操作,最后合并,也就是会调用 combiner() 方法
  • concurrent并发, 是多个线程对同一个集合进行操作,首先 combiner() 函数不会被执行,而同时,如果你在 accumulator() 函数中进行累加操作,又进行遍历操作,就会抛出并发异常。

用例子说话,首先我们必须采用 并行流 , 在 accumulator() 函数中遍历 set, 同时将 characteristics() 函数中加入 CONCURRENT 这个属性,你执行100次,基本都会抛出异常

14914633115993.jpg-397.3kB

原因很简答也比较复杂: 程序采用了并行流 set.parallelStream(),同时设置了 CONCURRENT 属性;也就是说,多个线程操作同一个集合。

14914634695900.jpg-105.6kB

而在 accumulator() 函数中同时遍历了集合,也修改了集合。由于是多线程操作,很大的概率就会发生你一边遍历集合,一遍修改,就会报错。顺被大家可看看 ConcurrentModificationExceptionjavadoc.

14914635854475.jpg-77.1kB

而如果,你不添加 CONCURRENT 这个属性,就不会报错。因为会生成多个中间集合。这一点,我们可以改造 supplier() 方法,在其中答应一条语句,通过打印了几条语句,就可以看出产生了几个集合。

当然了,如果你在 accumulator() 中不去有那个遍历的操作,也是即使你添加了 CONCURRENT 属性,也是不会报错的。因为你没有并发的即遍历又修改一个集合

14914638813587.jpg-75.7kB

而一般产生多少集合,由产生多少个线程决定,而产生多少线程一般由机器经过 超线程 技术之后有多少个处理器而确定的。当然也是可以修改的,但是一般没有必要。

1
2
// 查看有多少处理器
Runtime.getRuntime().availableProcessors();
朱老师&敏哥 wechat
有惊喜,朋友🙄
我要拿铁不加糖.