找回密码
 立即注册
首页 业界区 业界 Flink源码阅读:集群启动

Flink源码阅读:集群启动

锦惺 10 小时前
前文中,我们已经了解了 Flink 的三种执行图是怎么生成的。今天继续看一下 Flink 集群是如何启动的。
启动脚本

集群启动脚本的位置在:
  1. flink-dist/src/main/flink-bin/bin/start-cluster.sh
复制代码
脚本会负责启动 JobManager 和 TaskManager,我们主要关注 standalone 启动模式,具体的流程见下图。
1.png

从图中可以看出 JobManager 是通过 jobmanager.sh 文件启动的,TaskManager 是通过taskmanager.sh 启动的,两者都调用了 flink-daemon.sh,通过传递不同的参数,最终运行不同的 Java 类。
  1. case $DAEMON in
  2.     (taskexecutor)
  3.         CLASS_TO_RUN=org.apache.flink.runtime.taskexecutor.TaskManagerRunner
  4.     ;;
  5.     (zookeeper)
  6.         CLASS_TO_RUN=org.apache.flink.runtime.zookeeper.FlinkZooKeeperQuorumPeer
  7.     ;;
  8.     (historyserver)
  9.         CLASS_TO_RUN=org.apache.flink.runtime.webmonitor.history.HistoryServer
  10.     ;;
  11.     (standalonesession)
  12.         CLASS_TO_RUN=org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint
  13.     ;;
  14.     (standalonejob)
  15.         CLASS_TO_RUN=org.apache.flink.container.entrypoint.StandaloneApplicationClusterEntryPoint
  16.     ;;
  17.     (sql-gateway)
  18.         CLASS_TO_RUN=org.apache.flink.table.gateway.SqlGateway
  19.         SQL_GATEWAY_CLASSPATH="`findSqlGatewayJar`":"`findFlinkPythonJar`"
  20.     ;;
  21.     (*)
  22.         echo "Unknown daemon '${DAEMON}'. $USAGE."
  23.         exit 1
  24.     ;;
  25. esac
复制代码
JobManager 启动流程

在 StandaloneSessionClusterEntrypoint 的 main 方法中,主要就是加载各种配置和环境变量,然后调用 ClusterEntrypoint.runClusterEntrypoint 来启动集群。跟着调用链一直找到 ClusterEntrypoint.runCluster 方法,这里会启动 ResourceManager、DispatcherRunner 等组件。
  1. private void runCluster(Configuration configuration, PluginManager pluginManager)
  2.         throws Exception {
  3.     synchronized (lock) {
  4.         // 初始化各种服务
  5.         initializeServices(configuration, pluginManager);
  6.         // 创建 DispatcherResourceManagerComponentFactory,
  7.         // 包含了三个核心组件的 Factory
  8.         // DispatcherRunnerFactory、ResourceManagerFactory、RestEndpointFactory
  9.         final DispatcherResourceManagerComponentFactory
  10.                 dispatcherResourceManagerComponentFactory =
  11.                         createDispatcherResourceManagerComponentFactory(configuration);
  12.         // 启动 ResourceManager、DispatcherRunner、WebMonitorEndpoint
  13.         clusterComponent =
  14.                 dispatcherResourceManagerComponentFactory.create(
  15.                         configuration,
  16.                         resourceId.unwrap(),
  17.                         ioExecutor,
  18.                         commonRpcService,
  19.                         haServices,
  20.                         blobServer,
  21.                         heartbeatServices,
  22.                         delegationTokenManager,
  23.                         metricRegistry,
  24.                         executionGraphInfoStore,
  25.                         new RpcMetricQueryServiceRetriever(
  26.                                 metricRegistry.getMetricQueryServiceRpcService()),
  27.                         failureEnrichers,
  28.                         this);
  29.         // 关闭服务
  30.         clusterComponent
  31.                 .getShutDownFuture()
  32.                 .whenComplete(
  33.                         (ApplicationStatus applicationStatus, Throwable throwable) -> {
  34.                             if (throwable != null) {
  35.                                 shutDownAsync(
  36.                                         ApplicationStatus.UNKNOWN,
  37.                                         ShutdownBehaviour.GRACEFUL_SHUTDOWN,
  38.                                         ExceptionUtils.stringifyException(throwable),
  39.                                         false);
  40.                             } else {
  41.                                 // This is the general shutdown path. If a separate more
  42.                                 // specific shutdown was
  43.                                 // already triggered, this will do nothing
  44.                                 shutDownAsync(
  45.                                         applicationStatus,
  46.                                         ShutdownBehaviour.GRACEFUL_SHUTDOWN,
  47.                                         null,
  48.                                         true);
  49.                             }
  50.                         });
  51.     }
  52. }
复制代码
下面来详细看一下这几个方法, initializeServices 就是负责初始化各种服务,有几个比较重要的可以着重关注下:
  1. // 初始化并启动一个通用的 RPC Service
  2. commonRpcService = RpcUtils.createRemoteRpcService(...);
  3. // 创建一个 IO 线程池,线程数量位 CPU 核数 * 4
  4. ioExecutor = Executors.newFixedThreadPool(...);
  5. // 创建 HA 服务组件,根据配置初始化 Standalone、ZK、K8S 三种
  6. haServices = createHaServices(configuration, ioExecutor, rpcSystem);
  7. // 创建并启动 blobServer,blobServer 可以理解为是 Flink 内部的
  8. blobServer = BlobUtils.createBlobServer(...);
  9. blobServer.start();
  10. // 创建心跳服务
  11. heartbeatServices = createHeartbeatServices(configuration);
  12. // 创建一个监控服务
  13. processMetricGroup = MetricUtils.instantiateProcessMetricGroup(...);
