专注于业务编排的⼯作流引擎Temporal框架技术Java实践
(SpringBoot)
⽬录
Temporal
Temporal 是⼀个微服务编排平台,使开发⼈员能够在不牺牲⽣产⼒或可靠性的情况下构建可扩展的应⽤程序。临时服务器以弹性⽅式执⾏应⽤程序逻辑单元⼯作流,⾃动处理间歇性故障并重试失败的操作。
Temporal 是⼀项成熟的技术,是 Uber 的 Cadence 的⼀个分⽀。Temporal由 Cadence 的创建者初创公司Temporal
Technologies开发。
若你还不了解Temporal基础结构,。本⽂专为实践受众,以提供实践思路为⽬的书写。
想象下,你的业务系统存在数⼗种算法服务,⾜够复杂,⽐如⾯部3D识别、局部投影、五官单部位识别,动作识别等等。处理请求量⼤频繁,处理过程采⽤算法先后不尽相同,并且这些算法处理时间不⼀,⽽你需要对这些算法进⾏编排,来实现具体的业务。
为此你可能需要选择⼀款框架来帮助你,解决因不同算法服务的不确定性带来的⿇烦,⽐如⽹络不确定性问题,业务复杂度问题等。
也许你跟我⼀样,曾在RabbitMq和Temporal中徘徊过,看完下⽂但愿你有所得,我对两者区别分析如下:
RabbitMq Temporal
平台跨平台,多语⾔,分布式集跨平台,多语⾔,时空集
关注点关注消息本⾝,不关⼼消息被谁消费,关注
的是消息是否送达。
关注任务执⾏进度和结果,是否需要重
试等等。让程序员专注于业务的编排。
性能基于内存的设计,性能出众。⽀持多种存储⽅式,性能各有差异。MySQL存储100w级轻松应对。
复杂度上⼿快,基于AMQP模型。难度较⼤,有独特的并发编程规范,会与基础语⾔有所冲突。
Web-UI⼈性化的web-ui提供⾼效资源监控。web-ui仍有较⼤优化空间。
部署单机、集、云环境部署简单。⽀持单机、时空集、⽀持多种云环境部署。
对Spring⽀持原⽣提供对Spring的多种⽀持实例模型。原⽣框架暂未提供对Spring的集成⽅案,因此你可能需要考虑进⾏代码层次的兼容。
依据上表,你应该可以做出合理的架构计数技术选型。很荣幸选择Temporal的你可以从下⽂获取实践经验。
业务系统结构
可以看出,Temporal仍有较⼤优化空间,但不妨碍它成为⼀款优秀框架的潜质。编程时会遇到不少问题,为此我根据经验总结出如下代码分层结构:
关键层级定义
AlgorithmWorker
封装后的⼯⼈客户端,负责启动Workflow,以及向队列投递消息。
Workflow 每个Workflow代表⼀种特定的⼯作流程,代表着对算法的⼀种特定编排逻辑。
Activity 每个Activity代表⼀种特定执⾏逻辑,⽐如3D识别算法逻辑、⾯部识别算法逻辑、所有涉及IO或第三⽅服务调⽤逻辑等。每个Activity之间是松耦合的,应当仅包含⼀种特定⾏为。
Manager业务逻辑层,⽤于解耦Workflow和Activity业务代码。
使⽤确定的层级关系,这意味着你的代码执⾏逻辑应当是:
1. 使⽤守护线程启动⼀个的Workflow客户端,作为消费队列消息的主体。
2. 提供同步/异步的消息投递接⼝,异步接⼝你可能会需要使⽤到,通过⽗Workflow管理⼦Workflow的并发执⾏,以及进度监听等。
3. 在WorkflowImpl进⾏具体的业务逻辑编排,你可以将它看成Service,不同的是,你必须遵循,否则将⾯临代码健壮性问题,相信我
你会为此头疼不已。
4. 将所有不确定性代码抽象成Activity。IO、RPC调⽤、DB、Redis等等第三⽅或服务调⽤代码。
5. 。
6. WorkflowImpl中单⾏代码执⾏时间不应该超过1s,Activity调⽤除外。否则,因事件超时导致Workflow进⼊执⾏逻辑问题,异常显
⽰如下:
2022-02-21 18:20:11.036 ERROR 26416 --- [ce="default": 1] i.play.ReplayWorkflowTaskHandler : Workflow task failure. startedEventId=3, WorkflowI
poral.internal.statemachines.WorkflowStateMachines.handleEvent(WorkflowStateMachines.java:175) ~[temporal-sdk-1.5.0.jar:na]
play.ReplayWorkflowRunTaskHandler.handleWorkflowTaskImpl(ReplayWorkflowRunTaskHandler.java:177) ~[temporal-sdk-1.5.0.jar:na] play.ReplayWorkflowRunTaskHandler.handleWorkflowTask(ReplayWorkflowRunTaskHandler.java:146) ~[temporal-sdk-1.5.0.jar:na]
play.ReplayWorkflowTaskHandler.handleWorkflowTaskWithEmbeddedQuery(ReplayWorkflowTaskHandler.java:201) ~[temporal-sdk-1.5 play.ReplayWorkflowTaskHandler.handleWorkflowTask(ReplayWorkflowTaskHandler.java:114) ~[temporal-sdk-1.5.0.jar:na]
poral.internal.worker.WorkflowWorker$TaskHandlerImpl.handle(WorkflowWorker.java:319) [temporal-sdk-1.5.0.jar:na]
poral.internal.worker.WorkflowWorker$TaskHandlerImpl.handle(WorkflowWorker.java:279) [temporal-sdk-1.5.0.jar:na]
poral.internal.worker.PollTaskExecutor.lambda$process$0(PollTaskExecutor.java:73) [tempo
ral-sdk-1.5.0.jar:na]
at urrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[na:1.8.0_302]
at urrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[na:1.8.0_302]
at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_302]
Caused by: java.lang.RuntimeException: WorkflowTask: failure executing SCHEDULED->WORKFLOW_TASK_STARTED, transition history is [CREATED->WORK poral.internal.uteTransition(StateMachine.java:151) ~[temporal-sdk-1.5.0.jar:na]
poral.internal.statemachines.StateMachine.handleHistoryEvent(StateMachine.java:101) ~[temporal-sdk-1.5.0.jar:na]
poral.internal.statemachines.EntityStateMachineBase.handleEvent(EntityStateMachineBase.java:67) ~
[temporal-sdk-1.5.0.jar:na]
poral.internal.statemachines.WorkflowStateMachines.handleSingleEvent(WorkflowStateMachines.java:233) ~[temporal-sdk-1.5.0.jar:na]
poral.internal.statemachines.WorkflowStateMachines.handleEventsBatch(WorkflowStateMachines.java:199) ~[temporal-sdk-1.5.0.jar:na]
... 11 common frames omitted
Caused by: io.temporal.internal.sync.PotentialDeadlockException: Potential deadlock detected: workflow thread "workflow-method" didn't yield control for over a s
当你了解Temporal基础,了解项⽬结构后,你应该能对简单业务进⾏编码,若不⾏也许下⾯会对你有些许帮助。
⼼跳以及重试机制
当Workflow或Activity出现异常,就会触发。Temporal通过⼼跳上报的⽅式检测Activity存活状态,⾯对复杂业务或⽹络抖动因素,
同步的⼼跳上报⽆法满⾜需求,为此提供两种解决⽅案:
通过线程池管控全局⼼跳,Activity执⾏之初将上下⽂存⼊池中,再通过多线程⽅式进⾏上报;
每个Activity运⾏之初启动⼀个⼦线程进⾏⼼跳上报
两种⽅式各有优劣,在这⾥我们选择⼦线程⽅式。
package com.bootmon;
poral.activity.ActivityExecutionContext;
slf4j.Slf4j;
/**
* 功能描述:
*
* @program: melt-voiceai-server
* @author: 代号007
* @create: 2021-12-28 09:39
**/
@Slf4j
public class HeartbeatThread implements Runnable {
private Integer heartbeatInterval;
private ActivityExecutionContext context;
private Object value;
private volatile boolean status = true;
public HeartbeatThread(Integer heartbeatInterval, ActivityExecutionContext context, Object value) {
this.heartbeatInterval = heartbeatInterval;
this.value = value;
}
springboot结构@Override
public void run() {
log.info("启动【{}】⼼跳包发送任务", value);
while (status) {
context.heartbeat(1);
log.info("发送【{}】⼼跳包成功。", value);
try {
Thread.sleep(heartbeatInterval * 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public synchronized void stop() {
status = false;
}
}
长耗时复杂业务⼯作流设计
对于复杂IO或者⼤批量数据任务,简单⼯作流可能⽆法满⾜需求。这类任务尝使⽤异步⽅式执⾏,这包括Workflow和Activity异步,并且会⾯临任务拆分问题,在这⾥提供⼀种:
我们有⼀个简单的⼯作流程,看起来与此类似:
⼯作流从⼀个⽂件开始,其中可能有数百万条记录需要处理。我可以看到⼏个可⽤于处理此问题的选项:
1. 在处理⽂件的每个阶段使⽤活动创建单个⼯作流。每个 Activity 都会引⽤⼀个⽂件,处理该⽂件,然后返回⼀个对新创建的已
处理⽂件的引⽤。然后将其传递给下⼀个活动,依此类推。如果在处理过程中出现任何问题,每个 Activity 都可以检查⼼跳以从特定点恢复。
2. 打开原始⽂件并循环处理每个对象。然后,每个 Activity 将只处理⼀个对象,因此该对象可能作为参数传递,⽽不是在⽂件中
引⽤。
3. 打开原始⽂件,并为⽂件中的每个对象⽣成⼀个单独的⼯作流。然后,每个 Activity 将只处理⼀个对象,因此这可能会作为参
数传递,⽽不是在⽂件中引⽤。需要跟踪每个⼦⼯作流,以便在所有⼦⼯作流完成后完成⽗⼯作流。
选项 2 有可能在单个⼯作流中创建数百万个事件,因此我认为每个⼯作流 10 万个事件的内部限制将使该选项不可⾏。
选项 1 和选项 3 看起来它们都可以⼯作。
选项 1 优点和缺点
1. 优点:不触发多个⼯作流程
2. 缺点:⽆法并⾏处理多个对象
选项 3 优点和缺点
1. 优点:可以并⾏处理多个对象
2. 缺点:必须创建很多⼯作流程。
3. 缺点:不能利⽤任何批量操作,例如批量保存对象。
在这种情况下,是否有关于应该采⽤哪种⽅法或⼀种⽅法可能⽐另⼀种⽅法存在的陷阱的任何建议?
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。
发表评论