HikariCP源码阅读(五)归还与关闭连接

HikariCP源码阅读(五)归还与关闭连接本章是HikariCP的最后一章,学习HikariCP如何设计归还连接和关闭连接。 因为暴露给用户使用的是Connection的代理对象HikariProxyConnection,用户调用close方法关闭连接,实际执行的是ProxyConnection#close归还连接。 …

前言

本章是HikariCP的最后一章,学习HikariCP如何设计归还连接和关闭连接。

一、归还连接

HikariCP源码阅读(五)归还与关闭连接 因为暴露给用户使用的是Connection的代理对象HikariProxyConnection,用户调用close方法关闭连接,实际执行的是ProxyConnection#close归还连接。

// 实际数据库连接
protected Connection delegate;
// 关联的PoolEntry
private final PoolEntry poolEntry;
@Override
public final void close() throws SQLException {
   // 关闭所有打开的Statement
   closeStatements();
   if (delegate != ClosedConnection.CLOSED_CONNECTION) {
      leakTask.cancel();
      try {
         // 是否需要回滚
         if (isCommitStateDirty && !isAutoCommit) {
            delegate.rollback();
            lastAccess = currentTime();
         }
         // 用户是否设置过readOnly等连接属性
         if (dirtyBits != 0) {
            // 如果设置过,则恢复连接属性到创建时的状态
            poolEntry.resetConnectionState(this, dirtyBits);
            lastAccess = currentTime();
         }
         // 清除警告信息
         delegate.clearWarnings();
      } catch (SQLException e) {
         if (!poolEntry.isMarkedEvicted()) {
            // 发生异常可能会关闭连接
            throw checkException(e);
         }
      } finally {
         // 把deletgate改为CLOSED_CONNECTION
         delegate = ClosedConnection.CLOSED_CONNECTION;
         // 归还PoolEntry
         poolEntry.recycle(lastAccess);
      }
   }
}

PoolEntry#recycle

final class PoolEntry implements IConcurrentBagEntry {
   long lastAccessed;
   private final HikariPool hikariPool;
   void recycle(final long lastAccessed) {
      if (connection != null) {
         this.lastAccessed = lastAccessed;
         hikariPool.recycle(this);
      }
   }
}

HikariPool#recycle将PoolEntry归还给ConcurrentBag。

private final ConcurrentBag<PoolEntry> connectionBag;
@Override
void recycle(final PoolEntry poolEntry) {
   connectionBag.requite(poolEntry);
}

ConcurrentBag#requite修改Entry状态。

public void requite(final T bagEntry) {
   // 修改entry状态为未使用,此时已经可以被别的线程借走了,因为entry在shareList里
   bagEntry.setState(STATE_NOT_IN_USE);
   // 先尝试通过交接队列给等候线程发放entry
   for (int i = 0; waiters.get() > 0; i++) {
      if (bagEntry.getState() != STATE_NOT_IN_USE || handoffQueue.offer(bagEntry)) {
         return;
      } else if ((i & 0xff) == 0xff) {
         parkNanos(MICROSECONDS.toNanos(10));
      } else {
         Thread.yield();
      }
   }
   // 如果没有线程等候,或发放失败,放入threadList
   final List<Object> threadLocalList = threadList.get();
   if (threadLocalList.size() < 50) {
      threadLocalList.add(weakThreadLocals ? new WeakReference<>(bagEntry) : bagEntry);
   }
}

Hikari连接池如何实现借走和归还? HikariCP源码阅读(五)归还与关闭连接 HikariCP源码阅读(五)归还与关闭连接

  • PoolEntry包装Connection被放入ConcurrentBag
  • 用户借走通过PoolEntry构造的ProxyConnection
  • 使用中…
  • ProxyConnection.close修改ProxyConnection里的委托对象为CLOSED_CONNECTION,拒绝用户后续操作
  • PoolEntry修改状态,归还给ConcurrentBag,PoolEntry就可以被其他人借走了

二、关闭连接

HikariCP源码阅读(五)归还与关闭连接

HouseKeeper

The house keeping task to retire and maintain minimum idle connections.

HouseKeeper实现Runnable接口,负责执行连接过期并维持最小空闲连接。HikariCP启动的过程中,HouseKeeper是在HikariPool构造时提交到线程池定期执行的。默认30秒执行一次,可以通过配置com.zaxxer.hikari.housekeeping.periodMs系统参数修改。

