elastic-job⼊门实例
说明
Elastic-Job是⼀个分布式调度解决⽅案,由两个相互独⽴的⼦项⽬Elastic-Job-Lite和Elastic-Job-Cloud组成。
Elastic-Job-Lite定位为轻量级⽆中⼼化解决⽅案,使⽤jar包的形式提供分布式任务的协调服务;Elastic-Job-Cloud采⽤⾃研Mesos Framework的解决⽅案,额外提供资源治理、应⽤分发以及进程隔离等功能。
功能列表
1. 任务分⽚
将整体任务拆解为多个⼦任务
可通过服务器的增减弹性伸缩任务处理能⼒
分布式协调,任务服务器上下线的全⾃动发现与处理
2. 多任务类型
基于时间驱动的任务
基于数据驱动的任务(TBD)
同时⽀持常驻任务和瞬时任务
多语⾔任务⽀持
3. 云原⽣
完美结合Mesos或Kubernetes等调度平台
任务不依赖于IP、磁盘、数据等有状态组件
合理的资源调度,基于Netflix的Fenzo进⾏资源分配
4. 容错性
⽀持定时⾃我故障检测与⾃动修复
分布式任务分⽚唯⼀性保证安卓开发实例入门
⽀持失效转移和错过任务重触发
5. 任务聚合
相同任务聚合⾄相同的执⾏器统⼀处理
节省系统资源与初始化开销
动态调配追加资源⾄新分配的任务
6. 易⽤性
完善的运维平台
提供任务执⾏历史数据追踪能⼒
注册中⼼数据⼀键dump⽤于备份与调试问题
接下来我们就开始实现⼀个⼩例⼦
构建⼯具
gradle
项⽬结构如下
引⼊依赖
在adle⽂件中
//elastic-job
[group: 'com.dangdang', name: 'elastic-job-lite-core', version: '2.1.5'],
[group: 'com.dangdang', name: 'elastic-job-lite-spring', version: '2.1.5'] SimpleJob 简单作业
import com.dangdang.ddframe.job.api.ShardingContext;
import com.dangdang.ddframe.job.api.simple.SimpleJob;
public class MyElasticSimpleJob implements SimpleJob{
@Override
public void execute(ShardingContext context) {
switch (ShardingItem()) {
case0:
System.out.println("do something by sharding item 0");
break;
case1:
System.out.println("do something by sharding item 1");
break;
case2:
System.out.println("do something by sharding item 2");
break;
// case n: ...
}
}
}
DataFlowJob 数据流作业
import java.util.List;
import com.dangdang.ddframe.job.api.ShardingContext;
import com.dangdang.ddframe.job.api.dataflow.DataflowJob;
public class MyElasticDataflowJob implements DataflowJob<String>{
@Override
public List<String> fetchData(ShardingContext context) {
switch (ShardingItem()) {
case0:
/
/ get data from database by sharding item 0
List<String> data1 = new ArrayList<>();
data1.add("get data from database by sharding item 0");
return data1;
case1:
// get data from database by sharding item 1
List<String> data2 = new ArrayList<>();
data2.add("get data from database by sharding item 1");
return data2;
case2:
// get data from database by sharding item 2
List<String> data3 = new ArrayList<>();
data3.add("get data from database by sharding item 2");
return data3;
// case n: ...
}
return null;
}
@Override
public void processData(ShardingContext shardingContext, List<String> data) { int count=0;
// process data
// ...
for (String string : data) {
count++;
System.out.println(string);
if (count>10) {
return;
}
}
}
}
测试以上两种作业
import java.UnknownHostException;
import com.dangdang.ddframe.job.api.dataflow.DataflowJob;
import com.dangdang.ddframe.job.api.simple.SimpleJob;
import com.dangdang.fig.JobCoreConfiguration;
import com.dangdang.fig.JobRootConfiguration;
import com.dangdang.fig.dataflow.DataflowJobConfiguration;
import com.dangdang.fig.script.ScriptJobConfiguration;
import com.dangdang.fig.simple.SimpleJobConfiguration;
import com.dangdang.ddframe.job.lite.api.JobScheduler;
import com.dangdang.ddframe.fig.LiteJobConfiguration;
import com.dangdang.base.CoordinatorRegistryCenter;
import com.dangdang.okeeper.ZookeeperConfiguration;
import com.dangdang.okeeper.ZookeeperRegistryCenter;
import com.job.task.MyElasticDataflowJob;
import com.job.task.MyElasticSimpleJob;
public class JobDemo {
public static void main(String[] args) throws UnknownHostException {
System.out.println("");
System.out.LocalHost());
new JobScheduler(createRegistryCenter(), createSimpleJobConfiguration()).init();
new JobScheduler(createRegistryCenter(), createDataflowJobConfiguration()).init();
}
private static CoordinatorRegistryCenter createRegistryCenter() {
CoordinatorRegistryCenter regCenter = new ZookeeperRegistryCenter(
new ZookeeperConfiguration("127.0.0.1:2181", "new-elastic-job-demo"));
regCenter.init();
return regCenter;
}
private static LiteJobConfiguration createSimpleJobConfiguration() {
// 定义作业核⼼配置
JobCoreConfiguration simpleCoreConfig = wBuilder("SimpleJobDemo", "0/15 * * * * ?", 10).build();
// 定义SIMPLE类型配置
SimpleJobConfiguration simpleJobConfig = new SimpleJobConfiguration(simpleCoreConfig, CanonicalName());
// 定义Lite作业根配置
JobRootConfiguration simpleJobRootConfig = wBuilder(simpleJobConfig).build();
return (LiteJobConfiguration) simpleJobRootConfig;
}
private static LiteJobConfiguration createDataflowJobConfiguration() {
// 定义作业核⼼配置
JobCoreConfiguration dataflowCoreConfig = wBuilder("DataflowJob", "0/30 * * * * ?", 10).build();
// 定义DATAFLOW类型配置
DataflowJobConfiguration dataflowJobConfig = new DataflowJobConfiguration(dataflowCoreConfig, CanonicalName // 定义Lite作业根配置
JobRootConfiguration dataflowJobRootConfig = wBuilder(dataflowJobConfig).build();
return (LiteJobConfiguration) dataflowJobRootConfig;
}
}
运⾏结果
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。
发表评论