javamysql⼤数据量批量插⼊与流式读取分析
总结下这周帮助客户解决报表⽣成操作的mysql 驱动的使⽤上的⼀些问题,与解决⽅案。由于⽣成报表逻辑要从数据库读取⼤量数据并在内存中加⼯处理后在
⽣成⼤量的汇总数据然后写⼊到数据库。基本流程是读取->处理->写⼊。
1 读取操作开始遇到的问题是当sql查询数据量⽐较⼤时候基本读不出来。开始以为是server端处理太慢。但是在控制台是可以⽴即返回数据的。于是在应⽤
这边抓包,发现也是发送sql后⽴即有数据返回。但是执⾏ResultSet的next⽅法确实阻塞的。查⽂档翻代码原来mysql驱动默认的⾏为是需要把整个结果全部读取到
内存中才开始允许应⽤读取结果。显然与期望的⾏为不⼀致,期望的⾏为是流的⽅式读取,当结果从myql服务端返回后⽴即还是读取处理。这样应⽤就不需要⼤量内存
来存储这个结果集。正确的流式读取⽅式代码⽰例:
PreparedStatement ps = connection.prepareStatement("select .. from ..",
ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
//forward only read only也是mysql 驱动的默认值,所以不指定也是可以的⽐如: PreparedStatement ps = connection.prepareStatement("select .. from ..");
ps.setFetchSize(Integer.MIN_VALUE); //也可以修改jdbc url通过defaultFetchSize参数来设置,这样默认所以的返回结果都是通过流⽅式读取.
ResultSet rs = ps.executeQuery();
while (rs.next()) {
  System.out.String("fieldName"));
}
代码分析:下⾯是mysql判断是否开启流式读取结果的⽅法,有三个条件forward-only,read-only,fatch size是Integer.MIN_VALUE
/**
* We only stream result sets when they are forward-only, read-only, and the
* fetch size has been set to Integer.MIN_VALUE
*
* @return true if this result set should be streamed row at-a-time, rather
* than read all at once.
*/
protected boolean createStreamingResultSet() {
try {
synchronized(checkClosed().getConnectionMutex()) {
return ((sultSetType == java.sql.ResultSet.TYPE_FORWARD_ONLY)
&& (sultSetConcurrency == java.sql.ResultSet.CONCUR_READ_ONLY) && (this.fetchSize == Integer.MIN_VALUE));
}
} catch (SQLException e) {
// we can't break the interface, having this be no-op in case of error is ok
return false;
}
}
2 批量写⼊问题。开始时应⽤程序是⼀条⼀条的执⾏insert来写⼊报表结果。写⼊也是⽐较慢的。主要原因是单条写⼊时候需要应⽤于db之间⼤量的
请求响应交互。每个请求都是⼀个独⽴的事务提交。这样⽹络延迟⼤的情况下多次请求会有⼤量的时间消耗的⽹络延迟上。第⼆个是由于每个事务db都会
有刷新磁盘操作写事务⽇志,保证事务的持久性。由于每个事务只是写⼊⼀条数据所以磁盘io利⽤率不⾼,因为对于磁盘io是按块来的,所以连续写⼊⼤量数据效率
更好。所以必须改成批量插⼊的⽅式,减少请求数与事务数。下⾯是批量插⼊的例⼦:还有jdbc连接串必须加下rewriteBatchedStatements=true
int batchSize = 1000;
PreparedStatement ps = connection.prepareStatement("insert into tb1 (c1,) values (?,?,?...)");
for (int i = 0; i < list.size(); i++) {
ps.(i).getC1());
ps.(i).getC2());
ps.(i).getC3());
ps.addBatch();
if ((i + 1) % batchSize == 0) {
}
}
if (list.size() % batchSize != 0) {
}
上⾯代码⽰例是每1000条数据发送⼀次请求。mysql驱动内部在应⽤端会把多次addBatch()的参数合并成⼀条multi value的insert语句发送给db去执⾏
⽐如insert into tb1(c1,c2,c3) values (v1,v2,v3),(v4,v5,v6),(v7,v8,v9)...
这样可以⽐每条⼀个insert 明显少很多请求。减少了⽹络延迟消耗时间与磁盘io时间,从⽽提⾼了tps。
代码分析: 从代码可以看出,
1 rewriteBatchedStatements=true,insert是参数化语句且不是insert ... select 或者 on duplicate key update with an
id=last_insert_id(...)的话会执⾏
executeBatchedInserts,也就是muti value的⽅式
2 rewriteBatchedStatements=true 语句是都是参数化(没有addbatch(sql)⽅式加⼊的)的⽽且mysql server版本在4.1以上语句超过三条,则执⾏executePreparedBatchAsMultiStatement
就是将多个语句通过;分隔⼀次提交多条sql。⽐如 "insert into tb1(c1,c2,c3) values (v1,v2,v3);insert into tb1(c1,c2,c3) values (v1,v2,v3)..."
3 其余的执⾏executeBatchSerially,也就是还是⼀条条处理
public void addBatch(String sql)throws SQLException {
synchronized(checkClosed().getConnectionMutex()) {
this.batchHasPlainStatements = true;
super.addBatch(sql);
}
}
public int[] executeBatch()throws SQLException {
//...
if (!this.batchHasPlainStatements
&& RewriteBatchedStatements()) {
if (canRewriteAsMultiValueInsertAtSqlLevel()) {
return executeBatchedInserts(batchTimeout);
}
if (tion.versionMeetsMinimum(4, 1, 0)
&& !this.batchHasPlainStatements
&& this.batchedArgs != null
&& this.batchedArgs.size() > 3 /* cost of option setting rt-wise */
)
{
return executePreparedBatchAsMultiStatement(batchTimeout);
}mysql操作官方文档
}
return executeBatchSerially(batchTimeout);
//.....
}
executeBatchedInserts相⽐executePreparedBatchAsMultiStatement的⽅式传输效率更好,因为⼀次请求只重复⼀次前⾯的insert table (c1,c2,c3)
mysql server 对请求报⽂的最⼤长度有限制,如果batch size 太⼤造成请求报⽂超过最⼤限制,mysql 驱动会内部按最⼤报⽂限制查分成多个报⽂。所以要真正减少提交次数
还要检查下mysql server的max_allowed_packet 否则batch size 再⼤也没⽤.
mysql> show VARIABLES like '%max_allowed_packet%';
+--------------------+-----------+
| Variable_name | Value |
+--------------------+-----------+
| max_allowed_packet | 167772160 |
+--------------------+-----------+
1 row in set (0.00 sec)
要想验证mysql 发送了正确的sql 有两种⽅式
1 抓包,下图是wireshark在应⽤端抓包mysql的报⽂
2 另⼀个办法是在mysql server端开启general log 可以查看mysql收到的所有sql
3 在jdbc url上加上参数traceProtocol=true 或者profileSQL=true or autoGenerateTestcaseScript=true
性能测试对⽐
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import com.alibaba.druid.pool.DruidDataSource;
public class BatchInsert {
public static void main(String[] args) throws SQLException {
int batchSize = 1000;
int insertCount = 1000;
testDefault(batchSize, insertCount);
testRewriteBatchedStatements(batchSize,insertCount);
}
private static void testDefault(int batchSize, int insertCount) throws SQLException {
long start = System.currentTimeMillis();
doBatchedInsert(batchSize, insertCount,"");
long end = System.currentTimeMillis();
System.out.println("default:" + (end -start) + "ms");
}
private static void testRewriteBatchedStatements(int batchSize, int insertCount) throws SQLException {
long start = System.currentTimeMillis();
doBatchedInsert(batchSize, insertCount, "rewriteBatchedStatements=true");
long end = System.currentTimeMillis();
System.out.println("rewriteBatchedStatements:" + (end -start) + "ms");
}
private static void doBatchedInsert(int batchSize, int insertCount, String mysqlProperties) throws SQLException {
DruidDataSource dataSource = new DruidDataSource();
dataSource.setUrl("jdbc:mysql://ip:3306/test?" + mysqlProperties);
dataSource.setUsername("name");
dataSource.setPassword("password");
dataSource.init();
Connection connection = Connection();
PreparedStatement preparedStatement = connection.prepareStatement("insert into Test (name,gmt_created,gmt_modified) values (?,now(),now())");        for (int i = 0; i < insertCount; i++) {
preparedStatement.setString(1, i+" ");
preparedStatement.addBatch();
if((i+1) % batchSize == 0) {
}
}
connection.close();
dataSource.close();
}
}
⽹络环境ping测试延迟是35ms ,测试结果:default:75525ms rewriteBatchedStatements:914ms

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。