|
本帖最后由 天之衡 于 2021-10-24 14:31 编辑
最近在做的工作比较需要一个支持任务编排工作流的框架或者平台,这里记录下实现上的一些思路。
任务编排工作流
任务编排是什么意思呢,顾名思义就是可以把"任务"这个原子单位按照自己的方式进行编排,任务之间可能互相依赖。复杂一点的编排之后就能形成一个 workflow 工作流了。我们希望这个工作流按照我们编排的方式去执行每个原子 task 任务。如下图所示,我们希望先并发运行 Task A 和 Task C,Task A 执行完后串行运行 Task B,在并发等待 Task B 和 C 都结束后运行 Task D,这样就完成了一个典型的任务编排工作流。
DAG 有向无环图
首先我们了解图这个数据结构,每个元素称为顶点 vertex,顶点之间的连线称为边 edge。像我们画的这种带箭头关系的称为有向图,箭头关系之间能形成一个环的成为有环图,反之称为无环图。显然运用在我们任务编排工作流上,最合适的是 DAG 有向无环图。
我们在代码里怎么存储图呢,有两种数据结构:邻接矩阵和邻接表。
下图表示一个有向图的邻接矩阵,例如 x->y 的边,只需将 Array[x][y]标识为 1 即可。
此外我们也可以使用邻接表来存储,这种存储方式较好地弥补了邻接矩阵浪费空间的缺点,但相对来说邻接矩阵能更快地判断连通性。
一般在代码实现上,我们会选择邻接矩阵,这样我们在判断两点之间是否有边更方便点。
一个任务编排框架
了解了 DAG 的基本知识后我们可以来简单实现一下。首先是存储结构,我们的 Dag 表示一整个图,Node 表示各个顶点,每个顶点有其 parents 和 children:
- //Dag
- public final class DefaultDag<T, R> implements Dag<T, R> {
- private Map<T, Node<T, R>> nodes = new HashMap<T, Node<T, R>>();
- ...
- }
- //Node
- public final class Node<T, R> {
- /**
- * incoming dependencies for this node
- */
- private Set<Node<T, R>> parents = new LinkedHashSet<Node<T, R>>();
- /**
- * outgoing dependencies for this node
- */
- private Set<Node<T, R>> children = new LinkedHashSet<Node<T, R>>();
- ...
- }
复制代码
画两个顶点,以及为这两个顶点连边操作如下:
- public void addDependency(final T evalFirstNode, final T evalLaterNode) {
- Node<T, R> firstNode = createNode(evalFirstNode);
- Node<T, R> afterNode = createNode(evalLaterNode);
- addEdges(firstNode, afterNode);
- }
- private Node<T, R> createNode(final T value) {
- Node<T, R> node = new Node<T, R>(value);
- return node;
- }
- private void addEdges(final Node<T, R> firstNode, final Node<T, R> afterNode) {
- if (!firstNode.equals(afterNode)) {
- firstNode.getChildren().add(afterNode);
- afterNode.getParents().add(firstNode);
- }
- }
复制代码
到现在我们其实已经把基础数据结构写好了,但我们作为一个任务编排框架最终是需要线程去执行的,我们把它和线程池一起给包装一下。
- //任务编排线程池
- public class DefaultDexecutor <T, R> {
- //执行线程,和2种重试线程
- private final ExecutorService<T, R> executionEngine;
- private final ExecutorService immediatelyRetryExecutor;
- private final ScheduledExecutorService scheduledRetryExecutor;
- //执行状态
- private final ExecutorState<T, R> state;
- ...
- }
- //执行状态
- public class DefaultExecutorState<T, R> {
- //底层图数据结构
- private final Dag<T, R> graph;
- //已完成
- private final Collection<Node<T, R>> processedNodes;
- //未完成
- private final Collection<Node<T, R>> unProcessedNodes;
- //错误task
- private final Collection<ExecutionResult<T, R>> erroredTasks;
- //执行结果
- private final Collection<ExecutionResult<T, R>> executionResults;
- }
复制代码
可以看到我们的线程包括执行线程池,2 种重试线程池。我们使用 ExecutorState 来保存一些整个任务工作流执行过程中的一些状态记录,包括已完成和未完成的 task,每个 task 执行的结果等。同时它也依赖我们底层的图数据结构 DAG。
接下来我们要做的事其实很简单,就是 BFS 这整个 DAG 数据结构,然后提交到线程池中去执行就可以了,过程中注意一些节点状态的保持,结果的保存即可。
|
|