python任务编排_基于DAG实现的任务编排框架平台
最近在做的⼯作⽐较需要⼀个⽀持任务编排⼯作流的框架或者平台,这⾥记录下实现上的⼀些思路。
任务编排⼯作流
任务编排是什么意思呢,顾名思义就是可以把"任务"这个原⼦单位按照⾃⼰的⽅式进⾏编排,任务之间可能互相依赖。复杂⼀点的编排之后就能形成⼀个 workflow ⼯作流了。我们希望这个⼯作流按照我们编排的⽅式去执⾏每个原⼦ task 任务。如下图所⽰,我们希望先并发运⾏ Task A 和 Task C,Task A 执⾏完后串⾏运⾏ Task B,在并发等待 Task B 和 C 都结束后运⾏ Task D,这样就完成了⼀个典型的任务编排⼯作流。
DAG 有向⽆环图
⾸先我们了解图这个数据结构,每个元素称为顶点 vertex,顶点之间的连线称为边 edge。像我们画的这种带箭头关系的称为有向图,箭头关系之间能形成⼀个环的成为有环图,反之称为⽆环图。显然运⽤在我们任务编排⼯作流上,最合适的是 DAG 有向⽆环图。
我们在代码⾥怎么存储图呢,有两种数据结构:邻接矩阵和邻接表。
下图表⽰⼀个有向图的邻接矩阵,例如 x->y 的边,只需将 Array[x][y]标识为 1 即可。
此外我们也可以使⽤邻接表来存储,这种存储⽅式较好地弥补了邻接矩阵浪费空间的缺点,但相对来说邻接矩阵能更快地判断连通性。
⼀般在代码实现上,我们会选择邻接矩阵,这样我们在判断两点之间是否有边更⽅便点。
⼀个任务编排框架
了解了 DAG 的基本知识后我们可以来简单实现⼀下。⾸先是存储结构,我们的 Dag 表⽰⼀整个图,Node 表⽰各个顶点,每个顶点有其parents 和 children://Dagpublic final class DefaultDag implements Dag {private Map> nodes = new HashMap>();
...
}//Nodepublic final class Node {/**
* incoming dependencies for this node
*/private Set> parents = new LinkedHashSet>();/**
* outgoing dependencies for this node
*/private Set> children = new LinkedHashSet>();
...
}复制代码
画两个顶点,以及为这两个顶点连边操作如下:public void addDependency(final T evalFirstNode, final T evalLaterNode) {
Node firstNode = createNode(evalFirstNode);
Node afterNode = createNode(evalLaterNode);
addEdges(firstNode, afterNode);
}  private Node createNode(final T value) {
Node node = new Node(value);return node;
}private void addEdges(final Node firstNode, final Node afterNode) {if (!firstNode.equals(afterNode)) {
}
}复制代码
到现在我们其实已经把基础数据结构写好了,但我们作为⼀个任务编排框架最终是需要线程去执⾏的,我们把它和线程池⼀起给包装⼀下。
//任务编排线程池public class DefaultDexecutor  {//执⾏线程,和2种重试线程
private final ExecutorService executionEngine;private final ExecutorService immediatelyRetryExecutor;private final ScheduledExecu 执⾏状态private final ExecutorState state;
...
}//执⾏状态public class DefaultExecutorState {//底层图数据结构private final Dag graph;//已完成
private final Collection> processedNodes;//未完成private final Collection> unProcessedNodes;//错误
taskprivate final Collection> erroredTasks;//执⾏结果private final Collection> executionResults;
}复制代码
可以看到我们的线程包括执⾏线程池,2 种重试线程池。我们使⽤ ExecutorState 来保存⼀些整个任务⼯作流执⾏过程中的⼀些状态记
录,包括已完成和未完成的 task,每个 task 执⾏的结果等。同时它也依赖我们底层的图数据结构 DAG。
接下来我们要做的事其实很简单,就是 BFS 这整个 DAG 数据结构,然后提交到线程池中去执⾏就可以了,过程中注意⼀些节点状态的保
持,结果的保存即可。
还是以上图为例,值得说的⼀点是在 Task D 这个点需要有⼀个并发等待的操作,即 Task D 需要依赖 Task B 和 Task C 执⾏结束后再往下执⾏。这⾥有很多办法,我选择了共享变量的⽅式来完成并发等待。遍历⼯作流中被递归的⽅法的伪代码如下:
private void doProcessNodes(final Set> nodes) {for (Node node : nodes) {//共享变量 并发等待
if (!ains(node) && Parents())) {
Task task = newTask(node);utionEngine.submit(task);
...
ExecutionResult executionResult = utionEngine.proce***esult();if (executionResult.isSuccess()) {
state.markProcessingDone(processedNode);
}//继续执⾏孩⼦节点Children());
.
..
}
}
}复制代码
这样我们基本完成了这个任务编排框架的⼯作,现在我们可以如下来进⾏⽰例图中的任务编排以及执⾏:
DefaultExecutor executor = newTaskExecutor();
executor.addDependency("A", "B");
executor.addDependency("B", "D");
executor.addDependency("C", "D");
任务编排平台化
好了现在我们已经有⼀款任务编排框架了,但很多时候我们想要可视化、平台化,让使⽤者更加⽆脑。
框架与平台最⼤的区别在哪⾥?是可拖拽的可视化输⼊么?我觉得这个的复杂度更多在前端。⽽对于后端平台来讲,与框架最⼤的区别是数据的持久化。
对于 DAG 的顶点来说,我们需要将每个节点 Task 的信息给持久化到关系数据库中,包括 Task 的状态、输出结果等。⽽对于 DAG 的边来说,我们也得⽤数据库来存储各 Task 之间的⽅向关系。此外,在遍历执⾏ DAG 的整个过程中的中间状态数据,我们也得搬运到数据库中。
⾸先我们可以设计⼀个 workflow 表,来表⽰⼀个⼯作流。接着我们设计⼀个 task 表,来表⽰⼀个执⾏单元。task 表主要字段如下,这⾥主要是 task_parents 的设计,它是⼀个 string,存储 parents 的 taskId,多个由分隔符分隔。task_id
workflow_id
task_name
task_status
result
task_parents复制代码
依赖是上图这个例⼦,对⽐框架来说,我们⾸先得将其存储到数据库中去,最终可能得到如下数据:task_id  workflow_id  task_name  task_status  result  task_parents
1          1          A          0                    -1
2          1          B          0                    1
3          1          C          0                    -1
4          1          D          0                    2,3复制代码
快速排序python实现可以看到,这样也能很好地存储 DAG 数据,和框架中代码的输⼊⽅式差别并不是很⼤。
接下来我们要做的是遍历执⾏整个 workflow,这边和框架的差别也不⼤。⾸先我们可以利⽤select * from task where workflow_id = 1 and task_parents = -1来获取初始化节点 Task A 和 Task C,将其提交到我们的线程池中。
接着对应框架代码中的Children());,我们使⽤select * from task where
task_parents like %3%,就可以得到 Task C 的孩⼦节点 Task D,这⾥使⽤了模糊查询是因为我们的 task_parents 可能是由多个⽗亲的 taskId 与分隔号组合⽽成的字符串。查询到孩⼦节点后,继续提交到线程池即可。
别忘了我们在 Task D 这边还有⼀个并发等待的操作,对应框架代码中的if (!ains(node) &&
另外值得注意的是 task 的重试。在框架中,失败 task 的重试可以是⽴即使⽤当前线程重试或者放到⼀个定时线程池中去重试。⽽在平台上,我们的重试基本上来⾃于⽤户在界⾯上的点击,即主线程。
⾄此,我们已经将任务编排框架的功能基本平台化了。作为⼀个任务编排平台,可拖拽编排的可视化输⼊、整个⼯作流状态的可视化展⽰、任务的可⼈⼯重试都是其优点。

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。