spark多线程删数据库数据_使⽤SparkSQL实现多线程分页查
询并写⼊⽂件
⼀、由于具有多张宽表且字段较多,每个宽表数据⼤概为4000万条,根据业务逻辑拼接别名,并每张宽表的固定字段进⾏left join 拼接SQL。这样就能根据每个宽表的主列,根据每个宽表的不同字段关联出⼀张新的集合。由于下来要进⾏分页查询,如果要使⽤SparkSQL进⾏分页查询,需要增加序号列,那么就在刚才的Sql之前增加⼀句 create table tableName as SELECT  ROW_NUMBER() OVER() as id,* from (拼接的SQL) 就可创建⼀张带⾃增序列的,业务需要字段的⼏张宽表的关联集合,⽅便下来分页。
for(int i=0;i
SiCustomerLabelInfoModel Column = (i);
List ciMdaSysTable = ColumnName());
String alias = "t_" + (0).getTableId();
String aliasColumn = alias + "." + ColumnName();
String aliasTable = (0).getTableName() +" "+ alias;
if(mainTable == null){
mainTable = aliasTable;
}
(0).getUpdateCycle() == 1){
mainTable = aliasTable;
}
ColumnNameList.add(aliasColumn);
tableNameList.add(aliasTable);
}
String[] keyAlias = mainTable.split(" ");
String mainKeyColumn = keyAlias[1] + "." + keyColumn;
selectResult.append("select ").append(mainKeyColumn);
if(StringUtil.isNotEmpty(mainTable)){
fromTableName.append(" from ").append(mainTable);
}
Iterator table = tableNameList.iterator();
while(table.hasNext()){
String tableName = ();
String[] tableAlias = tableName.split(" ");
String[] mainAlias = mainTable.split(" ");
String alias = tableAlias[1];
String mAlias = mainAlias[1];
if(!mainTable.equals(tableName)){
fromTableName.append(" left join ").append(tableName).append(" on ").append(mAlias).append(".").append(keyColumn)
.append(" = ").append(alias).append(".").append(keyColumn).append(" ");
}
}
fromTableName.append(" ) a");
Iterator column = ColumnNameList.iterator();
while(column.hasNext()){
String columnName = ();
selectResult.append(",").append(columnName);
}
selectResult.append(fromTableName);
Createtable.append("create table ").append(cocDwName).append(" as SELECT ROW_NUMBER() OVER() as id,*
from").append(" (").append(selectResult);
⼆、由于业务场景,需要将4000万条数据最终写⼊10个⽂件,这⾥通过声明线程池pool,使⽤多线程的⽅法执⾏,有些⼈会担⼼那不会数据错乱吗,不会。因为后⾯要⽤分页sql,根据循环传⼊的 i 的值进⾏处理。
private ExecutorService pools = wFixedThreadPool(15);
if(result = true){
String queryCount = "select count(*) from "+cocDwName;
int count = DwTotolCount(queryCount);
log.info(""+keyColumn);
try {
for(int i=0;i<10;i++){
CreateDwFileThread jd = new CreateDwFileThread(jndiName,keyColumn,num,cocDwName,count,sysId,i);
Future fu = pools.submit(jd);
fus.add(fu);
}
long start = System.currentTimeMillis();
while (true) {
boolean done = true;
for (Future f : fus) {
if (!f.isDone()) {
done = false;
break;
}
}
if (!done) {
try {
Thread.sleep(1000 * 10);
} catch (InterruptedException e) {
<("sleep error", e);
e.printStackTrace();
}
continue;
} else {
break;
}
}
log.debug("wait tasks finish cost:" + (System.currentTimeMillis() - start));
}catch(Exception e){
result = false;
<("error", e);
}
}
jdbctemplate查询一条数据
三、根据第⼀步创建的表中的⾃增序列ID进⾏分页,由于要多线程并发执⾏,所以不能使⽤传统分页的begin与end,根据步骤⼆中传⼊的 i (这⾥参数为partNumber)进⾏处理,根据循环,每条线程执⾏的开始数据必定以上条数据结束的条数为开始,每次将查询出来的结果集通过list2File写⼊⽂件。这⾥还有个while循环,因为分成10份还是有400万条数据啊,还是觉得⼤,于是就⼜分成了10次~就是说每次查询出40万条写⼊⽂件,直到新加⼊400万条flag返回true退出循环。
while(flag == false){
pager.setPageSize(bufferedRowSize);
pager.setPageNum(pageNumber);
int begin = (PageNum() - 1) * PageSize()+createFileCount*partNumber;
int end = begin + PageSize();
if(end >= createFileCount*(partNumber+1)){
end = createFileCount*(partNumber+1);
}
StringBuffer sql = new StringBuffer() ;
sql.append(" select ").append(columns).append(" from ").append(cocDwName).append(" where id > ").append(begin).append(" and ").append(" id < ").append(end+1);
JdbcBaseDao jdbcBaseDao = (JdbcBaseDao) Instance().getService("jdbcBaseDao");
String BackjndiName = Properties("JNDI_CI_BACK");
final String file = fileLocalPath + File.separator + dwName+ "_" + String.valueOf(partNumber)+ ".csv";
Log.info("---------sql;:"+ sql + "-------fileName:"+file);
List> dataList = BackSimpleJdbcTemplate().String());
if (dataList.size() > 0) {
list2File(dataList, title, columns, file, encode, null, null);
pageNumber++;
}
if(end == createFileCount * partNumber + createFileCount){
flag = true;
}
有⼈会问你为啥不⽤ResultSet 直接放⼊400万条数据 为啥还要分开每40万条数据再分页写~ 我想说 我就是想这么⼲~ 啊哈哈。。。不过程序中貌似是有问题的 没有考虑到的情景,所以还在推敲。。(Resultset 查出来400万条不还是放在内存中,还是有可能内存溢出的,分页写⼤不了通过thriftserver多连接⼏次spark嘛~ 不过代码写的很烂,还在提⾼哈~)

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。