Mybatis Select…for update⽤法
最近有需求批量处理⼤量数据,由于数据量很⼤,如果加分布式锁让⼀个线程跑需要太长时间,所以考虑集中⼆⼗⼏台机器并⾏执⾏,每次取1000条数据处理。
选择了使⽤select…for update悲观锁,每次把取出来的1000条数据加锁之后更改状态字段再commit,从⽽保证所有线程不重复取数据。
很容易想到的⽤法就是把select for upate和之后的更新语句放在⼀个事务中:
SqlSession sqlSession = sqlSessionFactory.openSession(false);
try{
List<TestObject> records=sqlSession.selectList("testForUpdate");//testForUpdate的sql语句为: select * from test_table where status='0' and rownum<10 00 order by create_date desc for update
if(records!=null && records.size>0){
Map<String,Object> updateParam=new HashMap<>();
updateParam.put("records", records);
updateParam.put("status","01");
sqlSession.update("batchUpdate", updateParam);
}druid连接池配置详解
}finally{
sqlSessionmit(true);
sqlSession.close();
}
⾸先开⼀个session,参数传false表⽰autocommit=false,不⾃动提交事务
执⾏select for update
更新取出来的数据
commit并且close session
commit的参数必须为true,这样在没有数据更新时也可以commit。否则commit时会判断⼀个isDirty的参数,这个参数只有在更新或者插⼊是会为true,如果不为true就不会commit,
sqlSessionmit(true)
具体的源码如下,我们传⼊的true就是这个force参数。
private boolean isCommitOrRollbackRequired(boolean force){
return!this.autoCommit &&this.dirty || force;
}
这种做法看起来没有什么问题,但是当select for update查不到数据时,也会对表加⾏锁,在我这个情况下就是给status=0的数据加⾏锁,即使这些数据并不存在,这个锁会⼀直存在,如果此时插⼊status=0的数据则会报错。这个时候问题就暴露出来了,当查出的数据为空时,就不会执⾏下⾯的update语句,然后就发现这个事务不会被提交,⾏锁⼀直不会被释放。
按理说finally语句块⾥的commit是⽆论如何都会执⾏的,为什么最后事务没有提交呢?
Debug进去才发现了事情并不是我们⼀开始想象的这样。
太长不看版本:
由于我们使⽤的数据源是阿⾥的DruidPooledDataSource,这个数据源的autoCommit配置默认为true,导致我们开启session时设置的false并不起作⽤,实际上我们的事务都是⾃动提交的,最后finally块中的commit会判断如果⾃动提交已经开启它就不会执⾏。所以当select for update的数据为空时,由于不会执⾏update语句, 所以没有被⾃动commt,当我们想要⼿动commit时,由于已经开启了⾃动commit,所以⼿动commit也乜嘢执⾏,最红导致事务不能提交。
详解
以DefaultSqlSession为例,Session实例中包含的属性有:
private Configuration configuration;//配置信息
private Executor executor;//实际执⾏select的对象
private boolean autoCommit;//openSession时传⼊的参数,select之前为false
private boolean dirty;//表⽰是否有脏数据,执⾏update或者insert时会为true
当sqlSession执⾏selectList⽅法时:
public<E> List<E>selectList(String statement, Object parameter, RowBounds rowBounds){
List var5;
try{
MappedStatement ms =MappedStatement(statement);
var5 =utor.query(ms,this.wrapCollection(parameter), rowBounds, Executor.NO_RESULT_HANDLER);
}catch(Exception var9){
throw ExceptionFactory.wrapException("Error querying database.  Cause: "+ var9, var9);
}finally{
ErrorContext.instance().reset();
}
return var5;
}
可以看到实际执⾏query⽅法进⾏查询的是Executor对象。
以BaseExecutor为例,属性有
protected Transaction transaction;
protected Executor wrapper;
protected ConcurrentLinkedQueue<BaseExecutor.DeferredLoad> deferredLoads;
protected PerpetualCache localCache;
protected PerpetualCache localOutputParameterCache;
protected Configuration configuration;
protected int queryStack =0;
private boolean closed;
可以看到transaction实际是在Executor对象中,是在sqlSessionFactory openSession的时候调⽤TransactionFactory创建的,再看transaction的属性,SpringManagedTransaction为例:
protected Connection connection;
protected DataSource dataSource;
protected TransactionIsolationLevel level;
protected boolean autoCommmit;
可以看到connection实际是在transaction对象中获取的,这⾥也有⼀个autoCommit属性,这个属性的值在打开session时和我们传⼊的参数值相等,是false。
再看connection对象,以我们⽤的DruidPooledConnection为例
public static final int MAX_RECORD_SQL_COUNT =10;
protected Connection conn;
protected volatile DruidConnectionHolder holder;
protected TransactionInfo transactionInfo;
private final boolean dupCloseLogEnable;
private volatile boolean traceEnable =false;
private boolean disable =false;
private boolean closed =false;
private final Thread ownerThread;
private long connectedTimeNano;
private volatile boolean running =false;
private volatile boolean abandoned =false;
private StackTraceElement[] connectStackTrace;
private Throwable disableError = null;
其中有get和set autoCommit的⽅法,然⽽这个类⾥⾯并没有autoCommit属性,可以看到这⾥还有⼀个conn属性,这个get和set的autoCommit的值都是从这个conn⾥来的。
现在把主要的类⼤致屡清楚了,那么再看⼀下selectList的执⾏过程,以及我们openSession时配置的autoCommit是怎么被覆盖的。
前⾯说到了执⾏sqlSession的select⽅法实际执⾏的是Executor的query⽅法。执⾏时会先试图从缓存中,如果没有对应的key则会执⾏queryFromDatabase⽅法,从DB中查询。
在从DB查询之前会执⾏prepareStatement⽅法⽣成⼀个Statement,在这个⽅法⾥
private Statement prepareStatement(StatementHandler handler, Log statementLog)throws SQLException {
Connection connection =Connection(statementLog);
Statement stmt = handler.prepare(connection);
handler.parameterize(stmt);
return stmt;
}
可以看到⾸先是获取⼀个连接,最终调⽤的是Transaction的getConnection⽅法:
public Connection getConnection()throws SQLException {
tion == null){
this.openConnection();
}
tion;
}
如果当前没有连接则open⼀个,注意,这个时候transaction对象的autoCommit还是我们设置的false。
接下来再看openConnection⽅法
private void openConnection()throws SQLException {
this.autoCommit =AutoCommit();
this.isConnectionTransactional = DataSourceUtils.tion,this.dataSource);
if(LOGGER.isDebugEnabled()){
LOGGER.debug("JDBC Connection ["+tion +"] will"+(this.isConnectionTransactional ?" ":" not ")+"be managed by Spring");
}
}
⾸先由datasource获取connection,⼤致过程是dataSource从⾃⼰的连接池⾥⾯取⼀个连接返回过来,
这⾥的连接都是根据dataSource 的配置提前⽣成的,也就是说由于我们dataSource默认配置的autoCommit为true,所以这⾥拿到的所有connection的autoCommit属性都是true。
拿到connection后执⾏了this.autoCommit = AutoCommit();这样transaction的autoCommit属性就被设置成了true。
最后sqlSession执⾏commit时,实际执⾏commit的是Executor的commit,实际⼜是执⾏的transaction对象的commit:
public void commit()throws SQLException {
tion != null &&!this.isConnectionTransactional &&!this.autoCommit){
if(LOGGER.isDebugEnabled()){
LOGGER.debug("Committing JDBC Connection ["+tion +"]");
}
}
}
可以看到执⾏commit的条件是connection不为空,并且这个连接是事务性的,并且autoCommit=false,由于在获取连接时transaction 的autoCommit属性已经被替换成了true,所以这⾥的commit不会被执⾏。
终于解释清楚了为什么commit没有执⾏,事务没有被提交。
实际上如果在openSession(false)之后执⾏⼀个update⽅法,在sessionmit()之前这个事务就已经被⾃动提交了。由于select for update并不会触发事务⾃动提交所以它的锁不会被释放。
解决⽅法
这⾥整个过程中都是⽤的connection中的autoCommit属性,⽽不是sqlSession的这个属性,因此解决⽅法有:⽤Connection().setAutoCommit(false);来设置autoCommit属性为false
或者提交时⽤直接调⽤connection的commit⽅法:Connection()mit();
啊!好长啊,终于写完了。可能有理解不对的地⽅,还请多多指教哈!

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。