从ActiveMQ异步消息异步调⽤的区别到线程池的总结
ActiveMQ的异步消息
MQ的异步消息指的不是必须每⽣产⼀次就消费⼀次。⽣产⽅只要放到消息通道⾥。消费⽅不断地去监听就可以了。异步消息可以使⽤JMS来编码操作。JMS本⾝就是异步的。直接标注上代码即可。
1. ⾸先引⼊maven依赖:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
2. 其次加上MQ的url、username、password
spring:
activemq:
#broker-url: tcp://localhost:61616
broker-url: tcp://117.50.14.59:61616
user: admin
password: admin
3. 在启动类中加上@EnableJms 注解。标明启⽤JMS注⼊启⽤。
@EnableJms
@SpringBootApplication
public class SpringbootActiveMqApplication {
public static void main(String[] args) {
SpringApplication.run(SpringbootActiveMqApplication.class, args);
}
}
4. ⽣产⽅注⼊JmsTemplate
import javax.jms.Destination;
import org.springframework.beans.factory.annotation.Autowired;
import org.JmsMessagingTemplate;
import org.springframework.stereotype.Service;
@Service("producer")
public class Producer {
@Autowired // 也可以注⼊JmsTemplate,JmsMessagingTemplate对JmsTemplate进⾏了封装
private JmsMessagingTemplate jmsTemplate;
// 发送消息,destination是发送到的队列,message是待发送的消息
public void sendMessage(Destination destination, final String message) {
}
}
5. 消费⽅加上注解@JmsListener
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;
@Component
public class Consumer {
// 使⽤JmsListener配置消费者监听的队列,其中text是接收到的消息
@JmsListener(destination = "mytest.queue")
public void receiveQueue(String text) {
System.out.println("Consumer收到的报⽂为:"+text);
System.out.println("=================");
}
}
这样。⼀个简单的MQ异步消息就可以正常运作了。这属于MQ的异步消息
异步消息、异步调⽤、异步请求是不同的编码模式。他们应⽤的范畴不是⼀样的
异步请求与异步调⽤的区别
两者的使⽤场景不同,异步请求⽤来解决并发请求对服务器造成的压⼒,从⽽提⾼对请求的吞吐量;⽽异步调⽤是⽤来做⼀些⾮主线流程且不需要实时计算和响应的任务,⽐如同步⽇志到kafka中做⽇志分析等。
异步请求是会⼀直等待response相应的,需要返回结果给客户端的;⽽异步调⽤我们往往会马上返回
给客户端响应,完成这次整个的请求,⾄于异步调⽤的任务后台⾃⼰慢慢跑就⾏,客户端不会关⼼。
异步请求可以直接使⽤多线程来做。涉及到前端后端联合编码。这⾥不做详细说明。主要说明异步调⽤的使⽤。
通常在开发过程中,会遇到⼀个⽅法是和实际业务⽆关的,没有紧密性的。⽐如记录⽇志信息等业务。这个时候正常就是启⼀个新线程去做⼀些业务处理,让主线程异步的执⾏其他业务。
这⾥说⼀下springboot下的异步调⽤:
1. 需要在启动类加⼊@EnableAsync使异步调⽤@Async注解⽣效
@EnableAsync //异步线程执⾏启⽤
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
}
2. 在需要异步执⾏的⽅法上加⼊此注解即可@Async(“threadPool”),threadPool为⾃定义线程池
3. 在默认情况下,未设置TaskExecutor时,默认是使⽤SimpleAsyncTaskExecutor这个线程池,但此线程不是真正意义上的线程池,
因为线程不重⽤,每次调⽤都会创建⼀个新的线程。可通过控制台⽇志输出可以看出,每次输出线程名都是递增的。所以最好我们来⾃定义⼀个线程池。
4. 那如何⾃定义⼀个线程池呢。下⾯会有讲述。
4.1. ⾸先⾃定义配置类
@Configuration
@EnableAsync
@ConfigurationProperties(prefix = "spring.task.pool")
public class ExecutorConfig {
/**
* 线程池维护的核⼼数量
*/
private int corePoolSize ;
/**
* 线程池维护线程的最⼤数量
*/
private int maxPoolSize ;
private int maxPoolSize ;
/**
* 线程池所使⽤的缓冲队列
*/
private int queueCapacity;
/**
* 线程池维护线程所允许的空闲时间
*/
private int keepAliveSeconds;
/**
* 此bean⽤于埋点线程池。由于埋点数据的可丢失性。使⽤DiscardPolicy任务拒绝策略。
* @return
*/
@Bean
public Executor KITSimpleAsync() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(corePoolSize);
executor.setMaxPoolSize(maxPoolSize);
executor.setQueueCapacity(queueCapacity);
executor.setKeepAliveSeconds(keepAliveSeconds);
//任务拒绝策略。⽤于被拒绝任务的处理程序,DiscardPolicy 默认情况下它将丢弃被拒绝的任务。 executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
executor.setThreadNamePrefix("KITSimpleAsync---");
executor.initialize();
return executor;
springboot aop}
public int getCorePoolSize() {
return corePoolSize;
}
public int getKeepAliveSeconds() {
return keepAliveSeconds;
}
public void setKeepAliveSeconds(int keepAliveSeconds) {
this.keepAliveSeconds = keepAliveSeconds;
}
public void setCorePoolSize(int corePoolSize) {
}
public int getMaxPoolSize() {
return maxPoolSize;
}
public void setMaxPoolSize(int maxPoolSize) {
this.maxPoolSize = maxPoolSize;
}
public int getQueueCapacity() {
return queueCapacity;
}
public void setQueueCapacity(int queueCapacity) {
this.queueCapacity = queueCapacity;
}
}
4.2. 配置⽂件对应配置如下:
spring:
task:
pool:
corePoolSize: 5
maxPoolSize: 300
queueCapacity: 100
keepAliveSeconds: 30000
4.3. 使⽤时直接标注注解就可以使⽤
// 发送消息,destination是发送到的队列,message是待发送的消息
@Async("KITSimpleAsync")异步发送。使⽤⾃定义线程池详见ExecutorConfig
public void sendMessage(Destination destination,final String message){
}
5. 调⽤的异步⽅法,不能为同⼀个类的⽅法(包括同⼀个类的内部类),简单来说,因为Spring在启动扫描时会为其创建⼀个代理类,
⽽同类调⽤时,还是调⽤本⾝的代理类的,所以和平常调⽤是⼀样的。
其他的注解如@Cache等也是⼀样的道理,说⽩了,就是Spring的代理机制造成的。所以在开发中,最好把异步服务单独抽出⼀个类来管理。下⾯会重点讲述。
6. 什么情况下会导致@Async异步⽅法会失效?
a. 调⽤同⼀个类下注有@Async异步⽅法:在spring中像@Async和@Transactional、cache等注解本质使⽤的是动态代理,其实
Spring容器在初始化的时候Spring容器会将含有AOP注解的类对象“替换”为代理对象(简单这么理解),那么注解失效的原因就很明显了,就是因为调⽤⽅法的是对象本⾝⽽不是代理对象,因为没有经过Spring容器,那么解决⽅法也会沿着这个思路来解决。
b. 调⽤的是静态(static )⽅法
c. 调⽤(private)私有化⽅法
7. 解决同⼀个类中调⽤异步⽅法不起作⽤的问题
将要异步执⾏的⽅法单独抽取成⼀个类,原理就是当你把执⾏异步的⽅法单独抽取成⼀个类的时候,这个类肯定是被Spring管理的,其他Spring组件需要调⽤的时候肯定会注⼊进去,这时候实际上注⼊进去的就是代理类了。
其实我们的注⼊对象都是从Spring容器中给当前Spring组件进⾏成员变量的赋值,由于某些类使⽤了AOP注解,那么实际上在Spring容器中实际存在的是它的代理对象。那么我们就可以通过上下⽂获取⾃⼰的代理对象调⽤异步⽅法。
7.1
@Controller
@RequestMapping("/app")
public class EmailController {
//获取ApplicationContext对象⽅式有多种,这种最简单,其它的⼤家⾃⾏了解⼀下
@Autowired
private ApplicationContext applicationContext;
@RequestMapping(value = "/email/asyncCall", method = GET)
@ResponseBody
public Map<String, Object> asyncCall () {
Map<String, Object> resMap = new HashMap<String, Object>();
try{
//这样调⽤同类下的异步⽅法是不起作⽤的
//stAsyncTask();
//通过上下⽂获取⾃⼰的代理对象调⽤异步⽅法
EmailController emailController = (Bean(EmailController.class); stAsyncTask();
resMap.put("code",200);
}catch (Exception e) {
resMap.put("code",400);
<("error!",e);
}
return resMap;
}
//注意⼀定是public,且是⾮static⽅法
@Async
public void testAsyncTask() throws InterruptedException {
Thread.sleep(10000);
System.out.println("异步任务执⾏完成!");
}
}
7.2 另⼀种⽅式:开启cglib代理,⼿动获取Spring代理类,从⽽调⽤同类下的异步⽅法。
⾸先,在启动类上加上@EnableAspectJAutoProxy(exposeProxy = true)注解。
@EnableAspectJAutoProxy(exposeProxy = true) //开启cglib代理。使得能够调⽤同类中的异步⽅法
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
}
代码使⽤如下:
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。
发表评论