全链路压测时动态路由数据源MySQL、MongoDB、Redis
目录
一、全链路压测
1. 参数配置application-localDynamic.yml
2. 加载配置参数DynamicDataSourceProperties.java
3. 动态数据源DynamicDataSource.java
4. 动态数据源供应DynamicDataSourceProvider.java
5. 动态数据源bean
6. 动态数据源上下文DynamicDataSourceContextHolder.java
7. 动态数据源过滤器DynamicDataSourceFilter.java
8. 测试动态路由
三、动态路由MongoDB
1. 参数配置application-localDynamicMongo.yml
2. 加载配置参数DynamicMongoSourceProperties.java
3. 动态数据源DynamicMongoSource.java
4. 动态数据源供应DynamicMongoSourceProvider.java
5. 动态数据源bean
6. 动态数据源上下文DynamicMongoSourceContextHolder.java
7. 动态数据源过滤器DynamicMongoSourceFilter.java
8. 测试动态路由
四、动态路由Redis
1. 参数配置application-localDynamicRedis.yml
2. 加载配置参数DynamicRedisSourceProperties.java
3. 动态数据源DynamicRedisSource.java
4. 动态数据源供应DynamicRedisSourceProvider.java
5. 动态数据源bean
6. 动态数据源上下文DynamicRedisSourceContextHolder.java
7. 动态数据源过滤器DynamicRedisSourceFilter.java
8. 测试动态路由
五、参考资料
一、全链路压测
验证系统所能够承受的最大负载是否接近于预期,是否经得住大流量的冲击,绝非是一件易事。有过分布式系统开发经验的同学都应该非常清楚,简单对某个接口、子系统进行压测,并不能够准确探测出系统整体的流量水平,并且对环境有着极为严苛的要求。全链路压测就是确保大促来临时核心链路的整体稳定。
如何在大促前夕对线上环境实施全链路压测,做到有指导的进行容量规划和性能优化。大促前最基本也是最棘手的两项关键任务:
- 评估机器扩容数量;
- 验证系统整体容量是否能够有效支撑所预估的流量峰值。
首先梳理系统中的核心链路。其次算出单机最大流量。然后根据GMV(Gross Merchandise Volume _ 商品交易总额)或根据历史估计现有最大流量算出扩容的机器数量。最后压测整体系统才能暴露问题,如:慢SQL、连接资源耗尽(DB连接池连接)、加锁导致大量线程等待(排他锁、分布式锁、DB的行锁)等问题。
线上压测具有较大的风险,绝不能出现一丝失误。虽然困难重重,但是也要测试系统的真实流量水位。首先高峰期绝对是不能压测的,安全做法低峰期时,Nginx层控制用户流量方向。以下是线上全链路压测的关键环节:
- 如何标记流量是用户流量还是压测流量:如请求头、URL请求参数等来区分;
- 如何将压测数据引流到隔离环境中:如同表的某字段区分、动态路由到隔离环境(本章介绍);
- 如何构造压测数据:如自动生成数据、线上数据敏感过滤后的数据等;
- 如何升级和改造业务系统和中间件:如定时任务、消息中间件、外部接口等;
- 如何发起超大规模的流量:如集群Jmeter、分布式压测nGrinder等。
压测最大的难点是压测数据如何构造和隔离。本章节主要实现动态路由MySQL、MongoDB、Redis,用户流量和压测流量同时访问一台服务器后,动态路由实现用户数据与压测数据的完全隔离,用户数据避免被污染,避免导致出现数据安全事故。
二、动态路由Mysql
实现Mysql的动态路由,首先了解org.springframework.jdbc.datasource.lookup.AbstractRoutingDataSource类。该类是在Spring2.0.1中引入的(不是Spring Boot2.0.1), 充当了DataSource的路由中介,它能够在运行时, 根据某种key值来动态切换到真正的DataSource上。
AbstractRoutingDataSource大致逻辑是:提前准备好各种数据源,存入到一个Map中,Map的key是数据源的名字,Map的value就是具体的数据源,然后再把这个Map配置到AbstractRoutingDataSource中,最后每次执行数据库查询的时候,拿一个key出来,该类会找到具体的数据源去执行这次的数据库操作。
MongoDB、Redis没有该类,则本人根据各自的工厂实现了动态路由类,来完成各自的切换,见下小节。
1. 参数配置application-localDynamic.yml
spring:dynamic-data-source:druid:
# dsKey: testds:# 主库数据源,默认master不能变master:url: jdbc:mysql://${remote.ip}:3307/prod?useUnicode=true&characterEncoding=utf-8&useSSL=false&serverTimezone=GMT%2B8username: rootpassword: 123456# 压测库数据源test:url: jdbc:mysql://${remote.ip}:3307/test?useUnicode=true&characterEncoding=utf-8&useSSL=false&serverTimezone=GMT%2B8username: rootpassword: 123456driverClassName: com.mysql.cj.jdbc.Drivertype: mysqlinitialSize: 10keepAlive: trueminIdle: 10maxActive: 50maxWait: 60000timeBetweenEvictionRunsMillis: 60000minEvictableIdleTimeMillis: 300000maxEvictableIdleTimeMillis: 306000validationQuery: SELECT 1 FROM DUALtestWhileIdle: truetestOnBorrow: falsetestOnReturn: falsefilters: stat,wall,log4jpoolPreparedStatements: falsemaxPoolPreparedStatementPerConnectionSize: -1useGlobalDataSourceStat: trueconnectionProperties: druid.stat.mergeSql=true;druid.stat.slowSqlMillis=1000
2. 加载配置参数DynamicDataSourceProperties.java
package com.common.instance.demo.config.dynamicDataSource;import com.alibaba.druid.pool.DruidDataSource;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;import java.util.Map;/* @author tcm* @version 1.0.0* @description 动态加载数据源配置* @date 2023/3/14 15:13/
@Data
@Component
@ConfigurationProperties(prefix = "spring.dynamic-data-source.druid")
public class DynamicDataSourceProperties {private int initialSize;private int minIdle;private int maxActive;private int maxWait;private int timeBetweenEvictionRunsMillis;private int minEvictableIdleTimeMillis;private int maxEvictableIdleTimeMillis;private String validationQuery;private boolean testWhileIdle;private boolean testOnBorrow;private boolean testOnReturn;private Map<String, Map<String, String>> ds;private String dsKey;public DruidDataSource dataSource(DruidDataSource datasource) {/ 配置初始化大小、最小、最大 */datasource.setInitialSize(initialSize);datasource.setMaxActive(maxActive);datasource.setMinIdle(minIdle);/ 配置获取连接等待超时的时间 */datasource.setMaxWait(maxWait);/ 配置间隔多久才进行一次检测,检测需要关闭的空闲连接,单位是毫秒 */datasource.setTimeBetweenEvictionRunsMillis(timeBetweenEvictionRunsMillis);/ 配置一个连接在池中最小、最大生存的时间,单位是毫秒 */datasource.setMinEvictableIdleTimeMillis(minEvictableIdleTimeMillis);datasource.setMaxEvictableIdleTimeMillis(maxEvictableIdleTimeMillis);/* 用来检测连接是否有效的sql,要求是一个查询语句,常用select 'x'。如果validationQuery为null,testOnBorrow、testOnReturn、testWhileIdle都不会起作用。*/datasource.setValidationQuery(validationQuery);/ 建议配置为true,不影响性能,并且保证安全性。申请连接的时候检测,如果空闲时间大于timeBetweenEvictionRunsMillis,执行validationQuery检测连接是否有效。 */datasource.setTestWhileIdle(testWhileIdle);/ 申请连接时执行validationQuery检测连接是否有效,做了这个配置会降低性能。 */datasource.setTestOnBorrow(testOnBorrow);/ 归还连接时执行validationQuery检测连接是否有效,做了这个配置会降低性能。 */datasource.setTestOnReturn(testOnReturn);return datasource;}}
3. 动态数据源DynamicDataSource.java
继承org.springframework.jdbc.datasource.lookup.AbstractRoutingDataSource类,是实现动态路由切换的核心逻辑类。
package com.common.instance.demo.config.dynamicDataSource;import org.springframework.jdbc.datasource.lookup.AbstractRoutingDataSource;import java.util.HashMap;
import java.util.Map;/* @author tcm* @version 1.0.0* @description 动态数据源* @date 2023/3/14 15:36/
public class DynamicDataSource extends AbstractRoutingDataSource {private DynamicDataSourceProvider dynamicDataSourceProvider;private DynamicDataSourceProperties dynamicDataSourceProperties;public DynamicDataSource(DynamicDataSourceProvider dynamicDataSourceProvider, DynamicDataSourceProperties dynamicDataSourceProperties) {this.dynamicDataSourceProvider = dynamicDataSourceProvider;this.dynamicDataSourceProperties = dynamicDataSourceProperties;// 获取所有目标数据源Map<Object, Object> targetDataSources = new HashMap<>(dynamicDataSourceProvider.loadDataSources());super.setTargetDataSources(targetDataSources);// 设置默认数据源super.setDefaultTargetDataSource(dynamicDataSourceProvider.loadDataSources().get(DynamicDataSourceProvider.DEFAULT_DATASOURCE));super.afterPropertiesSet();}@Overrideprotected Object determineCurrentLookupKey() {
// return dynamicDataSourceProperties.getDsKey() == null ? DynamicDataSourceProvider.DEFAULT_DATASOURCE:dynamicDataSourceProperties.getDsKey();return DynamicDataSourceContextHolder.getDataSourceType();}}
4. 动态数据源供应DynamicDataSourceProvider.java
package com.common.instance.demo.config.dynamicDataSource;import javax.sql.DataSource;
import java.util.Map;/* @author tcm* @version 1.0.0* @description 动态数据源提供者接口* @date 2023/3/14 15:21/
public interface DynamicDataSourceProvider {// 默认数据源String DEFAULT_DATASOURCE = "master";/* 加载所有的数据源* @return*/Map<String, DataSource> loadDataSources();}
package com.common.instance.demo.config.dynamicDataSource;import com.alibaba.druid.pool.DruidDataSource;
import com.alibaba.druid.pool.DruidDataSourceFactory;
import lombok.Data;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Configuration;import javax.annotation.Resource;
import javax.sql.DataSource;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;/* @author tcm* @version 1.0.0* @description 动态数据源配置* @date 2023/3/17 9:15/
@Data
@Configuration
@EnableConfigurationProperties(DynamicDataSourceProperties.class)
public class YamlDynamicDataSourceProvider implements DynamicDataSourceProvider {@Resourceprivate DynamicDataSourceProperties dynamicDataSourceProperties;@Overridepublic Map<String, DataSource> loadDataSources() {Map<String, DataSource> ds = new HashMap<>(dynamicDataSourceProperties.getDs().size());try {Map<String, Map<String, String>> map = dynamicDataSourceProperties.getDs();Set<String> keySet = map.keySet();for (String s : keySet) {DruidDataSource dataSource = (DruidDataSource) DruidDataSourceFactory.createDataSource(map.get(s));ds.put(s, dynamicDataSourceProperties.dataSource(dataSource));}} catch (Exception e) {e.printStackTrace();}return ds;}}
5. 动态数据源bean
package com.common.instance.demo.config.dynamicDataSource;import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import javax.annotation.Resource;
import javax.sql.DataSource;/* @author tcm* @version 1.0.0* @description 数据源配置* @date 2023/3/15 17:18/
@Configuration
public class DataSourceBean {@Resourceprivate DynamicDataSourceProperties dynamicDataSourceProperties;@Resourceprivate DynamicDataSourceProvider dynamicDataSourceProvider;@Beanpublic DataSource getDataSource() {return new DynamicDataSource(dynamicDataSourceProvider, dynamicDataSourceProperties);}}
6. 动态数据源上下文DynamicDataSourceContextHolder.java
作用是:InheritableThreadLocal继承类来缓存路由类型(压测或用户流量),异步线程可以继承该缓存。
package com.common.instance.demo.config.dynamicDataSource;import com.log.util.LogUtil;/* @author tcm* @version 1.0.0* @description 动态数据源上下文* @date 2023/3/15 11:16/
public class DynamicDataSourceContextHolder {/* 使用ThreadLocal维护变量,ThreadLocal为每个使用该变量的线程提供独立的变量副本,* 所以每一个线程都可以独立地改变自己的副本,而不会影响其它线程所对应的副本。*/private static final ThreadLocal<String> CONTEXT_HOLDER = new InheritableThreadLocal<>();/* 设置数据源的变量*/public static void setDataSourceType(String dsType) {LogUtil.info(String.format("切换到%s数据源", dsType));CONTEXT_HOLDER.set(dsType);}/* 获得数据源的变量*/public static String getDataSourceType() {return CONTEXT_HOLDER.get();}/* 清空数据源变量*/public static void clearDataSourceType() {CONTEXT_HOLDER.remove();}}
7. 动态数据源过滤器DynamicDataSourceFilter.java
该类的作用:使用请求头来标记是用户还是压测,若是压测则切换到压测环境Mysql库。
package com.common.instance.demo.config.dynamicDataSource;import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;
import org.springframework.web.filter.OncePerRequestFilter;import javax.servlet.FilterChain;
import javax.servlet.ServletException;
import javax.servlet.annotation.WebFilter;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;/* @author tcm* @version 1.0.0* @description 动态数据源过滤器* @date 2023/3/15 15:07/
@Component
@WebFilter(filterName = "dynamicDataSourceFilter", urlPatterns = "/*")
@Order(-10)
public class DynamicDataSourceFilter extends OncePerRequestFilter {@Overrideprotected void doFilterInternal(HttpServletRequest request, HttpServletResponse response, FilterChain filterChain) throws ServletException, IOException {// 获取测试标记String testFlag = request.getHeader("Test-Flag");// 如有测试标记,则设置测试数据源if (testFlag != null ) {DynamicDataSourceContextHolder.setDataSourceType("test");}// 添加到过滤链中filterChain.doFilter(request, response);}}
8. 测试动态路由
// 路由生产mysql
curl --request POST \\--url http://127.0.0.1:9013/instance-demo/dynamicDataSource/insertData \\--header 'content-type: application/json' \\--data '{"tabId":"fde7c1d4cad049c89612afb6c2c2979","transactionId":"7F0000013B2818B4AAC22CE1BDA20004"
}'// 路由测试mysql
curl --request POST \\--url http://127.0.0.1:9013/instance-demo/dynamicDataSource/insertData \\--header 'Test-Flag: true' \\--header 'content-type: application/json' \\--data '{"tabId":"fde7c1d4cad049c89612afb6c2c2979","transactionId":"7F0000013B2818B4AAC22CE1BDA20004-t"
}'
三、动态路由MongoDB
动态路由原理参考Mysql动态路由。自定义抽象路由工厂AbstractRoutingMongoSource,根据key路由到不同的MongoDB数据源。把路由工厂实现类DynamicMongoSource注入到MongoTemplate中,实现路由切换。
1. 参数配置application-localDynamicMongo.yml
spring:dynamic-mongo-source:
# dsKey: testds:# 主库数据源,默认master不能变master:hosts: ${remote.ip}ports: 27018database: demousername: adminpassword: 123456authentication-database: adminconnections-per-host: 100min-connections-per-host: 1maxConnectionIdleTime: 150000maxConnectionLifeTime: 150000connectTimeout: 6000socketTimeout: 10000# 压测库数据源test:hosts: ${remote.ip}ports: 27018database: testusername: adminpassword: 123456authentication-database: adminconnections-per-host: 100min-connections-per-host: 1maxConnectionIdleTime: 150000maxConnectionLifeTime: 150000connectTimeout: 6000socketTimeout: 10000
2. 加载配置参数DynamicMongoSourceProperties.java
package com.common.instance.demo.config.dynamicMongoSource;import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;import java.util.Map;/* @author tcm* @version 1.0.0* @description Mongo动态加载数据源配置* @date 2023/3/17 9:21/
@Data
@Component
@ConfigurationProperties(prefix = "spring.dynamic-mongo-source")
public class DynamicMongoSourceProperties {private Map<String, MongoSettingsProperties> ds;private String dsKey;}
package com.common.instance.demo.config.dynamicMongoSource;import lombok.Data;
import org.hibernate.validator.constraints.NotBlank;
import org.hibernate.validator.constraints.NotEmpty;
import org.springframework.validation.annotation.Validated;import java.util.List;/* @author tcm* @version 1.0.0* @description Mongo配置* @date 2023/3/17 9:14/
@Validated
@Data
public class MongoSettingsProperties {@NotBlankprivate String database;@NotEmptyprivate List<String> hosts;@NotEmptyprivate List<Integer> ports;private String replicaSet;private String username;private String password;private String authenticationDatabase;private Integer minConnectionsPerHost = 10;private Integer connectionsPerHost = 20;private Integer maxConnectionIdleTime;private Integer maxConnectionLifeTime;private Integer connectTimeout;private Integer socketTimeout;}
3. 动态数据源DynamicMongoSource.java
自定义抽象路由工厂AbstractRoutingMongoSource,根据key路由到不同的MongoDB数据源。把路由工厂实现类DynamicMongoSource注入到MongoTemplate中,实现路由切换。
package com.common.instance.demo.config.dynamicMongoSource;import com.common.instance.demo.config.dynamicDataSource.DynamicDataSourceProvider;
import org.springframework.data.mongodb.MongoDatabaseFactory;import java.util.HashMap;
import java.util.Map;/* @author tcm* @version 1.0.0* @description Mongo动态数据源* @date 2023/3/17 11:22/
public class DynamicMongoSource extends AbstractRoutingMongoSource {private DynamicMongoSourceProvider dynamicMongoSourceProvider;private DynamicMongoSourceProperties dynamicMongoSourceProperties;public DynamicMongoSource(DynamicMongoSourceProvider dynamicMongoSourceProvider, DynamicMongoSourceProperties dynamicMongoSourceProperties) {this.dynamicMongoSourceProvider = dynamicMongoSourceProvider;this.dynamicMongoSourceProperties = dynamicMongoSourceProperties;// 获取所有目标数据源Map<String, MongoDatabaseFactory> targetMongoSources = new HashMap<>(dynamicMongoSourceProvider.loadMongoSources());super.setTargetMongoSources(targetMongoSources);// 设置默认数据源super.setDefaultTargetMongoSource(dynamicMongoSourceProvider.loadMongoSources().get(DynamicDataSourceProvider.DEFAULT_DATASOURCE));}@Overrideprotected Object determineCurrentLookupKey() {
// return dynamicMongoSourceProperties.getDsKey() == null ? DynamicMongoSourceProvider.DEFAULT_DATASOURCE:dynamicMongoSourceProperties.getDsKey();return DynamicMongoSourceContextHolder.getMongoSourceType();}}
package com.common.instance.demo.config.dynamicMongoSource;import com.mongodb.ClientSessionOptions;
import com.mongodb.client.ClientSession;
import com.mongodb.client.MongoDatabase;
import org.springframework.dao.DataAccessException;
import org.springframework.dao.support.PersistenceExceptionTranslator;
import org.springframework.data.mongodb.MongoDatabaseFactory;import java.util.Map;/* @author tcm* @version 1.0.0* @description MongoDB路由抽象类* @date 2023/3/16 17:39/
public abstract class AbstractRoutingMongoSource implements MongoDatabaseFactory {private Map<String, MongoDatabaseFactory> targetMongoSources;private MongoDatabaseFactory defaultTargetMongoSource;public void setTargetMongoSources(Map<String, MongoDatabaseFactory> targetMongoSources) {this.targetMongoSources = targetMongoSources;}public void setDefaultTargetMongoSource(MongoDatabaseFactory defaultTargetMongoSource) {this.defaultTargetMongoSource = defaultTargetMongoSource;}protected MongoDatabaseFactory determineTargetMongoSource() {if (this.targetMongoSources == null) {throw new IllegalArgumentException("Property 'targetMongoSources' is required");}Object lookupKey = determineCurrentLookupKey();MongoDatabaseFactory mongoSource = this.targetMongoSources.get(lookupKey);if (mongoSource == null && lookupKey == null) {mongoSource = this.defaultTargetMongoSource;}if (mongoSource == null) {throw new IllegalStateException("Cannot determine target MongoTemplate for lookup key [" + lookupKey + "]");}return mongoSource;}protected abstract Object determineCurrentLookupKey();@Overridepublic MongoDatabase getMongoDatabase() throws DataAccessException {return determineTargetMongoSource().getMongoDatabase();}@Overridepublic MongoDatabase getMongoDatabase(String s) throws DataAccessException {return determineTargetMongoSource().getMongoDatabase(s);}@Overridepublic PersistenceExceptionTranslator getExceptionTranslator() {return null;}@Overridepublic ClientSession getSession(ClientSessionOptions clientSessionOptions) {return determineTargetMongoSource().getSession(clientSessionOptions);}@Overridepublic MongoDatabaseFactory withSession(ClientSession clientSession) {return determineTargetMongoSource().withSession(clientSession);}
}
4. 动态数据源供应DynamicMongoSourceProvider.java
package com.common.instance.demo.config.dynamicMongoSource;import org.springframework.data.mongodb.MongoDatabaseFactory;import java.util.Map;/* @author tcm* @version 1.0.0* @description Mongo动态数据源提供者接口* @date 2023/3/17 9:14/
public interface DynamicMongoSourceProvider {// 默认数据源String DEFAULT_DATASOURCE = "master";/* 加载所有的数据源* @return*/Map<String, MongoDatabaseFactory> loadMongoSources();}
package com.common.instance.demo.config.dynamicMongoSource;import com.mongodb.*;
import com.mongodb.client.MongoClient;
import com.mongodb.client.internal.MongoClientImpl;
import lombok.Data;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.mongodb.MongoDatabaseFactory;
import org.springframework.data.mongodb.core.SimpleMongoClientDatabaseFactory;import javax.annotation.Resource;
import java.util.*;
import java.util.concurrent.TimeUnit;/* @author tcm* @version 1.0.0* @description Mongo动态数据源配置* @date 2023/3/17 9:15/
@Data
@Configuration
@EnableConfigurationProperties(DynamicMongoSourceProperties.class)
public class YamlDynamicMongoSourceProvider implements DynamicMongoSourceProvider {@Resourceprivate DynamicMongoSourceProperties dynamicMongoSourceProperties;@Overridepublic Map<String, MongoDatabaseFactory> loadMongoSources() {Map<String, MongoDatabaseFactory> ds = new HashMap<>(dynamicMongoSourceProperties.getDs().size());try {Map<String, MongoSettingsProperties> map = dynamicMongoSourceProperties.getDs();Set<String> keySet = map.keySet();for (String s : keySet) {MongoSettingsProperties mongoSettingsProperties = map.get(s);ds.put(s, getMongoFactory(mongoSettingsProperties));}} catch (Exception e) {e.printStackTrace();}return ds;}private MongoDatabaseFactory getMongoFactory(MongoSettingsProperties mongoSettingsProperties) {// MongoDB地址列表List<ServerAddress> serverAddresses = new ArrayList<>();for (String host : mongoSettingsProperties.getHosts()) {int index = mongoSettingsProperties.getHosts().indexOf(host);Integer port = mongoSettingsProperties.getPorts().get(index);ServerAddress serverAddress = new ServerAddress(host, port);serverAddresses.add(serverAddress);}// 连接认证MongoCredential credential = MongoCredential.createCredential(mongoSettingsProperties.getUsername(),mongoSettingsProperties.getAuthenticationDatabase(),mongoSettingsProperties.getPassword().toCharArray());MongoDriverInformation info = MongoDriverInformation.builder().build();MongoClientSettings build = MongoClientSettings.builder().applyToClusterSettings(builder -> builder.hosts(serverAddresses)).applyToConnectionPoolSettings(builder -> builder.maxConnectionIdleTime(mongoSettingsProperties.getMaxConnectionIdleTime(), TimeUnit.MILLISECONDS).maxConnectionLifeTime(mongoSettingsProperties.getMaxConnectionLifeTime(), TimeUnit.MILLISECONDS)).credential(credential).build();// 创建客户端和FactoryMongoClient mongoClient = new MongoClientImpl(build, info);return new SimpleMongoClientDatabaseFactory(mongoClient, mongoSettingsProperties.getDatabase());}}
5. 动态数据源bean
package com.common.instance.demo.config.dynamicMongoSource;import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.data.mongodb.core.convert.DefaultDbRefResolver;
import org.springframework.data.mongodb.core.convert.DefaultMongoTypeMapper;
import org.springframework.data.mongodb.core.convert.MappingMongoConverter;
import org.springframework.data.mongodb.core.mapping.MongoMappingContext;import javax.annotation.Resource;/* @author tcm* @version 1.0.0* @description Mongo数据源配置* @date 2023/3/21 10:08/
@Configuration
public class MongoSourceBean {@Resourceprivate DynamicMongoSourceProperties dynamicMongoSourceProperties;@Resourceprivate DynamicMongoSourceProvider dynamicMongoSourceProvider;@Beanpublic MongoTemplate getMongoSource() {// 动态数据源配置DynamicMongoSource dynamicMongoSource = new DynamicMongoSource(dynamicMongoSourceProvider, dynamicMongoSourceProperties);// 插入数据时,去除class字段DefaultDbRefResolver dbRefResolver = new DefaultDbRefResolver(dynamicMongoSource);MappingMongoConverter converter = new MappingMongoConverter(dbRefResolver, new MongoMappingContext());converter.setTypeMapper(new DefaultMongoTypeMapper(null));converter.setMapKeyDotReplacement("。");return new MongoTemplate(dynamicMongoSource, converter);}}
6. 动态数据源上下文DynamicMongoSourceContextHolder.java
package com.common.instance.demo.config.dynamicMongoSource;import com.log.util.LogUtil;/* @author tcm* @version 1.0.0* @description Mongo动态数据源上下文* @date 2023/3/21 14:16/
public class DynamicMongoSourceContextHolder {/* 使用ThreadLocal维护变量,ThreadLocal为每个使用该变量的线程提供独立的变量副本,* 所以每一个线程都可以独立地改变自己的副本,而不会影响其它线程所对应的副本。*/private static final ThreadLocal<String> CONTEXT_MONGO_HOLDER = new InheritableThreadLocal<>();/* 设置数据源的变量*/public static void setMongoSourceType(String dsType) {LogUtil.info(String.format("mongo切换到%s数据源", dsType));CONTEXT_MONGO_HOLDER.set(dsType);}/* 获得数据源的变量*/public static String getMongoSourceType() {return CONTEXT_MONGO_HOLDER.get();}/* 清空数据源变量*/public static void clearMongoSourceType() {CONTEXT_MONGO_HOLDER.remove();}}
7. 动态数据源过滤器DynamicMongoSourceFilter.java
package com.common.instance.demo.config.dynamicMongoSource;import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;
import org.springframework.web.filter.OncePerRequestFilter;import javax.servlet.FilterChain;
import javax.servlet.ServletException;
import javax.servlet.annotation.WebFilter;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;/* @author tcm* @version 1.0.0* @description Mongo动态数据源过滤器* @date 2023/3/21 14:17/
@Component
@WebFilter(filterName = "dynamicMongoSourceFilter", urlPatterns = "/*")
@Order(-10)
public class DynamicMongoSourceFilter extends OncePerRequestFilter {@Overrideprotected void doFilterInternal(HttpServletRequest request, HttpServletResponse response, FilterChain filterChain) throws ServletException, IOException {// 获取测试标记String testFlag = request.getHeader("Test-Mongo-Flag");// 如有测试标记,则设置测试数据源if (testFlag != null ) {DynamicMongoSourceContextHolder.setMongoSourceType("test");}// 添加到过滤链中filterChain.doFilter(request, response);}}
8. 测试动态路由
// 路由生产mongo
curl --request POST \\--url http://127.0.0.1:9013/instance-demo/test/mongodb/listAll// 路由测试mongo
curl --request POST \\--url http://127.0.0.1:9013/instance-demo/test/mongodb/listAll \\--header 'Test-Mongo-Flag: true'
四、动态路由Redis
动态路由原理参考Mysql动态路由。自定义抽象路由工厂AbstractRoutingRedisSource,根据key路由到不同的redis数据源。把路由工厂实现类DynamicRedisSource注入到RedisTemplate中,实现路由切换。
1. 参数配置application-localDynamicRedis.yml
spring:dynamic-redis-source:
# dsKey: testds:# 主库数据源,默认master不能变master:address: redis://127.0.0.1:6379database: 1password: abcdef# 压测库数据源test:address: redis://127.0.0.1:6379database: 0password: abcdefconfig:singleServerConfig:# 如果当前连接池里的连接数量超过了最小空闲连接数,而同时有连接空闲时间超过了该数值,那么这些连接将会自动被关闭,并从连接池里去掉。时间单位是毫秒。idleConnectionTimeout: 10000pingTimeout: 1000# 同任何节点建立连接时的等待超时。时间单位是毫秒。connectTimeout: 10000# 等待节点回复命令的时间。该时间从命令发送成功时开始计时。timeout: 3000# 如果尝试达到 retryAttempts(命令失败重试次数) 仍然不能将命令发送至某个指定的节点时,将抛出错误。如果尝试在此限制之内发送成功,则开始启用 timeout(命令等待超时) 计时。retryAttempts: 3# 在一条命令发送失败以后,等待重试发送的时间间隔。时间单位是毫秒。retryInterval: 1500# 当与某个节点的连接断开时,等待与其重新建立连接的时间间隔。时间单位是毫秒。reconnectionTimeout: 3000# 在某个节点执行相同或不同命令时,连续 失败 failedAttempts(执行失败最大次数) 时,该节点将被从可用节点列表里清除,直到 reconnectionTimeout(重新连接时间间隔) 超时以后再次尝试。failedAttempts: 3# 在Redis节点里显示的客户端名称clientName: null# 发布和订阅连接的最小空闲连接数 默认1subscriptionConnectionMinimumIdleSize: 1# 发布和订阅连接池大小 默认50subscriptionConnectionPoolSize: 100# 单个连接最大订阅数量 默认5subscriptionsPerConnection: 5# 最小空闲连接数,默认值:10,最小保持连接数(长连接)connectionMinimumIdleSize: 12# 连接池最大容量。默认值:64;连接池的连接数量自动弹性伸缩connectionPoolSize: 64# 这个线程池数量被所有RTopic对象监听器,RRemoteService调用者和RExecutorService任务共同共享。# 默认值: 当前处理核数量 * 2# threads: 4## 这个线程池数量是在一个Redisson实例内,被其创建的所有分布式数据类型和服务,以及底层客户端所一同共享的线程池里保存的线程数量。## 默认值: 当前处理核数量 * 2# nettyThreads: 0# 编码 默认值: org.redisson.codec.JsonJacksonCodec Redisson的对象编码类是用于将对象进行序列化和反序列化,以实现对该对象在Redis里的读取和存储。
# codec: !<org.redisson.codec.JsonJacksonCodec> { }# 传输模式 默认值:TransportMode.NIOtransportMode: NIO
2. 加载配置参数DynamicRedisSourceProperties.java
package com.common.instance.demo.config.dynamicRedisSource;import lombok.Data;
import org.redisson.config.Config;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;import java.util.Map;/* @author TCM* @version 1.0* @description 动态加载数据源配置* @date 2023/3/25 12:40/
@Data
@Component
@ConfigurationProperties(prefix = "spring.dynamic-redis-source")
public class DynamicRedisSourceProperties {private Config config;private Map<String, Map<String, String>> ds;private String dsKey;}
3. 动态数据源DynamicRedisSource.java
自定义抽象路由工厂AbstractRoutingRedisSource,根据key路由到不同的redis数据源。把路由工厂实现类DynamicRedisSource注入到RedisTemplate中,实现路由切换。
package com.common.instance.demo.config.dynamicRedisSource;import org.redisson.spring.data.connection.RedissonConnectionFactory;import java.util.HashMap;
import java.util.Map;/* @author TCM* @version 1.0* @description Redis动态数据源* @date 2023/3/25 13:32/
public class DynamicRedisSource extends AbstractRoutingRedisSource {private DynamicRedisSourceProvider dynamicMongoSourceProvider;private DynamicRedisSourceProperties dynamicRedisSourceProperties;public DynamicRedisSource(DynamicRedisSourceProvider dynamicMongoSourceProvider, DynamicRedisSourceProperties dynamicRedisSourceProperties) {try {this.dynamicMongoSourceProvider = dynamicMongoSourceProvider;this.dynamicRedisSourceProperties = dynamicRedisSourceProperties;// 获取所有目标数据源Map<String, RedissonConnectionFactory> targetRedisSources = new HashMap<>(dynamicMongoSourceProvider.loadRedisSources());super.setTargetRedisSources(targetRedisSources);// 设置默认数据源super.setDefaultTargetRedisSource(dynamicMongoSourceProvider.loadRedisSources().get(DynamicRedisSourceProvider.DEFAULT_DATASOURCE));super.afterPropertiesSet();} catch (Exception e) {throw new RuntimeException(e);}}@Overrideprotected Object determineCurrentLookupKey() {
// return dynamicRedisSourceProperties.getDsKey() == null ? DynamicRedisSourceProvider.DEFAULT_DATASOURCE:dynamicRedisSourceProperties.getDsKey();return DynamicRedisSourceContextHolder.getRedisSourceType();}}
package com.common.instance.demo.config.dynamicRedisSource;import org.redisson.spring.data.connection.RedissonConnectionFactory;
import org.springframework.data.redis.connection.RedisClusterConnection;
import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.RedisSentinelConnection;import java.util.Map;/* @author TCM* @version 1.0* @description Redis路由抽象类* @date 2023/3/25 13:18/
public abstract class AbstractRoutingRedisSource extends RedissonConnectionFactory {private Map<String, RedissonConnectionFactory> targetRedisSources;private RedissonConnectionFactory defaultTargetRedisSource;public void setTargetRedisSources(Map<String, RedissonConnectionFactory> targetRedisSources) {this.targetRedisSources = targetRedisSources;}public void setDefaultTargetRedisSource(RedissonConnectionFactory defaultTargetRedisSource) {this.defaultTargetRedisSource = defaultTargetRedisSource;}protected RedisConnectionFactory determineTargetRedisSource() {if (this.defaultTargetRedisSource == null) {throw new IllegalArgumentException("Property 'defaultTargetRedisSource' is required");}Object lookupKey = determineCurrentLookupKey();RedissonConnectionFactory redisSource = this.targetRedisSources.get(lookupKey);if (redisSource == null && lookupKey == null) {redisSource = this.defaultTargetRedisSource;}if (redisSource == null) {throw new IllegalStateException("Cannot determine target RedissonTemplate for lookup key [" + lookupKey + "]");}return redisSource;}protected abstract Object determineCurrentLookupKey();@Overridepublic RedisConnection getConnection() {return determineTargetRedisSource().getConnection();}@Overridepublic RedisClusterConnection getClusterConnection() {return determineTargetRedisSource().getClusterConnection();}@Overridepublic RedisSentinelConnection getSentinelConnection() {return determineTargetRedisSource().getSentinelConnection();}}
4. 动态数据源供应DynamicRedisSourceProvider.java
package com.common.instance.demo.config.dynamicRedisSource;import org.redisson.spring.data.connection.RedissonConnectionFactory;import java.util.Map;/* @author tcm* @version 1.0.0* @description Redis动态数据源提供者接口* @date 2023/3/25 12:51/
public interface DynamicRedisSourceProvider {// 默认数据源String DEFAULT_DATASOURCE = "master";/* 加载所有的数据源* @return*/Map<String, RedissonConnectionFactory> loadRedisSources();}
package com.common.instance.demo.config.dynamicRedisSource;import lombok.Data;
import org.apache.logging.log4j.util.Strings;
import org.redisson.Redisson;
import org.redisson.config.Config;
import org.redisson.spring.data.connection.RedissonConnectionFactory;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Configuration;import javax.annotation.Resource;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;/* @author TCM* @version 1.0* @description Redis动态数据源配置* @date 2023/3/25 12:54/
@Data
@Configuration
@EnableConfigurationProperties(DynamicRedisSourceProperties.class)
public class YamlDynamicRedisSourceProvider implements DynamicRedisSourceProvider {@Resourceprivate DynamicRedisSourceProperties dynamicRedisSourceProperties;@Overridepublic Map<String, RedissonConnectionFactory> loadRedisSources() {Map<String, RedissonConnectionFactory> ds = new HashMap<>(dynamicRedisSourceProperties.getDs().size());try {Map<String, Map<String, String>> map = dynamicRedisSourceProperties.getDs();Set<String> keySet = map.keySet();for (String s : keySet) {ds.put(s, getRedisFactory(s));}} catch (Exception e) {e.printStackTrace();}return ds;}private RedissonConnectionFactory getRedisFactory(String key) {Config config = dynamicRedisSourceProperties.getConfig();config.useSingleServer().setAddress(dynamicRedisSourceProperties.getDs().get(key).get("address"));config.useSingleServer().setPassword(Strings.EMPTY.equals(dynamicRedisSourceProperties.getDs().get(key).get("password")) ? null:dynamicRedisSourceProperties.getDs().get(key).get("password"));config.useSingleServer().setDatabase(Integer.parseInt(dynamicRedisSourceProperties.getDs().get(key).get("database")));return new RedissonConnectionFactory(Redisson.create(config));}}
5. 动态数据源bean
package com.common.instance.demo.config.dynamicRedisSource;import com.aliyun.openservices.shade.com.alibaba.fastjson.parser.ParserConfig;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;import javax.annotation.Resource;/* @author TCM* @version 1.0* @description Redis数据源配置* @date 2023/3/25 13:40/
@Configuration
public class RedisSourceBean {@Resourceprivate DynamicRedisSourceProvider dynamicRedisSourceProvider;@Resourceprivate DynamicRedisSourceProperties dynamicRedisSourceProperties;@Bean("dynamicRedisTemplate")public RedisTemplate<String, Object> redisTemplate() {DynamicRedisSource dynamicRedisSource = new DynamicRedisSource(dynamicRedisSourceProvider, dynamicRedisSourceProperties);RedisTemplate<String, Object> template = new RedisTemplate<String, Object>();// 全局开启AutoType,不建议使用ParserConfig.getGlobalInstance().setAutoTypeSupport(false);// 建议使用这种方式,小范围指定白名单ParserConfig.getGlobalInstance().addAccept("com.common");// value 值的序列化采用 fastJsonRedisSerializerFastJson2JsonRedisSerializer fastJsonRedisSerializer = new FastJson2JsonRedisSerializer(Object.class);// key 的序列化采用 StringRedisSerializerRedisSerializer stringSerializer = new StringRedisSerializer();template.setKeySerializer(stringSerializer);template.setHashKeySerializer(stringSerializer);template.setValueSerializer(fastJsonRedisSerializer);template.setHashValueSerializer(fastJsonRedisSerializer);template.setConnectionFactory(dynamicRedisSource);template.afterPropertiesSet();return template;}}
package com.common.instance.demo.config.dynamicRedisSource;import com.alibaba.fastjson.JSON;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.data.redis.serializer.SerializationException;import java.nio.charset.Charset;/* @description 自定义redis对象序列化* @author tcm* @version 1.0.0* @date 2021/5/26 17:59/
public class FastJson2JsonRedisSerializer<T> implements RedisSerializer<T> {private static final Charset DEFAULT_CHARSET = Charset.forName("UTF-8");private Class<T> clazz;public FastJson2JsonRedisSerializer(Class<T> clazz) {super();this.clazz = clazz;}@Overridepublic byte[] serialize(T t) throws SerializationException {if (t == null) {return new byte[0];}return JSON.toJSONString(t).getBytes(DEFAULT_CHARSET);}@Overridepublic T deserialize(byte[] bytes) throws SerializationException {if (bytes == null || bytes.length <= 0) {return null;}String str = new String(bytes, DEFAULT_CHARSET);return (T) JSON.parseObject(str, clazz);}}
6. 动态数据源上下文DynamicRedisSourceContextHolder.java
package com.common.instance.demo.config.dynamicRedisSource;import com.log.util.LogUtil;/* @author TCM* @version 1.0* @description Redis动态数据源上下文* @date 2023/3/25 13:37/
public class DynamicRedisSourceContextHolder {/* 使用ThreadLocal维护变量,ThreadLocal为每个使用该变量的线程提供独立的变量副本,* 所以每一个线程都可以独立地改变自己的副本,而不会影响其它线程所对应的副本。*/private static final ThreadLocal<String> CONTEXT_REDIS_HOLDER = new InheritableThreadLocal<>();/* 设置数据源的变量*/public static void setRedisSourceType(String dsType) {LogUtil.info(String.format("redis切换到%s数据源", dsType));CONTEXT_REDIS_HOLDER.set(dsType);}/* 获得数据源的变量*/public static String getRedisSourceType() {return CONTEXT_REDIS_HOLDER.get();}/* 清空数据源变量*/public static void clearRedisSourceType() {CONTEXT_REDIS_HOLDER.remove();}}
7. 动态数据源过滤器DynamicRedisSourceFilter.java
package com.common.instance.demo.config.dynamicRedisSource;import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;
import org.springframework.web.filter.OncePerRequestFilter;import javax.servlet.FilterChain;
import javax.servlet.ServletException;
import javax.servlet.annotation.WebFilter;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;/* @author TCM* @version 1.0* @description Redis动态数据源过滤器* @date 2023/3/26 18:00/
@Component
@WebFilter(filterName = "dynamicRedisSourceFilter", urlPatterns = "/*")
@Order(-10)
public class DynamicRedisSourceFilter extends OncePerRequestFilter {@Overrideprotected void doFilterInternal(HttpServletRequest request, HttpServletResponse response, FilterChain filterChain) throws ServletException, IOException {// 获取测试标记String testFlag = request.getHeader("Test-Redis-Flag");// 如有测试标记,则设置测试数据源if (testFlag != null ) {DynamicRedisSourceContextHolder.setRedisSourceType("test");}// 添加到过滤链中filterChain.doFilter(request, response);}}
8. 测试动态路由
// 路由生产redis
curl --request GET \\--url 'http://localhost:9013/instance-demo/lua/luaScript?keys=sku10&num=78'// 路由测试redis
curl --request GET \\--url 'http://localhost:9013/instance-demo/lua/luaScript?keys=sku10&num=50' \\--header 'Test-Redis-Flag: true'
五、参考资料
https://www.cnblogs.com/shih945/p/16650481.html
https://www.cnblogs.com/heyouxin/p/15119341.html