博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Flink - ResultPartition
阅读量:7244 次
发布时间:2019-06-29

本文共 18990 字,大约阅读时间需要 63 分钟。

发送数据一般通过,collector.collect

public interface Collector
{ /** * Emits a record. * * @param record The record to collect. */ void collect(T record); /** * Closes the collector. If any data was buffered, that data will be flushed. */ void close();}

output继承,

public interface Output
extends Collector
{ /** * Emits a {
@link Watermark} from an operator. This watermark is broadcast to all downstream * operators. * *

A watermark specifies that no element with a timestamp lower or equal to the watermark * timestamp will be emitted in the future. */ void emitWatermark(Watermark mark);}

RecordWriterOutput

public class RecordWriterOutput
implements Output
> { private StreamRecordWriter
> recordWriter; private SerializationDelegate
serializationDelegate; @Override public void collect(StreamRecord
record) { serializationDelegate.setInstance(record); try { recordWriter.emit(serializationDelegate); } catch (Exception e) { throw new RuntimeException(e.getMessage(), e); } }

 

RecordWriter

public class RecordWriter
{ protected final ResultPartitionWriter writer; //负责写入ResultPartition的writer private final ChannelSelector
channelSelector; //选择写入哪个channel,默认RoundRobinChannelSelector private final int numChannels; /** {
@link RecordSerializer} per outgoing channel */ private final RecordSerializer
[] serializers; public RecordWriter(ResultPartitionWriter writer) { this(writer, new RoundRobinChannelSelector
()); } @SuppressWarnings("unchecked") public RecordWriter(ResultPartitionWriter writer, ChannelSelector
channelSelector) { this.writer = writer; this.channelSelector = channelSelector; this.numChannels = writer.getNumberOfOutputChannels(); //获取channel数 /** * The runtime exposes a channel abstraction for the produced results * (see {
@link ChannelSelector}). Every channel has an independent * serializer. */ this.serializers = new SpanningRecordSerializer[numChannels]; for (int i = 0; i < numChannels; i++) { serializers[i] = new SpanningRecordSerializer
(); //为每个channel初始化Serializer } } public void emit(T record) throws IOException, InterruptedException { for (int targetChannel : channelSelector.selectChannels(record, numChannels)) { //对于选中的channels // serialize with corresponding serializer and send full buffer RecordSerializer
serializer = serializers[targetChannel]; synchronized (serializer) { //加锁,一条channel的serializer不能并发写 SerializationResult result = serializer.addRecord(record); while (result.isFullBuffer()) { //buffer,即memorySegment已满 Buffer buffer = serializer.getCurrentBuffer(); //将buffer取出 if (buffer != null) { writeBuffer(buffer, targetChannel, serializer); //将buffer写入 } buffer = writer.getBufferProvider().requestBufferBlocking(); //申请新的buffer result = serializer.setNextBuffer(buffer); //set新的buffer到serializer } } } }

writeBuffer

private void writeBuffer(        Buffer buffer,        int targetChannel,        RecordSerializer
serializer) throws IOException { try { writer.writeBuffer(buffer, targetChannel); } finally { serializer.clearCurrentBuffer(); }}

可以看到写入和申请buffer都是通过ResultPartitionWriter

public final class ResultPartitionWriter implements EventListener
{ private final ResultPartition partition; //Result Partition private final TaskEventHandler taskEventHandler = new TaskEventHandler(); public ResultPartitionWriter(ResultPartition partition) { this.partition = partition; } // ------------------------------------------------------------------------ // Attributes // ------------------------------------------------------------------------ public ResultPartitionID getPartitionId() { return partition.getPartitionId(); } public BufferProvider getBufferProvider() { return partition.getBufferProvider(); } public int getNumberOfOutputChannels() { return partition.getNumberOfSubpartitions(); } // ------------------------------------------------------------------------ // Data processing // ------------------------------------------------------------------------ public void writeBuffer(Buffer buffer, int targetChannel) throws IOException { partition.add(buffer, targetChannel); }}

而ResultPartitionWriter操作都通过ResultPartition, writerBuffer只是把buffer,add到partition

 

ResultPartition

初始化的过程,

task初始化ResultPartition,

// Produced intermediate result partitionsthis.producedPartitions = new ResultPartition[partitions.size()];this.writers = new ResultPartitionWriter[partitions.size()];for (int i = 0; i < this.producedPartitions.length; i++) {    ResultPartitionDeploymentDescriptor desc = partitions.get(i);    ResultPartitionID partitionId = new ResultPartitionID(desc.getPartitionId(), executionId);    this.producedPartitions[i] = new ResultPartition(            taskNameWithSubtaskAndId,            jobId,            partitionId,            desc.getPartitionType(),            desc.getEagerlyDeployConsumers(),            desc.getNumberOfSubpartitions(),            networkEnvironment.getPartitionManager(),            networkEnvironment.getPartitionConsumableNotifier(),            ioManager,            networkEnvironment.getDefaultIOMode());    this.writers[i] = new ResultPartitionWriter(this.producedPartitions[i]);}

在task.run中先到NetworkEnvironment中register,

network.registerTask(this);

这里做的主要的工作是,创建等同于subPartiton大小的localBuffer,并register到ResultPartition

bufferPool = networkBufferPool.createBufferPool(partition.getNumberOfSubpartitions(), false); //创建LocalPool,注意Reqired的segment数目是Subpartitions的数目,即一个subP一个segmentpartition.registerBufferPool(bufferPool); //把localPool注册到ResultPartition

 

所以,

writer.getBufferProvider().requestBufferBlocking();

就是调用localBufferPool.requestBuffer

如果有availableMemorySegments就直接用

如果没有,

if (numberOfRequestedMemorySegments < currentPoolSize) {    final MemorySegment segment = networkBufferPool.requestMemorySegment(); //如果还有可申请的,就去networkBufferPool申请    if (segment != null) {        numberOfRequestedMemorySegments++;        availableMemorySegments.add(segment);        continue;    }}if (askToRecycle) { //如果不能申请新的,让owner去试图释放    owner.releaseMemory(1);}if (isBlocking) { //实在不行,blocking等2秒    availableMemorySegments.wait(2000);}
public void releaseMemory(int toRelease) throws IOException {    for (ResultSubpartition subpartition : subpartitions) {        toRelease -= subpartition.releaseMemory(); //让subpartition去releaseMemory        // Only release as much memory as needed        if (toRelease <= 0) {            break;        }    }}

可以看到,如果在emit的时候,如果没有可用的segment,是会blocking等待的

对于pipelineSubpartition的release,什么都不会做,所以这里如果buffer没有被及时发送出去并回收,会不断的blocking等待

public int releaseMemory() {    // The pipelined subpartition does not react to memory release requests. The buffers will be    // recycled by the consuming task.    return 0;}

 

ResultPartition.add
public void add(Buffer buffer, int subpartitionIndex) throws IOException {    boolean success = false;    try {        checkInProduceState();        final ResultSubpartition subpartition = subpartitions[subpartitionIndex]; //取出index相应的ResultSubpartition        synchronized (subpartition) {            success = subpartition.add(buffer); //把buffer add到ResultSubpartition            // Update statistics            totalNumberOfBuffers++;            totalNumberOfBytes += buffer.getSize();        }    }    finally {        if (success) {            notifyPipelinedConsumers(); //通知ResultPartitionConsumableNotifier触发notifyPartitionConsumable        }        else {            buffer.recycle(); //失败,回收此buffer        }    }}

 

对于PipelinedSubpartition,add逻辑就是加入buffer

/** * A pipelined in-memory only subpartition, which can be consumed once. */class PipelinedSubpartition extends ResultSubpartition {    /**     * A data availability listener. Registered, when the consuming task is faster than the     * producing task.     */    private NotificationListener registeredListener; //来数据后,通知consuming    /** The read view to consume this subpartition. */    private PipelinedSubpartitionView readView; //read view    /** All buffers of this subpartition. Access to the buffers is synchronized on this object. */    final ArrayDeque
buffers = new ArrayDeque
(); //buffer队列 PipelinedSubpartition(int index, ResultPartition parent) { super(index, parent); } @Override public boolean add(Buffer buffer) { checkNotNull(buffer); final NotificationListener listener; synchronized (buffers) { if (isReleased || isFinished) { return false; } // Add the buffer and update the stats buffers.add(buffer); //加入buffer队列 updateStatistics(buffer); // Get the listener... listener = registeredListener; registeredListener = null; } // Notify the listener outside of the synchronized block if (listener != null) { listener.onNotification(); //触发listener } return true; }

 

 

NettyConnectionManager

@Override    public void start(ResultPartitionProvider partitionProvider, TaskEventDispatcher taskEventDispatcher, NetworkBufferPool networkbufferPool)            throws IOException {        PartitionRequestProtocol partitionRequestProtocol =                new PartitionRequestProtocol(partitionProvider, taskEventDispatcher, networkbufferPool);        client.init(partitionRequestProtocol, bufferPool);        server.init(partitionRequestProtocol, bufferPool);    }

 

PartitionRequestProtocol

// +-------------------------------------------------------------------+    // |                        SERVER CHANNEL PIPELINE                    |    // |                                                                   |    // |    +----------+----------+ (3) write  +----------------------+    |    // |    | Queue of queues     +----------->| Message encoder      |    |    // |    +----------+----------+            +-----------+----------+    |    // |              /|\                                 \|/              |    // |               | (2) enqueue                       |               |    // |    +----------+----------+                        |               |    // |    | Request handler     |                        |               |    // |    +----------+----------+                        |               |    // |              /|\                                  |               |    // |               |                                   |               |    // |    +----------+----------+                        |               |    // |    | Message decoder     |                        |               |    // |    +----------+----------+                        |               |    // |              /|\                                  |               |    // |               |                                   |               |    // |    +----------+----------+                        |               |    // |    | Frame decoder       |                        |               |    // |    +----------+----------+                        |               |    // |              /|\                                  |               |    // +---------------+-----------------------------------+---------------+    // |               | (1) client request               \|/    // +---------------+-----------------------------------+---------------+    // |               |                                   |               |    // |       [ Socket.read() ]                    [ Socket.write() ]     |    // |                                                                   |    // |  Netty Internal I/O Threads (Transport Implementation)            |    // +-------------------------------------------------------------------+    @Override    public ChannelHandler[] getServerChannelHandlers() {        PartitionRequestQueue queueOfPartitionQueues = new PartitionRequestQueue();        PartitionRequestServerHandler serverHandler = new PartitionRequestServerHandler(                partitionProvider, taskEventDispatcher, queueOfPartitionQueues, networkbufferPool);        return new ChannelHandler[] {                messageEncoder,                createFrameLengthDecoder(),                messageDecoder,                serverHandler,                queueOfPartitionQueues        };    }

 

PartitionRequestServerHandler

ServerHandler会分配大小至少为1的bufferpool,因为后面是false,意思是如果networkbufferpool有多余的segment,会分配进来

public void channelRegistered(ChannelHandlerContext ctx) throws Exception {    super.channelRegistered(ctx);    bufferPool = networkBufferPool.createBufferPool(1, false);}

 

protected void channelRead0(ChannelHandlerContext ctx, NettyMessage msg) throws Exception
PartitionRequest request = (PartitionRequest) msg;LOG.debug("Read channel on {}: {}.", ctx.channel().localAddress(), request);try {    ResultSubpartitionView subpartition =            partitionProvider.createSubpartitionView(                    request.partitionId,                    request.queueIndex,                    bufferPool);    outboundQueue.enqueue(subpartition, request.receiverId); //放入PartitionRequestQueue,进行发送}

 

ResultPartitionManager继承自partitionProvider

调用ResultPartitionManager.createSubpartitionView

synchronized (registeredPartitions) {    final ResultPartition partition = registeredPartitions.get(partitionId.getProducerId(),            partitionId.getPartitionId());    return partition.createSubpartitionView(subpartitionIndex, bufferProvider);}

 

 

ResultPartition

public ResultSubpartitionView createSubpartitionView(int index, BufferProvider bufferProvider) throws IOException {    int refCnt = pendingReferences.get();    checkState(refCnt != -1, "Partition released.");    checkState(refCnt > 0, "Partition not pinned.");    ResultSubpartitionView readView = subpartitions[index].createReadView(bufferProvider);    return readView;}

pendingReferences的定义

/** * The total number of references to subpartitions of this result. The result partition can be * safely released, iff the reference count is zero. A reference count of -1 denotes that the * result partition has been released. */private final AtomicInteger pendingReferences = new AtomicInteger();

 

PipelinedSubpartitionView

class PipelinedSubpartitionView implements ResultSubpartitionView {    /** The subpartition this view belongs to. */    private final PipelinedSubpartition parent;    /** Flag indicating whether this view has been released. */    private AtomicBoolean isReleased = new AtomicBoolean();    PipelinedSubpartitionView(PipelinedSubpartition parent) {        this.parent = checkNotNull(parent);    }    @Override    public Buffer getNextBuffer() {        synchronized (parent.buffers) {            return parent.buffers.poll(); //从parent,PipelinedSubpartition,的buffers里面直接poll        }    }    @Override    public boolean registerListener(NotificationListener listener) {        return !isReleased.get() && parent.registerListener(listener);    }    @Override    public void notifySubpartitionConsumed() { //消费完释放该Subpartition        releaseAllResources();    }    @Override    public void releaseAllResources() {        if (isReleased.compareAndSet(false, true)) {            // The view doesn't hold any resources and the parent cannot be restarted. Therefore,            // it's OK to notify about consumption as well.            parent.onConsumedSubpartition();        }    }

 

PartitionRequestQueue在发送数据的时候,会调用getNextBuffer获取数据

发送完,即收到EndOfPartitionEvent后,调用notifySubpartitionConsumed

释放会调用到,PipelinedSubpartition.onConsumedSubpartition –>  ResultPartition.onConsumedSubpartition

void onConsumedSubpartition(int subpartitionIndex) {    if (isReleased.get()) { //如果已经释放了        return;    }    int refCnt = pendingReferences.decrementAndGet(); //一个Subpartition消费完,减1    if (refCnt == 0) { //如果所有subPartition消费完        partitionManager.onConsumedPartition(this); //通知partitionManager,release这个partition    }}

 

PipelinedSubpartition中的buffer,何时被释放,放回localbufferpool

具体看下,PartitionRequestQueue的发送过程,

writeAndFlushNextMessageIfPossible
buffer = currentPartitionQueue.getNextBuffer();BufferResponse resp = new BufferResponse(buffer, currentPartitionQueue.getSequenceNumber(), currentPartitionQueue.getReceiverId()); //将buffer封装成BufferResponseif (!buffer.isBuffer() &&        EventSerializer.fromBuffer(buffer, getClass().getClassLoader()).getClass() == EndOfPartitionEvent.class) { //如果收到partition结束event    currentPartitionQueue.notifySubpartitionConsumed(); //通知    currentPartitionQueue.releaseAllResources();  //释放所有资源    markAsReleased(currentPartitionQueue.getReceiverId());    currentPartitionQueue = null;}channel.writeAndFlush(resp).addListener(writeListener); //真正的发送,WriteAndFlushNextMessageIfPossibleListener会再次调用writeAndFlushNextMessageIfPossible,反复读取
 

在BufferResponse中,

@OverrideByteBuf write(ByteBufAllocator allocator) throws IOException {    int length = 16 + 4 + 1 + 4 + buffer.getSize();    ByteBuf result = null;    try {        result = allocateBuffer(allocator, ID, length);        receiverId.writeTo(result);        result.writeInt(sequenceNumber);        result.writeBoolean(buffer.isBuffer());        result.writeInt(buffer.getSize());        result.writeBytes(buffer.getNioBuffer());        return result;    }    catch (Throwable t) {        if (result != null) {            result.release();        }        throw new IOException(t);    }    finally {        if (buffer != null) {            buffer.recycle(); //回收buffer        }    }}

 

Buffer

public void recycle() {    synchronized (recycleLock) {        if (--referenceCount == 0) {            recycler.recycle(memorySegment);        }    }}

 

LocalBufferPool

@Overridepublic void recycle(MemorySegment segment) {    synchronized (availableMemorySegments) {        if (isDestroyed || numberOfRequestedMemorySegments > currentPoolSize) {            returnMemorySegment(segment);        }        else {            EventListener
listener = registeredListeners.poll(); if (listener == null) { availableMemorySegments.add(segment); //没有listener,直接放回availableMemorySegments,下次使用 availableMemorySegments.notify(); } else { try { listener.onEvent(new Buffer(segment, this)); //如果有listener,直接扔给listener处理 } catch (Throwable ignored) { availableMemorySegments.add(segment); availableMemorySegments.notify(); } } } }}

转载地址:http://lxybm.baihongyu.com/

你可能感兴趣的文章
Mongodb系列- java客户端简单使用(CRUD)
查看>>
前端开发css禁止选中文本
查看>>
php实现冒泡排序
查看>>
Spark学习之路 (十四)SparkCore的调优之资源调优JVM的GC垃圾收集器
查看>>
Linux系统管理员不可不知的命令:sudo
查看>>
protocol buffer
查看>>
react 创建组件 (一)createClass
查看>>
用这个课件制作工具,快速构造三等分点
查看>>
Git常见问题
查看>>
Prism for WPF初探(构建简单的模块化开发框架)
查看>>
深入理解Linux内存分配
查看>>
AbstractQueuedSynchronizer
查看>>
Retrofit 2.0:有史以来最大的改进
查看>>
centos7.2使用rpm安装jdk8
查看>>
MySQL数据库的安装教程及相关问题
查看>>
电商促销后台逻辑
查看>>
ClickHouse高可用集群的配置
查看>>
Linux在shell中输入历史命令
查看>>
关于二维码分块上色(彩色二维码)的算法研究
查看>>
Express实现http和https服务
查看>>