SpringBoot+Redis实现延时队列,写得太好了!
来源:blog.csdn/qq330983778/article/details/99341671
⾸先我们分析下这个流程
1. ⽤户提交任务。⾸先将任务推送⾄延迟队列中。
2. 延迟队列接收到任务后,⾸先将任务推送⾄job pool中,然后计算其执⾏时间。
3. 然后⽣成延迟任务(仅仅包含任务id)放⼊某个桶中
4. 时间组件时刻轮询各个桶,当时间到达的时候从job pool中获得任务元信息。
5. 监测任务的合法性如果已经删除则pass。继续轮询。如果任务合法则再次计算时间
6. 如果合法则计算时间,如果时间合法:根据topic将任务放⼊对应的ready queue,然后从bucket中移除。如果时间不合法,则重新计算时间再次放⼊bucket,并移除之前的
bucket中的内容
7. 消费端轮询对应topic的ready queue。获取job后做⾃⼰的业务逻辑。与此同时,服务端将已经被消费端获取的job按照其设定的TTR,重新计算执⾏时间,并将其放⼊
bucket。
8. 完成消费后,发送finish消息,服务端根据job id删除对应信息。
我们现在可以了解到中间存在的⼏个组件
延迟队列,为Redis延迟队列。实现消息传递
Job pool 任务池保存job元信息。根据⽂章描述使⽤K/V的数据结构,key为ID,value为job
Delay Bucket ⽤来保存业务的延迟任务。⽂章中描述使⽤轮询⽅式放⼊某⼀个Bucket可以知道其并没有使⽤topic来区分,个⼈这⾥默认使⽤顺序插⼊
Timer 时间组件,负责扫描各个Bucket。根据⽂章描述存在多个Timer,但是同⼀个Timer同⼀时间只能扫描⼀个Bucket
Ready Queue 负责存放需要被完成的任务,但是根据描述根据Topic的不同存在多个Ready Queue。
其中Timer负责轮询,Job pool、Delay Bucket、Ready Queue都是不同职责的集合。
ready:可执⾏状态,
delay:不可执⾏状态,等待时钟周期。
reserved:已被消费者读取,但没有完成消费。
deleted:已被消费完成或者已被删除。
1. ⾸先根据状态状态描述,finish和delete操作都是将任务设置成deleted状态。
2. 根据⽂章描述的操作,在执⾏finish或者delete的操作的时候任务已经从元数据中移除,此时deleted状态可能只存在极短时间,所以实际实现中就直接删除了。
3. ⽂章中并没有说明响应超时后如何处理,所以个⼈现在将其重新投⼊了待处理队列。
4. ⽂章中因为使⽤了集,所以使⽤redis的setnx锁来保证多个时间循环处理多个桶的时候不会出现重复循环。这⾥因为是简单的实现,所以就很简单的每个桶设置⼀个时间
队列处理。也是为了⽅便简单处理。关于分布式锁可以看我之前的⽂章⾥⾯有描述。
现在我们根据设计内容完成设计。这⼀块设计我们分四步完成
任务对象
@Data
@AllArgsConstructor
@NoArgsConstructor
public class Job implements Serializable {
/**
* 延迟任务的唯⼀标识,⽤于检索任务
*/
@JsonSerialize(using = ToStringSerializer.class)
private Long id;
/**
* 任务类型(具体业务类型)
*/
private String topic;
/**
* 任务的延迟时间
*/
private long delayTime;
/**
* 任务的执⾏超时时间
*/
private long ttrTime;
/**
* 任务具体的消息内容,⽤于处理具体业务逻辑⽤
*/
private String message;
/**
* 重试次数
*/
private int retryCount;
/**
* 任务状态
*/
private JobStatus status;
}
任务引⽤对象
public class DelayJob implements Serializable {
/**
* 延迟任务的唯⼀标识
*/
private long jodId;
/**
* 任务的执⾏时间
*/
private long delayDate;
/**
* 任务类型(具体业务类型)
*/
private String topic;
public DelayJob(Job job) {
this.jodId = Id();
this.delayDate = System.currentTimeMillis() + DelayTime();
}
public DelayJob(Object value, Double score) {
this.jodId = Long.parseLong(String.valueOf(value));
this.delayDate = System.currentTimeMillis() + score.longValue();
}
}
⽬前我们需要完成三个容器的创建,Job任务池、延迟任务容器、待完成任务容器job任务池,为普通的K/V结构,提供基础的操作
@Component
@Slf4j
public class JobPool {
@Autowired
private RedisTemplate redisTemplate;
private String NAME = "job.pool";
private BoundHashOperations getPool () {
BoundHashOperations ops = redisTemplate.boundHashOps(NAME);
return ops;
}
/**
* 添加任务
* @param job
*/
public void addJob (Job job) {
log.info("任务池添加任务:{}", JSONString(job));
getPool().Id(),job);
return ;
}
/**
* 获得任务
* @param jobId
* @return
*/
public Job getJob(Long jobId) {
Object o = getPool().get(jobId);
if (o instanceof Job) {
return (Job) o;
}
return null;
}
/**
* 移除任务
* @param jobId
*/
public void removeDelayJob (Long jobId) {
log.info("任务池移除任务:{}",jobId);
// 移除任务
getPool().delete(jobId);
}
}
延迟任务,使⽤可排序的ZSet保存数据,提供取出最⼩值等操作
@Slf4j
@Component
public class DelayBucket {
@Autowired
private RedisTemplate redisTemplate;
private static AtomicInteger index = new AtomicInteger(0);
@Value("${thread.size}")
private int bucketsSize;
private List <String> bucketNames = new ArrayList <>();
@Bean
public List <String> createBuckets() {
for (int i = 0; i < bucketsSize; i++) {
bucketNames.add("bucket" + i);
}
return bucketNames;
}
/**
* 获得桶的名称
* @return
*/
private String getThisBucketName() {
int thisIndex = index.addAndGet(1);
int i1 = thisIndex % bucketsSize;
(i1);
}
* @param bucketName
* @return
*/
private BoundZSetOperations getBucket(String bucketName) {
return redisTemplate.boundZSetOps(bucketName);
}
/**
* 放⼊延时任务
* @param job
*/
public void addDelayJob(DelayJob job) {
log.info("添加延迟任务:{}", JSONString(job));
String thisBucketName = getThisBucketName();
BoundZSetOperations bucket = getBucket(thisBucketName);
bucket.add(DelayDate());
}
/**
* 获得最新的延期任务
* @return
*/
public DelayJob getFirstDelayTime(Integer index) {
String name = (index);
BoundZSetOperations bucket = getBucket(name);
Set<ZSetOperations.TypedTuple> set = bucket.rangeWithScores(0, 1);
if (CollectionUtils.isEmpty(set)) {
return null;
}
ZSetOperations.TypedTuple typedTuple = (ZSetOperations.TypedTuple) Array()[0];        Object value = Value();
if (value instanceof DelayJob) {
return (DelayJob) value;
}
return null;
}
/**
* 移除延时任务
* @param index
* @param delayJob
*/
public void removeDelayTime(Integer index,DelayJob delayJob) {
String name = (index);
BoundZSetOperations bucket = getBucket(name);
}
}
待完成任务,内部使⽤topic进⾏细分,每个topic对应⼀个list集合
@Component
@Slf4j
public class ReadyQueue {
@Autowired
private RedisTemplate redisTemplate;
private String NAME = "process.queue";
private String getKey(String topic) {
return NAME + topic;
}
/**
* 获得队列
* @param topic
* @return
*/
private BoundListOperations getQueue (String topic) {
BoundListOperations ops = redisTemplate.boundListOps(getKey(topic));
return ops;
}
/**
* 设置任务
* @param delayJob
*/
public void pushJob(DelayJob delayJob) {
log.info("执⾏队列添加任务:{}",delayJob);
BoundListOperations listOperations = Topic());
listOperations.leftPush(delayJob);
}
/**
* 移除并获得任务
* @param topic
* @return
*/
public DelayJob popJob(String topic) {
BoundListOperations listOperations = getQueue(topic);
Object o = listOperations.leftPop();
if (o instanceof DelayJob) {
log.info("执⾏队列取出任务:{}", JSONString((DelayJob) o));
return (DelayJob) o;
}
return null;
}
}
设置了线程池为每个bucket设置⼀个轮询操作
@Component
public class DelayTimer implements ApplicationListener <ContextRefreshedEvent> {
@Autowired
private DelayBucket delayBucket;
@Autowired
private JobPool    jobPool;
@Autowired
private ReadyQueue  readyQueue;
@Value("${thread.size}")
private int length;
@Override
public void onApplicationEvent(ContextRefreshedEvent contextRefreshedEvent) {
ExecutorService executorService = new ThreadPoolExecutor(
length,
length,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue <Runnable>());
for (int i = 0; i < length; i++) {
springboot原理流程
new DelayJobHandler(
delayBucket,
jobPool,
readyQueue,
i));
}
}
}
/**
* 测试⽤请求
* @author daify
**/
@RestController
@RequestMapping("delay")
public class DelayController {
@Autowired
private JobService jobService;
/**
* 添加
* @param request
* @return
*/
@RequestMapping(value = "add",method = RequestMethod.POST)
public String addDefJob(Job request) {
DelayJob delayJob = jobService.addDefJob(request);
JSONString(delayJob);
}
/**
* 获取
* @return
*/
@RequestMapping(value = "pop",method = RequestMethod.GET)
public String getProcessJob(String topic) {
Job process = ProcessJob(topic);
JSONString(process);
}
/**
* 完成⼀个执⾏的任务
* @param jobId
* @return
*/
@RequestMapping(value = "finish",method = RequestMethod.DELETE)
public String finishJob(Long jobId) {
jobService.finishJob(jobId);
return "success";
}
@RequestMapping(value = "delete",method = RequestMethod.DELETE)
public String deleteJob(Long jobId) {
jobService.deleteJob(jobId);
return "success";
}
}
通过postman请求:localhost:8000/delay/add
此时这条延时任务被添加进了线程池中
2019-08-12 21:21:36.589  INFO 21444 --- [nio-8000-exec-6] ainer.JobPool  : 任务池添加任务:{"delayTime":10000,"id":3,"message":"tag:testid:3","retryCount":0,"status":"DELAY","topic":"test","ttrTime":10000 2019-08-12 21:21:36.609  INFO 21444 --- [nio-8000-exec-6] ainer.DelayBucket    : 添加延迟任务:{"delayDate":1565616106609,"jodId":3,"topic":"test"}
根据设置10秒钟之后任务会被添加⾄ReadyQueue中
2019-08-12 21:21:46.744  INFO 21444 --- [pool-1-thread-4] ainer.ReadyQueue    : 执⾏队列添加任务:DelayJob(jodId=3, delayDate=1565616106609, topic=test)
这时候我们请求localhost:8000/delay/pop
这个时候任务被响应,修改状态的同时设置其超时时间,然后放置在DelayBucket中
2019-08-09 19:36:02.342  INFO 58456 --- [nio-8000-exec-3] ainer.ReadyQueue    : 执⾏队列取出任务:{"delayDate":1565321728704,"jodId":1,"topic":"测试"}
2019-08-09 19:36:02.364  INFO 58456 --- [nio-8000-exec-3] ainer.JobPool  : 任务池添加任务:{"delayTime":10000,"id":1,"message":"延迟10秒,超时30秒","retryCount":0,"status":"RESERVED","topic":"测试" 2019-08-09 19:36:02.384  INFO 58456 --- [nio-8000-exec-3] ainer.DelayBucket    : 添加延迟任务:{"delayDate":1565321792364,"jodId":1,"topic":"测试"}
按照设计在30秒后,任务假如没有被消费将会重新放置在ReadyQueue中
2019-08-12 21:21:48.239  INFO 21444 --- [nio-8000-exec-7] ainer.ReadyQueue    : 执⾏队列取出任务:{"delayDate":1565616106609,"jodId":3,"topic":"test"}
2019-08-12 21:21:48.261  INFO 21444 --- [nio-8000-exec-7] ainer.JobPool  : 任务池添加任务:{"delayTime":10000,"id":3,"message":"tag:testid:3","retryCount":0,"status":"RESERVED","topic":"test","ttrTime"
任务的删除/
现在我们请求:localhost:8000/delay/delete
此时在Job pool中此任务将会被移除,此时元数据已经不存在,但任务还在DelayBucket中循环,然⽽在循环中当检测到元数据已经不存的话此延时任务会被移除。
2019-08-12 21:21:54.880  INFO 21444 --- [nio-8000-exec-8] ainer.JobPool  : 任务池移除任务:3
2019-08-12 21:21:59.104  INFO 21444 --- [pool-1-thread-5] dis.delay.handler.DelayJobHandler  : 移除不存在任务:{"delayDate":1565616118261,"jodId":3,"topic":"test"}
近期热⽂推荐:
1.
2.
3.
4.
5.
觉得不错,别忘了随⼿点赞+转发哦!

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。