Azkaban 是一个非常简单实用,而且开源的作业调度系统。在 2.x 版本中不支持集群模式部署,在 3.x 版本中支持集群模式部署,适用于作业量比较大一些的应用场景。有关 Azkaban 更多详细信息,如特点、功能、特性、作业定义等,可以参考官方文档,这里不再详述。
Azkaban 集群架构
下面我们看一下 Azkaban 集群模式的架构,如下图所示:
从上图可见,Azkaban 集群部署模式,主要有 3 个核心的组件:
- Azkaban WebServer
Azkaban WebServer,是整个调度集群的核心,负责所有作业的管理和调度。
- Azkaban ExecutorServer
Azkaban ExecutorServer,整个调度集群中实际运行作业的节点,该类节点可能是作为一个作业提交的客户端,比如 Spark on YARN 部署模式下,cluster 运行模式时只作为客户端使用,client 运行模式时会有部分计算逻辑;比如普通的 Java 程序需要处理量级较小的数据作业,这时Executor Server 节点可能有较大的工作负载,占用较多节点资源(内存、CPU)。
- DB
DB,是集群中所有节点运行共用的数据存储,包含作业信息、各种调度元数据等等。
核心调度概述
Azkaban WebServer 需要根据 Executor Server 的运行状态信息,选择一个合适的 Executor Server 来运行 WorkFlow,然后会将提交到队列中的 WorkFlow 调度到选定的 Executor Server 上运行。我们整理了与核心调度相关的各个组件,主要包括 Azkaban WebServer 端和 Azkaban ExecutorServer 端,他们之间的关系如下图所示:
其实,从调度层面来看,Azkaban WebServer 与 Executor Server 之间的交互方式非常简单,是通过 REST API 的方式来进行交互,基本的模式是,Azkaban WebServer 根据调度的需要,主动调用 Executor Server 暴露的 REST API 来获取相应的资源信息,比如 Executor Server 的状态信息、分配 WorkFlow 到指定 Executor Server 上运行,等等。
我们可以在 QueueProcessorThread.selectExecutorAndDispatchFlow() 方法中看到,选择 Executor Server 并进行调度的实现,代码片段如下所示:
final Executor selectedExecutor = selectExecutor(exflow, availableExecutors); if (selectedExecutor != null) { try { dispatch(reference, exflow, selectedExecutor); ExecutorManager.this.commonMetrics.markDispatchSuccess(); } catch (final ExecutorManagerException e) { ExecutorManager.this.commonMetrics.markDispatchFail(); logger.warn(String.format( "Executor %s responded with exception for exec: %d", selectedExecutor, exflow.getExecutionId()), e); handleDispatchExceptionCase(reference, exflow, selectedExecutor, availableExecutors); } }
QueueProcessorThread 是运行在 Azkaban WebServer 端的一个线程,它在 ExecutorManager 中定义,是内部调度中最核心的线程。selectExecutor() 方法处理如何选择一个合适的 Executor Server,然后通过 dispatch() 方法将需要运行的 WorkFlow 调度到该 Executor Server 上运行。
选择 Executor Server
Azkaban WebServer 选择 Executor,调用 selectExecutor() 方法,实现如下所示:
private Executor selectExecutor(final ExecutableFlow exflow, final Set<Executor> availableExecutors) { Executor choosenExecutor = getUserSpecifiedExecutor(exflow.getExecutionOptions(), exflow.getExecutionId()); // If no executor was specified by admin if (choosenExecutor == null) { logger.info("Using dispatcher for execution id :" + exflow.getExecutionId()); final ExecutorSelector selector = new ExecutorSelector(ExecutorManager.this.filterList, ExecutorManager.this.comparatorWeightsMap); choosenExecutor = selector.getBest(availableExecutors, exflow); } return choosenExecutor; }
首先,查看当前 exflow 的配置中,是否要求将该 exflow 调度到指定的 Executor Server 上运行,如果是的话,则会返回该指定的 Executor Server 的信息,后续直接调度到该 Executor Server 上;否则会按照一定的计算规则去选出一个 Executor Server。
在创建 ExecutorSelector 时,传入参数值 ExecutorManager.this.filterList,该 filterList 是从 azkanban.properties 文件中读取 azkaban.executorselector.filters 的配置值,并创建了一个 ExecutorFilter 对象,而该对象中包含了一组 FactorFilter,后面我们会说明。
使用 ExecutorSelector 来选出一个 Executor Server,具体选择的逻辑,我们可以查看 ExecutorSelector.getBest() 方法。
首先通过定义的 CandidateFilter(它是一个抽象类,具体实现类为 ExecutorFilter)进行预筛选:
for (final K candidateInfo : candidateList) { if (this.filter.filterTarget(candidateInfo, dispatchingObject)) { filteredList.add(candidateInfo); } }
上面的 filter 就是 FactorFilter 类的实例,Azkaban 内部定义了如下 3 种:
private static final String STATICREMAININGFLOWSIZE_FILTER_NAME = "StaticRemainingFlowSize"; private static final String MINIMUMFREEMEMORY_FILTER_NAME = "MinimumFreeMemory"; private static final String CPUSTATUS_FILTER_NAME = "CpuStatus";
目前 3.40.0 版本不支持自定义,只能使用内建实现的,如果需要增加新的 FactorFilter,可以在此基础上做一个简单改造,配置使用自己定义的 FactorFilter 实现。FactorFilter 是一个泛型类:FactorFilter<Executor, ExecutableFlow>,根据上面定义的 3 种指标对 Executor Server 进行一个预过滤,满足要求的会进行后面的比较,加入到调度 WorkFlow 执行的 Executor Server 的候选集中。
然后,通过如下方式进行比较排序,选择合适的 Executor Server:
// final work - find the best candidate from the filtered list. final K executor = Collections.max(filteredList, this.comparator); logger.debug(String.format("candidate selected %s", null == executor ? "(null)" : executor.toString())); return executor;
这里关键的就是 this.comparator,它有一个实现类 ExecutorComparator,该类中给出了需要对两个 Executor Server 的哪些指标进行综合比较,亦即一组比较器的定义,可以看到目前考虑了 4 种比较器:
private static final String NUMOFASSIGNEDFLOW_COMPARATOR_NAME = "NumberOfAssignedFlowComparator"; private static final String MEMORY_COMPARATOR_NAME = "Memory"; private static final String LSTDISPATCHED_COMPARATOR_NAME = "LastDispatched"; private static final String CPUUSAGE_COMPARATOR_NAME = "CpuUsage";
通过上面代码可以看出,在选择调度一个 WorkFlow 到 Azkaban 集群中的某个 Executor Server 时,需要比较 Executor Server 的如下 4 个指标:
- 能够运行 WorkFlow 的剩余容量,数值越大越优先
- 剩余内存用量,数值越大越优先
- 最近分配 Flow 的时间,数值越大越优先
- CPU 使用用量,数值越小越优先
基于上面 4 个指标,创建了 4 个比较器,使用 FactorComparator 来表示,对需要比较的一组 Executor Server,使用这 4 个比较器进行比较,通过加权后得到一个得分值,根据该得分值选定 Executor Server,核心逻辑如下所示:
final Collection<FactorComparator<T>> comparatorList = this.factorComparatorList.values(); for (final FactorComparator<T> comparator : comparatorList) { final int result = comparator.compare(object1, object2); result1 = result1 + (result > 0 ? comparator.getWeight() : 0); result2 = result2 + (result < 0 ? comparator.getWeight() : 0); logger.debug(String.format("[Factor: %s] compare result : %s (current score %s vs %s)", comparator.getFactorName(), result, result1, result2)); }
上面选取了待比较的两个 Executor Server 都不为空的情况,分别遍历每个 FactorComparator 进行比较,在分别对每个 Executor Server 的比较结果值进行累加求和,加权得到一个分数值。从一组 Executor Server 中,根据最终比较的分数值,分数值最大的 Executor Server 为最终选定的 Executor Server。
获取 Executor Server 的运行统计信息
在 Azkaban WebServer 内部,会维护集群中每个 Executor Server 的运行状态信息,该信息的获取是在 QueueProcessorThread 线程中实现的,定期去更新所维护的 Executor Server 的运行状态信息,如下所示:
if (currentTime - lastExecutorRefreshTime > activeExecutorsRefreshWindow || currentContinuousFlowProcessed >= maxContinuousFlowProcessed) { // Refresh executorInfo for all activeExecutors refreshExecutors(); lastExecutorRefreshTime = currentTime; currentContinuousFlowProcessed = 0; }
上面 refreshExecutors() 方法遍历内存中维护的所有的 Executor Server,调用每个 Executor Server 的 /serverStatistics 接口,拉取 Executor Server 的运行状态信息。
另外,Azkaban WebServer 还需要能够获取到各个 Executor Server 上运行的 WorkFlow 的状态信息,可以在 ExecutorManager.ExecutingManagerUpdaterThread 中看到实现,代码片段如下所示:
results = ExecutorManager.this.apiGateway.callWithExecutionId(executor.getHost(), executor.getPort(), ConnectorParams.UPDATE_ACTION, null, null, executionIds, updateTimes);
上面调用 Executor Server 的 /executor?action=update 接口来拉取 WorkFlow 的状态信息,然后更新内存中维护的状态信息数据结构。其中,有些 WorkFlow 可能已经运行完成,需要释放资源;有些 WorkFlow 状态发生变更,也需要更新 Azkaban WebServer 端内存中维护的数据结构。
调度 WorkFlow 到 Executor Server 上执行
上面已经选定 Executor Server,结合前面代码,是通过调用 ExecutorManager.dispatch() 方法来实现,调度 WorkFlow 到该选定的 Executor Server 运行,代码片段如下所示:
try { this.apiGateway.callWithExecutable(exflow, choosenExecutor, ConnectorParams.EXECUTE_ACTION); } catch (final ExecutorManagerException ex) { logger.error("Rolling back executor assignment for execution id:" + exflow.getExecutionId(), ex); this.executorLoader.unassignExecutor(exflow.getExecutionId()); throw new ExecutorManagerException(ex); }
通过跟踪查看 apiGateway.callWithExecutable() 实现,可以看到,最终是调用了 Executor Server 端的一个 REST API 接口:/executor,然后带上相关的请求参数,如 action=execute、execId 等。
Executor Server 执行 WorkFlow
很显然,Azkaban WebServer 调度 WorkFlow 后,Executor Server 在 ExecutorServlet 中接收到对应的请求,核心方法如下所示:
private void handleAjaxExecute(final HttpServletRequest req, final Map<String, Object> respMap, final int execId) throws ServletException { try { this.flowRunnerManager.submitFlow(execId); } catch (final ExecutorManagerException e) { e.printStackTrace(); logger.error(e.getMessage(), e); respMap.put(RESPONSE_ERROR, e.getMessage()); } }
在收到 Azkaban WebServer 的调度请求后,Executor Server 使用内部的 FlowRunnerManager 来提交 WorkFlow 执行。在这个过程中,首先使用 ExecutorLoader 从数据库中读取 WorkFlow 对应的信息;然后使用 FlowPreparer 进行初始化,创建对应的数据目录等;最后创建 FlowRunner 来执行 WorkFlow,并跟踪其执行状态。
参考资源
- http://azkaban.github.io/azkaban/docs/latest/
- https://github.com/azkaban/azkaban/releases/tag/3.40.0
本文基于署名-非商业性使用-相同方式共享 4.0许可协议发布,欢迎转载、使用、重新发布,但务必保留文章署名时延军(包含链接:http://shiyanjun.cn),不得用于商业目的,基于本文修改后的作品务必以相同的许可发布。如有任何疑问,请与我联系。
不错。