紫影基地

 找回密码
 立即注册
查看: 396|回复: 0

基于DAG实现的任务编排框架&平台

[复制链接]
阅读字号:

598

主题

635

帖子

9069

积分

审核员

Rank: 7Rank: 7Rank: 7

积分
9069
发表于 2021-10-24 14:26:09 | 显示全部楼层 |阅读模式
本帖最后由 天之衡 于 2021-10-24 14:31 编辑

最近在做的工作比较需要一个支持任务编排工作流的框架或者平台,这里记录下实现上的一些思路。
任务编排工作流

任务编排是什么意思呢,顾名思义就是可以把"任务"这个原子单位按照自己的方式进行编排,任务之间可能互相依赖。复杂一点的编排之后就能形成一个 workflow 工作流了。我们希望这个工作流按照我们编排的方式去执行每个原子 task 任务。如下图所示,我们希望先并发运行 Task A 和 Task C,Task A 执行完后串行运行 Task B,在并发等待 Task B 和 C 都结束后运行 Task D,这样就完成了一个典型的任务编排工作流。

142702dfv1zv95151zl2vg.png
DAG 有向无环图

首先我们了解图这个数据结构,每个元素称为顶点 vertex,顶点之间的连线称为边 edge。像我们画的这种带箭头关系的称为有向图,箭头关系之间能形成一个环的成为有环图,反之称为无环图。显然运用在我们任务编排工作流上,最合适的是 DAG 有向无环图。
我们在代码里怎么存储图呢,有两种数据结构:邻接矩阵和邻接表。
下图表示一个有向图的邻接矩阵,例如 x->y 的边,只需将 Array[x][y]标识为 1 即可。
142703mgq09avuvueodhdb.jpg
此外我们也可以使用邻接表来存储,这种存储方式较好地弥补了邻接矩阵浪费空间的缺点,但相对来说邻接矩阵能更快地判断连通性。
142704ezcn9t9dwhfzdc9w.jpg
一般在代码实现上,我们会选择邻接矩阵,这样我们在判断两点之间是否有边更方便点。
一个任务编排框架

了解了 DAG 的基本知识后我们可以来简单实现一下。首先是存储结构,我们的 Dag 表示一整个图,Node 表示各个顶点,每个顶点有其 parents 和 children:
  1. //Dag
  2. public final class DefaultDag<T, R> implements Dag<T, R> {

  3.     private Map<T, Node<T, R>> nodes = new HashMap<T, Node<T, R>>();
  4.     ...
  5. }

  6. //Node
  7. public final class Node<T, R> {
  8.     /**
  9.      * incoming dependencies for this node
  10.      */
  11.     private Set<Node<T, R>> parents = new LinkedHashSet<Node<T, R>>();
  12.     /**
  13.      * outgoing dependencies for this node
  14.      */
  15.     private Set<Node<T, R>> children = new LinkedHashSet<Node<T, R>>();
  16.     ...
  17. }
复制代码

画两个顶点,以及为这两个顶点连边操作如下:
  1.     public void addDependency(final T evalFirstNode, final T evalLaterNode) {
  2.         Node<T, R> firstNode = createNode(evalFirstNode);
  3.         Node<T, R> afterNode = createNode(evalLaterNode);

  4.         addEdges(firstNode, afterNode);
  5.     }

  6.    private Node<T, R> createNode(final T value) {
  7.         Node<T, R> node = new Node<T, R>(value);
  8.         return node;
  9.     }

  10.     private void addEdges(final Node<T, R> firstNode, final Node<T, R> afterNode) {
  11.         if (!firstNode.equals(afterNode)) {
  12.             firstNode.getChildren().add(afterNode);
  13.             afterNode.getParents().add(firstNode);
  14.         }
  15.     }
复制代码

到现在我们其实已经把基础数据结构写好了,但我们作为一个任务编排框架最终是需要线程去执行的,我们把它和线程池一起给包装一下。

  1. //任务编排线程池
  2. public class DefaultDexecutor <T, R> {

  3.     //执行线程,和2种重试线程
  4.     private final ExecutorService<T, R> executionEngine;
  5.     private final ExecutorService immediatelyRetryExecutor;
  6.     private final ScheduledExecutorService scheduledRetryExecutor;
  7.     //执行状态
  8.     private final ExecutorState<T, R> state;
  9.     ...
  10. }
  11. //执行状态
  12. public class DefaultExecutorState<T, R> {
  13.     //底层图数据结构
  14.     private final Dag<T, R> graph;
  15.     //已完成
  16.     private final Collection<Node<T, R>> processedNodes;
  17.     //未完成
  18.     private final Collection<Node<T, R>> unProcessedNodes;
  19.     //错误task
  20.     private final Collection<ExecutionResult<T, R>> erroredTasks;
  21.     //执行结果
  22.     private final Collection<ExecutionResult<T, R>> executionResults;
  23. }
复制代码

可以看到我们的线程包括执行线程池,2 种重试线程池。我们使用 ExecutorState 来保存一些整个任务工作流执行过程中的一些状态记录,包括已完成和未完成的 task,每个 task 执行的结果等。同时它也依赖我们底层的图数据结构 DAG。
接下来我们要做的事其实很简单,就是 BFS 这整个 DAG 数据结构,然后提交到线程池中去执行就可以了,过程中注意一些节点状态的保持,结果的保存即可。


1581984-7356cc40f1c92d80.png



天之所衡,道之所倚
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

QQ|Archiver|手机版|小黑屋|紫影基地

GMT+8, 2025-1-12 12:17 , Processed in 0.108592 second(s), 21 queries .

Powered by Discuz! X3.4

Copyright © 2001-2020, Tencent Cloud.

快速回复 返回顶部 返回列表