> 文章列表 > 核心源码分析:数据库短时断开Druid连接池如何恢复连接

核心源码分析:数据库短时断开Druid连接池如何恢复连接

核心源码分析:数据库短时断开Druid连接池如何恢复连接

文章目录

    • 前言
    • Druid简介
    • Druid连接池如何恢复连接
      • 获取连接超时和检查连接机制
      • 创建连接重试机制
      • 生产环境如何配置连接池保证高可用
    • 写在最后

前言

在我们常用的应用软件中都会用到数据库,数据库是提供数据保证系统正常运行的基础。在数据库短时间连接断开或者超时断开连接的时候,如果不能有效的重连会使得我们应用程序无法正常提供服务。所以,在实际的开发过程中我们要着重考虑数据库连接的处理。比如:过多、过少数据库连接的处理,异常断开如何重连等。当然,在互联网开放的今天我们有很多的数据库连接池开源组件,比如我们今天的主角——Druid,今天我们暂且就Druid连接池如何恢复连接进行讨论。

Druid简介

Druid连接池是阿里巴巴开源的数据库连接池项目。其能够帮助我们管理和监控数据库连接,比如创建连接、获取连接、回收连接、获取/创建连接异常重连机制、线程监控明、防止SQL注入机制等等功能。其中的获取连接和获取/创建连接异常重试机制非常重要,这两种机制可以保证在数据库恢复正常服务后,我们应用系统可以正常使用数据库操作语言进行业务逻辑处理。

Druid连接池如何恢复连接

获取连接超时和检查连接机制

大家都知道系统启动时我们Druid连接池会自动初始化创建数据源DruidDataSource,其中的参数从配置文件获取。我们直接略过初始化连接池流程,直接进入com.alibaba.druid.pool 下查看获取连接的源码:

@Override
public DruidPooledConnection getConnection() throws SQLException {return getConnection(maxWait);
}
//获取连接
public DruidPooledConnection getConnection(long maxWaitMillis) throws SQLException {//初始化验证,没有初始化就初始化连接池init();//如果配置了过滤器,经过过滤器后获取连接,比如日志等等if (filters.size() > 0) {FilterChainImpl filterChain = new FilterChainImpl(this);return filterChain.dataSource_connect(this, maxWaitMillis);} else {//获取连接return getConnectionDirect(maxWaitMillis);}
}

如上源码所示,获取数据库连接会先验证是否连接池已经初始化,没有初始化会先进行初始化。然后会进入获取连接的方法,这里我们重点讨论获取连接的逻辑。

继续深入获取连接的源码:

//获取连接的方法,无论是通过过滤器或者没有过滤器都会调用这个方法
public DruidPooledConnection getConnectionDirect(long maxWaitMillis) throws SQLException {//获取重试次数默认0int notFullTimeoutRetryCnt = 0;for (;;) {// handle notFullTimeoutRetryDruidPooledConnection poolableConnection;try {//获取连接,传入超时时间maxWaitpoolableConnection = getConnectionInternal(maxWaitMillis);} catch (GetConnectionTimeoutException ex) {//获取连接超时重试机制if (notFullTimeoutRetryCnt <= this.notFullTimeoutRetryCount && !isFull()) {notFullTimeoutRetryCnt++;if (LOG.isWarnEnabled()) {LOG.warn("get connection timeout retry : " + notFullTimeoutRetryCnt);}continue;}throw ex;}if (testOnBorrow) {//testOnBorrow == true 检查连接boolean validate = testConnectionInternal(poolableConnection.holder, poolableConnection.conn);if (!validate) {if (LOG.isDebugEnabled()) {LOG.debug("skip not validate connection.");}//丢弃线程逻辑,如果连接数小于等于min-idle会启动创建新连接discardConnection(poolableConnection.holder);continue;}} else {if (poolableConnection.conn.isClosed()) {discardConnection(poolableConnection.holder); // 传入null,避免重复关闭continue;}if (testWhileIdle) {final DruidConnectionHolder holder = poolableConnection.holder;long currentTimeMillis             = System.currentTimeMillis();long lastActiveTimeMillis          = holder.lastActiveTimeMillis;long lastExecTimeMillis            = holder.lastExecTimeMillis;long lastKeepTimeMillis            = holder.lastKeepTimeMillis;if (checkExecuteTime&& lastExecTimeMillis != lastActiveTimeMillis) {lastActiveTimeMillis = lastExecTimeMillis;}if (lastKeepTimeMillis > lastActiveTimeMillis) {lastActiveTimeMillis = lastKeepTimeMillis;}long idleMillis                    = currentTimeMillis - lastActiveTimeMillis;long timeBetweenEvictionRunsMillis = this.timeBetweenEvictionRunsMillis;if (timeBetweenEvictionRunsMillis <= 0) {timeBetweenEvictionRunsMillis = DEFAULT_TIME_BETWEEN_EVICTION_RUNS_MILLIS;}if (idleMillis >= timeBetweenEvictionRunsMillis|| idleMillis < 0 // unexcepted branch) {boolean validate = testConnectionInternal(poolableConnection.holder, poolableConnection.conn);if (!validate) {if (LOG.isDebugEnabled()) {LOG.debug("skip not validate connection.");}discardConnection(poolableConnection.holder);continue;}}}}if (removeAbandoned) {// removeAbandoned == true 超时回收连接StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace();poolableConnection.connectStackTrace = stackTrace;poolableConnection.setConnectedTimeNano();poolableConnection.traceEnable = true;activeConnectionLock.lock();try {activeConnections.put(poolableConnection, PRESENT);} finally {activeConnectionLock.unlock();}}if (!this.defaultAutoCommit) {//自动提交等等poolableConnection.setAutoCommit(false);}return poolableConnection;}
}

