博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Flink - InstanceManager
阅读量:6119 次
发布时间:2019-06-21

本文共 6132 字,大约阅读时间需要 20 分钟。

InstanceManager用于管理JobManager申请到的taskManager和slots资源

/** * Simple manager that keeps track of which TaskManager are available and alive. */public class InstanceManager {    // ------------------------------------------------------------------------    // Fields    // ------------------------------------------------------------------------    //分别以InstanceId和ResourceId来索引Instance    /** Set of hosts known to run a task manager that are thus able to execute tasks (by ID). */    private final Map
registeredHostsById; /** Set of hosts known to run a task manager that are thus able to execute tasks (by ResourceID). */ private final Map
registeredHostsByResource; /** Set of hosts that were present once and have died */ private final Set
deadHosts; /** Listeners that want to be notified about availability and disappearance of instances */ private final List
instanceListeners = new ArrayList<>(); //Instance资源发生变化时,需要通知谁,如Scheduler /** The total number of task slots that the system has */ private int totalNumberOfAliveTaskSlots;

 

关键的操作,

registerTaskManager

/** * Registers a task manager. Registration of a task manager makes it available to be used * for the job execution. * * @param taskManagerGateway gateway to the task manager * @param taskManagerLocation Location info of the TaskManager * @param resources Hardware description of the TaskManager * @param numberOfSlots Number of available slots on the TaskManager * @return The assigned InstanceID of the registered task manager */public InstanceID registerTaskManager(        TaskManagerGateway taskManagerGateway,        TaskManagerLocation taskManagerLocation,        HardwareDescription resources,        int numberOfSlots) {        synchronized (this.lock) {        InstanceID instanceID = new InstanceID();        Instance host = new Instance( //创建新的instance            taskManagerGateway,            taskManagerLocation,            instanceID,            resources,            numberOfSlots);        registeredHostsById.put(instanceID, host); //register        registeredHostsByResource.put(taskManagerLocation.getResourceID(), host);        totalNumberOfAliveTaskSlots += numberOfSlots;        host.reportHeartBeat();        // notify all listeners (for example the scheduler)        notifyNewInstance(host);        return instanceID;    }}

其中,notifyNewInstance

private void notifyNewInstance(Instance instance) {    synchronized (this.instanceListeners) {        for (InstanceListener listener : this.instanceListeners) {            try {                listener.newInstanceAvailable(instance); //调用listener的newInstanceAvailable            }            catch (Throwable t) {                LOG.error("Notification of new instance availability failed.", t);            }        }    }}

 

Instance

看注释,instance就是一种抽象

用于描述注册到JobManager,并准备接受work的TaskManager

/** * An instance represents a {
@link org.apache.flink.runtime.taskmanager.TaskManager} * registered at a JobManager and ready to receive work. */public class Instance implements SlotOwner { /** The instance gateway to communicate with the instance */ private final TaskManagerGateway taskManagerGateway; /** The instance connection information for the data transfer. */ private final TaskManagerLocation location; /** A description of the resources of the task manager */ private final HardwareDescription resources; /** The ID identifying the taskManager. */ private final InstanceID instanceId; /** The number of task slots available on the node */ private final int numberOfSlots; /** A list of available slot positions */ private final Queue
availableSlots; //注意这里记录的不是slot,而是position,因为slot是在用的时候创建的 /** Allocated slots on this taskManager */ private final Set
allocatedSlots = new HashSet
(); /** A listener to be notified upon new slot availability */ private SlotAvailabilityListener slotAvailabilityListener; //listener用于通知当slot状态发生变化 /** Time when last heat beat has been received from the task manager running on this taskManager. */ private volatile long lastReceivedHeartBeat = System.currentTimeMillis();

核心的操作,

申请slot

/** * Allocates a simple slot on this TaskManager instance. This method returns {
@code null}, if no slot * is available at the moment. * * @param jobID The ID of the job that the slot is allocated for. * * @return A simple slot that represents a task slot on this TaskManager instance, or null, if the * TaskManager instance has no more slots available. * * @throws InstanceDiedException Thrown if the instance is no longer alive by the time the * slot is allocated. */public SimpleSlot allocateSimpleSlot(JobID jobID) throws InstanceDiedException { synchronized (instanceLock) { Integer nextSlot = availableSlots.poll(); //看看有没有available的slot position if (nextSlot == null) { return null; } else { SimpleSlot slot = new SimpleSlot(jobID, this, location, nextSlot, taskManagerGateway); allocatedSlots.add(slot); return slot; } }}

 

归还slot

/** * Returns a slot that has been allocated from this instance. The slot needs have been canceled * prior to calling this method. *  * 

The method will transition the slot to the "released" state. If the slot is already in state * "released", this method will do nothing.

* * @param slot The slot to return. * @return True, if the slot was returned, false if not. */@Overridepublic boolean returnAllocatedSlot(Slot slot) { if (slot.markReleased()) { LOG.debug("Return allocated slot {}.", slot); synchronized (instanceLock) { if (this.allocatedSlots.remove(slot)) { this.availableSlots.add(slot.getSlotNumber()); if (this.slotAvailabilityListener != null) { this.slotAvailabilityListener.newSlotAvailable(this); //通知有个slot可以用 } return true; } } }}

转载地址:http://xpqka.baihongyu.com/

你可能感兴趣的文章
day6-if,while,for的快速掌握
查看>>
JavaWeb学习笔记(十四)--JSP语法
查看>>
【算法笔记】多线程斐波那契数列
查看>>
java8函数式编程实例
查看>>
jqgrid滚动条宽度/列显示不全问题
查看>>
在mac OS10.10下安装 cocoapods遇到的一些问题
查看>>
angularjs表达式中的HTML内容,如何不转义,直接表现为html元素
查看>>
css技巧
查看>>
Tyvj 1728 普通平衡树
查看>>
[Usaco2015 dec]Max Flow
查看>>
javascript性能优化
查看>>
多路归并排序之败者树
查看>>
java连接MySql数据库
查看>>
转:Vue keep-alive实践总结
查看>>
android studio修改新项目package名称
查看>>
深入python的set和dict
查看>>
C++ 11 lambda
查看>>
Hadoop2.5.0 搭建实录
查看>>
实验吧 recursive write up
查看>>
High-speed Charting Control--MFC绘制图表(折线图、饼图、柱形图)控件
查看>>