> 文章列表 > Flink Stream 处理数据倾斜

Flink Stream 处理数据倾斜

Flink Stream 处理数据倾斜

数据倾斜的场景

  1. 在数据源发生的数据倾斜。例如,Kafka 的分区,有的分区数据量特别的少,有的特别的多,这样在消费数据后,各个 subtask 拿到的数据量就有了差异。
  2. 在 keyBy 之后,产生的数据倾斜。例如,wordcount 的场景中,可能有的单词特别的多,有的特别的少,那么就造成 keyBy 之后的聚合算子中,有的接收到的数据特表的大,有的特别的少。

如何处理数据倾斜

数据源造成的倾斜

Flink 为我们提供了重分区的 8 个算子,shuffle、rebalance、rescale、broadcast、global、forward、keyBy、partitionCustom ,我们可以使用 shuffle、rebalance、rescale 三个算子,做重分区。它们的功能是:

  1. shuffle 是 random().nextInt()%parallelism 随机的分发到下游的算子。
  2. rebalance 是 nextPartition = (nextPartition + 1)%numberOfChannel
  3. rescale 是 nextPartition = if ++nextPartition > numberOfChannel then 0 else nextPartition
    从上面的三个分区算法的查看,他们都能解决数据倾斜的问题。通过这三个算子之后,根据后续算子的并发度了重新分区,而且各个分区中的数据量是相同的。例如,在算子中,我们只有转化类型的算子,并没有分组聚合的需求,此时就可以使用这三个算子来解决问题。

keyBy 造成的倾斜

keyBy 造成的倾斜,通常的做法是 combiner 的做法,做本地预聚合,减少 keyBy 之后的数据量。具体的思路是,通过给每条数据指定一个随机值,然后分发到不同分区,这样相同的 word 会均匀的分发到分区中,然后使用算子来做第一次聚合,最后使用 keyBy + 聚合算子做第二次聚合。实际的实现有两种。

第一种,通过 shuffle、rebalance、rescale 实现将 word 随机落到分区中,然后可以使用 flatMap 将分区中的 word 做第一次聚合,最后使用 keyBy + 聚合算子做第二次聚合。

第二种, Tuple2(word,1) 转换为 Tuple3(word, UUID%10 , 1) ,然后对 uuid%10 做 keyBy ,这样也可以实现第一次随机聚合的步骤。第二次聚合和第一种实现方式相同。

第一种实现方式的代码:

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStream<Tuple3<String, String, Integer>> src = env.socketTextStream("127.0.0.1", 6666).flatMap(new RichFlatMapFunction<String, Tuple3<String, String, Integer>>() {@Overridepublic void flatMap(String s, Collector<Tuple3<String, String, Integer>> collector) throws Exception {Arrays.stream(s.split("\\\\s+")).forEach(x -> {collector.collect(new Tuple3<String, String, Integer>(x, UUID.randomUUID().toString(), 1));});}}).setParallelism(1);src.rebalance().flatMap(new LocalCombiner()).keyBy(new KeySelector<Tuple2<String,Integer> , String>(){@Overridepublic String getKey(Tuple2<String, Integer> record) throws Exception {return record.f0;}}).sum(1).print("--------");env.execute();

第二种实现方式:

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();SingleOutputStreamOperator<Tuple2<String, Integer>> firstPhase = env.socketTextStream("127.0.0.1", 6666).flatMap(new RichFlatMapFunction<String, Tuple3<String, String, Integer>>() {@Overridepublic void flatMap(String s, Collector<Tuple3<String, String, Integer>> collector) throws Exception {Arrays.stream(s.split("\\\\s+")).forEach(x -> {collector.collect(new Tuple3<String, String, Integer>(x, UUID.randomUUID().toString(), 1));});}}).keyBy(new KeySelector<Tuple3<String, String, Integer>, Integer>() {@Overridepublic Integer getKey(Tuple3<String, String, Integer> t3) throws Exception {return t3.f1.hashCode() % 10;}}).timeWindow(Time.seconds(10)).process(new ProcessWindowFunction<Tuple3<String, String, Integer>, Tuple2<String, Integer>, Integer, TimeWindow>() {@Overridepublic void process(Integer integer, ProcessWindowFunction<Tuple3<String, String, Integer>, Tuple2<String, Integer>, Integer, TimeWindow>.Context context, Iterable<Tuple3<String, String, Integer>> iterable, Collector<Tuple2<String, Integer>> collector) throws Exception {Map<String, Integer> wordCnt = new HashMap<>();iterable.forEach(x -> {wordCnt.computeIfPresent(x.f0, (k, oldValue) -> oldValue + 1);wordCnt.computeIfAbsent(x.f0, k -> 1);});wordCnt.entrySet().forEach(x -> {collector.collect(new Tuple2<String, Integer>(x.getKey(), x.getValue()));});}});firstPhase.keyBy(new KeySelector<Tuple2<String,Integer> , String>(){@Overridepublic String getKey(Tuple2<String, Integer> record) throws Exception {return record.f0;}}).sum(1).print("--------");env.execute();