private final long housekeepingPeriodMs = Long.getLong("com.zaxxer.hikari.housekeeping.periodMs", SECONDS.toMillis(30));
private final ScheduledExecutorService houseKeepingExecutorService;
public HikariPool(final HikariConfig config) {
  this.houseKeeperTask = houseKeepingExecutorService.scheduleWithFixedDelay(
  new HouseKeeper(), 100L, housekeepingPeriodMs, MILLISECONDS);
}

HouseKeeper一个定时任务执行了很多事情。

1. JMX运行时修改配置,更新HikariPool的一些成员变量

@Override public void run() {
  // 获取连接超时时间
  connectionTimeout = config.getConnectionTimeout();
  // 校验连接是否存活的超时时间
  validationTimeout = config.getValidationTimeout();
  // 连接泄露检测阈值
  leakTaskFactory.updateLeakDetectionThreshold(config.getLeakDetectionThreshold());
  // 连接空闲超时时间
  final long idleTimeout = config.getIdleTimeout();
  
  // ...
}

2. 检测时钟倒退

private volatile long previous = plusMillis(currentTime(), -housekeepingPeriodMs);
@Override
public void run() {
      // ...
      final long now = currentTime();
      // Detect retrograde time, allowing +128ms as per NTP spec.
      if (plusMillis(now, 128) < plusMillis(previous, housekeepingPeriodMs)) {
         softEvictConnections();
         return;
      }
      previous = now;
      // ...
   }
}

previous代表上次执行的时间戳,housekeepingPeriodMs默认30秒。正常情况下now = previous + housekeepingPeriodMs,如果previous + housekeepingPeriodMs - now > 128发生时钟倒退超过128ms,则执行softEvictConnections软驱逐所有连接,并直接返回。

HikariPool#softEvictConnections

@Override public void softEvictConnections() {
  connectionBag
  .values()
  .forEach(poolEntry -> softEvictConnection(poolEntry, "(connection evicted)", false));
}
private boolean softEvictConnection(final PoolEntry poolEntry, final String reason, final boolean owner) {
  poolEntry.markEvicted();
  if (owner || connectionBag.reserve(poolEntry)) {
     closeConnection(poolEntry, reason);
     return true;
  }
  return false;
}

首先poolEntry.markEvicted()将PoolEntry标记为被驱逐,如果之后的操作没成功,这个连接将在下次getConnection的时候被检测到驱逐,从而关闭。

判断调用者是否是owner,即是否是用户。如果是用户自己操作HikariPool#evictConnection方法驱逐连接,关闭连接(见3)。

执行connectionBag.reserve(poolEntry),将PoolEntry的状态通过CAS从STATE_NOT_IN_USE修改为STATE_RESERVED状态,如果执行成功,关闭连接(见3)。

3. 关闭空闲连接

@Override
public void run() {
  // ...
  if (idleTimeout > 0L && config.getMinimumIdle() < config.getMaximumPoolSize()) {
     final List<PoolEntry> notInUse = connectionBag.values(STATE_NOT_IN_USE);
     int toRemove = notInUse.size() - config.getMinimumIdle();
     for (PoolEntry entry : notInUse) {
        if (toRemove > 0 && elapsedMillis(entry.lastAccessed, now) > idleTimeout && connectionBag.reserve(entry)) {
           closeConnection(entry, "(connection has passed idleTimeout)");
           toRemove--;
        }
     }
  }
  // ...
}

首先判断当前配置idleTimeout > 0L && config.getMinimumIdle() < config.getMaximumPoolSize()是否支持空闲连接超时关闭。要设置idleTimeout并且MinimumIdle要小于MaximumPoolSize

接下来计算总共有多少连接需要关闭。待关闭数量(toRemove)= STATE_NOT_IN_USE的PoolEntry总数 – 配置的MinimumIdle数量。

循环所有未使用的PoolEntry,判断 当前时间 – entry的上次使用时间 是否大于 idleTimeout,并且执行reserve修改PoolEntry状态成功。如果修改成功,执行关闭连接。

HikariPool#closeConnection是关闭连接的公共入口。

void closeConnection(final PoolEntry poolEntry, final String closureReason)
{
  if (connectionBag.remove(poolEntry)) {
     final Connection connection = poolEntry.close();
     closeConnectionExecutor.execute(() -> {
        quietlyCloseConnection(connection, closureReason);
        if (poolState == POOL_NORMAL) {
           fillPool();
        }
     });
  }
}

connectionBag.remove(poolEntry)修改PoolEntry状态,从shareList和threadList中移除这个元素。

