2021年7月21日星期三

Flink源码解析(三)——从RM与TM的心跳交互分析Flink心跳机制

0. 说明

基于Flink 1.12

1. 背景知识

1.1 Actor模型

Flink底层RPC是通过AKKA实现的,AKKA是基于Actor模型实现的框架。下面,将大致介绍一下actor模型。
在Actor模型中,一切事物都是actor,一个actor是一个基本的计算单元,每个actor是完全隔离的,不会共享内存,也就不会有共享数据带来的并发问题;它们是自己维护自身的状态,该状态不会被其他actor直接修改。
整体模型大致是:多个actor同时运行,每个actor接收消息,并根据消息做出相应的反应。消息本身是通过异步的形式发送给actor的,消息会被存储在一个叫做"邮箱(mailbox)"的地方,actor会顺序的处理收到的信息,避免锁的使用。从描述可以了解到actor模型中,消息的发送者和已发送消息解耦,是以并发的形式处理数据的。

1.2 RPC

RPC作用是让远程调用像本地调用,封装调用的细节。
Flink定义了各个组件的Gateway,通过回调的方式隐藏实现细节,将业务本身和通信解绑了,方便RPC调用。目前,Flink的RPC请求的底层通信是通过AKKA的实现的。

1.2.1 RPC相关的接口

  • RPCGateway
    所有Rpc组件的网关,定义了各组件的Rpc接口,提供了获取地址和主机名的功能;
  • RPCEndpoint
    RPCEndpoint是Flink RPC调用的基类,所有具有分布式调用能力的组件都需要继承该接口。
  • RpcService
    RPC服务提供者,提供开始、停止服务等功能,以及提供远程功能;
    三者的关系如下:
    public abstract class RpcEndpoint implements RpcGateway, AutoCloseableAsyn{//启动sever和获取RPC Gateway/** RPC service to be used to start the RPC server and to obtain rpc gateways. */private final RpcService rpcService;//RpcServer用于启动和连接到RpcEndpoint, 连接到rpc服务器将返回一个RpcGateway,为RpcService提供RPC服务/连接远程Server/** Interface to access the underlying rpc server. */protected final RpcServer rpcServer;}

2. Flink心跳机制

2.1 核心接口

2.1.1 HeartbeatTarget

是可以发送心跳和请求心跳相应组件接口,是对具备心跳能力对象的一种抽象。
HeartbeatTarget的函数具备以下两种动作:

  • receiveHeartbeat 向某个节点发送心跳响应,其参数heartbeatOrigin就是该节点;
  • requestHeartbeat
    要求某个节点发送心跳信息,其参数requestOrigin就是心跳信息上报的节点。

2.1.2 HeartbeatMonitor

HeartbeatMonitor管理HeartbeatTarget的心跳状态。当在指定时间内未收到心跳信息时,monitor将会通知对应的HeartbeatListener,收到心跳信息后会重置其定时器。其工厂接口如下:

  HeartbeatMonitor<O> createHeartbeatMonitor(    ResourceID resourceID,    HeartbeatTarget<O> heartbeatTarget,    ScheduledExecutor mainThreadExecutor,    HeartbeatListener<?, O> heartbeatListener, //用于处理心跳信息    long heartbeatTimeoutIntervalMs);

2.1.3 HeartbeatListener

HeartbeatListener是和HeartbeatManager交互的接口,Flink的业务的处理逻辑需要继承该接口以处理心跳结果,其三个回调函数如下:

  • notifyHeartbeatTimeout :通知心跳超时;
  • reportPayload:处理节点发来的Payload载荷;
  • retrievePayLoad:获取对某节点发下一次心跳请求的Payload载荷

2.1.4 HeartbeatManager

心跳的管理者,用于开始/停止对HeartbeatTarget的心跳监控,以及会处理某个节点的心跳超时。
HeartbeatManager继承了HeartbeatTarget,其具有了HeartbeatTarget的函数功能以外,该接口还有以下四种函数:

  • monitorTarget
    开始监控HeartbeatTarget,HeartbeatTarget的心跳超时后,将会通知HeartbeatListener;
  • unmonitorTarget 停止监控某节点;
  • stop 停止HeartbeatManager;
  • getLastHeartbeatFrom 返回特定节点最近一次心跳信息;

核心接口交互的大致过程:HeartbeatManager将HeartbeatTarget放入到监控列表中,当心跳超时时,HeartbeatMonitor回通知HeartbeatListener处理,通过对HeartbeatListener的实现,完成相关处理心跳超时的逻辑。

2.2. 核心接口的实现

下面通过分析1.3.1中核心接口的实现类,来具体分析心跳处理的过程。

2.2.1 HearbeatManagerImpl

该manager维护了一个heartbeat 的监控对象(HeartbeatMonitor)和资源ID信息,当收到新的心跳信息是,monitor对象将会被更新;心跳超时时,将会通知HeartbeatListenter对象。

public class HeartbeatManagerImpl<I, O> implements HeartbeatManager<I, O> { //心跳间隔 /** Heartbeat timeout interval in milli seconds. */ private final long heartbeatTimeoutIntervalMs; //心跳 /** Heartbeat listener with which the heartbeat manager has been associated. */ private final HeartbeatListener<I, O> heartbeatListener; //使用一个map存放资源-心跳的monitor信息,其monitorTarget方法就是将对应信息放入该map中 /** Map containing the heartbeat monitors associated with the respective resource ID. */ private final ConcurrentHashMap<ResourceID, HeartbeatMonitor<O>> heartbeatTargets; /** Running state of the heartbeat manager. */ protected volatile boolean stopped;

HearbeatManagerImpl实现的主要函数有:

  • monitorTarget将一个节点加入监控列表中
    该方法会根据ResourceID和HeartbeatTarget生成一个HeartbeatMonitor对象,然后将resourceID和该对象组成KV的形式放入heartbeatTargets中。
  • requestHeartbeat
    心跳请求方调用requestHeartbeat要求上报一个心跳信息,该请求会通过RPC异步调用到心跳的上报方(HearbeatManagerImpl的创建者)的requestHeartbeat,以要求上报方向requestOrigin节点发起一个心跳响应。具体过程如下:
    • requestHeartbeat会记录下这个请求时间点,然后取消超时,重新创建一个ScheduleFuture去判断requestOrigin的心跳是否超时。后续若是超时了,则将heartbeatMonitor的state置为timeout状态,若是特定时间内requestOrigin响应了,则ScheduleFuture取消,monitor的状态依旧为RUNNING。
    • 调用heartbeatListener#reportPayload处理心跳信息,其具体过程依据具体的实现。
    • 最后调用receiveHearbeat函数,响应一个心跳给请求方。

2.2.2 HeartbeatManagerSenderImpl

继承于HearbeatManagerImpl,HeartbeatManagerSenderImpl向其监控的heartbeatTarget对象请求心跳的响应,属于主动触发心跳请求。实现了Runnable接口,在其run方法中,会遍历heartbeatMonitor,通过requestHeartbeat()方法向节点获取心跳信息。

 public class HeartbeatManagerSenderImpl<I, O> extends HeartbeatManagerImpl<I, O> implements Runnable { @Override public void run() {  if (!stopped) {   log.debug("Trigger heartbeat request.");   for (HeartbeatMonitor<O> heartbeatMonitor : getHeartbeatTargets().values()) {    requestHeartbeat(heartbeatMonitor);   }   // 周期性调度,事件周期可配   getMainThreadExecutor().schedule(this, heartbeatPeriod, TimeUnit.MILLISECONDS);  } } // 主动发起心跳请求 private void requestHeartbeat(HeartbeatMonitor<O> heartbeatMonitor) {  O payload = getHeartbeatListener().retrievePayload(heartbeatMonitor.getHeartbeatTargetId());  final HeartbeatTarget<O> heartbeatTarget = heartbeatMonitor.getHeartbeatTarget();  // 调用Target的 requestHeartbeat函数  heartbeatTarget.requestHeartbeat(getOwnResourceID(), payload); }}

2.2.3 HeartbeatMonitorImpl

HeartbeatMonitor管理心跳目标,它在初始化会启动一个ScheduledExecutor。

  • timeout超时会通知heartbeatListener执行响应的超时逻辑;
  • 在规定时间内收到心跳信息,会重置ScheduledExecutor,重新开始;
public class HeartbeatMonitorImpl<O> implements HeartbeatMonitor<O>, Runnable { /** Resource ID of the monitored heartbeat target. */ private final ResourceID resourceID; //监控的资源ID /** Associated heartbeat target. */ private final HeartbeatTarget<O> heartbeatTarget; //心跳目标 private final ScheduledExecutor scheduledExecutor; /** Listener which is notified about heartbeat timeouts. */ private final HeartbeatListener<?, ?> heartbeatListener; HeartbeatMonitorImpl(   ResourceID resourceID,   HeartbeatTarget<O> heartbeatTarget,   ScheduledExecutor scheduledExecutor,   HeartbeatListener<?, O> heartbeatListener,   long heartbeatTimeoutIntervalMs) {  this.resourceID = Preconditions.checkNotNull(resourceID);  this.heartbeatTarget = Preconditions.checkNotNull(heartbeatTarget);  this.scheduledExecutor = Preconditions.checkNotNull(scheduledExecutor);  this.heartbeatListener = Preconditions.checkNotNull(heartbeatListener);  Preconditions.checkArgument(    heartbeatTimeoutIntervalMs > 0L,    "The heartbeat timeout interval has to be larger than 0.");  this.heartbeatTimeoutIntervalMs = heartbeatTimeoutIntervalMs;  lastHeartbeat = 0L;  //初始化的时候,就启动一个定时任务  resetHeartbeatTimeout(heartbeatTimeoutIntervalMs); }  @Override public void run() {  // The heartbeat has timed out if we're in state running  if (state.compareAndSet(State.RUNNING, State.TIMEOUT)) {   //通知heartbeatListener处理   heartbeatListener.notifyHeartbeatTimeout(resourceID);  } } void resetHeartbeatTimeout(long heartbeatTimeout) {  if (state.get() == State.RUNNING) {   cancelTimeout();   //重新开启新的定时任务   futureTimeout =     scheduledExecutor.schedule(this, heartbeatTimeout, TimeUnit.MILLISECONDS);   // Double check for concurrent accesses (e.g. a firing of the scheduled future)   if (state.get() != State.RUNNING) {    cancelTimeout();   }  } } }

2.2.4 HeartbeatServices

HeartbeatServices为所有需要心跳服务的创建heartbeat receivers and heartbeat senders。

 public class HeartbeatServices {  /**  * 创建 heartbeat receivers  * Creates a heartbeat manager which does not actively send heartbeats.  */  public <I, O> HeartbeatManager<I, O> createHeartbeatManager(   ResourceID resourceId,   HeartbeatListener<I, O> heartbeatListener,   ScheduledExecutor mainThreadExecutor,   Logger log) {  return new HeartbeatManagerImpl<>(    heartbeatTimeout, resourceId, heartbeatListener, mainThreadExecutor, log); }  /**  * 创建 heartbeat sender  * Creates a heartbeat manager which actively sends heartbeats to monitoring targets.  */  public <I, O> HeartbeatManager<I, O> createHeartbeatManagerSender(   ResourceID resourceId,   HeartbeatListener<I, O> heartbeatListener,   ScheduledExecutor mainThreadExecutor,   Logger log) {  return new HeartbeatManagerSenderImpl<>(    heartbeatInterval,    heartbeatTimeout,    resourceId,    heartbeatListener,    mainThreadExecutor,    log); }  // 从配置文件配置心跳间隔时间和心跳超时时间 //两者的关系 0 < 心跳间隔时间 < 心跳超时时间  public static HeartbeatServices fromConfiguration(Configuration configuration) {  long heartbeatInterval = configuration.getLong(HeartbeatManagerOptions.HEARTBEAT_INTERVAL);  long heartbeatTimeout = configuration.getLong(HeartbeatManagerOptions.HEARTBEAT_TIMEOUT);  return new HeartbeatServices(heartbeatInterval, heartbeatTimeout); } }

3. RM和TM的交互

3.1. 总述

在一个Flink集群中只有一个ResourceManager(RM),和一个或多个TaskManager(TM)。两者的交互过程为:TM启动时会向RM注册,注册成功之后,RM会主动要求TM上报心跳信息。通过RM和TM的心跳信息,双方知道对方是否存活。
在2.2.4小节总,我们知道HeartbeatManagerSenderImpl属于Sender,HeartbeatManagerImpl属于Receiver。sender要对心跳目标上报心跳信息,receiver收到信息请求后返回一个response。

3.2. 初始化过程

3.2.1 ResourceManager

  • RM启动
 public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>  extends FencedRpcEndpoint<ResourceManagerId>  implements ResourceManagerGateway, LeaderContender {   // RM启动时运行的方法 @Override public final void onStart() throws Exception {  try {   // 启动RMServices   startResourceManagerServices();  } catch (Throwable t) {   final ResourceManagerException exception =     new ResourceManagerException(       String.format("Could not start the ResourceManager %s", getAddress()),       t);   onFatalError(exception);   throw exception;  } } }
  • leaderElectionService#start方法
    leaderElectionService#start方法有多个实现,其中,主要是DefaultLeaderElectionService和StandaloneLeaderElectionService,前者是依赖外部组价的,这里我们以standalone模式分析。
    在standalone模式下,Flink集群中的leader是通过配置文件配置,所以在调用启动leader选举方法时,会直接将leadership赋给指定的节点,在赋予leadership角色过程会初始化心跳服务,大致的流程如下:
    StandaloneLeaderElectionService#start|ResourceManager#grantLeadership|ResourceManager#tryAcceptLeadership|ResourceManager#startServicesOnLeadership //其具体实现如下private void startServicesOnLeadership() { //启动心跳服务 startHeartbeatServices(); //slotManager是RM中管理slot的组件,其具体过程后续博客分析 slotManager.start(getFencingToken(), getMainThreadExecutor(), new ResourceActionsImpl()); //周期性判断是否存在未满足的slot请求 onLeadership();}
    启动心跳服务,就是创建分别创建了taskManagerHeartbeatManager和jobManagerHeartbeatManager用于RM和TM、RM和JM的心跳服务
  private void startHeartbeatServices() {  taskManagerHeartbeatManager =    heartbeatServices.createHeartbeatManagerSender(      resourceId,      new TaskManagerHeartbeatListener(),      getMainThreadExecutor(),      log);  jobManagerHeartbeatManager =    heartbeatServices.createHeartbeatManagerSender(      resourceId,      new JobManagerHeartbeatListener(),      getMainThreadExecutor(),      log); }

结合2.2.2小节,RM在心跳服务在和TM与JM的心跳过程中,充当的是请求心跳请求的发起方,即RM是主动去拉取心跳信息的。

3.2.2 TaskExecutor

TaskExecutor在创建时,就初始化了心跳组件。

 public TaskExecutor(   RpcService rpcService,   TaskManagerConfiguration taskManagerConfiguration,   HighAvailabilityServices haServices,   TaskManagerServices taskExecutorServices,   ExternalResourceInfoProvider externalResourceInfoProvider,   HeartbeatServices heartbeatServices,   TaskManagerMetricGroup taskManagerMetricGroup,   @Nullable String metricQueryServiceAddress,   BlobCacheService blobCacheService,   FatalErrorHandler fatalErrorHandler,   TaskExecutorPartitionTracker partitionTracker,   BackPressureSampleService backPressureSampleService) {  //创建HeartbeatManagerImpl,对JM的心跳进行相应  this.jobManagerHeartbeatManager =    createJobManagerHeartbeatManager(heartbeatServices, resourceId);  // 创建HeartbeatManagerImpl,对RM的心跳进行相应  this.resourceManagerHeartbeatManager =    createResourceManagerHeartbeatManager(heartbeatServices, resourceId); }

3.3 TM向RM注册过程

3.3.1 流程图

  • TaskExecutor的启动过程如下:
 TaskExecutor#onStart | TaskExecutor#startTaskExecutorServices | StandaloneLeaderRetrievalService#start //以standalone模式分析 | |//在standalone模式下,已知晓JobManager的地址,会直接去链接RM TaskExecutor.ResourceManagerLeaderListener#notifyLeaderAddress | TaskExecutor#notifyOfNewResourceManagerLeader | TaskExecutor#reconnectToResourceManager | |//在该方法中会主动调用TaskExecutorToResourceManagerConnection类的start方法去链接RM TaskExecutor#connectToResourceManager | | //在该函数的createNewRegistration方法中的回调函数,处理注册成功后的逻辑 RegisteredRpcConnection#start | |//z在该方法中会先链接RM,然后连接成功后发起注册请求 RetryingRegistration#startRegistration | RetryingRegistration#register | TaskExecutorToResourceManagerConnection#invokeRegistration

到此,TM向RM发起了注册,通过AKKA RPC,请求来到了RM中。

  • ResourceManager处理逻辑
 Res......

原文转载:http://www.shaoqun.com/a/890789.html

跨境电商:https://www.ikjzd.com/

欧麦:https://www.ikjzd.com/w/2085

跨境通:https://www.ikjzd.com/w/1329

海豚村:https://www.ikjzd.com/w/1779


0.说明基于Flink1.121.背景知识1.1Actor模型Flink底层RPC是通过AKKA实现的,AKKA是基于Actor模型实现的框架。下面,将大致介绍一下actor模型。在Actor模型中,一切事物都是actor,一个actor是一个基本的计算单元,每个actor是完全隔离的,不会共享内存,也就不会有共享数据带来的并发问题;它们是自己维护自身的状态,该状态不会被其他actor直接修改。整
家得宝:https://www.ikjzd.com/w/1570
泰安方特游乐园特色是什么?泰安方特项目介绍?:http://www.30bags.com/a/428350.html
泰安高铁站到太阳部落景区有直达车吗?泰安高铁站到太阳部落发车时间?:http://www.30bags.com/a/428436.html
泰安民宿排行,泰安住宿推荐,泰安住宿攻略:http://www.30bags.com/a/425243.html
泰安太阳部落交通路线?太阳部落旅游专线在哪坐?:http://www.30bags.com/a/428439.html
吧深一点老师今晚随你怎么弄 老师今天晚上让你桶个够:http://lady.shaoqun.com/a/247574.html
男朋友在车里㖭比过程 放在里面一整天:http://www.30bags.com/m/a/249824.html
少妇口述:夫妻4P换妻真实经历:http://www.30bags.com/m/a/249628.html
深圳到港珠澳大桥一日游多少钱:http://www.30bags.com/a/516026.html
深圳罗湖美术馆怎么坐地铁去(几号线+哪个站下):http://www.30bags.com/a/516027.html
罗湖美术馆要身份证吗:http://www.30bags.com/a/516028.html
深圳罗湖美术馆近期展览2021(更新中):http://www.30bags.com/a/516029.html

没有评论:

发表评论