前言
本章是HikariCP的最后一章,学习HikariCP如何设计归还连接和关闭连接。
一、归还连接
因为暴露给用户使用的是Connection的代理对象HikariProxyConnection
,用户调用close
方法关闭连接,实际执行的是ProxyConnection#close
归还连接。
// 实际数据库连接
protected Connection delegate;
// 关联的PoolEntry
private final PoolEntry poolEntry;
@Override
public final void close() throws SQLException {
// 关闭所有打开的Statement
closeStatements();
if (delegate != ClosedConnection.CLOSED_CONNECTION) {
leakTask.cancel();
try {
// 是否需要回滚
if (isCommitStateDirty && !isAutoCommit) {
delegate.rollback();
lastAccess = currentTime();
}
// 用户是否设置过readOnly等连接属性
if (dirtyBits != 0) {
// 如果设置过,则恢复连接属性到创建时的状态
poolEntry.resetConnectionState(this, dirtyBits);
lastAccess = currentTime();
}
// 清除警告信息
delegate.clearWarnings();
} catch (SQLException e) {
if (!poolEntry.isMarkedEvicted()) {
// 发生异常可能会关闭连接
throw checkException(e);
}
} finally {
// 把deletgate改为CLOSED_CONNECTION
delegate = ClosedConnection.CLOSED_CONNECTION;
// 归还PoolEntry
poolEntry.recycle(lastAccess);
}
}
}
PoolEntry#recycle
final class PoolEntry implements IConcurrentBagEntry {
long lastAccessed;
private final HikariPool hikariPool;
void recycle(final long lastAccessed) {
if (connection != null) {
this.lastAccessed = lastAccessed;
hikariPool.recycle(this);
}
}
}
HikariPool#recycle
将PoolEntry归还给ConcurrentBag。
private final ConcurrentBag<PoolEntry> connectionBag;
@Override
void recycle(final PoolEntry poolEntry) {
connectionBag.requite(poolEntry);
}
ConcurrentBag#requite
修改Entry状态。
public void requite(final T bagEntry) {
// 修改entry状态为未使用,此时已经可以被别的线程借走了,因为entry在shareList里
bagEntry.setState(STATE_NOT_IN_USE);
// 先尝试通过交接队列给等候线程发放entry
for (int i = 0; waiters.get() > 0; i++) {
if (bagEntry.getState() != STATE_NOT_IN_USE || handoffQueue.offer(bagEntry)) {
return;
} else if ((i & 0xff) == 0xff) {
parkNanos(MICROSECONDS.toNanos(10));
} else {
Thread.yield();
}
}
// 如果没有线程等候,或发放失败,放入threadList
final List<Object> threadLocalList = threadList.get();
if (threadLocalList.size() < 50) {
threadLocalList.add(weakThreadLocals ? new WeakReference<>(bagEntry) : bagEntry);
}
}
Hikari连接池如何实现借走和归还?
- PoolEntry包装Connection被放入ConcurrentBag
- 用户借走通过PoolEntry构造的ProxyConnection
- 使用中…
- ProxyConnection.close修改ProxyConnection里的委托对象为CLOSED_CONNECTION,拒绝用户后续操作
- PoolEntry修改状态,归还给ConcurrentBag,PoolEntry就可以被其他人借走了
二、关闭连接
HouseKeeper
The house keeping task to retire and maintain minimum idle connections.
HouseKeeper
实现Runnable接口,负责执行连接过期并维持最小空闲连接。HikariCP启动的过程中,HouseKeeper是在HikariPool构造时提交到线程池定期执行的。默认30秒执行一次,可以通过配置com.zaxxer.hikari.housekeeping.periodMs
系统参数修改。
private final long housekeepingPeriodMs = Long.getLong("com.zaxxer.hikari.housekeeping.periodMs", SECONDS.toMillis(30));
private final ScheduledExecutorService houseKeepingExecutorService;
public HikariPool(final HikariConfig config) {
this.houseKeeperTask = houseKeepingExecutorService.scheduleWithFixedDelay(
new HouseKeeper(), 100L, housekeepingPeriodMs, MILLISECONDS);
}
HouseKeeper
一个定时任务执行了很多事情。
1. JMX运行时修改配置,更新HikariPool的一些成员变量
@Override public void run() {
// 获取连接超时时间
connectionTimeout = config.getConnectionTimeout();
// 校验连接是否存活的超时时间
validationTimeout = config.getValidationTimeout();
// 连接泄露检测阈值
leakTaskFactory.updateLeakDetectionThreshold(config.getLeakDetectionThreshold());
// 连接空闲超时时间
final long idleTimeout = config.getIdleTimeout();
// ...
}
2. 检测时钟倒退
private volatile long previous = plusMillis(currentTime(), -housekeepingPeriodMs);
@Override
public void run() {
// ...
final long now = currentTime();
// Detect retrograde time, allowing +128ms as per NTP spec.
if (plusMillis(now, 128) < plusMillis(previous, housekeepingPeriodMs)) {
softEvictConnections();
return;
}
previous = now;
// ...
}
}
previous
代表上次执行的时间戳,housekeepingPeriodMs
默认30秒。正常情况下now = previous + housekeepingPeriodMs
,如果previous + housekeepingPeriodMs - now > 128
发生时钟倒退超过128ms,则执行softEvictConnections
软驱逐所有连接,并直接返回。
HikariPool#softEvictConnections
@Override public void softEvictConnections() {
connectionBag
.values()
.forEach(poolEntry -> softEvictConnection(poolEntry, "(connection evicted)", false));
}
private boolean softEvictConnection(final PoolEntry poolEntry, final String reason, final boolean owner) {
poolEntry.markEvicted();
if (owner || connectionBag.reserve(poolEntry)) {
closeConnection(poolEntry, reason);
return true;
}
return false;
}
首先poolEntry.markEvicted()
将PoolEntry标记为被驱逐,如果之后的操作没成功,这个连接将在下次getConnection的时候被检测到驱逐,从而关闭。
判断调用者是否是owner
,即是否是用户。如果是用户自己操作HikariPool#evictConnection
方法驱逐连接,关闭连接(见3)。
执行connectionBag.reserve(poolEntry)
,将PoolEntry的状态通过CAS从STATE_NOT_IN_USE
修改为STATE_RESERVED
状态,如果执行成功,关闭连接(见3)。
3. 关闭空闲连接
@Override
public void run() {
// ...
if (idleTimeout > 0L && config.getMinimumIdle() < config.getMaximumPoolSize()) {
final List<PoolEntry> notInUse = connectionBag.values(STATE_NOT_IN_USE);
int toRemove = notInUse.size() - config.getMinimumIdle();
for (PoolEntry entry : notInUse) {
if (toRemove > 0 && elapsedMillis(entry.lastAccessed, now) > idleTimeout && connectionBag.reserve(entry)) {
closeConnection(entry, "(connection has passed idleTimeout)");
toRemove--;
}
}
}
// ...
}
首先判断当前配置idleTimeout > 0L && config.getMinimumIdle() < config.getMaximumPoolSize()
是否支持空闲连接超时关闭。要设置idleTimeout
并且MinimumIdle
要小于MaximumPoolSize
。
接下来计算总共有多少连接需要关闭。待关闭数量(toRemove
)= STATE_NOT_IN_USE
的PoolEntry总数 – 配置的MinimumIdle
数量。
循环所有未使用的PoolEntry,判断 当前时间 – entry的上次使用时间 是否大于 idleTimeout
,并且执行reserve
修改PoolEntry状态成功。如果修改成功,执行关闭连接。
HikariPool#closeConnection
是关闭连接的公共入口。
void closeConnection(final PoolEntry poolEntry, final String closureReason)
{
if (connectionBag.remove(poolEntry)) {
final Connection connection = poolEntry.close();
closeConnectionExecutor.execute(() -> {
quietlyCloseConnection(connection, closureReason);
if (poolState == POOL_NORMAL) {
fillPool();
}
});
}
}
connectionBag.remove(poolEntry)
修改PoolEntry状态,从shareList和threadList中移除这个元素。
public boolean remove(final T bagEntry) {
if (!bagEntry.compareAndSet(STATE_IN_USE, STATE_REMOVED)
&& !bagEntry.compareAndSet(STATE_RESERVED, STATE_REMOVED) && !closed) {
return false;
}
final boolean removed = sharedList.remove(bagEntry);
threadList.get().remove(bagEntry);
return removed;
}
poolEntry.close()
关闭MaxLifeTime超时检测任务,一些变量赋空值帮助GC。
Connection close()
{
ScheduledFuture<?> eol = endOfLife;
if (eol != null && !eol.isDone() && !eol.cancel(false)) {
LOGGER.warn("{} - maxLifeTime expiration task cancellation unexpectedly returned false for connection {}", getPoolName(), connection);
}
Connection con = connection;
connection = null;
endOfLife = null;
return con;
}
准备工作都做好以后,把关闭任务提交到线程池,执行关闭连接。如果连接池仍然是正常状态(POOL_NORMAL
),尝试维持最小空闲连接(fillPool()
见4)。
void closeConnection(final PoolEntry poolEntry, final String closureReason)
{
// ...
closeConnectionExecutor.execute(() -> {
quietlyCloseConnection(connection, closureReason);
if (poolState == POOL_NORMAL) {
fillPool();
}
});
}
4. 维持最小空闲连接
业务上往往我们称Hikari连接池的MinimumIdle
为最小连接数,其实这是一个简称,全称是最小空闲连接数
。它是保证连接池中至少有x个连接是空闲的,拿来即可用,但是需要保证总连接数不会超过MaximumPoolSize
。
@Override
public void run() {
// ...
fillPool();
}
private synchronized void fillPool()
{
final int connectionsToAdd = Math.min(config.getMaximumPoolSize() - getTotalConnections(), config.getMinimumIdle() - getIdleConnections())
- addConnectionQueueReadOnlyView.size();
if (connectionsToAdd <= 0) logger.debug("{} - Fill pool skipped, pool is at sufficient level.", poolName);
for (int i = 0; i < connectionsToAdd; i++) {
addConnectionExecutor.submit((i < connectionsToAdd - 1) ? poolEntryCreator : postFillPoolEntryCreator);
}
}
首先计算需要添加多少连接。缺少最大连接和缺少空闲连接取小,扣除当前正在准备创建的连接(addConnectionQueueReadOnlyView.size()
是线程池addConnectionExecutor
的等待队列视图),就是connectionsToAdd
需要创建的连接。
例如:配置MaximumPoolSize=10,MinimumIdle=5;此时总连接数=7,空闲连接数=4,等待创建连接线程数=0。根据公式计算:Math.min(10 - 7, 5 - 4) - 0 = 1
,还需要创建一个连接,因为空闲连接数不够5。如果按照最小连接数去理解,这里应该不需要创建连接了,因为连接数已经有7个了。
最后把创建连接的任务提交到addConnectionExecutor
执行。
HikariPool#getConnection
getConnection
方法在第四章介绍过,主要是获取PoolEntry,并创建ProxyConnection给用户使用。
public Connection getConnection(final long hardTimeout) throws SQLException
{
suspendResumeLock.acquire();
final long startTime = currentTime();
try {
long timeout = hardTimeout;
do {
PoolEntry poolEntry = connectionBag.borrow(timeout, MILLISECONDS);
if (poolEntry == null) {
break;
}
final long now = currentTime();
if (poolEntry.isMarkedEvicted()
|| (elapsedMillis(poolEntry.lastAccessed, now) > aliveBypassWindowMs && !isConnectionAlive(poolEntry.connection))) {
// 关闭连接
closeConnection(poolEntry, poolEntry.isMarkedEvicted() ? EVICTED_CONNECTION_MESSAGE : DEAD_CONNECTION_MESSAGE);
timeout = hardTimeout - elapsedMillis(startTime);
} else {
return poolEntry.createProxyConnection(leakTaskFactory.schedule(poolEntry), now);
}
} while (timeout > 0L);
throw createTimeoutException(startTime);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new SQLException(poolName + " - Interrupted during connection acquisition", e);
} finally {
suspendResumeLock.release();
}
}
注意当poolEntry.isMarkedEvicted()
PoolEntry被标记未驱逐,或!isConnectionAlive(poolEntry.connection)
连接存活检测不通过时,会执行closeConnection
关闭连接,关闭连接方法在上面讲过。
MaxLifetime到期软驱逐
当一个连接被创建之后,就会开启一个延迟任务,检测连接如果超过MaxLifetime则进行软驱逐(softEvictConnection
)。代码见HikariPool#createPoolEntry
方法。
private PoolEntry createPoolEntry() {
// 连接被创建
final PoolEntry poolEntry = newPoolEntry();
final long maxLifetime = config.getMaxLifetime();
if (maxLifetime > 0) {
final long variance = maxLifetime > 10_000 ? ThreadLocalRandom.current().nextLong( maxLifetime / 40 ) : 0;
final long lifetime = maxLifetime - variance;
// 延迟maxLifetime,如果这个task没被取消将被执行
poolEntry.setFutureEol(houseKeepingExecutorService.schedule(
() -> {
// 执行软驱逐
if (softEvictConnection(poolEntry, "(connection has passed maxLifetime)", false)) {
// 通知HikariPool增加 等待获取连接的线程数 个PoolEntry
addBagItem(connectionBag.getWaitingThreadCount());
}
},
lifetime, MILLISECONDS));
}
return poolEntry;
}
连接池关闭
HikariDataSource#close
修改isShutdown状态。
@Override
public void close() {
if (isShutdown.getAndSet(true)) {
return;
}
HikariPool p = pool;
if (p != null) {
try {
p.shutdown();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
HikariPool#shutdown
关闭所有线程池,取消所有任务,关闭所有数据库连接。
public synchronized void shutdown() throws InterruptedException {
// 状态修改
poolState = POOL_SHUTDOWN;
if (addConnectionExecutor == null) { // pool never started
return;
}
// 取消HouseKeeper任务
if (houseKeeperTask != null) {
houseKeeperTask.cancel(false);
houseKeeperTask = null;
}
// 首次软驱逐
softEvictConnections();
// 停止添加连接线程池
addConnectionExecutor.shutdown();
addConnectionExecutor.awaitTermination(getLoginTimeout(), SECONDS);
// 关闭HouseKeeper线程池
destroyHouseKeepingExecutorService();
// 关闭connectionBag
connectionBag.close();
// 开启一个线程池,负责中断使用中的连接
final ExecutorService assassinExecutor = createThreadPoolExecutor(...);
try {
final long start = currentTime();
do {
// 打断所有正在使用的连接
abortActiveConnections(assassinExecutor);
// 再次执行软驱逐
softEvictConnections();
} while (getTotalConnections() > 0 && elapsedMillis(start) < SECONDS.toMillis(10));
} finally {
assassinExecutor.shutdown();
assassinExecutor.awaitTermination(10L, SECONDS);
}
// 关闭设置网络超时时间线程池
shutdownNetworkTimeoutExecutor();
// 关闭 关闭连接线程池
closeConnectionExecutor.shutdown();
closeConnectionExecutor.awaitTermination(10L, SECONDS);
}
abortActiveConnections
中断所有正在使用的连接,并将PoolEntry从connectionBag移除。
private void abortActiveConnections(final ExecutorService assassinExecutor) {
for (PoolEntry poolEntry : connectionBag.values(STATE_IN_USE)) {
Connection connection = poolEntry.close();
try {
connection.abort(assassinExecutor);
} catch (Throwable e) {
quietlyCloseConnection(connection, "(connection aborted during shutdown)");
} finally {
connectionBag.remove(poolEntry);
}
}
}
为什么需要多次执行abortActiveConnections
和softEvictConnections
,直到getTotalConnections<0
?
因为PoolEntry的状态时刻发生变化,不多次尝试可能导致少关闭连接。abortActiveConnections
只负责STATE_IN_USE
的PoolEntry;softEvictConnections
只负责STATE_NOT_IN_USE
的PoolEntry。
总结
关于归还连接,对于HikariCP如何设计连接借走和归还有一个整体的理解,主要利用了代理和ConcurrentBag。
对于关闭连接,触发点很多,一般都是通过软驱逐或强制关闭两种方式关闭。重点关注HouseKeeper这个30秒定时任务的作用。
原创不易,欢迎评论、点赞和关注。欢迎关注公众号:程序猿阿越。
今天的文章HikariCP源码阅读(五)归还与关闭连接分享到此就结束了,感谢您的阅读。
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
如需转载请保留出处:http://bianchenghao.cn/15975.html