public boolean remove(final T bagEntry) {
  if (!bagEntry.compareAndSet(STATE_IN_USE, STATE_REMOVED) 
  	&& !bagEntry.compareAndSet(STATE_RESERVED, STATE_REMOVED) && !closed) {
     return false;
  }
  final boolean removed = sharedList.remove(bagEntry);
  threadList.get().remove(bagEntry);
  return removed;
}

poolEntry.close()关闭MaxLifeTime超时检测任务,一些变量赋空值帮助GC。

Connection close()
 {
    ScheduledFuture<?> eol = endOfLife;
    if (eol != null && !eol.isDone() && !eol.cancel(false)) {
       LOGGER.warn("{} - maxLifeTime expiration task cancellation unexpectedly returned false for connection {}", getPoolName(), connection);
    }
    Connection con = connection;
    connection = null;
    endOfLife = null;
    return con;
 }

准备工作都做好以后,把关闭任务提交到线程池,执行关闭连接。如果连接池仍然是正常状态(POOL_NORMAL),尝试维持最小空闲连接(fillPool()见4)。

void closeConnection(final PoolEntry poolEntry, final String closureReason)
{
	// ...
	closeConnectionExecutor.execute(() -> {
        quietlyCloseConnection(connection, closureReason);
        if (poolState == POOL_NORMAL) {
           fillPool();
        }
     });
}

4. 维持最小空闲连接

业务上往往我们称Hikari连接池的MinimumIdle为最小连接数,其实这是一个简称,全称是最小空闲连接数。它是保证连接池中至少有x个连接是空闲的,拿来即可用,但是需要保证总连接数不会超过MaximumPoolSize

@Override
public void run() {
  // ...
  fillPool();
}
private synchronized void fillPool()
{
  final int connectionsToAdd = Math.min(config.getMaximumPoolSize() - getTotalConnections(), config.getMinimumIdle() - getIdleConnections())
                               - addConnectionQueueReadOnlyView.size();
  if (connectionsToAdd <= 0) logger.debug("{} - Fill pool skipped, pool is at sufficient level.", poolName);

  for (int i = 0; i < connectionsToAdd; i++) {
     addConnectionExecutor.submit((i < connectionsToAdd - 1) ? poolEntryCreator : postFillPoolEntryCreator);
  }
}

首先计算需要添加多少连接。缺少最大连接和缺少空闲连接取小,扣除当前正在准备创建的连接(addConnectionQueueReadOnlyView.size()是线程池addConnectionExecutor的等待队列视图),就是connectionsToAdd需要创建的连接。

例如:配置MaximumPoolSize=10,MinimumIdle=5;此时总连接数=7,空闲连接数=4,等待创建连接线程数=0。根据公式计算:Math.min(10 - 7, 5 - 4) - 0 = 1,还需要创建一个连接,因为空闲连接数不够5。如果按照最小连接数去理解,这里应该不需要创建连接了,因为连接数已经有7个了。

最后把创建连接的任务提交到addConnectionExecutor执行。

HikariPool#getConnection

getConnection方法在第四章介绍过,主要是获取PoolEntry,并创建ProxyConnection给用户使用。

public Connection getConnection(final long hardTimeout) throws SQLException
{
  suspendResumeLock.acquire();
  final long startTime = currentTime();
  try {
     long timeout = hardTimeout;
     do {
        PoolEntry poolEntry = connectionBag.borrow(timeout, MILLISECONDS);
        if (poolEntry == null) {
           break;
        }
        final long now = currentTime();
        if (poolEntry.isMarkedEvicted() 
        	|| (elapsedMillis(poolEntry.lastAccessed, now) > aliveBypassWindowMs && !isConnectionAlive(poolEntry.connection))) {
           // 关闭连接
           closeConnection(poolEntry, poolEntry.isMarkedEvicted() ? EVICTED_CONNECTION_MESSAGE : DEAD_CONNECTION_MESSAGE);
           timeout = hardTimeout - elapsedMillis(startTime);
        } else {
           return poolEntry.createProxyConnection(leakTaskFactory.schedule(poolEntry), now);
        }
     } while (timeout > 0L);
     throw createTimeoutException(startTime);
  } catch (InterruptedException e) {
     Thread.currentThread().interrupt();
     throw new SQLException(poolName + " - Interrupted during connection acquisition", e);
  } finally {
     suspendResumeLock.release();
  }
}

注意当poolEntry.isMarkedEvicted() PoolEntry被标记未驱逐,或!isConnectionAlive(poolEntry.connection)连接存活检测不通过时,会执行closeConnection关闭连接,关闭连接方法在上面讲过。

MaxLifetime到期软驱逐

