【源码解析】多数据源 dynamic-datasource快速入门及源码解析
快速入门
依赖引入
<dependency><groupId>com.baomidou</groupId><artifactId>dynamic-datasource-spring-boot-starter</artifactId><version>3.5.1</version></dependency>
配置文件
spring.datasource.druid.stat-view-servlet.enabled = true
spring.datasource.druid.stat-view-servlet.login-username = root
spring.datasource.druid.stat-view-servlet.login-password = root
spring.datasource.dynamic.druid.filters = stat,wall
spring.datasource.druid.aop-patterns = com.charles.mapper.*
spring.datasource.druid.web-stat-filter.enabled = true
spring.datasource.druid.filter.stat.enabled = true
spring.datasource.druid.filter.stat.log-slow-sql = true
spring.datasource.druid.filter.stat.slow-sql-millis = 1000spring.datasource.dynamic.primary = db0
spring.datasource.dynamic.datasource.db1.driver-class-name = com.mysql.cj.jdbc.Driver
spring.datasource.dynamic.datasource.db1.url = jdbc:mysql://127.0.0.1:3306/eb-crm?useUnicode=true&characterEncoding=utf-8&useSSL=false&allowPublicKeyRetrieval=true
spring.datasource.dynamic.datasource.db1.username = app
spring.datasource.dynamic.datasource.db1.password = Ys@123456
spring.datasource.dynamic.datasource.db1.type = com.alibaba.druid.pool.DruidDataSource
spring.datasource.dynamic.datasource.db0.driver-class-name = com.mysql.cj.jdbc.Driver
spring.datasource.dynamic.datasource.db0.url = jdbc:mysql://127.0.0.1:3306/newcrm?useUnicode=true&characterEncoding=utf-8&useSSL=false&allowPublicKeyRetrieval=true
spring.datasource.dynamic.datasource.db0.username = app
spring.datasource.dynamic.datasource.db0.password = Ys@123456
spring.datasource.dynamic.datasource.db0.type = com.alibaba.druid.pool.DruidDataSource
使用注解拦截
@DS("db1")
public interface TbpExcelLogMapper extends BaseMapper<TbpExcelLog> {
}
源码解析
启动的时候,会加载在dynamic-datasource-spring-boot-starter
的jar包中的spring.factories
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\\
com.baomidou.dynamic.datasource.spring.boot.autoconfigure.DynamicDataSourceAutoConfiguration
数据源加载
在DynamicDataSourceAutoConfiguration
会注入DynamicRoutingDataSource
@Bean@ConditionalOnMissingBeanpublic DataSource dataSource() {DynamicRoutingDataSource dataSource = new DynamicRoutingDataSource();dataSource.setPrimary(properties.getPrimary());dataSource.setStrict(properties.getStrict());dataSource.setStrategy(properties.getStrategy());dataSource.setP6spy(properties.getP6spy());dataSource.setSeata(properties.getSeata());return dataSource;}
DynamicRoutingDataSource#afterPropertiesSet
,系统启动的时候会加载所有的数据源
@Overridepublic void afterPropertiesSet() throws Exception {// 检查开启了配置但没有相关依赖checkEnv();// 添加并分组数据源Map<String, DataSource> dataSources = new HashMap<>(16);for (DynamicDataSourceProvider provider : providers) {dataSources.putAll(provider.loadDataSources());}for (Map.Entry<String, DataSource> dsItem : dataSources.entrySet()) {addDataSource(dsItem.getKey(), dsItem.getValue());}// 检测默认数据源是否设置if (groupDataSources.containsKey(primary)) {log.info("dynamic-datasource initial loaded [{}] datasource,primary group datasource named [{}]", dataSources.size(), primary);} else if (dataSourceMap.containsKey(primary)) {log.info("dynamic-datasource initial loaded [{}] datasource,primary datasource named [{}]", dataSources.size(), primary);} else {log.warn("dynamic-datasource initial loaded [{}] datasource,Please add your primary datasource or check your configuration", dataSources.size());}}
在DynamicDataSourceAutoConfiguration
会注入DynamicDataSourceProvider
@Beanpublic DynamicDataSourceProvider ymlDynamicDataSourceProvider() {return new YmlDynamicDataSourceProvider(properties.getDatasource());}
AbstractDataSourceProvider#createDataSourceMap
,根据配置文件中的信息创建数据源。
protected Map<String, DataSource> createDataSourceMap(Map<String, DataSourceProperty> dataSourcePropertiesMap) {Map<String, DataSource> dataSourceMap = new HashMap<>(dataSourcePropertiesMap.size() * 2);for (Map.Entry<String, DataSourceProperty> item : dataSourcePropertiesMap.entrySet()) {String dsName = item.getKey();DataSourceProperty dataSourceProperty = item.getValue();String poolName = dataSourceProperty.getPoolName();if (poolName == null || "".equals(poolName)) {poolName = dsName;}dataSourceProperty.setPoolName(poolName);dataSourceMap.put(dsName, defaultDataSourceCreator.createDataSource(dataSourceProperty));}return dataSourceMap;}
AbstractDataSourceCreator#createDataSource
,模板方法。该方法会调用dataSourceInitEvent.beforeCreate(dataSourceProperty);
。而doCreateDataSource
交由每个子类来创建。
@Overridepublic DataSource createDataSource(DataSourceProperty dataSourceProperty) {String publicKey = dataSourceProperty.getPublicKey();if (StringUtils.isEmpty(publicKey)) {publicKey = properties.getPublicKey();dataSourceProperty.setPublicKey(publicKey);}Boolean lazy = dataSourceProperty.getLazy();if (lazy == null) {lazy = properties.getLazy();dataSourceProperty.setLazy(lazy);}dataSourceInitEvent.beforeCreate(dataSourceProperty);DataSource dataSource = doCreateDataSource(dataSourceProperty);dataSourceInitEvent.afterCreate(dataSource);this.runScrip(dataSource, dataSourceProperty);return wrapDataSource(dataSource, dataSourceProperty);}
DruidDataSourceCreator#doCreateDataSource
,创建druid数据源。根据全局配置gConfig
和定制化配置结合来生成DruidDataSource
。spring.datasource.dynamic.datasource.xxx.type=com.alibaba.druid.pool.DruidDataSource
可以进行定制化配置,也可以通过spring.datasource.dynamic.druid
全局配置druid的属性
private DruidConfig gConfig;@Overridepublic DataSource doCreateDataSource(DataSourceProperty dataSourceProperty) {DruidDataSource dataSource = new DruidDataSource();dataSource.setUsername(dataSourceProperty.getUsername());dataSource.setPassword(dataSourceProperty.getPassword());dataSource.setUrl(dataSourceProperty.getUrl());dataSource.setName(dataSourceProperty.getPoolName());String driverClassName = dataSourceProperty.getDriverClassName();if (!StringUtils.isEmpty(driverClassName)) {dataSource.setDriverClassName(driverClassName);}DruidConfig config = dataSourceProperty.getDruid();Properties properties = config.toProperties(gConfig);List<Filter> proxyFilters = this.initFilters(dataSourceProperty, properties.getProperty("druid.filters"));dataSource.setProxyFilters(proxyFilters);dataSource.configFromPropety(properties);//连接参数单独设置dataSource.setConnectProperties(config.getConnectionProperties());//设置druid内置properties不支持的的参数this.setParam(dataSource, config);if (Boolean.FALSE.equals(dataSourceProperty.getLazy())) {try {dataSource.init();} catch (SQLException e) {throw new ErrorCreateDataSourceException("druid create error", e);}}return dataSource;}
切面修改数据源的key
在DynamicDataSourceAutoConfiguration
会注入DynamicDataSourceAnnotationAdvisor
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)@Bean@ConditionalOnProperty(prefix = DynamicDataSourceProperties.PREFIX + ".aop", name = "enabled", havingValue = "true", matchIfMissing = true)public Advisor dynamicDatasourceAnnotationAdvisor(DsProcessor dsProcessor) {DynamicDatasourceAopProperties aopProperties = properties.getAop();DynamicDataSourceAnnotationInterceptor interceptor = new DynamicDataSourceAnnotationInterceptor(aopProperties.getAllowedPublicOnly(), dsProcessor);DynamicDataSourceAnnotationAdvisor advisor = new DynamicDataSourceAnnotationAdvisor(interceptor, DS.class);advisor.setOrder(aopProperties.getOrder());return advisor;}
DynamicDataSourceAnnotationAdvisor#buildPointcut
,构造切面,对方法和类都拦截。
private Pointcut buildPointcut() {Pointcut cpc = new AnnotationMatchingPointcut(annotation, true);Pointcut mpc = new AnnotationMethodPoint(annotation);return new ComposablePointcut(cpc).union(mpc);}
如果使用@Aspect
来切面的话,对注解进行切片,要注意@within
和@annotation
的区别。
1. @within 对象级别2. @annotation 方法级别
DynamicDataSourceAnnotationInterceptor#invoke
,获取方法上或者类上的注解中的key
@Overridepublic Object invoke(MethodInvocation invocation) throws Throwable {String dsKey = determineDatasourceKey(invocation);DynamicDataSourceContextHolder.push(dsKey);try {return invocation.proceed();} finally {DynamicDataSourceContextHolder.poll();}}
DynamicDataSourceAnnotationInterceptor#determineDatasourceKey
,如果key以#
开头,则返回dsProcessor.determineDatasource(invocation, key)
,否则直接返回key。
private String determineDatasourceKey(MethodInvocation invocation) {String key = dataSourceClassResolver.findKey(invocation.getMethod(), invocation.getThis());return key.startsWith(DYNAMIC_PREFIX) ? dsProcessor.determineDatasource(invocation, key) : key;}
DsProcessor
在DynamicDataSourceAutoConfiguration
会注入DsProcessor
,使用过滤器链的模式进行处理。
@Bean@ConditionalOnMissingBeanpublic DsProcessor dsProcessor(BeanFactory beanFactory) {DsHeaderProcessor headerProcessor = new DsHeaderProcessor();DsSessionProcessor sessionProcessor = new DsSessionProcessor();DsSpelExpressionProcessor spelExpressionProcessor = new DsSpelExpressionProcessor();spelExpressionProcessor.setBeanResolver(new BeanFactoryResolver(beanFactory));headerProcessor.setNextProcessor(sessionProcessor);sessionProcessor.setNextProcessor(spelExpressionProcessor);return headerProcessor;}
DsHeaderProcessor#doDetermineDatasource
,如果方法上的注解类似于@DS(#header=tenat_id)
,那么切换路由的key是由header中tenat_id
的值决定的。
@Overridepublic String doDetermineDatasource(MethodInvocation invocation, String key) {HttpServletRequest request = ((ServletRequestAttributes) RequestContextHolder.getRequestAttributes()).getRequest();return request.getHeader(key.substring(8));}
spelExpressionProcessor
是支持表达式的。
在注解中配置@DS("#header=tenantName")
,则从header里面去寻找key;写了@DS("#session=tenantName")
,就从session里面获取以tenantName
为键的值。而@DS("#tenantName")
和@DS("#user.tenantName")
根据参数来获取。
获取数据源
获取连接,AbstractRoutingDataSource#getConnection()
。首先获取数据源,从数据源中获取Connection
@Overridepublic Connection getConnection() throws SQLException {String xid = TransactionContext.getXID();if (StringUtils.isEmpty(xid)) {return determineDataSource().getConnection();} else {String ds = DynamicDataSourceContextHolder.peek();ds = StringUtils.isEmpty(ds) ? "default" : ds;ConnectionProxy connection = ConnectionFactory.getConnection(ds);return connection == null ? getConnectionProxy(ds, determineDataSource().getConnection()) : connection;}}
DynamicRoutingDataSource#determineDataSource
,获取线程变量的数据,根据线程变量获取数据源。
@Overridepublic DataSource determineDataSource() {String dsKey = DynamicDataSourceContextHolder.peek();return getDataSource(dsKey);}
DynamicRoutingDataSource#getDataSource
,该key为空,获取默认数据源,优先从groupDataSources
里面获取数据源,然后再从dataSourceMap
里面获取。
public DataSource getDataSource(String ds) {if (StringUtils.isEmpty(ds)) {return determinePrimaryDataSource();} else if (!groupDataSources.isEmpty() && groupDataSources.containsKey(ds)) {log.debug("dynamic-datasource switch to the datasource named [{}]", ds);return groupDataSources.get(ds).determineDataSource();} else if (dataSourceMap.containsKey(ds)) {log.debug("dynamic-datasource switch to the datasource named [{}]", ds);return dataSourceMap.get(ds);}if (strict) {throw new CannotFindDataSourceException("dynamic-datasource could not find a datasource named" + ds);}return determinePrimaryDataSource();}
DynamicDataSourceStrategy
,从群组里面获取数据源需要配置获取策略,从集合中决定是哪一个,有随机和轮询。
public class RandomDynamicDataSourceStrategy implements DynamicDataSourceStrategy {@Overridepublic String determineKey(List<String> dsNames) {return dsNames.get(ThreadLocalRandom.current().nextInt(dsNames.size()));}
}
比对一下,spring-jdbc中的动态数据源切换。
AbstractRoutingDataSource#afterPropertiesSet
,重写了InitializingBean#afterPropertiesSet
的方法。
public void afterPropertiesSet() {if (this.targetDataSources == null) {throw new IllegalArgumentException("Property 'targetDataSources' is required");} else {this.resolvedDataSources = new HashMap(this.targetDataSources.size());this.targetDataSources.forEach((key, value) -> {Object lookupKey = this.resolveSpecifiedLookupKey(key);DataSource dataSource = this.resolveSpecifiedDataSource(value);this.resolvedDataSources.put(lookupKey, dataSource);});if (this.defaultTargetDataSource != null) {this.resolvedDefaultDataSource = this.resolveSpecifiedDataSource(this.defaultTargetDataSource);}}}
AbstractRoutingDataSource#getConnection()
。获取lookupKey
的方式由子类决定。
public Connection getConnection() throws SQLException {return this.determineTargetDataSource().getConnection();}protected DataSource determineTargetDataSource() {Assert.notNull(this.resolvedDataSources, "DataSource router not initialized");Object lookupKey = this.determineCurrentLookupKey();DataSource dataSource = (DataSource)this.resolvedDataSources.get(lookupKey);if (dataSource == null && (this.lenientFallback || lookupKey == null)) {dataSource = this.resolvedDefaultDataSource;}if (dataSource == null) {throw new IllegalStateException("Cannot determine target DataSource for lookup key [" + lookupKey + "]");} else {return dataSource;}}@Nullableprotected abstract Object determineCurrentLookupKey();
多数据源事务
在DynamicDataSourceAutoConfiguration
会注入dynamicTransactionAdvisor
,主要是拦截处理有DSTransactional
注解的方法。
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)@Bean@ConditionalOnProperty(prefix = DynamicDataSourceProperties.PREFIX, name = "seata", havingValue = "false", matchIfMissing = true)public Advisor dynamicTransactionAdvisor() {DynamicLocalTransactionInterceptor interceptor = new DynamicLocalTransactionInterceptor();return new DynamicDataSourceAnnotationAdvisor(interceptor, DSTransactional.class);}
DynamicLocalTransactionInterceptor
,开启事务。
public class DynamicLocalTransactionInterceptor implements MethodInterceptor {@Overridepublic Object invoke(MethodInvocation methodInvocation) throws Throwable {if (!StringUtils.isEmpty(TransactionContext.getXID())) {return methodInvocation.proceed();}boolean state = true;Object o;LocalTxUtil.startTransaction();try {o = methodInvocation.proceed();} catch (Exception e) {state = false;throw e;} finally {if (state) {LocalTxUtil.commit();} else {LocalTxUtil.rollback();}}return o;}
}
LocalTxUtil#startTransaction
public static void startTransaction() {if (!StringUtils.isEmpty(TransactionContext.getXID())) {log.debug("dynamic-datasource exist local tx [{}]", TransactionContext.getXID());} else {String xid = UUID.randomUUID().toString();TransactionContext.bind(xid);log.debug("dynamic-datasource start local tx [{}]", xid);}}
AbstractRoutingDataSource#getConnection()
,判断是否存在事务id,如果存在,则获取连接,并封装成ConnectionProxy
。
@Overridepublic Connection getConnection() throws SQLException {String xid = TransactionContext.getXID();if (StringUtils.isEmpty(xid)) {return determineDataSource().getConnection();} else {String ds = DynamicDataSourceContextHolder.peek();ds = StringUtils.isEmpty(ds) ? "default" : ds;ConnectionProxy connection = ConnectionFactory.getConnection(ds);return connection == null ? getConnectionProxy(ds, determineDataSource().getConnection()) : connection;}}private Connection getConnectionProxy(String ds, Connection connection) {ConnectionProxy connectionProxy = new ConnectionProxy(connection, ds);ConnectionFactory.putConnection(ds, connectionProxy);return connectionProxy;}
ConnectionFactory#putConnection
,存储到线程变量中的concurrentHashMap
。
public static void putConnection(String ds, ConnectionProxy connection) {Map<String, ConnectionProxy> concurrentHashMap = CONNECTION_HOLDER.get();if (!concurrentHashMap.containsKey(ds)) {try {connection.setAutoCommit(false);} catch (SQLException e) {e.printStackTrace();}concurrentHashMap.put(ds, connection);}}
提交事务,LocalTxUtil#commit
public static void commit() {ConnectionFactory.notify(true);log.debug("dynamic-datasource commit local tx [{}]", TransactionContext.getXID());TransactionContext.remove();}
ConnectionFactory#notify
,对于线程变量中的连接全部处理(回滚或者提交),并清空线程变量。
public static void notify(Boolean state) {try {Map<String, ConnectionProxy> concurrentHashMap = CONNECTION_HOLDER.get();for (ConnectionProxy connectionProxy : concurrentHashMap.values()) {connectionProxy.notify(state);}} finally {CONNECTION_HOLDER.remove();}}
配置文件加密
在DynamicDataSourceAutoConfiguration
会注入EncDataSourceInitEvent
@Bean@ConditionalOnMissingBeanpublic DataSourceInitEvent dataSourceInitEvent() {return new EncDataSourceInitEvent();}
EncDataSourceInitEvent#beforeCreate
,对加密的字符串进行解密
@Overridepublic void beforeCreate(DataSourceProperty dataSourceProperty) {String publicKey = dataSourceProperty.getPublicKey();if (StringUtils.hasText(publicKey)) {dataSourceProperty.setUrl(decrypt(publicKey, dataSourceProperty.getUrl()));dataSourceProperty.setUsername(decrypt(publicKey, dataSourceProperty.getUsername()));dataSourceProperty.setPassword(decrypt(publicKey, dataSourceProperty.getPassword()));}}
EncDataSourceInitEvent#decrypt
,匹配正则表达式则进行解密。 Pattern ENC_PATTERN = Pattern.compile("^ENC\\\\((.*)\\\\)$");
private String decrypt(String publicKey, String cipherText) {if (StringUtils.hasText(cipherText)) {Matcher matcher = ENC_PATTERN.matcher(cipherText);if (matcher.find()) {try {return CryptoUtils.decrypt(publicKey, matcher.group(1));} catch (Exception e) {log.error("DynamicDataSourceProperties.decrypt error ", e);}}}return cipherText;}
脚本执行器
AbstractDataSourceCreator#runScrip
,用对应的数据源执行脚本
private void runScrip(DataSource dataSource, DataSourceProperty dataSourceProperty) {DatasourceInitProperties initProperty = dataSourceProperty.getInit();String schema = initProperty.getSchema();String data = initProperty.getData();if (StringUtils.hasText(schema) || StringUtils.hasText(data)) {ScriptRunner scriptRunner = new ScriptRunner(initProperty.isContinueOnError(), initProperty.getSeparator());if (StringUtils.hasText(schema)) {scriptRunner.runScript(dataSource, schema);}if (StringUtils.hasText(data)) {scriptRunner.runScript(dataSource, data);}}}
主从插件
MasterSlaveAutoRoutingPlugin
。当进行查询的时候,切换数据源SLAVE
;当进行更新的时候,切换数据源,MASTER
。
@Intercepts({@Signature(type = Executor.class, method = "query", args = {MappedStatement.class, Object.class, RowBounds.class, ResultHandler.class}),@Signature(type = Executor.class, method = "query", args = {MappedStatement.class, Object.class, RowBounds.class, ResultHandler.class, CacheKey.class, BoundSql.class}),@Signature(type = Executor.class, method = "update", args = {MappedStatement.class, Object.class})})
@Slf4j
public class MasterSlaveAutoRoutingPlugin implements Interceptor {@Autowiredprotected DataSource dynamicDataSource;@Overridepublic Object intercept(Invocation invocation) throws Throwable {Object[] args = invocation.getArgs();MappedStatement ms = (MappedStatement) args[0];String pushedDataSource = null;try {String dataSource = SqlCommandType.SELECT == ms.getSqlCommandType() ? DdConstants.SLAVE : DdConstants.MASTER;pushedDataSource = DynamicDataSourceContextHolder.push(dataSource);return invocation.proceed();} finally {if (pushedDataSource != null) {DynamicDataSourceContextHolder.poll();}}}@Overridepublic Object plugin(Object target) {return Plugin.wrap(target, this);}@Overridepublic void setProperties(Properties properties) {}
}