java并⾏_Java并⾏编程!
多核处理器现在已⼴泛应⽤于服务器、台式机和便携机硬件。它们还扩展到到更⼩的设备,如智能电话和平板电脑。由于进程的线程可以在多个内核上并⾏执⾏,因此多核处理器为并发编程打开了⼀扇扇新的⼤门。为实现应⽤程序的最⼤性能,⼀项重要的技术就是将密集型任务拆分成可以并⾏执⾏的若⼲⼩块,以便最⼤程度利⽤计算能⼒。
传统上,处理并发(并⾏)编程⼀直很困难,因为您不得不处理线程同步和共享数据的问题。Groovy (GPar)、Scala 和 Clojure 社区的努⼒已经证明,⼈们对 Java 平台上并发编程的语⾔级⽀持的兴趣⼗分强烈。这些社区都尝试提供全⾯的编程模型和⾼效的实现,以屏蔽与多线程和分布式应⽤程序相关的难点。但不应认为 Java 语⾔本⾝在这⽅⾯逊⾊。Java Platform, Standard Edition (Java SE) 5 及后来的Java SE 6 引⼊了⼀组程序包,可以提供强⼤的并发构建块。Java SE 7 通过添加并⾏⽀持进⼀步增强了这些构建块。
下⽂⾸先简单回顾了 Java 并发编程,从早期版本以来已经存在的低级机制开始。然后在介绍 Java SE 7 中由分解/合并框架提供的新增基本功能分解/合并任务之前,先介绍 urrent 程序包添加的丰富基元。⽂中给出了这些新 API 的⽰例⽤法。最后,在结束之前对⽅法进⾏了讨论。
下⾯,我们假定读者拥有 Java SE 5 或 Java SE 6 编程背景。在此过程中,我们还将介绍 Java SE 7 ⼀
些实⽤的语⾔发展。
Java 并发编程
传统线程
过去,Java 并发编程包括通过 java.lang.Thread 类和 java.lang.Runnable 接⼝编写线程,然后确保其代码以正确、⼀致的⽅式对共享可变对象进⾏操作并避免错误的读/写操作,同时不会产⽣由于锁争⽤条件所导致的死锁。以下是基本线程操作的⽰例:
Thread thread = new Thread() {
@Override public void run() {
System.out.println(">>> I am running in a separate thread!");
}
};
thread.start();
thread.join();
本⽰例中的代码所做的只是创建⼀个线程,该线程将⼀个字符串打印到标准输出流。主线程通过调⽤ join() 等待所创建的(⼦)线程完成。
这样直接操作线程对于简单⽰例来说是不错,但对于并发编程,这种代码很快就容易产⽣错误,尤其是当多个线程需要合作执⾏⼀个⼤型任务时。在这样的情况下,需要协调其控制流。
例如,某个线程执⾏的完成可能依赖于其他线程执⾏完成。通常⼈们熟知的⽰例是⽣产者/使⽤者的例⼦,如果使⽤者的队列已满则⽣产者应等待使⽤者,当队列为空时使⽤者应等待⽣产者。这⼀要求可通过共享状态和条件队列得到满⾜,但您仍需要通过对共享状态对象使⽤java.ify() 和 java.lang.Object.wait() 来使⽤同步,这很容易出错。
最后,⼀个常见的问题是对⼤段代码甚⾄是整个⽅法使⽤同步和提供互斥。尽管此⽅法可产⽣线程安全的代码,但由于排除实际上过长所引起的有限并⾏度,该⽅法通常导致性能变差。
正如计算中经常发⽣的那样,操作低级基元以实现复杂操作会打开错误之门,因此开发⼈员应想办法将复杂性封装在⾼效的⾼级库中。Java SE 5 正好为我们提供了这种能⼒。
urrent 程序包的丰富基元
Java SE 5 引⼊了⼀个名为 urrent 的程序包系列,Java SE 6 对其进⾏了进⼀步的增强。该程序包系列提供了以下并发编程基元、集合和特性:
执⾏器是对传统线程的增强,因为它们是从线程池管理抽象⽽来的。它们执⾏与传递到线程的任务类似的任务(实际上,可封装实现
java.lang.Runnable 的实例)。有些实现提供了线程池和调度策略。⽽且,可以通过同步和异步⽅式获取执⾏结果。
线程安全队列允许在并发任务之间传递数据。底层数据结构和并发⾏为有着丰富的实现,底层数据结构的实现包括数组列表、链接列表或双端队列等,并发⾏为的实现包括阻塞、⽀持优先级或延迟等。
细粒度的超时延迟规范,因为 urrent 程序包中的⼤部分类均⽀持超时延迟。例如,如果任务在规定时间范围内⽆法完成,执⾏器将中断任务执⾏。
丰富的同步模式,不仅仅是 Java 中低级同步块所提供的互斥。这些模式包括信号或同步障碍等常⽤语法。
⾼效、并发的数据集合(映射、列表和集),通过使⽤写时复制和细粒度锁通常可在多线程上下⽂中产⽣出⾊的性能。
原⼦变量,可以使开发⼈员免于亲⾃执⾏同步访问。这些变量封装了常⽤的基元类型,如整型或布尔值,以及对其他对象的引⽤。
超出固有锁所提供的锁定/通知功能范围的多种锁,例如,⽀持重新进⼊、读/写锁定、超时或基于轮询的锁定尝试。
例如,考虑以下程序:
注意:由于 Java SE 7 引⼊的新的整数⽂本,可以在任意位置插⼊下划线以提⾼可读性(例如,1_000_000)。
import java.util.*;
import urrent.*;
import static java.util.Arrays.asList;
public class Sums {
static class Sum implements Callable {
private final long from;
private final long to;
Sum(long from, long to) {
this.from = from;
< = to;
}
@Override
public Long call() {
long acc = 0;
for (long i = from; i <= to; i++) {
acc = acc + i;
}
return acc;
}
}
public static void main(String[] args) throws Exception {
ExecutorService executor = wFixedThreadPool(2);
List > results = executor.invokeAll(asList(
new Sum(0, 10), new Sum(100, 1_000), new Sum(10_000, 1_000_000)
));
executor.shutdown();
for (Future result : results) {
System.out.());
}
}
}
该⽰例程序利⽤执⾏器来计算多个长整型的和。内部 Sum 类实现了执⾏器⽤于计算结果的 Callable 接⼝,并发⼯作在 call() ⽅法内执⾏。urrent.Executors 类提供了多种实⽤⽅法,如提供预配置执⾏器或将传统 java.lang.Runnable 对象封装到 Callable 实例中。与 Runnable 相⽐,使⽤ Callable 的优势在于 Callable 能够显式返回⼀个值。
本⽰例使⽤⼀个执⾏器将⼯作分派给两个线程。ExecutorService.invokeAll() ⽅法接受 Callable 实例的集合,并在返回之前等待所有这些实例完成。它会返回 Future 对象的列表,这些对象全都表⽰计算的“未来”结果。如果我们以异步⽅式⼯作,就可以测试每个 Future 对象来检查其对应的 Callable 是否已完成⼯作,并检查其是否引发了异常,甚⾄可以取消其⼯作。相反,当使⽤普通传统线程时,必须通过共享的可变布尔值对取消逻辑进⾏编码,并由于定期检查此布尔值⽽减缓代码的执⾏。因为 invokeAll() 容易产⽣阻塞,我们可以直接对Future 实例进⾏遍历并读取其计算和。
还需注意,必须关闭执⾏器服务。如果未关闭,则在主⽅法退出时 Java 虚拟机将不会退出,因为环境中还有活动线程。
分解/合并任务
概述
与传统线程相⽐,执⾏器是⼀⼤进步,因为可以简化并发任务的管理。有些类型的算法要求任务创建⼦任务并与其他任务互相通信以完成任务。这些是“分⽽治之”的算法,也称为“映射归约”,类似函数语⾔中的齐名函数。其思路是将算法要处理的数据空间拆分成较⼩的独⽴块。这是“映射”阶段。⼀旦块集处理完毕之后,就可以将部分结果收集起来形成最终结果。这是“归约”阶段。
⼀个简单的⽰例是您希望计算⼀个⼤型整数数组的总和(参见图 1)。假定加法是可交换的,可以将数组划分为较⼩的部分,并发线程对这些部分计算部分和。然后将部分和相加,计算总和。因为对于此算法,线程可以在数组的不同区域上独⽴运⾏,所以与对数组中每个整数循环执⾏的单线程算法相⽐,此算法在多核架构上可以看到明显的性能提升。
图 1:整数数组的部分和
使⽤执⾏器解决以上问题很简单:将数组分为 n 个可⽤物理处理单元,创建 Callable 实例以计算每个部分和,将部分和提交给管理 n 个线程的线程池的执⾏器,然后收集结果以计算最终和。
但对于其他类型的算法和数据结构,执⾏计划通常不会如此简单。尤其是,标识“⾜够⼩”可通过⾼效⽅式独⽴处理的数据块的“映射”阶段预先不知道数据空间拓扑结构。对基于图形和基于树的数据结构来说尤为如此。在这些情况下,算法应创建“各部分”的层次结构,在返回部分结果之前等待⼦任务完成。尽管类似图 1 中的数组并⾮最优,但可以使⽤多级并发部分和计算(例如,在双核处理器上将数组分为 4个⼦任务)。
⽤于实现分⽽治之算法的执⾏器的问题与创建⼦任务⽆关,因为 Callable 可⾃由向其执⾏器提交新的⼦任务,然后以同步或异步⽅式等待其结果。问题出在并⾏上:当 Callable 等待另⼀个 Callable 的结果时,它被置于等待状态,因此浪费了处理排队等待执⾏的另⼀java中split的用法
个 Callable 的机会。
通过 Doug Lea 的努⼒,在 Java SE 7 中添加到 urrent 程序包的分解/合并框架填补了这⼀空⽩。Java SE 5 和 Java SE 6 版本的 urrent 帮助处理并发,Java SE 7 中另外增加了⼀些功能帮助处理并⾏。
⽤于⽀持并⾏的新增功能
核⼼新增功能是专⽤于运⾏实现 ForkJoinTask 实例的新的 ForkJoinPool 执⾏器。ForkJoinTask 对象⽀持创建⼦任务并等待⼦任务完成。通过这些明确的语义,执⾏器能够通过在任务等待另⼀任务完成并且有待处理任务要运⾏时“窃取”作业,从⽽在其内部线程池中分派任务。
ForkJoinTask 对象有两种特定⽅法:
fork() ⽅法允许计划 ForkJoinTask 异步执⾏。这允许从现有 ForkJoinTask 启动新的 ForkJoinTask。
⽽ join() ⽅法允许 ForkJoinTask 等待另⼀个 ForkJoinTask 完成。
任务之间的合作通过 fork() 和 join() 来实现,如图 2 所⽰。请注意,fork() 和 join() ⽅法名不应与其 POSIX 对应项(进程可通过它复制⾃⾝)混淆。其中,fork() 仅在 ForkJoinPool 中调度⼀个新任务,但不创建⼦ Java 虚拟机。
图 2:Fork 和 Join 任务之间的合作
有两种类型的 ForkJoinTask 实现:
RecursiveAction 的实例表⽰不产⽣返回值的执⾏。
相反,RecursiveTask 的实例会产⽣返回值。
通常,优先选择 RecursiveTask,因为⼤多数的分⽽治之算法返回数据集的计算值。对于任务的执⾏,提供了不同的同步和异步选项,从⽽有可能实现细致的模式。
⽰例:计算某个单词在⽂档中出现的次数
为了说明新的分解/合并框架的⽤法,我们举⼀个简单⽰例:计算某个单词在⼀组⽂档中出现的次数。⾸先,分解/合并任务应作为“纯”内存中算法运⾏,其中不涉及 I/O 操作。同时,应尽可能避免任务之间通过共享状态的通信,因为这意味着可能必须执⾏锁定。理想情况下,仅当⼀个任务分出另⼀个任务或⼀个任务并⼊另⼀个任务时,任务之间才进⾏通信。
我们的应⽤程序运⾏在⽂件⽬录结构上,将每个⽂件的内容加载到内存中。因此,需要以下类来表⽰该模型。⽂档表⽰为⼀系列⾏:
class Document {
private final List lines;
Document(List lines) {
this.lines = lines;
}
List getLines() {
return this.lines;
}
static Document fromFile(File file) throws IOException {
List lines = new LinkedList<>();
try(BufferedReader reader = new BufferedReader(new FileReader(file))) {
String line = adLine();
while (line != null) {
lines.add(line);
line = adLine();
}
}
return new Document(lines);
}
}
注意:如果您是初次接触 Java SE7,fromFile() ⽅法有两点会使您感到惊讶:
LinkedList 使⽤尖括号语法 (<>) 告知编译器推断通⽤类型参数。由于⾏是 List,LinkedList<> 扩展为L
inkedList。使⽤尖括号运算符,对于那些能在编译时轻松推断的类型就不必再重复,从⽽使得通⽤类型的处理更轻松。
try 块使⽤新的⾃动资源管理语⾔特性。在 try 块的开头可以使⽤实现 java.lang.AutoCloseable 的任何类。⽆论是否引发异常,当执⾏离开 try 块时,在此声明的任何资源都将正常关闭。在 Java SE 7 之前,正常关闭多个资源很快会变成⼀场嵌套if/try/catch/finally 块的梦魇,这种嵌套块通常很难正确编写。
于是⽂件夹成为⼀个简单的基于树的结构:
class Folder {
private final List subFolders;
private final List documents;
Folder(List subFolders, List documents) {
this.subFolders = subFolders;
this.documents = documents;
}
List getSubFolders() {
return this.subFolders;
}
List getDocuments() {
return this.documents;
}
static Folder fromDirectory(File dir) throws IOException {
List documents = new LinkedList<>();
List subFolders = new LinkedList<>();
for (File entry : dir.listFiles()) {
if (entry.isDirectory()) {
subFolders.add(Folder.fromDirectory(entry));
} else {
documents.add(Document.fromFile(entry));
}
}
return new Folder(subFolders, documents);
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。
发表评论