多线程分批处理集合(可扩展为分批从数据库中读取数据)的测试⼀例⼦【我】
任务类:
import java.util.List;
import java.util.Map;
public class MyTask implements Runnable {
//当前待处理数据集合
private List dataList;
//其他参数Map
private Map paramMap;
public MyTask() {
super();
}
public MyTask(List dataList, Map paramMap) {
super();
this.dataList = dataList;
生活中数据库系统的实际例子this.paramMap = paramMap;
}
public List getDataList() {
return dataList;
}
public void setDataList(List dataList) {
this.dataList = dataList;
}
public Map getParamMap() {
return paramMap;
}
public void setParamMap(Map paramMap) {
this.paramMap = paramMap;
}
@Override
public void run() {
try {
long threadStartTime = System.currentTimeMillis();
/
/ System.out.println("--T--线程: {"+Thread.currentThread().getName()+"} -- 开始执⾏,当前批次数据: {"+dataList.size()+"} 条,线程数:{"+("threadNum")+"},批次数:{"+("batchNum")+"},当前模值: {"+par System.out.println("--T--线程: {"+Thread.currentThread().getName()+"} -- 开始执⾏,当前批次数据: {"+dataList.size()+"} 条,当前模值: "+("mod"));
for (int y = 0; y < dataList.size(); y++) {
Object object = (y);
try {
long st = System.currentTimeMillis();
// System.out.println("--T--线程: {"+Thread.currentThread().getName()+"正在处理的数据是:"+object);
Thread.sleep(10);
} catch (Exception e) {
e.printStackTrace();
}
}
System.out.println("--T--线程: {"+Thread.currentThread().getName()+"} -- 结束执⾏,当前批次数据: {"+dataList.size()+"} 条,当前模值: {"+("mod")+"},当前线程总耗时:"+(System.currentTimeMillis() - threadStartTime)) } catch (Exception e) {
e.printStackTrace();
}
}
}
测试类:
import java.util.ArrayList;
import java.util.HashMap;
import urrent.ExecutorService;
import urrent.Executors;
public class T2 {
@SuppressWarnings("unchecked")
public static void main(String[] args) {
ArrayList<Object> dataList = new ArrayList<>();
for (int i = 0; i < 193570; i++) {
dataList.add(i);
}
long t1 = System.currentTimeMillis();
/
/ 创建线程服务
ExecutorService exec = null;
//数据总数
int dataNum = dataList.size();
// 线程数默认为1,数据⼩于等于100时使⽤
int threadNum = 1;
// 分隔数据的批次数
int batchNum = 1;
// 系统能承受最⼤线程数 batch.maxThreadNum 取⾃配置⽂件
// int maxThreadNum = Integer.Config("maxThreadNum"));
int maxThreadNum = 100;
/
/ 默认⼀次处理100条左右
int onceNum = 100;
if (dataNum <= 100) {
batchNum = 1;
exec = wCachedThreadPool();
} else if (100 < dataNum && dataNum <= 10000) {
// 批次数不会⼤于100
batchNum = dataNum / onceNum;
if (batchNum > maxThreadNum) {
// 设置固定线程数100
threadNum = maxThreadNum;
} else {
// 线程数等于批次数
threadNum = batchNum;
}
// 开启缓存线程池
exec = wCachedThreadPool();
} else if (dataNum > 10000) {
// 计划每批次500条左右
onceNum = 500;
// 批次数计算
batchNum = dataNum / onceNum; // bathNum 范围在20到400之间
if (batchNum > maxThreadNum) {
// 设置固定线程数100
threadNum = maxThreadNum;
} else {
// 线程数等于批次数
threadNum = batchNum;
}
// 开启固定线程池
exec = wFixedThreadPool(threadNum);
}
System.out.println("--B--预计线程数为:{"+threadNum+"},预计批次数:{"+batchNum+"},总待处理数量为:{"+dataNum+"}");
// 定义多线程相关
// final Semaphore semaphore = new Semaphore(10);
// ExecutorService exec = wCachedThreadPool();
// 处理的⽂件总数(查询出的)
int sumHandler = 0;
// 根据批次数启动线程
for (int i = 0; i < batchNum; i++) {
// 根据线程数和当前模值查出⼀批数据
ArrayList onceList = new ArrayList();
//根据对分批数量的模值切割数据
for (int j = 0; j < dataNum; j++) {
/
/⽤数据的id(这⾥是⽤数据集合的⾓标模拟)对批次数量取模,进⾏切分
//【实际项⽬中这步是⽤sql语句从数据库中按照相同的条件查询数据】
if (j%batchNum==i) {
onceList.(j));
}
}
// System.out.println("-----主线程中的i:"+i);
//每个线程⽤⼀个参数Map【注意:这⾥必须在循环内部new参数map,如果在循环外,会出现问题】
HashMap paramMap = new HashMap();
paramMap.put("dataNum", dataNum);
paramMap.put("batchNum", batchNum);
paramMap.put("threadNum", threadNum);
//当前模值
paramMap.put("mod", i);
// 开启线程
Runnable task = new MyTask(onceList, paramMap);
exec.submit(task);
//计数
sumHandler += onceList.size();
}
exec.shutdown();
// exec.awaitTermination(1, TimeUnit.HOURS);
while (true) {
if (exec.isTerminated()) {
System.out.println("--B--所有⼦线程都结束了,共计校验记录:{"+sumHandler+"}");
break;
}
}
System.out.println("--B--总耗时:"+(System.currentTimeMillis()-t1));
}
}
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。
发表评论