> 文章列表 > Flink-CountWindow/CountWindowAll

Flink-CountWindow/CountWindowAll

Flink-CountWindow/CountWindowAll

在这里我们已经知道这两者之间的区别,本文将用代码和控制台打印的方式演示二者

CountWindow

CountWindow是基于key的窗口,所以必须在keyBy方法之后才能调用,再演示之前,我们先建立两个类

public class WordOnce {/*表示输入的一个单词/private String word;/*表示这个单词出现的次数,默认是1,注意1表示当前值(累加),也就是说本次又出现了一次,至于之前* 已经出现过多少次,由flink赋值,反正我们自己new WordOnce()的时候,表示的是该单词又出现了* 一次,所以此处默认是1/private int times=1;@Overridepublic String toString() {return "WordWithCount{" +"word='" + word + '\\'' +", times=" + times +'}';}// 省略get/set方法,但是并不影响我认为lombok是垃圾
}
public class StreamingFlatFunction implements FlatMapFunction<String, WordOnce> {@Overridepublic void flatMap(String value, Collector<WordOnce> out) throws Exception {WordOnce wo = new WordOnce();wo.setWord(value);out.collect(wo);}
}

(1)countWindow(long size)
该方法属于滚动窗口(TumblingWindow),countWindow(2)表示相同的key攒满两条数据之后,再对这两条数据进行计算,下面的代码表示nc -lp命令输入两次yc之后,控制台才打印,而输入一次yc是不会打印的

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<String> text = env.socketTextStream("localhost", 9000);DataStream<WordOnce> windowCounts = text.flatMap(new StreamingFlatFunction()).keyBy("word").countWindow(2).sum("times");windowCounts.print().setParallelism(1);env.execute("Flink Streaming Java API Skeleton");
}

下面的内容分别是使用nc输入的命令和控制台打印的内容

C:\\Users\\admin>nc -lp 9000
yc
yc
wh
yc
yc

WordOnce{word=‘yc’, times=2}
WordOnce{word=‘yc’, times=2}
// 并没有打印wh,因为wh只出现1次,只有出现2次的时候才打印,因为countWindow(2)

(2)countWindow(long size, long slide)
该方法属于滑动窗口(SlidingWindow),countWindow(3,2)表示相同的key攒满两条数据之后,才会对最近的3条数据进行计算,注意,这3条数据中是包含这攒满的这两条的,下面的代码表示nc -lp命令输入两次yc之后,控制台才打印,而输入一次yc是不会打印的
NOTE:其实这个例子并不能很好的演示滑动count窗口,目前我并没有想到什么好的例子来演示基于key的窗口,不过下文中的CountWindowAll更好的解释了滑动count窗口,虽然CountWindowAll并不是基于key的,但并不影响对滑动count的解释

public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<String> text = env.socketTextStream("localhost", 9000);DataStream<WordOnce> windowCounts = text.flatMap(new StreamingFlatFunction()).keyBy("word").countWindow(3,2)// 与示例1相比,只有本行有所改变,其他地方相同.sum("times");windowCounts.print().setParallelism(1);env.execute("Flink Streaming Java API Skeleton");
}

下面的两段代码分别是使用nc输入的命令和控制台打印的内容

C:\\Users\\admin>nc -lp 9000
a
b
c
d
a<--------这个a输入之后,才会触发控制台打印,因为这个a此时出现2次

WordOnce{word=‘a’, times=2}

CountWindowAll

CountWindowAll并不是基于key的窗口,所以可以在keyBy方法之前就可以使用,在演示之前,我们先建立两个类

public class MyACC {private List<String> list = new ArrayList<>();public List<String> getList() {return list;}public void add(String value) {list.add(value);}
}
public class MyAggregateFunction implements AggregateFunction<String,MyACC,Integer> {@Overridepublic MyACC createAccumulator() {// 每次开始统计一个窗口内的数据的时候,就会调用一次该方法System.out.println("createAccumulator");return new MyACC();}// 每次来数据的时候,都会调用一次该方法// 假设在一个窗口周期之内来了三次数据,那么就会调用三次该方法@Overridepublic MyACC add(String value, MyACC accumulator) {System.out.println("add:"+value);accumulator.add(value);return accumulator;}// 在getResult之前会调用一次这个方法,将多个slot中的ACC相加到一起,// 准备用来统计@Overridepublic Integer getResult(MyACC accumulator) {System.out.println("getResult");return accumulator.getList().size();}// 每次即将统计的时候,调用该方法,因为该方法就是用来统计@Overridepublic MyACC merge(MyACC a, MyACC b) {System.out.println("merge");for (String s : b.getList()) {a.add(s);}return a;}
}

(1)countWindowAll(long size, long slide)
该方法属于滑动窗口(SlidingWindow),下面的例子中,countWindowAll(3,2)表示攒满两条数据之后,才会对最近的3条数据进行计算

public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<String> text = env.socketTextStream("localhost", 9000);text.flatMap(new StreamingFlatFunction()).countWindowAll(3,2).aggregate(new MyAggregateFunction());env.execute("Flink Streaming Java API Skeleton");
}

输入

C:\\Users\\admin>nc -lp 9000
a
b

控制台打印

createAccumulator
add:a
add:b
getResult

继续输入字母c和字母d

C:\\Users\\admin>nc -lp 9000
a<----这是刚才输入的
b<----这是刚才输入的
c<----这才是本次输入的
d<----这才是本次输入的

控制台打印

createAccumulator
add:b
add:c
add:d
getResult

(2)countWindowAll(long size)
同countWindow(long size),演示效果是一样的,不再赘述