如上源码可知:

1、获取连接方法如果失败会进行重试,只要notFullTimeoutRetryCnt 参数(默认0) >=0 就会进行重试。如果重试超过notFullTimeoutRetryCnt 并且连接池连接数量大于等于maxActive则不再重试。所以,我们的应用程序获取连接的超时时间是maxWait * (notFullTimeoutRetry + 1)毫秒。

2、在获取连接方法中,如果线程池中没有连接会激活创建连接逻辑如果maxWait 是默认 -1,虽然超时时间是无限等待,但是创建数据库连接TCP Linux系统默认127s。

3、连接池获取到连接 且配置 testOnBorrow == true (默认false)每次都会对连接进行检查是否有效( 注意:testWhileIdle参数(默认true)也会按照 timeBetweenEvictionRunsMillis 参数(默认60s)配置的频率对连接进行有效检查,与testOnBorrow 参数不同的是这个需要一定的频率才会对连接进行有效检查)。当连接检查无效会被标记为丢弃,并重新从连接池获取下一个连接。

4、丢弃连接的逻辑中会验证当前存在的连接是否小于等于 min-idle 最小连接数,如果有效连接小于等于最小连接数会激活创建连接逻辑 emptySignal():

public void discardConnection(DruidConnectionHolder holder) {//省略代码部分代码try {if (holder.discard) {return;}activeCount--;discardCount++;holder.discard = true;if (activeCount <= minIdle) {//有效连接 小于等于 最小连接数,激活创建数据库连接emptySignal();}} finally {lock.unlock();}
}

创建连接重试机制

上面我们看到了如果连接池没有连接或者有效线程小于等于最小连接都会触发创建连接逻辑,那么我们现在看下创建连接的源码。我们直接进入 com.alibaba.druid.pool 包 DruidDataSource 的内部类 CreateConnectionThread 创建数据库连接线程查看源码:

//创建数据库连接线程,继承Thread 
public class CreateConnectionThread extends Thread {public CreateConnectionThread(String name){super(name);//申明守护线程标识this.setDaemon(true);}public void run() {initedLatch.countDown();long lastDiscardCount = 0;int errorCount = 0;for (;;) {// addLasttry {lock.lockInterruptibly();} catch (InterruptedException e2) {break;}long discardCount = DruidDataSource.this.discardCount;boolean discardChanged = discardCount - lastDiscardCount > 0;lastDiscardCount = discardCount;try {boolean emptyWait = true;if (createError != null&& poolingCount == 0&& !discardChanged) {emptyWait = false;}if (emptyWait&& asyncInit && createCount < initialSize) {emptyWait = false;}if (emptyWait) {// 必须存在线程等待,才创建连接if (poolingCount >= notEmptyWaitThreadCount //&& (!(keepAlive && activeCount + poolingCount < minIdle))&& !isFailContinuous()) {empty.await();}// 防止创建超过maxActive数量的连接if (activeCount + poolingCount >= maxActive) {empty.await();continue;}}} catch (InterruptedException e) {lastCreateError = e;lastErrorTimeMillis = System.currentTimeMillis();if (!closing) {LOG.error("create connection Thread Interrupted, url: " + jdbcUrl, e);}break;} finally {lock.unlock();}PhysicalConnectionInfo connection = null;try {//创建数据库物理连接connection = createPhysicalConnection();} catch (SQLException e) {LOG.error("create connection SQLException, url: " + jdbcUrl + ", errorCode " + e.getErrorCode()+ ", state " + e.getSQLState(), e);errorCount++;//创建失败重试机制//失败次数大于配置重试次数 且 重试频率(默认500ms) > 0if (errorCount > connectionErrorRetryAttempts && timeBetweenConnectErrorMillis > 0) {// fail over retry attempts//设置不放弃创建连接标识,也就是说会一直重试setFailContinuous(true);if (failFast) {lock.lock();try {notEmpty.signalAll();} finally {lock.unlock();}}if (breakAfterAcquireFailure) {//breakAfterAcquireFailure == true 参数(默认false)中断重试break;}try {//线程睡眠保证创建频率Thread.sleep(timeBetweenConnectErrorMillis);} catch (InterruptedException interruptEx) {break;}}} catch (RuntimeException e) {LOG.error("create connection RuntimeException", e);setFailContinuous(true);continue;} catch (Error e) {LOG.error("create connection Error", e);setFailContinuous(true);break;}if (connection == null) {//没有获取到连接,继续尝试获取连接continue;}boolean result = put(connection);if (!result) {JdbcUtils.close(connection.getPhysicalConnection());LOG.info("put physical connection to pool failed.");}errorCount = 0; // reset errorCount}}
}

如CreateConnectionThread 源码所示,我已经对重点部分进行了注释。创建线程的规则是:

1、notEmptyWaitThreadCount > poolingCount 用户等待连接的线程数量大于连接池线程数量
且 (!(keepAlive && activeCount + poolingCount < minIdle)) == false 线程池数量加有效数量小于等于最小连接
且 !isFailContinuous() == false 重试创建标识
都满足的情况下才会重新创建数据库物理连接。

2、创建数据库物理连接默认无限重试。
当 errorCount > connectionErrorRetryAttempts && timeBetweenConnectErrorMillis > 0 错误次数大于配置的重试次数(默认1)
且 重试频率 timeBetweenConnectErrorMillis > 0 (默认500ms)
会设置不放弃创建连接标识,对下次循环绕过检查直接进行创建物理连接。

3、 当 errorCount > connectionErrorRetryAttempts && timeBetweenConnectErrorMillis > 0 &&
breakAfterAcquireFailure == true
错误次数大于配置的重试次数(默认1) 并且 重试频率 大于0 ms(默认500ms)并且 创建连接失败中断重试的情况下不会一直重试,如果达到重试次数则不再重试创建物理连接。

生产环境如何配置连接池保证高可用

由于生产环境可靠性都要求99.999%以上,如果数据库连接轻微抖动、获取数据库代理异常都会对应用系统造成影响。比如数据库中断影响应用程序,注册中心如果检测到异常就会踢掉服务。那么,我们如何保证数据库连接池的稳定呢?

上面讲述了获取连接和创建连接的核心Druid源码,我们可以知道配置一些参数即可满足日常生产要求:

initial-size: 10 # 初始化连接数
min-idle: 10 # 最小连接数
maxActive: 100 # 最大连接数
maxWait: 60000 # 获取连接时最大等待时间,单位毫秒,默认-1
notFullTimeoutRetryCount: 0 # 获取连接超时重试次数,默认0会重试一次;若次参数小于0,则不会进行重试
timeBetweenEvictionRunsMillis: 60000 # 回收空闲线程和检查连接的频率,默认60ms
minEvictableIdleTimeMillis: 300000 # 连接保持空闲而不被驱逐的最小时间,默认30min
maxEvictableIdleTimeMillis: 4200000 # 连接保持空闲而不被驱逐的最大时间,默认7h
validationQuery: SELECT 1 # 验证数据库服务可用性的sql,mysql默认无效(Ping方式),因数据库方言差异, 例如 oracle 应该写成 SELECT 1 FROM DUAL
testWhileIdle: true # 获取连接时检测空闲时间,根据空闲时间timeBetweenEvictionRunsMillis检测是否有效.建议配置为true,不影响性能,并且保证安全性
testOnBorrow: false # 获取连接时直接检测连接是否有效
testOnReturn: false # 回收连接时检测连接是否有效
poolPreparedStatements: true # 开启PSCache
maxPoolPreparedStatementPerConnectionSize: 20 #设置PSCache值
connectionErrorRetryAttempts: 3 # 创建连接出错重试次数,默认1
breakAfterAcquireFailure: false # 创建连接失败后中断,默认false,配置true则不进行创建连接重试
timeBetweenConnectErrorMillis: 60000 # 创建连接出错重试时间间隔,默认500ms

如果是系统非常敏感则可以考虑损失部分性能:

testOnBorrow == true 且 testWhileIdle == false 每次获取连接进行检查是否有效,不按照频率检查;
breakAfterAcquireFailure == false 无限重试创建物理连接,如果数据库恢复则会即时创建有效连接;
maxWait == 30000 获取连接等待时间缩短,减少获取连接的超时间;

写在最后

Druid数据库连接池主要是在获取连接和创建连接阶段的一些机制来保障高可用。获取连接阶段有超时重试机制和连接有效检查机制,创建连接阶段则是重试机制。我们在实际开发中可以增加一些配置参数来保证Druid数据库连接池的正常运行,比如maxWait 最大等待时间、notFullTimeoutRetryCount 获取连接重试次数、testOnBorrow 每次有效检查连接、testWhileIdle 按频率检查连接 、breakAfterAcquireFailure 是否中断创建连接、connectionErrorRetryAttempts 创建连接重试次数等等。通过这些参数的灵活配置运用,使得Druid数据库连接池达到高稳定性。

路漫漫其修远兮,吾将上下而求索
有兴趣的小伙伴也可以加我:
订阅号 ‘架构集结号’
知识星球 ‘Coding社区’