RecordWriter核心设计实现
文章目录
-
- 1.准备RecordWriter
- 2.RecordSerializer序列化
- 3.拷贝到MemorySegment
-
- 获取BufferBuilder
- 数据拷贝到MemorySegment中
StreamTask所对应的OperatorChain内的最后一个StreamOperator处理后的数据,是通过RecordWriterOutput输出到网络的,而RecordWriterOutput的底层正是借助了RecordWriter组件实现了此功能。
1.准备RecordWriter
在StreamTask的beforeInvoke阶段,构建OperatorChain时会用到RecordWriter,因为OperatorChain内的最后一个StreamOperator计算出的Intermediate Result会被RecordWriterOutput组件输出到网络中,而RecordWriterOutput底层就是借助RecordWriter实现了该功能。
/* 阶段1:准备当前Task执行所需要的环境信息*/
private void beforeInvoke() throws Exception {// 省略部分代码...// 创建OperatorChain,需要指定RecordWriter实例作为参数传入(RecordWriter用于将OperatorChain处理完的数据输出到网络、交给下一个Task实例)operatorChain = new OperatorChain<>(this, recordWriter);// 省略部分代码...
}
在StreamTask的构造方法中,会创建RecordWriter的代理类,用来输出当前StreamTask产生的Intermediate Result。
protected StreamTask(Environment environment,@Nullable TimerService timerService,Thread.UncaughtExceptionHandler uncaughtExceptionHandler,StreamTaskActionExecutor.SynchronizedStreamTaskActionExecutor actionExecutor,TaskMailbox mailbox) {super(environment);// 省略部分代码.../* 核心:创建RecordWriter(用于将OperatorChain处理完的数据输出到网络、交给下一个Task实例)的代理类* 这个StreamTask对应的OperatorChain有几个“最终输出边”,就会创建几个RecordWriter实例。根据创建出的RecordWriter的数量,* 最终确定RecordWriter组件的代理类是哪个:NonRecordWriter or SingleRecordWriter or MultipleRecordWriters*/this.recordWriter = createRecordWriterDelegate(configuration, environment);// 省略部分代码...
}
创建RecordWriter组件的代理,当前StreamTask对应的OperatorChain有几个“最终输出边”,就会创建出几个RecordWriter。根据创建得到的RecordWriter组件的数量,构建出RecordWriter代理(到底是哪个)
/* 创建RecordWriter组件的代理,它为RecordWriterOutput组件提供了向外输出的能力*/
@VisibleForTesting
public static <OUT> RecordWriterDelegate<SerializationDelegate<StreamRecord<OUT>>> createRecordWriterDelegate(StreamConfig configuration,Environment environment) {/* 核心:当前StreamTask对应的OperatorChain有几个“最终输出边”,在此就会创建几个RecordWriter*/List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> recordWrites = createRecordWriters(configuration,environment);/* 根据刚刚创建好的RecordWriter组件的数量,最终确定RecordWriter组件的代理类到底是谁?*/if (recordWrites.size() == 1) {// 如果为1,就创建SingleRecordWriter代理类;return new SingleRecordWriter<>(recordWrites.get(0));} else if (recordWrites.size() == 0) {// 如果为0,就创建NonRecordWriter代理类;return new NonRecordWriter<>();} else {// 其他情况创建MultipleRecordWriters代理类return new MultipleRecordWriters<>(recordWrites);}
}
当前StreamTask对应的OperatorChain有几个“最终输出边”,就会创建几个RecordWriter实例,并保存到List集合中
/* 当前StreamTask对应的OperatorChain有几个最终输出边,就创建几个RecordWriter实例*/
private static <OUT> List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> createRecordWriters(StreamConfig configuration,Environment environment) {// 构建空的List集合,用来存储创建的RecordWriter实例List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> recordWriters = new ArrayList<>();// 获取当前StreamTask对应的OperatorChain的所有“最终输出边”List<StreamEdge> outEdgesInOrder = configuration.getOutEdgesInOrder(environment.getUserClassLoader());// 遍历OperatorChain的所有“最终输出边”,创建RecordWriter实例for (int i = 0; i < outEdgesInOrder.size(); i++) {// 遍历到1个StreamEdgeStreamEdge edge = outEdgesInOrder.get(i);recordWriters.add(/* 核心:创建RecordWriter,并保存到List中*/createRecordWriter(edge,i,environment,environment.getTaskInfo().getTaskName(),edge.getBufferTimeout()));}return recordWriters;
}
创建RecordWriter组件的具体实现逻辑如下:
/* 创建RecordWriter组件:* RecordWriter组件利用RecordSerializer将数据元素序列化成二进制的Buffer数据,利用ResultPartitionWriter将其写入指定的ResultPartition,* 下游算子可以到ResultPartition中消费Buffer数据。(在创建OperatorChain时,会在构造方法中以参数的形式指定RecordWriter)*/
@SuppressWarnings("unchecked")
private static <OUT> RecordWriter<SerializationDelegate<StreamRecord<OUT>>> createRecordWriter(StreamEdge edge,int outputIndex,Environment environment,String taskName,long bufferTimeout) {// StreamPartitioner作为ChannelSelector,会决定将StreamRecord发送到下游InputGate的哪些InputChannel,也就是数据分发模式StreamPartitioner<OUT> outputPartitioner = null;try {/* 为了避免多个StreamEdge共用同一个StreamPartitioner,这里要克隆StreamPartitioner。* DataStream的“物理分区操作”所创建的StreamPartitioner分区策略会被应用在RecordWriter组件中。* 例如rabalance()操作就会创建RebalancePartitioner作为StreamPartitioner的实现类,并通过RebalancePartitioner选择下游InputChannel,* 实现数据元素按照指定的分区策略下发*/outputPartitioner = InstantiationUtil.clone(// 获取StreamEdge内部对应的StreamPartitioner(StreamPartitioner<OUT>) edge.getPartitioner(),environment.getUserClassLoader());} catch (Exception e) {ExceptionUtils.rethrow(e);}LOG.debug("Using partitioner {} for output {} of task {}", outputPartitioner, outputIndex, taskName);/* RecordWriter的核心组件之一,ResultPartitionWriter组件:* 它会将单个任务产生的中间结果数据(序列化后而成的二进制数据),写入到ResultPartition中。下游节点会从它消费*/ResultPartitionWriter bufferWriter = environment.getWriter(outputIndex);// 如果StreamPartitioner实现类的类型为ConfigurableStreamPartitioner,那就“加持”KeyGroupif (outputPartitioner instanceof ConfigurableStreamPartitioner) {// 从ResultPartition中获取KeyGroupint numKeyGroups = bufferWriter.getNumTargetKeyGroups();if (0 < numKeyGroups) {// 将KeyGroup的数量交给StreamPartitioner,完成StreamPartitioner的初始化((ConfigurableStreamPartitioner) outputPartitioner).configure(numKeyGroups);}}/* 核心:创建RecordWriter*/RecordWriter<SerializationDelegate<StreamRecord<OUT>>> output = new RecordWriterBuilder<SerializationDelegate<StreamRecord<OUT>>>()// StreamPartitioner作为ChannelSelector,会被保存到RecordWriter的成员变量中.setChannelSelector(outputPartitioner).setTimeout(bufferTimeout).setTaskName(taskName)// ResultPartitionWriter作为RecordWriter的核心组件之一,会被保存到RecordWriter的全局变量中。// 初始化RecordWriter组件时,会在构造方法中创建另一个核心组件-- RecordSerializer(将数据元素序列化成二进制).build(bufferWriter);// 为RecordWriter设置“监控指标的采集和输出”output.setMetricGroup(environment.getMetricGroup().getIOMetricGroup());// 返回创建好的RecordWriterreturn output;
}
构建RecordWriter组件的核心:基于建造者模式创建RecordWriter组件。本质就是将RecordWriter的核心组件之一的ResultPartitionWriter(负责写出),保存到RecordWriter的全局变量中。同时在build过程中会初始化RecordWriter的另一个核心组件–RecordSerializer(负责序列化)。
StreamPartitioner有点特殊,它会决定ChannelSelector选择下游InputGate的哪些InputChannel,也就是数据分发模式。作为RecordWriter的抽象实现子类,只有ChannelSelectorRecordWriter才需要选择,因此StreamPartitioner会保存到ChannelSelectorRecordWriter的全局变量中;反观BroadcastRecordWriter,它全都要!因此它并不需要StreamPartitioner。
基于建造者模式构造RecordWriter组件:
/* 基于建造者模式创建RecordWriter:* BroadcastRecordWriter,数据元素会被发送到下游节点的所有InputChannel中* ChannelSelectorRecordWriter,根据ChannelSelector(内部基于StreamPartitioner选择不同的分发策略)选择下游节点的InputChannel*/
public RecordWriter<T> build(ResultPartitionWriter writer) {// 根据ChannelSelector类型,对应创建具体的RecordWriter抽象子类if (selector.isBroadcast()) {// 只需要ResultPartitionWriter,不需要ChannelSelector去选择InputChannel,因为会all inreturn new BroadcastRecordWriter<>(writer, timeout, taskName);} else {// 需要ResultPartitionWriter和StreamPartitioner(决定将StreamRecord发送到下游InputGate的哪个InputChannel中)return new ChannelSelectorRecordWriter<>(writer, selector, timeout, taskName);}
}
至此,RecordWriter就准备完毕了。与此同时,RecordWriter同时拥有了2个核心组件:RecordSerializer 和 ResultPartitionWriter。
2.RecordSerializer序列化
RecordWriter有2个重要的核心组件:
/* 重要组件RecordSerializer:对“输出到网络中的数据”,序列化成二进制格式。*/
protected final RecordSerializer<T> serializer;/* 重要组件ResultPartitionWriter:是ResultPartition要实现的接口,提供了将数据元素(序列化后的Buffer数据)写入ResultPartition的方法*/
protected final ResultPartitionWriter targetPartition;
RecordWriter会将StreamRecord序列化成二进制后,再向外写出。
/* ChannelSelector会决定将这个StreamRecord发送到下游的哪个InputChannel中* @param record 即将被写入到网络中的StreamRecord* @param targetChannel ChannelSelector决定将数据元素写入到哪个InputChannel中*/
protected void emit(T record, int targetChannel) throws IOException, InterruptedException {checkErroneous();/* 核心:使用RecordSerializer(RecordWriter组件的核心之一)的唯一实现子类SpanningRecordSerializer,* 将StreamRecord(数据元素)序列化成二进制格式后,临时暂存到ByteBuffer中。(等合适时机再做实际IO,有效减少了实际的物理读写次数)* 相当于就是把“砖”变成“沙子”,临时装到“拖拉机”上*/serializer.serializeRecord(record);/* 将ByteBuffer(缓冲区,也就是理解意义上的“拖拉机”)“临时暂存”的二进制数据,拷贝到指定InputChannel对应的BufferBuilder所持有的MemorySegment中。* 如果拷贝完成,就会clear序列化器保存的中间数据,以此保证序列化器中累积的数据不会太多*/if (copyFromSerializerToTargetChannel(targetChannel)) {// 在完成整个序列化过程后,包括数据元素序列化成二进制、拷贝到BufferBuilder持有的MemorySegment中,// 就要清空RecordSerializer内的DataOutputSerializer和ByteBuffer,下一次可以继续这样搞。serializer.prune();}
}
第一步要做的,就是“把砖变成粒度更细的沙子,装到拖拉机上”。
RecordSerializer是RecordWriter组件的核心之一,负责将StreamRecord(数据元素,也就是理解意义上的“砖”)序列化成二进制(粒度更细的“沙子”),临时暂存到ByteBuffer(缓冲区,也就是理解意义上的“拖拉机”)上。
RecordSerializer接口的唯一实现子类是SpanningRecordSerializer,它可以将StreamRecord序列化成二进制后,“临时暂存”到ByteBuffer上。
// 实现了java.io.DataOutput接口,为RecordSerializer提供了序列化能力,可以将数据转换成二进制后,保存到byte[]数组中
private final DataOutputSerializer serializationBuffer;// 对数据进行序列化时,(临时存储)用到的中间缓冲区。通过ByteBuffer提供的方法,可以轻松操作二进制数据
private ByteBuffer dataBuffer;/* RecordSerializer会将完整的StreamRecord序列化二进制数据后,临时暂存到ByteBuffer(缓冲区)中。* 相当于就是把“砖”变成“沙子”,临时装到“拖拉机”上*/
@Override
public void serializeRecord(T record) throws IOException {if (CHECKED) {if (dataBuffer.hasRemaining()) {throw new IllegalStateException("Pending serialization of previous record.");}}// 清理DataOutputSerializer的中间数据:将byte[]数组的position置为0serializationBuffer.clear();// 设定DataOutputSerializer的初始化容量:不小于4serializationBuffer.skipBytesToWrite(4);// write data and length/* 将StreamRecord(数据元素)写入到指定的DataOutputSerializer的byte[]数组中(数据元素可以直接转换成二进制格式)*/record.write(serializationBuffer);// DataOutputSerializer的长度int len = serializationBuffer.length() - 4;// 本次二进制数据写入完成后,DataOutputSerializer会记录相关元数据serializationBuffer.setPosition(0);serializationBuffer.writeInt(len);serializationBuffer.skipBytesToWrite(len);/* 将刚刚由DataOutputSerializer序列化所产生的byte[]数组内的二进制数据,封装成java.io.ByteBuffer数据结构。* ByteBuffer是NIO Buffer的其中一个表现形式,它提供了对二进制数据进行操作的各种方法。这里相当于就是将“砖”变成“沙子”,临时装到“拖拉机”上*/dataBuffer = serializationBuffer.wrapAsByteBuffer();
}
3.拷贝到MemorySegment
StreamRecord已经被序列化成二进制格式,并临时暂存到ByteBuffer上了。现在要做的就是将其拷贝到指定InputChannel对应的BufferBuilder所持有的MemorySegment中。
/* 将ByteBuffer(缓冲区)上“临时暂存”的二进制数据,拷贝到BufferBuilder内的MemorySegment上*/
protected boolean copyFromSerializerToTargetChannel(int targetChannel) throws IOException, InterruptedException {// 在复制之前,必须重置RecordSerializer,也就是将SpanningRecordSerializer中的ByteBuffer的position重置为0serializer.reset();boolean pruneTriggered = false;/* (找RecordWriter)获取指定Channel对应的BufferBuilder,也就是理解意义上的“目的地”。BufferBuilder可以基于ByteBuffer,构建出完整的Buffer数据。* 不管是ChannelSelectorRecordWriter还是BroadcastRecordWriter,都会将BufferConsumer添加到ResultSubPartition中,只是实现细节不同*/BufferBuilder bufferBuilder = getBufferBuilder(targetChannel);/* 将ByteBuffer(缓冲区,也就是理解意义上的“拖拉机”)上“临时暂存”的二进制数据(也就是“拖拉机上的沙子”),* 拷贝到BufferBuilder内的MemorySegment(也就是“拖拉机的目的地”)中。* BufferBuilder可以基于ByteBuffer,构建出完整的Buffer数据。*/SerializationResult result = serializer.copyToBufferBuilder(bufferBuilder);/* 上述拷贝结果,会被用来判断当前BufferBuilder是否已经构建出了完整的Buffer数据*/while (result.isFullBuffer()) {// 如果BufferBuilder已经构建了完整的Buffer数据,那就完成BufferBuilder中的Buffer的构建finishBufferBuilder(bufferBuilder);// 安全检查:写入结果中是否包含有完整的StreamRecord(数据元素)?if (result.isFullRecord()) {pruneTriggered = true;// 如果是,则需要清空ByteBuffer,并跳出循环emptyCurrentBufferBuilder(targetChannel);break;}/* 创建新的ByteBuffer,也就是再买一辆新的“拖拉机”,以便能继续将DataOutputSerializer(把StreamRecord)序列化后的二进制数据,拷贝到BufferBuilder(也就是“拖拉机的目的地”)*/bufferBuilder = requestNewBufferBuilder(targetChannel);// 继续将DataOutputSerializer(把StreamRecord)序列化后的二进制数据,写入到BufferBuilderresult = serializer.copyToBufferBuilder(bufferBuilder);}checkState(!serializer.hasSerializedData(), "All data should be written at once");if (flushAlways) {/* 手动触发一个指定ResultSubPartition内的ArrayDeque<BufferConsumer> 队列中排队的BufferConsumers的消费。* 为了防止过度频繁的写入,RecordWriter中会有一个独立的线程,周期性(默认100ms)的将BufferBuilder构建出来的Buffer数据推送到ResultPartition的本地队列中存储起来*/flushTargetPartition(targetChannel);}// 返回拷贝结果return pruneTriggered;
}
获取BufferBuilder
RecordWriter定义了获取指定InputChannel所对应的BufferBuilder的抽象方法,而它的2个抽象实现子类为其提供了具体的实现逻辑。
以ChannelSelectorRecordWriter为例,它维护了1个BufferBuilder[]数组,保存了所有已创建好的BufferBuilder对象。
/* 获取指定Channel的BufferBuilder,也就是理解意义上的“目的地”。* targetChannel和下游InputGate中的InputChannel ID是对应的*/
@Override
public BufferBuilder getBufferBuilder(int targetChannel) throws IOException, InterruptedException {// ChannelSelectorRecordWriter维护了1个BufferBuilder[]数组,(对应每个InputChannel)保存了所有已创建好的BufferBuilder对象if (bufferBuilders[targetChannel] != null) {return bufferBuilders[targetChannel];} else {/* 核心:创建新的BufferBuilder*/return requestNewBufferBuilder(targetChannel);}
}
只有当数组中不存在对应的BufferBuilder时,才会去新建。创建好的BufferBuilder会被保存到BufferBuilder[]数组中,以待复用。最关键的是,会利用BufferBuilder构建出1个BufferConsumer,并将其添加到ResultSubPartition的BufferConsumer队列中缓存起来,供下游InputGate消费BufferConsumer对象。
/* 创建新的BufferBuilder(1个targetChannel,对应下游InputGate中的1个InputChannel ID)*/
@Override
public BufferBuilder requestNewBufferBuilder(int targetChannel) throws IOException, InterruptedException {// 检查BufferBuilder[]数组状态,确保数组中真的没有targetChannel对应的BufferBuilder对象checkState(bufferBuilders[targetChannel] == null || bufferBuilders[targetChannel].isFinished());/* 获取新的BufferBuilder:向LocalBufferPool申请MemorySegment(即Buffer内存空间),LocalBufferPool向NetworkBufferPool(作为BufferPoolFactory)* 要MemorySegment。最后得到的MemorySegment会交给BufferBuilder的成员变量持有,这样才能利用BufferBuilder将ByteBuffer(缓冲区,也就是“拖拉机”上装的“沙子”)数据写入MemorySegment中。*/BufferBuilder bufferBuilder = targetPartition.getBufferBuilder();/* BufferBuilder和BufferConsumer的内部维护了同一个Buffer数据,BufferConsumer会被存储到指定targetChannel对应的ResultSubPartition的BufferConsumer队列中。* BufferConsumer会读取写入到BufferBuilder内MemorySegment上的数据(也就是“拖拉机”),1个线程将二进制数据写入BufferBuilder内的MemorySegment,另1个线程使用BufferConsumer从中读取。*/targetPartition.addBufferConsumer(bufferBuilder.createBufferConsumer(), targetChannel);// 将创建好的BufferBuilder保存到BufferBuilder[]数组中,以待复用bufferBuilders[targetChannel] = bufferBuilder;// 返回刚刚创建好的BufferBuilderreturn bufferBuilder;
}
step 1:获取BufferBuilder
首先会向LocalBufferPool申请MemorySegment,并包装成BufferBuilder。让BufferBuilder持有MemorySegment的引用,才能将ByteBuffer中的二进制数据顺利写入到MemorySegment中
/* 获取新的BufferBuilder(用来构建Buffer数据):找ResultPartition中的LocalBufferPool申请MemorySegment(会被封装成BufferBuilder)。* 如果没有可用的缓冲区,则本次调用将会被阻塞,直到缓冲区再次可用。*/
@Override
public BufferBuilder getBufferBuilder() throws IOException, InterruptedException {checkInProduceState();/* 向LocalBufferPool申请MemorySegment,并包装成BufferBuilder。让BufferBuilder持有MemorySegment的引用,* 才能将ByteBuffer(缓冲区,也就是“拖拉机”)中的二进制数据,顺利写入到MemorySegment中。*/return bufferPool.requestBufferBuilderBlocking();
}
LocalBufferPool会向NetworkBufferPool(作为BufferPoolFactory)申请MemorySegment,并将其转换成BufferBuilder
/* 找LocalBufferPool要MemorySegment,LocalBufferPool找NetworkBufferPool(作为BufferPoolFactory)要MemorySegment。* 最终申请到的MemorySegment会被转换成BufferBuilder。*/
@Override
public BufferBuilder requestBufferBuilderBlocking() throws IOException, InterruptedException {// 将得到的MemorySegment包装成BufferBuilder,让BufferBuilder持有MemorySegment的引用,才能将数据写入其中return toBufferBuilder(requestMemorySegmentBlocking());
}
ResultPartitionWriter会找LocalBufferPool要BufferBuilder,LocalBufferPool首先会在自己的MemorySegment队列中寻找合适的MemorySegment,没有的话才会去向NetworkBufferPool(作为BufferPoolFactory)申请新的MemorySegment。如果无法从NetworkBufferPool申请到MemorySegment,会再去MemorySegment队列碰碰运气。(如果还不行,只能将这个LocalBufferPool置为“不可用”状态)
最后,LocalBufferPool会构建出BufferBuilder,并让它以成员变量的方式持有刚刚申请得到的MemorySegment。
step 2:向ResultSubPartition的BufferConsumer队列中添加BufferConsumer
BufferBuilder会被用来向MemorySegment中写入ByteBuffer二进制数据,而BufferConsumer会被添加到指定InputChannel对应的ResultSubPartition内的BufferConsumer队列中,供下游InputGate消费BufferConsumer对象。
/* 得到BufferBuilder后,将创建出BufferConsumer,并将其添加到ResultSubPartition的ArrayDeque<BufferConsumer>队列中*/
@Override
public boolean addBufferConsumer(BufferConsumer bufferConsumer, int subpartitionIndex) throws IOException {checkNotNull(bufferConsumer);ResultSubpartition subpartition;try {checkInProduceState();// 取出指定InputChannel所对应的ResultSubPartitionsubpartition = subpartitions[subpartitionIndex];}catch (Exception ex) {bufferConsumer.close();throw ex;}/* 将BufferConsumer添加到ResultSubPartition内的ArrayDeque<BufferConsumer>队列中,* 等待消费ResultSubPartition中的数据*/return subpartition.add(bufferConsumer);
}
BufferBuilder创建BufferConsumer的实现如下:
/* 从当前写入的offset处,开始创建BufferConsumer。在创建BufferConsumer之前,写入到BufferBuilder的数据对BufferConsumer不可见。*/
public BufferConsumer createBufferConsumer() {checkState(!bufferConsumerCreated, "Two BufferConsumer shouldn't exist for one BufferBuilder");bufferConsumerCreated = true;// 从MemorySegment指定的offset处,构建BufferConsumerreturn new BufferConsumer(// BufferBuilder和BufferConsumer持有同一个MemorySegmentmemorySegment,recycler,positionMarker,positionMarker.cachedPosition);
}
创建好的BufferConsumer会被添加到指定InputChannel所对应的ResultSubPartition的BufferConsumer队列中,具体实现逻辑如下:
/* 将BufferConsumer添加到ResultSubPartition内的ArrayDeque<BufferConsumer>队列中。* RecordWriter将数据写入ResultPartition时,并没有直接将Buffer数据发送给下游Task的InputChannel,而是先将其缓存在BufferConsumer本地。* 下游发来PartitionRequest(也就是理解意义上的“我现在可以消费”了的消息)后,才会从BufferConsumer队列里读取Buffer数据,并经网络发送至下游Task*/
private boolean add(BufferConsumer bufferConsumer, boolean finish) {checkNotNull(bufferConsumer);final boolean notifyDataAvailable;// 往BufferConsumer队列里写入新BufferConsumer时,会上锁,直至写入完毕synchronized (buffers) {if (isFinished || isReleased) {bufferConsumer.close();return false;}/* 核心:将BufferConsumer添加到指定InputChannel所对应的ResultSubPartition的ArrayDeque<BufferConsumer>队列中*/buffers.add(bufferConsumer);// 更新BufferConsumer队列的统计信息updateStatistics(bufferConsumer);/* 更新ResultSubPartition的Backlog值:用于控制上下游数据的生产和消费速率,实现基于Credit的反压机制*/increaseBuffersInBacklog(bufferConsumer);// 获取ResultSubPartition目前能否被ResultSubPartitionView消费。// 如果True,就通知ResultSubPartitionView的持有者,开始消费ResultSubPartition中的Buffer数据notifyDataAvailable = shouldNotifyDataAvailable() || finish;isFinished |= finish;}if (notifyDataAvailable) {notifyDataAvailable();}return true;
}
数据拷贝到MemorySegment中
现在BufferBuilder、BufferConsumer都有了,是时候将ByteBuffer内的二进制数据拷贝到BufferBuilder持有的MemorySegment中了。
RecordSerializer接口定义了将ByteBuffer缓冲区内的二进制数据写入到BufferBuilder的接口方法,SpanningRecordSerializer作为唯一的实现子类,提供了具体的实现逻辑:
@Override
public SerializationResult copyToBufferBuilder(BufferBuilder targetBuffer) {// 将ByteBuffer(缓冲区,也就是“拖拉机”)中的二进制数据,拷贝到BufferBuilder持有的MemorySegment中targetBuffer.append(dataBuffer);targetBuffer.commit();// 返回(序列化后的)拷贝结果return getSerializationResult(targetBuffer);
}
BufferBuilder往MemorySegment里写入指定ByteBuffer数据,BufferConsumer从MemorySegment里消费。BufferConsumer被add到哪些ResultSubPartition中,就能体现对应的Selector策略。既可以将Buffer下发到指定Channel对应的ResultSubPartition,也能广播到所有的ResultSubPartition。
对下游Task实例而言,主要依靠自己的InputGate组件读取上游数据,且InputGate中的InputChannel和ResultPartition的ResultSubPartition是一一对应的。因此向ResultSubPartition写入Buffer数据,就是向下游的InputChannel写入Buffer数据(从ResultSubPartition的BufferConsumer队列中读取Buffer数据,在经过TCP网络连接将其发送到对应的Channel中)