flink中watermaker的理解
目录
一,watermaker的概念
1,什么是watermaker
2,watermaker的例子
二,代码演习
1,简易版本
2,自定义watermaker类
一,watermaker的概念
1,什么是watermaker
watermaker又称水位线,它是一个带时间戳的DataStream,是为了解决实时计算中的数据乱序问题,以窗口延迟来兼容迟到的数据,当水位线大于窗口时间时触发窗口执行。
watermaker的计算方式是:
watermaker=当前窗口中最大的eventTime-延迟时间
watermaker的前提是使用事件时间以及窗口。
flink中的时间分成三类:
事件时间(EventTime): 事件真真正正发生产生的时间
摄入时间(IngestionTime): 事件到达Flink的时间
处理时间(ProcessingTime): 事件真正被处理/计算的时间
EventTime事件时间是最为重要的,因为只要事件时间一产生就不会变化,事件时间更能反映事件的本质
2,watermaker的例子
在时间窗口10:00:00-10:10:00中有6条数据依次到达。
a 10:01:00
b 10:11:00
c 10:08:00
d 10:14:00
e 10:15:00
f 10:09:00
abcdef依次到达。最大延迟时间是5分钟。对于10:00:00-10:10:00的数据统计。
没有Watemaker时
a到达 会正常统计
b到达 触发窗口,认为已经到达统计时间了,只有a会被统计,后面的都不会被统计。比如c到达时统计的数据窗口已经变成[10:10:01-10:20:00]了,数据不符合就被丢弃了。
有Watemaker,且最大延迟时间是5分钟时
a到达 Watermaker=max(10:01:00)-5 =09:06:00 < 窗口结束时间10:10:00 – 不触发条件
b到达 Watermaker=max(10:11:00)-5 =10:06:00 < 窗口结束时间10:10:00 – 不触发条件
c到达 Watermaker=max(10:08:00)-5 =10:03:00 < 窗口结束时间10:10:00 – 不触发条件
d到达 Watermaker=max(10:14:00)-5 =10:09:00 < 窗口结束时间10:10:00 – 不触发条件
e到达 Watermaker=max(10:15:00)-5 =10:10:00 = 窗口结束时间10:10:00 – 满足触发条件,窗口开始计算那么a,b,c会被统计进窗口[10:00:00 10:10:00].
Watermaker机制可以在一定程度上解决数据乱序后延迟到达问题,但是更严重的还是无法解决。比如f数据到达时窗口已经计算完毕,所以f数据还是会丢失。
二,代码演习
Event Time的使用通常要指定数据源中的时间戳:
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
需要的依赖:
<dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.10</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_2.11</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>${kafka.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-statebackend-rocksdb_2.11</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.11</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-scala_2.11</artifactId><version>${flink.version}</version><scope>provided</scope><exclusions><exclusion><groupId>com.google.code.findbugs</groupId><artifactId>jsr305</artifactId></exclusion></exclusions></dependency>
1,简易版本
构造一个订单数据来模拟实时数据,然后统计相同用户在5秒内的消费情况。
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;import java.time.Duration;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.TimeUnit;public class check_waterMaker1 {public static void main(String[] args) throws Exception {//1.envStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);env.getConfig().setAutoWatermarkInterval(100);env.getCheckpointConfig().setCheckpointTimeout(60 * 60 * 1000);env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);//2.Source//模拟实时订单数据(数据有延迟和乱序)DataStream<Order> orderDS = env.addSource(new SourceFunction<Order>() {private boolean flag = true;@Overridepublic void run(SourceContext<Order> ctx) throws Exception {Random random = new Random();while (flag) {String orderId = UUID.randomUUID().toString();int userId = random.nextInt(3);int money = random.nextInt(100);//模拟数据延迟和乱序!long eventTime = System.currentTimeMillis() - random.nextInt(5) * 1000;ctx.collect(new Order(orderId, userId, money, eventTime));System.out.println("orderId:"+orderId+"\\tuserId:"+userId+"\\tmoney:"+money+"\\teventTime:"+eventTime);TimeUnit.SECONDS.sleep(1);}}@Overridepublic void cancel() {flag = false;}}).setParallelism(1);DataStream<Order> watermakerDS = orderDS.assignTimestampsAndWatermarks(WatermarkStrategy.<Order>forBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner((event, timestamp) -> event.getEventTime())).setParallelism(1);//代码走到这里,就已经被添加上Watermaker了!接下来就可以进行窗口计算了//要求每隔5s,计算5秒内(基于时间的滚动窗口),每个用户的订单总金额SingleOutputStreamOperator<Order> money = watermakerDS.keyBy(Order_t -> Order_t.userId).window(TumblingEventTimeWindows.of(Time.seconds(5))).sum("money").setParallelism(1);money.print();//5.executeenv.execute();}@Data@AllArgsConstructor@NoArgsConstructorpublic static class Order {public String orderId;public Integer userId;public Integer money;public Long eventTime;public Order(String orderId, int userId, int money, long eventTime) {this.orderId = orderId;this.userId = userId;this.money = money;this.eventTime = eventTime;}public long getEventTime() {return this.eventTime;}public static <Integer> Integer getUserId(Order order) {return (Integer) order.userId;}}}
构造实时数据时需要引用lombok包。
2,自定义watermaker类
如下:
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.commons.lang.time.FastDateFormat;
import org.apache.flink.api.common.eventtime.*;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;import java.util.*;
import java.util.concurrent.TimeUnit;public class check_waterMaker2 {public static void main(String[] args) throws Exception {FastDateFormat df = FastDateFormat.getInstance("HH:mm:ss");//1.envStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);// 提交水位线的周期,将之设置成1秒下面orderDS.assignTimestampsAndWatermarks中的输出就是一条一次,设置成100就是一条十次。env.getConfig().setAutoWatermarkInterval(1000);env.setParallelism(1);
// env.enableCheckpointing(checkpointInteval); //把这个注释掉就不会有offset上传了。env.getCheckpointConfig().setCheckpointTimeout(60 * 60 * 1000);env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);//2.Source//模拟实时订单数据(数据有延迟和乱序)DataStreamSource<Order> orderDS = env.addSource(new SourceFunction<Order>() {private boolean flag = true;@Overridepublic void run(SourceContext<Order> ctx) throws Exception {Random random = new Random();while (flag) {String orderId = UUID.randomUUID().toString();int userId = random.nextInt(3);int money = random.nextInt(100);//模拟数据延迟和乱序!long eventTime = System.currentTimeMillis() - random.nextInt(5) * 1000;System.out.println("发送的数据为: " + orderId + " \\t userId:"+userId+"\\t"+money+"\\t"+ df.format(eventTime));ctx.collect(new Order(orderId, userId, money, eventTime));TimeUnit.SECONDS.sleep(1);}}@Overridepublic void cancel() {flag = false;}});// orderDS.print();DataStream<Order> watermakerDS = orderDS.assignTimestampsAndWatermarks(new WatermarkStrategy<Order>() {@Overridepublic WatermarkGenerator<Order> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {return new WatermarkGenerator<Order>() {private int userId = 0;private long eventTime = 0L;private final long outOfOrdernessMillis = 3000;private long maxTimestamp = Long.MIN_VALUE + outOfOrdernessMillis + 1;@Overridepublic void onEvent(Order event, long eventTimestamp, WatermarkOutput output) {userId = event.userId;eventTime = event.eventTime;maxTimestamp = Math.max(maxTimestamp, eventTimestamp);}@Overridepublic void onPeriodicEmit(WatermarkOutput output) {//Watermaker = 当前最大事件时间 - 最大允许的延迟时间或乱序时间Watermark watermark = new Watermark(maxTimestamp - outOfOrdernessMillis - 1);System.out.println("key:" + userId + ",系统时间:" + df.format(System.currentTimeMillis()) + ",事件时间:" + df.format(eventTime) + ",水位线时间:" + df.format(watermark.getTimestamp()));output.emitWatermark(watermark);}};}}.withTimestampAssigner((event, timestamp) -> event.getEventTime())).setParallelism(1);//学习测试时可以使用下面的代码对数据进行更详细的输出,如输出窗口触发时各个窗口中的数据的事件时间,Watermaker时间SingleOutputStreamOperator<Object> result = watermakerDS.keyBy(Order_t -> Order_t.userId).window(TumblingEventTimeWindows.of(Time.seconds(5)))//把apply中的函数应用在窗口中的数据上//WindowFunction<IN, OUT, KEY, W extends Window>.apply(new WindowFunction<Order, Object, Integer, TimeWindow>() {@Overridepublic void apply(Integer key, TimeWindow window, Iterable<Order> input, Collector<Object> out) throws Exception {//准备一个集合用来存放属于该窗口的数据的事件时间List<String> eventTimeList = new ArrayList<>();Iterator<Order> iterator = input.iterator();int sum_n = 0;while (iterator.hasNext()) {Order next = iterator.next();Long eventTime = next.eventTime;eventTimeList.add(df.format(eventTime));sum_n += next.money;}String outStr = String.format("key:%s,窗口开始结束:[%s~%s),属于该窗口的事件时间:%s,userid:%s,total_money:%d",key.toString(), df.format(window.getStart()), df.format(window.getEnd()), eventTimeList, key, sum_n);out.collect(outStr);}}).setParallelism(1);//4.Sinkresult.print();//5.executeenv.execute();}@Data@AllArgsConstructor@NoArgsConstructorpublic static class Order {public String orderId;public Integer userId;public Integer money;public Long eventTime;public Order(String orderId, int userId, int money, long eventTime) {this.orderId = orderId;this.userId = userId;this.money = money;this.eventTime = eventTime;}public long getEventTime() {return this.eventTime;}public static <Integer> Integer getUserId(Order order) {return (Integer) order.userId;}}}
参考:
Flink教程(12)- Flink高级API(Time与Watermaker)_杨林伟的博客-CSDN博客
https://www.cnblogs.com/dalianpai/p/15268363.html
Flink最佳实践 - Watermark原理及实践问题解析 - 知乎
https://zhuanlan.zhihu.com/p/364013202
2021年大数据Flink(二十三):Watermaker案例演示-云社区-华为云
Flink水位线Watermaker生产应用避坑分享 - 墨天轮