【Spring手写动态分表】异构数据库双写
背景
有日志数据提供到一个kafka数据队列,要求将数据按月份动态分表(如table_2022_10),同步写入到前置库(MySQL)及业务库(Postgres)中,本次不涉及查询/修改/删除
方案
大致选型了三种方案:
选型 | 优点 | 缺点 |
---|---|---|
ShardingSphere | 开发引用简单,建表分表规则容易设计 | 异构数据库同时使用会抛出异常,无法使用 |
ShardingSphere-Proxy/Mycat/pg-pool | 直接部署数据库分库分表中间件,免去开发 | 简单需求引入过多服务,没有必要 |
手写动态分表 | 逻辑清晰简单 | 适用于简单的分表场景 |
实现步骤
结合简单的场景,最终选择了手写动态分表
POM
核心是引入动态数据源,保证可切换
<parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.7.2</version><relativePath/> <!-- lookup parent from repository --></parent><groupId>com.wi.demo</groupId><artifactId>kafka-consumer</artifactId><version>1.0</version><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><exclusions><exclusion><groupId>org.junit.vintage</groupId><artifactId>junit-vintage-engine</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka-test</artifactId><scope>test</scope></dependency><!-- https://mvnrepository.com/artifact/org.postgresql/postgresql --><dependency><groupId>org.postgresql</groupId><artifactId>postgresql</artifactId><version>42.2.2</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.24</version></dependency><!-- 数据库连接池 --><dependency><groupId>com.alibaba</groupId><artifactId>druid</artifactId><version>1.1.13</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>druid-spring-boot-starter</artifactId><version>1.1.13</version></dependency><dependency><groupId>com.baomidou</groupId><artifactId>mybatis-plus-boot-starter</artifactId><version>3.4.2</version></dependency><dependency><groupId>com.baomidou</groupId><artifactId>dynamic-datasource-spring-boot-starter</artifactId><version>3.5.0</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency><dependency><groupId>org.apache.commons</groupId><artifactId>commons-lang3</artifactId><version>3.9</version><scope>test</scope></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build>
数据库配置
- 数据库A:postgres
- 数据库B:mysql
spring:datasource:dynamic:# 指定默认数据源primary: a# true:找不到数据源报错,false:找不到数据源则使用数据源strict: falsedatasource:a:driver-class-name: org.postgresql.Driverurl: jdbc:postgresql://localhost:5432/test?characterEncoding=utf8&useSSL=false&serverTimezone=Asia/Shanghai&allowMultiQueries=trueusername: postgrespassword: 123456b:driver-class-name: com.mysql.jdbc.Driverurl: jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=utf8&useSSL=falseusername: rootpassword: 123456druid:minIdle: 1initialSize: 1maxActive: 5maxWait: 30000
实体类Log
此处表名为虚拟表名
@Data
@TableName("log")
public class Log implements Serializable {private static final long serialVersionUID = 1L;@TableId(type = IdType.AUTO)private Long id;@TableField("val")private Double val;@TableField("time")private String time; // 格式:yyyyMMddHHmmss}
Mapper类
- 由于有两种数据库,所以建表语句格式不同,需要分成两种方法
- 动态分表时,插入语句的实际表名会变化,所以需要动态传入,即insert方法也要重新构造
- 由于不涉及查询/修改/删除,不需要实现额外sql
@Mapper
public interface LogMapper extends BaseMapper<Log> {// pg格式的建表DDL,传入实际动态表名@Insert("CREATE TABLE IF NOT EXISTS \\"${realTable}\\" (" +" \\"id\\" serial8, " +" \\"val\\" float4, " +" \\"time\\" varchar(20), " +" PRIMARY KEY (\\"id\\") " +");" +"COMMENT ON COLUMN \\"${realTable}\\".\\"id\\" IS '主键';" +"COMMENT ON COLUMN \\"${realTable}\\".\\"val\\" IS '数值'; " +"COMMENT ON COLUMN \\"${realTable}\\".\\"time\\" IS '定位时间yyyyMMddHHmmss'; " +"COMMENT ON TABLE \\"${realTable}\\" IS '动态日志表(消息队列)';")public void createTableA(@Param("realTable") String realTable);// mysql格式的建表DDL,传入实际动态表名@Insert("CREATE TABLE IF NOT EXISTS `${realTable}` ( " +" `id` bigint(0) NOT NULL AUTO_INCREMENT COMMENT '主键', " +" `val` float(24, 6) NULL COMMENT '数值', " +" `time` varchar(20) COMMENT '定位时间yyyyMMddHHmmss', " +" PRIMARY KEY (`id`) " +") " +"COMMENT = '动态日志表(消息队列)';")public void createTableB(@Param("realTable") String realTable);// 重写插入语句@Insert("insert into ${realTable} (val, time) values (#{record.val}, #{record.time}) ")public void insertLog(@Param("realTable") String realTable, @Param("record") Log log);
}
辅助工具类
// 数据库枚举类,方便扩展
public enum ShardingDatabase {A("a"),B("b");private ShardingDatabase(String dbName) {this.dbName = dbName;}private String dbName;public String getDbName() {return dbName;}public void setDbName(String dbName) {this.dbName = dbName;}
}// 数据表枚举类,方便扩展
public enum ShardingTable {LOG("log");private ShardingTable(String logicName) {this.logicName = logicName;}private String logicName;public String getLogicName() {return logicName;}public void setLogicName(String logicName) {this.logicName = logicName;}
}// 分表工具类
public final class ShardingUtil {// 实际表名缓存,减少实时判断public static Map<ShardingDatabase, Set<String>> activeTables = new HashMap<>();static {activeTables.put(ShardingDatabase.A, new HashSet<>());activeTables.put(ShardingDatabase.B, new HashSet<>());}public static boolean hasExist(ShardingDatabase db, String realTableName) {return activeTables.get(db).contains(realTableName);}public static void addActiveTable(ShardingDatabase db, String realTableName) {Set<String> set = activeTables.get(db);set.add(realTableName);activeTables.put(db, set);}/*** 根据时间字段获取表名*/public static String getTableName(ShardingTable table, String time) {// 逻辑表名, 如:trans_recordfinal String logicTableName = table.getLogicName();// 组装获得实际表名, 如:trans_record_2022_5final String year = time.substring(0, 4);final String index = time.substring(4, 6);return logicTableName + "_" + year + "_" + index ;}}
Service类
@Service
public class LogService {@Autowiredprivate LogMapper logMapper;// 插入A库的log动态表@DS("a")public void insertA(Log log) {String name = ShardingUtil.getTableName(ShardingTable.LOG, log.getTime());if (!ShardingUtil.hasExist(ShardingDatabase.A, name)) { // 缓存判断logMapper.createTableA(name);ShardingUtil.addActiveTable(ShardingDatabase.A, name);}logMapper.insertLog(name, log);}@DS("b")public void insertB(Log log) {String name = ShardingUtil.getTableName(ShardingTable.LOG, log.getTime());if (!ShardingUtil.hasExist(ShardingDatabase.B, name)) {logMapper.createTableB(name);ShardingUtil.addActiveTable(ShardingDatabase.B, name);}logMapper.insertLog(name, log);}
}
测试
随机写入2条数据,可以在数据库中看到新增的表和数据
/*** 测试增加随机时间的数据,会自动创建表*/@Testvoid randomAdd() {final Random random = new SecureRandom();for (int i= 0; i < 2; i++) {Log log = new Log();log.setVal(3.6);log.setTime(// 随机年(random.nextInt(100) + 2000) +// 随机月StringUtils.leftPad((random.nextInt(12) + 1) + "", 2, '0') +// 随机日StringUtils.leftPad((random.nextInt(30) + 1) + "", 2, '0') + "115959");logService.insertA(log);logService.insertB(log);}}