当一个连接被创建之后,就会开启一个延迟任务,检测连接如果超过MaxLifetime则进行软驱逐(softEvictConnection)。代码见HikariPool#createPoolEntry方法。

private PoolEntry createPoolEntry() {
	 // 连接被创建
     final PoolEntry poolEntry = newPoolEntry();
     final long maxLifetime = config.getMaxLifetime();
     if (maxLifetime > 0) {
        final long variance = maxLifetime > 10_000 ? ThreadLocalRandom.current().nextLong( maxLifetime / 40 ) : 0;
        final long lifetime = maxLifetime - variance;
        // 延迟maxLifetime,如果这个task没被取消将被执行
        poolEntry.setFutureEol(houseKeepingExecutorService.schedule(
           () -> {
           	  // 执行软驱逐
              if (softEvictConnection(poolEntry, "(connection has passed maxLifetime)", false)) {
              	 // 通知HikariPool增加 等待获取连接的线程数 个PoolEntry
                 addBagItem(connectionBag.getWaitingThreadCount());
              }
           },
           lifetime, MILLISECONDS));
     }

     return poolEntry;
}

连接池关闭

HikariDataSource#close修改isShutdown状态。

@Override
public void close() {
  if (isShutdown.getAndSet(true)) {
     return;
  }
  HikariPool p = pool;
  if (p != null) {
     try {
        p.shutdown();
     } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
     }
  }
}

HikariPool#shutdown关闭所有线程池,取消所有任务,关闭所有数据库连接。

public synchronized void shutdown() throws InterruptedException {
 // 状态修改
 poolState = POOL_SHUTDOWN;

 if (addConnectionExecutor == null) { // pool never started
    return;
 }

// 取消HouseKeeper任务
 if (houseKeeperTask != null) {
    houseKeeperTask.cancel(false);
    houseKeeperTask = null;
 }
 // 首次软驱逐
 softEvictConnections();
 // 停止添加连接线程池
 addConnectionExecutor.shutdown();
 addConnectionExecutor.awaitTermination(getLoginTimeout(), SECONDS);
 // 关闭HouseKeeper线程池
 destroyHouseKeepingExecutorService();
 // 关闭connectionBag
 connectionBag.close();
 // 开启一个线程池,负责中断使用中的连接
 final ExecutorService assassinExecutor = createThreadPoolExecutor(...);
 try {
    final long start = currentTime();
    do {
    	// 打断所有正在使用的连接
       abortActiveConnections(assassinExecutor);
       // 再次执行软驱逐
       softEvictConnections();
    } while (getTotalConnections() > 0 && elapsedMillis(start) < SECONDS.toMillis(10));
 } finally {
    assassinExecutor.shutdown();
    assassinExecutor.awaitTermination(10L, SECONDS);
 }
 // 关闭设置网络超时时间线程池
 shutdownNetworkTimeoutExecutor();
 // 关闭 关闭连接线程池
 closeConnectionExecutor.shutdown();
 closeConnectionExecutor.awaitTermination(10L, SECONDS);
}

abortActiveConnections中断所有正在使用的连接,并将PoolEntry从connectionBag移除。

private void abortActiveConnections(final ExecutorService assassinExecutor) {
  for (PoolEntry poolEntry : connectionBag.values(STATE_IN_USE)) {
     Connection connection = poolEntry.close();
     try {
        connection.abort(assassinExecutor);
     } catch (Throwable e) {
        quietlyCloseConnection(connection, "(connection aborted during shutdown)");
     } finally {
        connectionBag.remove(poolEntry);
     }
  }
}

为什么需要多次执行abortActiveConnectionssoftEvictConnections,直到getTotalConnections<0

因为PoolEntry的状态时刻发生变化,不多次尝试可能导致少关闭连接。abortActiveConnections只负责STATE_IN_USE的PoolEntry;softEvictConnections只负责STATE_NOT_IN_USE的PoolEntry。

总结

关于归还连接,对于HikariCP如何设计连接借走和归还有一个整体的理解,主要利用了代理和ConcurrentBag。

对于关闭连接,触发点很多,一般都是通过软驱逐或强制关闭两种方式关闭。重点关注HouseKeeper这个30秒定时任务的作用。

原创不易,欢迎评论、点赞和关注。欢迎关注公众号:程序猿阿越。

今天的文章HikariCP源码阅读(五)归还与关闭连接分享到此就结束了,感谢您的阅读。

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
如需转载请保留出处:http://bianchenghao.cn/15975.html

(0)
编程小号编程小号

相关推荐

发表回复

您的电子邮箱地址不会被公开。 必填项已用*标注