> 文章列表 > Flink-Scala版学习——转换算子

Flink-Scala版学习——转换算子

Flink-Scala版学习——转换算子

目录

一、基本转换算子

1.map

2.filter

3.flatMap

3.聚合算子Aggregation

(1)keyBy

(2)简单聚合:sum、min、minBy、max、maxBy

(3)归约聚合:reduce

二、UDF

三、富函数类

四、物理分区

1.随机分区(shuffle)

2. 轮询分区(Round-Robin)  

 3. 重缩放分区(rescale)

4.广播(broadcast)——不常用

5.全局分区(global)——不常用

6. 自定义分区(Custom)


一、基本转换算子

1.map

package com.atguigu.transform
import com.atguigu.chapter05.Event
import org.apache.flink.api.common.functions.MapFunction
import org.apache.flink.streaming.api.scala._
object MapTest {def main(args: Array[String]): Unit = {val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironmentenv.setParallelism(1)val stream: DataStream[Event] = env.fromElements(Event("Mary", "./home", 1000L),Event("zhangsan", "./cart", 2000L))// TODO 提取每次点击事件的用户名// 1.使用匿名函数stream.map(_.user).print("1:")// 2.也可以实现MapFunction接口,需要定义一个类stream.map(new UserExtractor).print("2:")env.execute()}class UserExtractor extends MapFunction[Event,String] {override def map(t: Event): String = {t.user}}
}// 结果:
1:> Mary
2:> Mary
1:> zhangsan
2:> zhangsan

2.filter

import com.atguigu.chapter05.Event
import org.apache.flink.api.common.functions.FilterFunction
import org.apache.flink.streaming.api.scala._object FilterTest {def main(args: Array[String]): Unit = {val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironmentenv.setParallelism(1)val stream: DataStream[Event] = env.fromElements(Event("Mary", "./home", 1000L),Event("zhangsan", "./cart", 2000L))// TODO 过滤出用户为Marry的点击事件// 1.使用匿名函数stream.filter(_.user == "Mary").print("1:")// 2.也可以实现FilterFunction接口,需要定义一个类stream.filter(new UserFilter).print("2:")env.execute()}class UserFilter extends FilterFunction[Event] {override def filter(t: Event): Boolean = t.user == "zhangsan"}
}// 结果:
1:> Event(Mary,./home,1000)
2:> Event(zhangsan,./cart,2000)

3.flatMap

import com.atguigu.chapter05.Event
import org.apache.flink.api.common.functions.FlatMapFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collectorobject FlatMapTest {def main(args: Array[String]): Unit = {val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironmentenv.setParallelism(1)val stream: DataStream[Event] = env.fromElements(Event("Mary", "./home", 1000L),Event("zhangsan", "./cart", 2000L),Event("lisi", "./cart", 3000L))// TODO 找出用户Mary的点击记录val value: DataStream[Event] = stream.flatMap(new FlatMapFunction[Event, Event] {override def flatMap(t: Event, collector: Collector[Event]): Unit = {if (t.user.equals("Mary")) {collector.collect(t)} else if (t.user.equals("zhangsan")) {collector.collect(t)}}})value.print("1:")// 结果:1:> Event(Mary,./home,1000)1:> Event(zhangsan,./cart,2000)// TODO 采用自定义类实现stream.flatMap(new MyFlatMap).print("2:")// 结果:2:> Mary2:> zhangsan2:> ./cartenv.execute()}// 自定义实现FlatMapFunctionclass MyFlatMap extends FlatMapFunction[Event, String] {override def flatMap(t: Event, collector: Collector[String]): Unit = {// TODO 如果当前数据是Mary的点击事件,那么就直接输出Userif (t.user == "Mary") {collector.collect(t.user)} else if (t.user == "zhangsan") {collector.collect(t.user)collector.collect(t.url)}}}
}

3.聚合算子Aggregation

(1)keyBy

import com.atguigu.chapter05.Event
import org.apache.flink.api.common.functions.FlatMapFunction
import org.apache.flink.api.java.functions.KeySelector
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collectorobject KayByTest {def main(args: Array[String]): Unit = {val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironmentenv.setParallelism(4)val stream: DataStream[Event] = env.fromElements(Event("Mary", "./home", 1000L),Event("wangwu", "./home", 1000L),Event("zhaoliu", "./home", 1000L),Event("xiaoming", "./home", 1000L),Event("Mary", "./home", 1000L),Event("zhangsan", "./cart", 2000L),Event("zhangsan", "./home", 2000L),Event("lisi", "./cart", 3000L))// TODO 指定 Event 的 user 属性作为 key// 方法一:stream.keyBy(_.user).print()// 方法二:stream.keyBy(new MyKeySelector).print()// 结果:相同的key放在同一个分区3> Event(wangwu,./home,1000)1> Event(xiaoming,./home,1000)4> Event(lisi,./cart,3000)2> Event(Mary,./home,1000)3> Event(zhangsan,./cart,2000)2> Event(zhaoliu,./home,1000)3> Event(zhangsan,./home,2000)2> Event(Mary,./home,1000)env.execute()}class MyKeySelector extends KeySelector[Event, String] {override def getKey(in: Event): String = in.user}
}

(2)简单聚合:sum、min、minBy、max、maxBy

import com.atguigu.chapter05.Event
import org.apache.flink.api.common.functions.FlatMapFunction
import org.apache.flink.api.java.functions.KeySelector
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collectorobject KayByTest {def main(args: Array[String]): Unit = {val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironmentenv.setParallelism(1)val stream: DataStream[Event] = env.fromElements(Event("Mary", "./home", 9000L),Event("wangwu", "./id", 8000L),Event("zhaoliu", "./home", 6000L),Event("xiaoming", "./home", 1000L),Event("Mary", "./cart", 10000L),Event("zhangsan", "./cart", 2000L),Event("zhangsan", "./home", 8000L),Event("zhangsan", "./home", 6000L),Event("lisi", "./cart", 3000L))// TODO 指定 Event 的 user 属性作为 key// TODO 简单聚合// TODO sum():对值按key进行累加stream.keyBy(_.user).sum("timestamp").print()/*Event(Mary,./home,9000)Event(wangwu,./home,8000)Event(zhaoliu,./home,6000)Event(xiaoming,./home,1000)Event(Mary,./home,10000)Event(zhangsan,./cart,8000)Event(zhangsan,./cart,10000)Event(lisi,./cart,3000)*/// TODO minBy()则会返回包含字段最小值的整条数据。stream.keyBy(_.user).minBy("timestamp").print()/*Event(Mary,./home,9000)Event(wangwu,./id,8000)Event(zhaoliu,./home,6000)Event(xiaoming,./home,1000)Event(Mary,./home,9000)Event(zhangsan,./cart,2000)Event(zhangsan,./cart,2000)Event(zhangsan,./cart,2000)Event(lisi,./cart,3000)*/// TODO min()只计算指定字段的最小值,其他字段会保留最初第一个数据的值stream.keyBy(_.user).min("timestamp").print()/*Event(Mary,./home,9000)Event(wangwu,./id,8000)Event(zhaoliu,./home,6000)Event(xiaoming,./home,1000)Event(Mary,./home,9000)Event(zhangsan,./cart,2000)Event(zhangsan,./cart,2000)Event(zhangsan,./cart,2000)Event(lisi,./cart,3000)*/// TODO maxBy:则会返回包含字段最大值的整条数据。stream.keyBy(_.user).maxBy("timestamp").print()/*  Event(Mary,./home,9000)Event(wangwu,./id,8000)Event(zhaoliu,./home,6000)Event(xiaoming,./home,1000)Event(Mary,./cart,10000)Event(zhangsan,./cart,2000)Event(zhangsan,./home,8000)Event(zhangsan,./home,8000)Event(lisi,./cart,3000)*/// TODO max:只计算指定字段的最大值,其他字段会保留最初第一个数据的值stream.keyBy(_.user).max("timestamp").print()// 结果出现了两次Event(Mary,./home,9000)说明第一次读取的数据是最大值,第二次读取的数据比前一次小,就返回前一次读取的内容// 并且会返回的内容会被更新/*Event(Mary,./home,9000)Event(wangwu,./id,8000)Event(zhaoliu,./home,6000)Event(xiaoming,./home,1000)Event(Mary,./home,10000)Event(zhangsan,./cart,2000)Event(zhangsan,./cart,8000)Event(zhangsan,./cart,8000)Event(lisi,./cart,3000)*/env.execute()}class MyKeySelector extends KeySelector[Event, String] {override def getKey(in: Event): String = in.user}
}

(3)归约聚合:reduce

import com.atguigu.chapter05.Event
import org.apache.flink.api.java.functions.KeySelector
import org.apache.flink.streaming.api.scala._object ReduceTest {def main(args: Array[String]): Unit = {val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironmentenv.setParallelism(1)val stream: DataStream[Event] = env.fromElements(Event("Mary", "./home", 9000L),Event("zhaoliu", "./home", 6000L),Event("Mary", "./cart", 10000L),Event("zhangsan", "./cart", 2000L),Event("zhangsan", "./home", 8000L),Event("zhangsan", "./home", 6000L),)// TODO reduce归约聚合,求最活跃的用户stream.map(x=>(x.user, 1L))// 长整型统计.keyBy(_._1).reduce((x, y) => (x._1, x._2 + y._2)).keyBy(_ => true) // 将所有数据按照同样的key分到同一个组中.reduce((x, y) => if (x._2 > y._2) x else y) // 选取当前最活跃的用户.print()/* (Mary,1)(zhaoliu,1)(Mary,2)(Mary,2)(zhangsan,2)(zhangsan,3)*/env.execute()}
}

二、UDF

        对于大部分操作而言,都需要传入一个用户自定义函数(UDF),实现相关操作的接口,来完成处理逻辑的定义。Flink 暴露了所有 UDF 函数的接口,具体实现方式为接口或者抽象类,例如 MapFunction、FilterFunction、ReduceFunction 等。 所以最简单直接的方式,就是自定义一个函数类,实现对应的接口。

import com.atguigu.chapter05.Event
import org.apache.flink.api.common.functions.FilterFunction
import org.apache.flink.streaming.api.scala._object UdfTest {def main(args: Array[String]): Unit = {val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironmentenv.setParallelism(1)val stream: DataStream[Event] = env.fromElements(Event("Mary", "./home", 9000L),Event("zhaoliu", "./home", 6000L),Event("Mary", "./cart", 10000L),Event("zhangsan", "./cart", 2000L),Event("zhangsan", "./home", 8000L),Event("zhangsan", "./home", 6000L))// TODO 测试UDF的用法,筛选url中包含某个关键字home的Event事件// TODO 方法一:实现一个自定义的函数类stream.filter(new MyFilterFunction).print()// TODO 方法二:stream.filter(new FilterFunction[Event] {override def filter(t: Event): Boolean = t.url.contains("home")}).print()// TODO 方法三:stream.filter(new MyFilterFunction2("home")).print()env.execute()}class MyFilterFunction() extends FilterFunction[Event] {override def filter(t: Event): Boolean = t.url.contains("home")}class MyFilterFunction2(url: String) extends FilterFunction[Event] {override def filter(t: Event): Boolean = t.url.contains(url)}
}

三、富函数类

import com.atguigu.chapter05.Event
import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.scala._object RichFunctionTest {def main(args: Array[String]): Unit = {val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
//    env.setParallelism(1)env.setParallelism(2)val stream: DataStream[Event] = env.fromElements(Event("Mary", "./home", 9000L),Event("zhaoliu", "./home", 6000L),Event("Mary", "./cart", 10000L),Event("zhangsan", "./cart", 2000L),Event("zhangsan", "./home", 8000L),Event("zhangsan", "./home", 6000L),)// TODO 自定义一个RichMapFunction,测试富函数类的功能stream.map(new MyRichMapFunction).print()// 当并行度为1时:/*索引号为:0的任务开始9000600010000200080006000索引号为:0的任务结束*/// 当并行度为2时:/*索引号为:0的任务开始索引号为:1的任务开始2> 90002> 100001> 60002> 80001> 20001> 6000索引号为:1的任务结束索引号为:0的任务结束*/env.execute()}class MyRichMapFunction extends RichMapFunction[Event, Long] {override def open(parameters: Configuration): Unit = {println("索引号为:" + getRuntimeContext.getIndexOfThisSubtask + "的任务开始")}override def close(): Unit = {println("索引号为:" + getRuntimeContext.getIndexOfThisSubtask + "的任务结束")}override def map(in: Event): Long = in.timestamp}
}

四、物理分区

        keyBy()操作就是一种按照键的哈希值来进行重新分区的操作。只不过这种分区操作只能保证把数据按key“分开”,至于分得均不均匀、每个key的数据具体会分到哪一区去,这些是完全无从控制的,这时,为了避免出现上述这种数据倾斜的现象,我们就要手动地对数据进行物理分区。

        所谓物理分区就是人为控制分区策略,精准地调配数据,告诉每个数据到底去哪里,重新进行负载均衡,将数据流较为平均地发送到下游任务操作分区中去。

1.随机分区(shuffle)

        最简单的重分区方式就是直接“洗牌”。通过调用 DataStream 的 shuffle()方法,将数据随机地分配到下游算子的并行任务中去。 随机分区服从均匀分布(uniform distribution),所以可以把流中的数据随机打乱,均匀地传递到下游任务分区。

shuffle实现:

import com.atguigu.chapter05.{ClickSource, Event}
import org.apache.flink.streaming.api.scala._object ShuffleTest {def main(args: Array[String]): Unit = {val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironmentenv.setParallelism(1)// TODO 读取自定义的数据源val stream: DataStream[Event] = env.addSource(new ClickSource)// TODO 洗牌之后打印输出stream.shuffle.print().setParallelism(4)env.execute()}
}2> Event(Cary,./fav,1681456335725)
2> Event(Bob,./cart,1681456336737)
3> Event(Bob,./pord?id=1,1681456337746)
3> Event(Cary,./pord?id=2,1681456338754)
1> Event(Cary,./cart,1681456339760)
2> Event(Alice,./pord?id=2,1681456340767)
3> Event(Alice,./cart,1681456341773)
2> Event(Alice,./home,1681456342781)
4> Event(Marry,./home,1681456343789)
1> Event(Bob,./fav,1681456344796)
1> Event(Marry,./pord?id=1,1681456345805)

ClickSource类:

import org.apache.flink.streaming.api.functions.source.SourceFunctionimport java.util.Calendar
import scala.util.Randomclass ClickSource extends SourceFunction[Event] {// 标志位var running = trueoverride def run(sourceContext: SourceFunction.SourceContext[Event]): Unit = {// 随机数生成器val random = new Random()val users: Array[String] = Array("Marry", "Alice", "Bob", "Cary")val urls: Array[String] = Array("./home", "./cart", "./fav", "./pord?id=1", "./pord?id=2", "./pord?id=3")// 用标志位作为循环判断条件,不停地发出数据while (running) {val event: Event = Event(users(random.nextInt(users.length)), urls(random.nextInt(urls.length)), Calendar.getInstance.getTimeInMillis)// 调用sourceContext方法向下游发送数据sourceContext.collect(event)// 每隔1s发送一条数据Thread.sleep(1000)}}override def cancel(): Unit = running = false
}

2. 轮询分区(Round-Robin)  

        轮询也是一种常见的重分区方式。简单来说就是“发牌”,按照先后顺序将数据做依次分发,如图所示。通过调用DataStream的.rebalance()方法,就可以实现轮询重分区。rebalance()使用的是 Round-Robin 负载均衡算法,可以将输入流数据平均分配到下游的并行任务中去。

import com.atguigu.chapter05.{ClickSource, Event}
import org.apache.flink.streaming.api.scala._object ReblanceTest {def main(args: Array[String]): Unit = {val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironmentenv.setParallelism(1)// TODO 读取自定义的数据源val stream: DataStream[Event] = env.addSource(new ClickSource)// TODO 下面两种写法都是轮询分区stream.print().setParallelism(4)stream.rebalance.print("reblance").setParallelism(4)// 结果:reblance:1> Event(Cary,./pord?id=1,1681456987398)reblance:2> Event(Cary,./pord?id=1,1681456988408)reblance:3> Event(Cary,./home,1681456989417)reblance:4> Event(Alice,./pord?id=3,1681456990424)reblance:1> Event(Bob,./cart,1681456991437)reblance:2> Event(Cary,./pord?id=3,1681456992438)reblance:3> Event(Bob,./cart,1681456993445)reblance:4> Event(Cary,./cart,1681456994456)reblance:1> Event(Marry,./home,1681456995468)reblance:2> Event(Cary,./home,1681456996480)reblance:3> Event(Marry,./pord?id=2,1681456997488)env.execute()}
}

 3. 重缩放分区(rescale)

        重缩放分区和轮询分区非常相似。当调用 rescale()方法时,其实底层也是使用 Round-Robin算法进行轮询,但是只会将数据轮询发送到下游并行任务的一部分中。也就是说,“发牌人”如果有多个,那么 rebalance()的方式是每个发牌人都面向所有人发牌;而rescale()的做法是分成小团体,发牌人只给自己团体内的所有人轮流发牌。

package com.atguigu.partitionimport com.atguigu.chapter05.{ClickSource, Event}
import org.apache.flink.streaming.api.functions.source.{RichParallelSourceFunction, SourceFunction}
import org.apache.flink.streaming.api.scala._object RescaleTest {def main(args: Array[String]): Unit = {val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironmentenv.setParallelism(1)// TODO 读取自定义的数据源// TODO 定义2次输出,相当于2个数据源,分到4个分区中val stream: DataStream[Int] = env.addSource(new RichParallelSourceFunction[Int] {override def run(sourceContext: SourceFunction.SourceContext[Int]): Unit = {for (i <- 0 to 7) {if (getRuntimeContext.getIndexOfThisSubtask == (i + 1) % 2) {// TODO 奇数分到3和4分区;偶数分到1和2分区sourceContext.collect(i + 1)}}}override def cancel(): Unit = ???}).setParallelism(2)// TODO 轮询分区stream.rebalance.print("rescale").setParallelism(4)rescale:1> 4rescale:2> 6rescale:4> 2rescale:3> 8rescale:4> 5rescale:1> 7rescale:3> 3rescale:2> 1// TODO 重缩放分区stream.rescale.print("rescale").setParallelism(4)rescale:4> 3rescale:3> 1rescale:1> 2rescale:2> 4rescale:1> 6rescale:3> 5rescale:4> 7rescale:2> 8env.execute()}
}

注意:rebalance与rescale的区别:

4.广播(broadcast)——不常用

        经过广播之后,数据会在不同的分区都保留一份,可能进行重复处理。可以通过调用 DataStream 的 broadcast()方法,将输入数据复制并发送到下游算子的所有并行任务中去。

    // TODO 读取自定义的数据源val stream: DataStream[Event] = env.addSource(new ClickSource)// TODO 广播分区:每隔1秒一次性输出4个数据,且4个并行子任务输出的内容是一样的,特殊场合下才会使用stream.broadcast.print("broadcast").setParallelism(4)broadcast:3> Event(Cary,./pord?id=1,1681461381359)
broadcast:2> Event(Cary,./pord?id=1,1681461381359)
broadcast:1> Event(Cary,./pord?id=1,1681461381359)
broadcast:4> Event(Cary,./pord?id=1,1681461381359)
broadcast:1> Event(Alice,./fav,1681461382364)
broadcast:2> Event(Alice,./fav,1681461382364)
broadcast:3> Event(Alice,./fav,1681461382364)
broadcast:4> Event(Alice,./fav,1681461382364)

5.全局分区(global)——不常用

        全局分区也是一种特殊的分区方式。这种做法非常极端,通过调用.global()方法,会将所有的输入流数据都发送到下游算子的第一个并行子任务中去。这就相当于强行让下游任务并行度变成了 1,所以使用这个操作需要非常谨慎,可能对程序造成很大的压力。

// TODO 读取自定义的数据源val stream: DataStream[Event] = env.addSource(new ClickSource)// TODO 全局分区:每一次只生成一条数据,而且每一条数据都在第1个分区stream.global.print().setParallelism(4)1> Event(Cary,./home,1681461414206)
1> Event(Bob,./home,1681461415222)
1> Event(Alice,./fav,1681461416225)
1> Event(Alice,./home,1681461417236)
1> Event(Bob,./fav,1681461418242)

6. 自定义分区(Custom)

        当Flink提供的所有分区策略都不能满足用户的需求时,我们可以通过使用partitionCustom()方法来自定义分区策略。

        在调用时,方法需要传入两个参数,第一个是自定义分区器(Partitioner)对象,第二个是应用分区器的字段,它的指定方式与keyBy指定key基本一样:可以通过字段名称指定,也可以通过字段位置索引来指定,还可以实现一个KeySelector接口。

import org.apache.flink.api.common.functions.Partitioner
import org.apache.flink.streaming.api.scala._object CustomTest {def main(args: Array[String]): Unit = {val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironmentenv.setParallelism(1)// TODO 读取自定义的数据源val stream: DataStream[Int] = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8)val ds: DataStream[Int] = stream.partitionCustom(new Partitioner[Int] {override def partition(k: Int, i: Int): Int = {k % 2}}, x => x)ds.print("partitionCustom").setParallelism(4)// 结果:partitionCustom:1> 2partitionCustom:2> 1partitionCustom:1> 4partitionCustom:2> 3partitionCustom:1> 6partitionCustom:2> 5partitionCustom:1> 8partitionCustom:2> 7env.execute()}
}