白菜Java自习室 涵盖核心知识
1. MyBatis-Plus 批处理性能问题
MyBatis-Plus(简称 MP)是一个 MyBatis 的增强工具,在 MyBatis 的基础上只做增强不做改变,为简化开发、提高效率而生。
MyBatis-Plus 虽然简化了开发,但是在真正业务临界点需要抉择底层 SQL 实现方案的时候,发现它默认的实现方式并不是最好得,尤其是批处理部分,我们来看他的源码(版本是3.3.2):
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-boot-starter</artifactId>
<version>3.3.2</version>
</dependency>
首先我们来看 批量新增部分:
public interface IService<T> {
/** * 默认批次提交数量 */
int DEFAULT_BATCH_SIZE = 1000;
/** * 插入(批量) * * @param entityList 实体对象集合 */
@Transactional(rollbackFor = Exception.class)
default boolean saveBatch(Collection<T> entityList) {
return saveBatch(entityList, DEFAULT_BATCH_SIZE);
}
/** * 插入(批量) * * @param entityList 实体对象集合 * @param batchSize 插入批次数量 */
boolean saveBatch(Collection<T> entityList, int batchSize);
}
MyBatis 源码相关的前置知识建议阅读作者的文章: Java工程师的进阶之路 MyBatis篇
public class ServiceImpl<M extends BaseMapper<T>, T> implements IService<T> {
/** * 批量插入 * * @param entityList ignore * @param batchSize ignore * @return ignore */
@Transactional(rollbackFor = Exception.class)
@Override
public boolean saveBatch(Collection<T> entityList, int batchSize) {
String sqlStatement = sqlStatement(SqlMethod.INSERT_ONE);
return executeBatch(entityList, batchSize, (sqlSession, entity) -> sqlSession.insert(sqlStatement, entity));
}
/** * 执行批量操作 * * @param list 数据集合 * @param batchSize 批量大小 * @param consumer 执行方法 * @param <E> 泛型 * @return 操作结果 * @since 3.3.1 */
protected <E> boolean executeBatch(Collection<E> list, int batchSize, BiConsumer<SqlSession, E> consumer) {
Assert.isFalse(batchSize < 1, "batchSize must not be less than one");
return !CollectionUtils.isEmpty(list) && executeBatch(sqlSession -> {
int size = list.size();
int i = 1;
for (E element : list) {
consumer.accept(sqlSession, element);
if ((i % batchSize == 0) || i == size) {
sqlSession.flushStatements();
}
i++;
}
});
}
}
其实看到这里,我就已经发现 MyBatis-Plus 批量新增是开启事务 insert 语句按 单条批量提交 的,不妨再看的深入点。
/** * 执行批量操作 * * @param consumer consumer * @since 3.3.0 * @deprecated 3.3.1 后面我打算移除掉 {@link #executeBatch(Collection, int, BiConsumer)} }. */
@Deprecated
protected boolean executeBatch(Consumer<SqlSession> consumer) {
SqlSessionFactory sqlSessionFactory = SqlHelper.sqlSessionFactory(entityClass);
SqlSessionHolder sqlSessionHolder = (SqlSessionHolder) TransactionSynchronizationManager.getResource(sqlSessionFactory);
boolean transaction = TransactionSynchronizationManager.isSynchronizationActive();
if (sqlSessionHolder != null) {
SqlSession sqlSession = sqlSessionHolder.getSqlSession();
//原生无法支持执行器切换,当存在批量操作时,会嵌套两个session的,优先commit上一个session
//按道理来说,这里的值应该一直为false。
sqlSession.commit(!transaction);
}
SqlSession sqlSession = sqlSessionFactory.openSession(ExecutorType.BATCH);
if (!transaction) {
log.warn("SqlSession [" + sqlSession + "] was not registered for synchronization because DataSource is not transactional");
}
try {
consumer.accept(sqlSession);
//非事物情况下,强制commit。
sqlSession.commit(!transaction);
return true;
} catch (Throwable t) {
sqlSession.rollback();
Throwable unwrapped = ExceptionUtil.unwrapThrowable(t);
if (unwrapped instanceof RuntimeException) {
MyBatisExceptionTranslator myBatisExceptionTranslator
= new MyBatisExceptionTranslator(sqlSessionFactory.getConfiguration().getEnvironment().getDataSource(), true);
throw Objects.requireNonNull(myBatisExceptionTranslator.translateExceptionIfPossible((RuntimeException) unwrapped));
}
throw ExceptionUtils.mpe(unwrapped);
} finally {
sqlSession.close();
}
}
然后我们再来看 批量更新部分,发现处理方式一样:
public interface IService<T> {
/** * 默认批次提交数量 */
int DEFAULT_BATCH_SIZE = 1000;
/** * 根据ID 批量更新 * * @param entityList 实体对象集合 */
@Transactional(rollbackFor = Exception.class)
default boolean updateBatchById(Collection<T> entityList) {
return updateBatchById(entityList, DEFAULT_BATCH_SIZE);
}
/** * 根据ID 批量更新 * * @param entityList 实体对象集合 * @param batchSize 更新批次数量 */
boolean updateBatchById(Collection<T> entityList, int batchSize);
}
public class ServiceImpl<M extends BaseMapper<T>, T> implements IService<T> {
@Transactional(rollbackFor = Exception.class)
@Override
public boolean updateBatchById(Collection<T> entityList, int batchSize) {
String sqlStatement = sqlStatement(SqlMethod.UPDATE_BY_ID);
return executeBatch(entityList, batchSize, (sqlSession, entity) -> {
MapperMethod.ParamMap<T> param = new MapperMethod.ParamMap<>();
param.put(Constants.ENTITY, entity);
sqlSession.update(sqlStatement, param);
});
}
/** * 执行批量操作 * * @param list 数据集合 * @param batchSize 批量大小 * @param consumer 执行方法 * @param <E> 泛型 * @return 操作结果 * @since 3.3.1 */
protected <E> boolean executeBatch(Collection<E> list, int batchSize, BiConsumer<SqlSession, E> consumer) {
Assert.isFalse(batchSize < 1, "batchSize must not be less than one");
return !CollectionUtils.isEmpty(list) && executeBatch(sqlSession -> {
int size = list.size();
int i = 1;
for (E element : list) {
consumer.accept(sqlSession, element);
if ((i % batchSize == 0) || i == size) {
sqlSession.flushStatements();
}
i++;
}
});
}
}
MyBatis-Plus 不管是批量 insert 还是批量 update,最终默认都会去调用 executeBatch(Collection<E> list, int batchSize, BiConsumer<SqlSession, E> consumer)
方法,并且在同一个事务中使用 for (E element : list) {}
每1000条一个批次来批量提交实现。
2. Mybatis 选择更优的批处理方案
方案一(Insert 和 Update 同理):
pulic boolean bathInsert(String statementId, List<Map> params) {
SqlSession sqlSession = null;
try {
sqlSession = SqlsessionUtil.getSqlSession();
for (Map param : params) {
sqlSession.insert(statementId, param);
}
sqlSession.commit();
return true;
} catch (Exception e) {
sqlSession.rollback();
e.printStackTrace();
} finally {
SqlsessionUtil.closeSession(sqlSession);
}
return false;
}
方案二(Insert 和 Update 同理):
<insert id="batchInsert">
INSERT INTO table
(
business_id,
element_id,
business_value
)
VALUES
<foreach collection="list" item="item" index="index" separator=",">
(#{item.business_id, jdbcType=VARCHAR},
#{item.element_id, jdbcType=VARCHAR},
#{item.business_value, jdbcType=VARCHAR})
</foreach>
</insert>
比较结论:
当集合数据条数比较多时候,方案二 效率提升明显!
方式 | 50条 | 100条 | 500条 | 1000条 |
---|---|---|---|---|
方案一 | 178ms | 266ms | 841ms | 1863ms |
方案二 | 156ms | 211ms | 395ms | 456ms |
原因分析:
执行效率高的主要原因是合并后日志量(MySQL的binlog和innodb的事务让日志)减少了,降低日志刷盘的数据量和频率,从而提高效率。通过合并SQL语句,同时也能减少SQL语句解析的次数,减少网络传输的IO。
注意事项:
- SQL语句是有长度限制,在进行数据合并在同一SQL中务必不能超过SQL长度限制,通过max_allowed_packet配置可以修改,默认是1M,测试时修改为8M。
- 事务需要控制大小,事务太大可能会影响执行的效率。MySQL有innodb_log_buffer_size配置项,超过这个值会把innodb的数据刷到磁盘中,这时,效率会有所下降。所以比较好的做法是,在数据达到这个这个值前进行事务提交。
3. 改造 MyBatis-Plus 批处理实现
注意:这种实现方式要 特别注意数据库SQL语句的长度限制,在进行数据合并在同一SQL中务必不能超过SQL长度限制,通过 max_allowed_packet 配置可以修改,默认是1M,测试时修改为8M。
- 添加 InsertBatchMethod 和 UpdateBatchMethod 类:
@Slf4j
public class InsertBatchMethod extends AbstractMethod {
@Override
public MappedStatement injectMappedStatement(Class<?> mapperClass, Class<?> modelClass, TableInfo tableInfo) {
final String sql = "<script>insert into %s %s values %s</script>";
final String fieldSql = prepareFieldSql(tableInfo);
final String valueSql = prepareValuesSql(tableInfo);
final String sqlResult = String.format(sql, tableInfo.getTableName(), fieldSql, valueSql);
// log.debug("sqlResult----->{}", sqlResult);
SqlSource sqlSource = languageDriver.createSqlSource(configuration, sqlResult, modelClass);
return this.addInsertMappedStatement(mapperClass, modelClass, "insertBatch", sqlSource, new NoKeyGenerator(), null, null);
}
private String prepareFieldSql(TableInfo tableInfo) {
StringBuilder fieldSql = new StringBuilder();
fieldSql.append(tableInfo.getKeyColumn()).append(",");
tableInfo.getFieldList().forEach(x -> fieldSql.append(x.getColumn()).append(","));
fieldSql.delete(fieldSql.length() - 1, fieldSql.length());
fieldSql.insert(0, "(");
fieldSql.append(")");
return fieldSql.toString();
}
private String prepareValuesSql(TableInfo tableInfo) {
final StringBuilder valueSql = new StringBuilder();
valueSql.append("<foreach collection=\"list\" item=\"item\" index=\"index\" open=\"(\" separator=\"),(\" close=\")\">");
valueSql.append("#{item.").append(tableInfo.getKeyProperty()).append("},");
tableInfo.getFieldList().forEach(x -> valueSql.append("#{item.").append(x.getProperty()).append("},"));
valueSql.delete(valueSql.length() - 1, valueSql.length());
valueSql.append("</foreach>");
return valueSql.toString();
}
}
@Slf4j
public class UpdateBatchMethod extends AbstractMethod {
@Override
public MappedStatement injectMappedStatement(Class<?> mapperClass, Class<?> modelClass, TableInfo tableInfo) {
String sql = "<script>\n<foreach collection=\"list\" item=\"item\" separator=\";\">\nupdate %s %s where %s=#{%s} %s\n</foreach>\n</script>";
String additional = tableInfo.isWithVersion() ? tableInfo.getVersionFieldInfo().getVersionOli("item", "item.") : "" + tableInfo.getLogicDeleteSql(true, true);
String setSql = sqlSet(tableInfo.isLogicDelete(), false, tableInfo, false, "item", "item.");
String sqlResult = String.format(sql, tableInfo.getTableName(), setSql, tableInfo.getKeyColumn(), "item." + tableInfo.getKeyProperty(), additional);
// log.debug("sqlResult----->{}", sqlResult);
SqlSource sqlSource = languageDriver.createSqlSource(configuration, sqlResult, modelClass);
return this.addUpdateMappedStatement(mapperClass, modelClass, "updateBatch", sqlSource);
}
}
- 添加自定义方法SQL注入器 CustomizedSqlInjector:
public class CustomizedSqlInjector extends DefaultSqlInjector {
/** * 如果只需增加方法,保留mybatis plus自带方法, * 可以先获取super.getMethodList(),再添加add */
@Override
public List<AbstractMethod> getMethodList(Class<?> mapperClass) {
List<AbstractMethod> methodList = super.getMethodList(mapperClass);
methodList.add(new InsertBatchMethod());
methodList.add(new UpdateBatchMethod());
return methodList;
}
}
@Configuration
@EnableTransactionManagement
@MapperScan("com.xxx.xxx.mapper")
public class MybatisPlusConfig {
@Bean
public CustomizedSqlInjector customizedSqlInjector() {
return new CustomizedSqlInjector();
}
}
- 添加通用 mapper 和 service:
public interface BasicMapper<T> extends BaseMapper<T> {
/** * 自定义批量插入 */
int insertBatch(@Param("list") List<T> list);
/** * 自定义批量更新,条件为主键 */
int updateBatch(@Param("list") List<T> list);
}
public interface BasicService<T> extends IService<T> {
int insertBatch(List<T> list);
int updateBatch(List<T> list);
}
public class BasicServiceImpl<M extends BasicMapper<T>, T> extends ServiceImpl<M, T> implements BasicService<T> {
@Override
public int insertBatch(List<T> list) {
return baseMapper.insertBatch(list);
}
@Override
public int updateBatch(List<T> list) {
return baseMapper.updateBatch(list);
}
}
忙活了半天,其实就是为了达到下边的效果(上文中的 方案二):
<insert id="batchInsert">
INSERT INTO table
(
business_id,
element_id,
business_value
)
VALUES
<foreach collection="list" item="item" index="index" separator=",">
(#{item.business_id, jdbcType=VARCHAR},
#{item.element_id, jdbcType=VARCHAR},
#{item.business_value, jdbcType=VARCHAR})
</foreach>
</insert>
总结来看,代码很粗糙,只是表达一种方法,当真正数据量达到一定量级(超过 1000 就有必要)的时候,需要非常关注数据库 SQL 语句的长度限制,单句 SQL 拼接优化方案并不是万能的,只能在特定数量区间之内安全的提升性能。MyBatis-Plus 默认方案的选择也必然有一定的道理。
今天的文章MyBatis-Plus 批处理有坑,我教你改造分享到此就结束了,感谢您的阅读。
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
如需转载请保留出处:http://bianchenghao.cn/14311.html