《JAVA并发编程的艺术》之Java并发编程实战
《JAVA并发编程的艺术》之 Java并发编程实战
⽂章⽬录
当你在进⾏并发编程时,看着程序的执⾏速度在⾃⼰的优化下运⾏得越来越快,你会觉得越来越有成就感,这就是并发编程的魅⼒。但与此同时,并发编程产⽣的问题和风险可能也 会随之⽽来。本章先介绍⼏个并发编程的实战案例,然后再介绍如何排查并发编程造成的问题。
⽣产者和消费者模式
在并发编程中使⽤⽣产者和消费者模式能够解决绝⼤多数并发问题。该模式通过平衡⽣产线程和消费线程的⼯作能⼒来提⾼程序整体处理数据的速度。
在线程世界⾥,⽣产者就是⽣产数据的线程,消费者就是消费数据的线程。在多线程开发中,如果⽣产者处理速度很快,⽽消费者处理速度很慢,那么⽣产者就必须等待消费者处理完,才能继续⽣产数据。同样的道理,如果消费者的处理能⼒⼤于⽣产者,那么消费者就必须等待⽣产者。为了解决这种⽣产消费能⼒不均衡的问题,便有了⽣产者和消费者模式。
**什么是⽣产者和消费者模式 **
⽣产者和消费者模式是通过⼀个容器来解决⽣产者和消费者的强耦合问题。⽣产者和消费者彼此之间不直接通信,⽽是通过阻塞队列来进⾏通信,所以⽣产者⽣产完数据之后不⽤等待消费者处理,直接扔给阻塞队列,消费者不⽣产者要数据,⽽是直接从阻塞队列⾥取,阻塞队列就相当于⼀个缓冲区,平衡了⽣产者和消费者的处理能⼒。
这个阻塞队列就是⽤来给⽣产者和消费者解耦的。纵观⼤多数设计模式,都会⼀个第三者出来进⾏解耦,如⼯⼚模式的第三者是⼯⼚类,模板模式的第三者是模板类。在学习⼀些设计模式的过程中,先到这个模式的第三者,能帮助我们快速熟悉⼀个设计模式。
⽣产者消费者模式实战
我和同事⼀起利⽤业余时间开发的Yuna⼯具中使⽤了⽣产者和消费者模式。我先介绍下Yuna[1]⼯具,在阿⾥巴巴很多同事都喜欢通过邮件分享技术⽂章,因为通过邮件分享很⽅便,⼤家在⽹上看到好的技术⽂章,执⾏复制→粘贴→发送就完成了⼀次分享,但是我发现技术⽂章不能沉淀下来,新来的同事看不到以前分享的技术⽂章,⼤家也很难到以前分享过的技术⽂章。为了解决这个问题,我们开发了⼀个Yuna⼯具。
我们申请了⼀个专门⽤来收集分享邮件的邮箱,⽐如share@alibaba,⼤家将分享的⽂ 章发送到这个邮箱,让⼤家每次都抄送到这个邮箱肯定很⿇烦,所以我们的做法是将这个邮箱地址放在部门邮件列
表⾥,所以分享的同事只需要和以前⼀样向整个部门分享⽂章就⾏。
Yuna⼯具通过读取邮件服务器⾥该邮箱的邮件,把所有分享的邮件下载下来,包括邮件的附件、图⽚和邮件回复。因为我们可能会从这个邮箱⾥下载到⼀些⾮分享的⽂章,所以我们要求分享的邮件标题必须带有⼀个关键字,⽐如“内贸技术分享”。下载完邮件之后,通过confluence的Web Service接⼝,把⽂章插⼊到confluence⾥,这样新同事就可以在confluence⾥看以前分享过的⽂章了,并且Yuna⼯具还可以⾃动把⽂章进⾏分类和归档。
为了快速上线该功能,当时我们花了3天业余时间快速开发了Yuna 1.0版本。在1.0版本中并没有使⽤⽣产者消费模式,⽽是使⽤单线程来处理,因为当时只需要处理我们⼀个部门的 邮件,所以单线程明显够⽤,整个过程是串⾏执⾏的。在⼀个线程⾥,程序先抽取全部的邮件,转化为⽂章对象,然后添加全部的⽂章,最后删除抽取过的邮件。代码如下。
public void extract(){
logger.debug("开始"+getExtractorName()+"。。");// 抽取邮件
List<Article> articles =extractEmail();// 添加⽂章
for(Article article : articles){
addArticleOrComment(article);
}
// 清空邮件
cleanEmail();
logger.debug("完成"+getExtractorName()+"。。");
}
Yuna⼯具在推⼴后,越来越多的部门使⽤这个⼯具,处理的时间越来越慢,Yuna是每隔5分钟进⾏⼀次抽取的,⽽当邮件多的时候⼀次处理可能就花了⼏分钟,于是我在Yuna 2.0版本⾥使⽤了⽣产者消费者模式来处理邮件,⾸先⽣产者线程按⼀定的规则去邮件系统⾥抽取邮件,然后存放在阻塞队列⾥,消费者从阻塞队列⾥取出⽂章后插⼊到conflunce⾥。代码如下。
public class QuickEmailToWikiExtractor extends AbstractExtractor {
private ThreadPoolExecutor threadsPool;
private ArticleBlockingQueue<ExchangeEmailShallowDTO> emailQueue;
public QuickEmailToWikiExtractor(){
emailQueue=new ArticleBlockingQueue<ExchangeEmailShallowDTO>();
int corePoolSize = Runtime().availableProcessors()*2;
threadsPool =new ThreadPoolExecutor(corePoolSize, corePoolSize,10l, TimeUnit. SECONDS,new LinkedBlockingQueue<Runnable>(2000));
}
public void extract(){
logger.debug("开始"+getExtractorName()+"。。");
long start = System.currentTimeMillis();// 抽取所有邮件放到队列⾥
new ExtractEmailTask().start();
// 把队列⾥的⽂章插⼊到
Wiki insertToWiki();
long end = System.currentTimeMillis();
double cost =(end - start)/1000;
logger.debug("完成"+getExtractorName()+",花费时间:"+ cost +"秒");
}
/*** 把队列⾥的⽂章插⼊到Wiki */
private void insertToWiki(){
// 登录Wiki,每间隔⼀段时间需要登录⼀次
confluenceService.login(RuleFactory.USER_NAME, RuleFactory.PASSWORD);
while(true){
// 2秒内取不到就退出
ExchangeEmailShallowDTO email = emailQueue.poll(2, TimeUnit.SECONDS);
if(email == null){
break;
}
threadsPool.submit(new insertToWikiTask(email));
}
}
protected List<Article>extractEmail(){
List<ExchangeEmailShallowDTO> allEmails =getEmailService().queryAllEmails();
if(allEmails == null){
return null;
}
for(ExchangeEmailShallowDTO exchangeEmailShallowDTO : allEmails){
emailQueue.offer(exchangeEmailShallowDTO);
}
return null;
}
/*** 抽取邮件任务 ** @author tengfei.fangtf */
public class ExtractEmailTask extends Thread {
public void run(){
extractEmail();
}
}
}
代码的执⾏逻辑是,⽣产者启动⼀个线程把所有邮件全部抽取到队列中,消费者启动CPU*2个线程数处理邮件,从之前的单线程处理邮件变成了现在的多线程处理,并且抽取邮件的线程不需要等处理邮件的线程处理完再抽取新邮件,所以使⽤了⽣产者和消费者模式后,邮件的整体处理速度⽐以前要快了⼏倍。
[1] Yuna取名⾃我⾮常喜欢的⼀款RPG游戏《最终幻想》中⼥主⾓的名字。
多⽣产者和多消费者场景
在多核时代,多线程并发处理速度⽐单线程处理速度更快,所以可以使⽤多个线程来⽣产数据,同样可以使⽤多个消费线程来消费数据。⽽更复杂的情况是,消费者消费的数据,有可能需要继续处理,于是消费者处理完数据之后,它⼜要作为⽣产者把数据放在新的队列⾥,交给其他消费者继续处理,如图11-1所⽰。
我们在⼀个长连接服务器中使⽤了这种模式,⽣产者1负责将所有客户端发送的消息存放在阻塞队列1⾥,消费者1从队列⾥读消息,然后通过消息ID进⾏散列得到N个队列中的⼀个,然后根据编号将消息存放在到不同的队列⾥,每个阻塞队列会分配⼀个线程来消费阻塞队列⾥的数据。如果消费者2⽆法消费消息,就将消息再抛回到阻塞队列1中,交给其他消费者处理。
以下是消息总队列的代码。
/*** 总消息队列管理
*
* @author tengfei.fangtf
*/
public class MsgQueueManager implements IMsgQueue{
private static final Logger LOGGER = Logger(MsgQueueManager.class); /*** 消息总队列 */
public final BlockingQueue<Message> messageQueue;
private MsgQueueManager(){
messageQueue =new LinkedTransferQueue<Message>();
}
public void put(Message msg){
try{
messageQueue.put(msg);
}catch(InterruptedException e){
Thread.currentThread().interrupt();
}
}
public Message take(){
try{
return messageQueue.take();
}catch(InterruptedException e){
Thread.currentThread().interrupt();
}
return null;
}
}
启动⼀个消息分发线程。在这个线程⾥⼦队列⾃动去总队列⾥获取消息。
/**
* 分发消息,负责把消息从⼤队列塞到⼩队列⾥
*
* @author tengfei.fangtf
*/
static class DispatchMessageTask implements Runnable {
@Override
public void run(){
BlockingQueue<Message> subQueue;
for(;;){
// 如果没有数据,则阻塞在这⾥
Message msg = MessageQueue().take();
// 如果为空,则表⽰没有Session机器连接上来,
/
/ 需要等待,直到有Session机器连接上来
while((subQueue =getInstance().getSubQueue())== null){
try{
Thread.sleep(1000);
}catch(InterruptedException e){
Thread.currentThread().interrupt();
}
}
// 把消息放到⼩队列⾥
try{
subQueue.put(msg);
}catch(InterruptedException e){
Thread.currentThread().interrupt();
}
}
}
}
使⽤散列(hash)算法获取⼀个⼦队列,代码如下。
/**
* 均衡获取⼀个⼦队列。
*
* @return
*/
public BlockingQueue<Message>getSubQueue(){
int errorCount =0;
for(;;){
if(subMsgQueues.isEmpty()){
return null;
}
int index =(int)(System.nanoTime()% subMsgQueues.size());
try{
(index);
}catch(Exception e){
/
/ 出现错误表⽰,在获取队列⼤⼩之后,队列进⾏了⼀次删除操作
<("获取⼦队列出现错误", e);
if((++errorCount)<3){
continue;
}
}
}
}
使⽤的时候,只需要往总队列⾥发消息。
// 往消息队列⾥添加⼀条消息
IMsgQueue messageQueue = MessageQueue();
Packet msg = atePacket(Packet64FrameType. TYPE_DATA,"{}".getBytes(),(short)1);
messageQueue.put(msg);
线程池与⽣产消费者模式
Java中的线程池类其实就是⼀种⽣产者和消费者模式的实现⽅式,但是我觉得其实现⽅式更加⾼明。⽣产者把任务丢给线程池,线程池创建线程并处理任务,如果将要运⾏的任务数⼤于线程池的基本线程数就把任务扔到阻塞队列⾥,这种做法⽐只使⽤⼀个阻塞队列来实现⽣产者和消费者模式显然要⾼明很多,因为消费者能够处理直接就处理掉了,这样速度更快,⽽⽣产者先存,消费者再取这种⽅式显然慢⼀些。
我们的系统也可以使⽤线程池来实现多⽣产者和消费者模式。例如,创建N个不同规模的Java线程池来处理不同性质的任务,⽐如线程池1将数据读到内存之后,交给线程池2⾥的线程继续处理压缩数据。线程池1主要处理IO密集型任务,线程池2主要处理CPU密集型任务。
本节讲解了⽣产者和消费者模式,并给出了实例。读者可以在平时的⼯作中思考⼀下哪 些场景可以使⽤⽣产者消费者模式,我相信这种场景应该⾮常多,特别是需要处理任务时间⽐较长的场景,⽐如上传附件并处理,⽤户把⽂件上传到系统后,系统把⽂件丢到队列⾥,然后⽴刻返回告诉⽤户上传成功,最后
消费者再去队列⾥取出⽂件处理。再如,调⽤⼀个远程接⼝查询数据,如果远程服务接⼝查询时需要⼏⼗秒的时间,那么它可以提供⼀个申请查询的接⼝,这个接⼝把要申请查询任务放数据库中,然后该接⼝⽴刻返回。然后服务器端⽤线程轮询并获取申请任务进⾏处理,处理完之后发消息给调⽤⽅,让调⽤⽅再来调⽤另外⼀个接⼝取数据。
线上问题定位
有时候,有很多问题只有在线上或者预发环境才能发现,⽽线上⼜不能调试代码,所以线上问题定位就只能看⽇志、系统状态和dump线程,本节只是简单地介绍⼀些常⽤的⼯具,以帮助⼤家定位线上问题。
1. 在Linux命令⾏下使⽤TOP命令查看每个进程的情况,显⽰如下。
top - 22:27:25 up 463 days, 12:46, 1 user, load average: 11.80, 12.19, 11.79
Tasks: 113 total, 5 running, 108 sleeping, 0 stopped, 0 zombie
Cpu(s): 62.0%us, 2.8%sy, 0.0%ni, 34.3%id, 0.0%wa, 0.0%hi, 0.7%si, 0.2%st
Mem: 7680000k total, 7665504k used, 14496k free, 97268k buffers
Swap: 2096472k total, 14904k used, 2081568k free, 3033060k cached
PID USER PR NI VIRT RES SHR S %CPU %MEM TIME+ COMMAND
31177 admin 18 0 5351m 4.0g 49m S 301.4 54.0 935:02.08 java
31738 admin 15 0 36432 12m 1052 S 8.7 0.2 11:21.05 nginx-proxy
java中常用的设计模式有哪些
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。
发表评论