复制代码
createDispatcherResourceManagerComponentFactory 这个方法就是创建了三个工厂类,不需要过多介绍。我们重点关注 dispatcherResourceManagerComponentFactory.create 方法,即 ResourceManager、DispatcherRunner、WebMonitorEndpoint 是如何启动的。
WebMonitorEndpoint

WebMonitorEndpoint 的启动流程图如下,图中细箭头代表同一个方法中顺序调用,粗箭头代表进入上一个方法内部的调用。
2.png

WebMonitorEndpoint 创建和启动步骤如下:

  • 通过工厂创建出了 WebMonitorEndpoint,这里就是比较常规的初始化操作。
  • 调用 WebMonitorEndpoint 的 start 方法开始启动,start 方法内部先是创建了一个 Router 并调用 initializeHandlers 创建了一大堆 handler(是真的一大堆,这个方法有接近一千行,都是在创建 handler),创建完成之后,对 handler 进行排序和去重,再把它们都注册到 Router 中。这里排序是为了确保路由匹配的正确性,排序规则是先静态路径(/jobs/overview),后动态路径(/jobs/:jobid),假如我们没有排序,先注册了 /jobs/:jobid ,后注册 /jobs/overview ,这时当我们请求 /jobs/overview 时,就会被错误的路由到 /jobs/:jobid 上去。
  • 是调用 startInternal 方法,在 startInternal 方法内部只有 leader 选举和启动缓存清理任务两个步骤。
ResourceManager

3.png

ResourceManager 创建和启动步骤如下:

  • 调用 ResourceManagerServiceImpl.create 方法创建 ResourceManagerService,这里只是创建 ResourceManager 服务,实际创建 ResourceManager 在后面的步骤中。
  • 调用 resourceManagerService.start 方法启动服务,这里就是启动选主服务,standalne 模式直接调用 grantLeadership 成为 leader。
  • 成为 leader 后,就会调用 startNewLeaderResourceManager 方法,这个方法中会调用 resourceManagerFactory.createResourceManager 正式创建 resourceManager。创建完成后,就会调用 resourceManager.start 来启动它。
  • 启动后会回调 ResourceManager.onStart 方法。这里调用 startHeartbeatServices 启动了两个心跳服务,一个是 ResourceManager 和 TaskManager 之间的心跳,一个是 ResourceManager 和 JobManager 之间的心跳,然后会启动 SlotManager。SlotManager 可以被当作 Flink 集群的资源调度中心。它会负责管理集群中的所有 Slot 资源,也需要响应 JobManager 的资源请求。
DispatcherRunner

4.png


  • 先创建工厂,创建完成后调用 DefaultDispatcherRunner.create 创建出 DispatcherRunner,接着是调用 start 启动选主流程。
  • 选主完成后就调用 startNewDispatcherLeaderProcess 启动新的流程。启动新的流程需要先关闭旧流程,然后创建新的 dispatcherLeaderProcess,并调用 start 启动。
  • 启动时,会回调 onStart 方法。
  • 回调方法中,先启动 executionPlanStore,它主要是用于持久化 JobGraph。然后恢复执行计划,重建状态(如果是从失败中恢复),实例化 Dispatcher,完成作业启动。
TaskManager 启动流程

5.png

TaskManager 是 Flink 的执行节点,其最小执行单元是 slot。TaskManager 启动流程也主要是和资源管理相关,包括 slot 列表的管理和与 ResourceManager 的通信。
TaskManager 启动流程大体分为以下几部分:

  • 构建并启动 TaskManagerRunner(蓝色部分)
  • 启动 TaskExecutor(红色部分)
  • 完成与 ResourceManager 的连接(橙色部分)
启动 TaskManagerRunner

在 TaskManagerRunner 的 start 方法中,有两个步骤:
第一步是调用 startTaskManagerRunnerServices 创建和启动了很多服务,这一点和 JobManager 的启动流程比较像。这些服务包括了高可用服务、心跳服务、监控指标服务等,这里也创建了 taskExecutorService,它的启动在第二步。
第二步是调用 taskExecutorService.start 方法,启动 TaskExecutorService,它内部主要负责启动 TaskExecutor。
启动 TaskExecutor

TaskExecutor 是 TaskManager 内部的一个核心组件,负责帮助 TaskManager 完成 task 的部署和执行等核心操作。
在上一步调用 taskExecutor 的 start 方法后,会回调 onStart 方法,这里主要是三个步骤

  • 连接 ResourceManager 以及注册监听
  • 启动 taskSlotTable
  • 连接 JobMaster 以及注册监听
第一步我们在下面详细解释。第二步启动的 TaskSlotTable 是 TaskManager 中负责资源的核心组件,它维护了一个 Slot 列表,管理每个 Slot 的状态,负责 Slot 的分配和释放。第三步主要是和 JobMaster 建立连接并保持心跳,同时也会接收 Slot 申请的请求。
连接 ResourceManager

TaskExecutor 注册完监听之后,会收到 ResourceManagerLeaderListener.notifyLeaderAddress 方法回调。回调方法中,会创建一个 TaskExecutorToResourceManagerConnection 实例并启动它。这个类是用来将 TaskExecutor 注册到 ResourceManager,注册成功会回调 onRegistrationSuccess 方法。回调成功的方法中,TaskManager 会调用 resourceManagerGateway.sendSlotReport 将 Slot 的状态进行上报。
总结

本文介绍了 Flink 集群在 Standalone 模式下的启动过程,其中 JobManager 重点介绍了 WebMonitorEndpoint、ResourceManager 和 DispatcherRunner 这三个组件的启动过程。TaskManager 主要介绍了启动 TaskExecutor 和连接 ResourceManager 的过程。

来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!

相关推荐

您需要登录后才可以回帖 登录 | 立即注册