> 文章列表 > 【Spring手写动态分表】异构数据库双写

【Spring手写动态分表】异构数据库双写

【Spring手写动态分表】异构数据库双写

背景

有日志数据提供到一个kafka数据队列,要求将数据按月份动态分表(如table_2022_10),同步写入到前置库(MySQL)及业务库(Postgres)中,本次不涉及查询/修改/删除

方案

大致选型了三种方案:

选型 优点 缺点
ShardingSphere 开发引用简单,建表分表规则容易设计 异构数据库同时使用会抛出异常,无法使用
ShardingSphere-Proxy/Mycat/pg-pool 直接部署数据库分库分表中间件,免去开发 简单需求引入过多服务,没有必要
手写动态分表 逻辑清晰简单 适用于简单的分表场景

实现步骤

结合简单的场景,最终选择了手写动态分表

POM

核心是引入动态数据源,保证可切换

<parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.7.2</version><relativePath/> <!-- lookup parent from repository --></parent><groupId>com.wi.demo</groupId><artifactId>kafka-consumer</artifactId><version>1.0</version><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><exclusions><exclusion><groupId>org.junit.vintage</groupId><artifactId>junit-vintage-engine</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka-test</artifactId><scope>test</scope></dependency><!-- https://mvnrepository.com/artifact/org.postgresql/postgresql --><dependency><groupId>org.postgresql</groupId><artifactId>postgresql</artifactId><version>42.2.2</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.24</version></dependency><!-- 数据库连接池 --><dependency><groupId>com.alibaba</groupId><artifactId>druid</artifactId><version>1.1.13</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>druid-spring-boot-starter</artifactId><version>1.1.13</version></dependency><dependency><groupId>com.baomidou</groupId><artifactId>mybatis-plus-boot-starter</artifactId><version>3.4.2</version></dependency><dependency><groupId>com.baomidou</groupId><artifactId>dynamic-datasource-spring-boot-starter</artifactId><version>3.5.0</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency><dependency><groupId>org.apache.commons</groupId><artifactId>commons-lang3</artifactId><version>3.9</version><scope>test</scope></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build>

数据库配置

  • 数据库A:postgres
  • 数据库B:mysql
spring:datasource:dynamic:# 指定默认数据源primary: a# true:找不到数据源报错,false:找不到数据源则使用数据源strict: falsedatasource:a:driver-class-name: org.postgresql.Driverurl: jdbc:postgresql://localhost:5432/test?characterEncoding=utf8&useSSL=false&serverTimezone=Asia/Shanghai&allowMultiQueries=trueusername: postgrespassword: 123456b:driver-class-name: com.mysql.jdbc.Driverurl: jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=utf8&useSSL=falseusername: rootpassword: 123456druid:minIdle: 1initialSize: 1maxActive: 5maxWait: 30000

实体类Log

此处表名为虚拟表名

@Data
@TableName("log")
public class Log implements Serializable {private static final long serialVersionUID = 1L;@TableId(type = IdType.AUTO)private Long id;@TableField("val")private Double val;@TableField("time")private String time; // 格式:yyyyMMddHHmmss}

Mapper类

  • 由于有两种数据库,所以建表语句格式不同,需要分成两种方法
  • 动态分表时,插入语句的实际表名会变化,所以需要动态传入,即insert方法也要重新构造
  • 由于不涉及查询/修改/删除,不需要实现额外sql
@Mapper
public interface LogMapper extends BaseMapper<Log> {// pg格式的建表DDL,传入实际动态表名@Insert("CREATE TABLE IF NOT EXISTS \\"${realTable}\\"  (" +"  \\"id\\" serial8, " +"  \\"val\\" float4, " +"  \\"time\\" varchar(20), "  +"  PRIMARY KEY (\\"id\\") " +");" +"COMMENT ON COLUMN  \\"${realTable}\\".\\"id\\" IS '主键';" +"COMMENT ON COLUMN  \\"${realTable}\\".\\"val\\" IS '数值'; " +"COMMENT ON COLUMN  \\"${realTable}\\".\\"time\\" IS '定位时间yyyyMMddHHmmss'; " +"COMMENT ON TABLE  \\"${realTable}\\" IS '动态日志表(消息队列)';")public void createTableA(@Param("realTable") String realTable);// mysql格式的建表DDL,传入实际动态表名@Insert("CREATE TABLE IF NOT EXISTS `${realTable}`  (    " +"  `id` bigint(0) NOT NULL AUTO_INCREMENT COMMENT '主键',    " +"  `val` float(24, 6) NULL COMMENT '数值',    " +"  `time` varchar(20) COMMENT '定位时间yyyyMMddHHmmss',    " +"  PRIMARY KEY (`id`)    " +")    " +"COMMENT = '动态日志表(消息队列)';")public void createTableB(@Param("realTable") String realTable);// 重写插入语句@Insert("insert into ${realTable} (val, time) values (#{record.val}, #{record.time}) ")public void insertLog(@Param("realTable") String realTable, @Param("record") Log log);
}

辅助工具类

// 数据库枚举类,方便扩展
public enum ShardingDatabase {A("a"),B("b");private ShardingDatabase(String dbName) {this.dbName = dbName;}private String dbName;public String getDbName() {return dbName;}public void setDbName(String dbName) {this.dbName = dbName;}
}// 数据表枚举类,方便扩展
public enum ShardingTable {LOG("log");private ShardingTable(String logicName) {this.logicName = logicName;}private String logicName;public String getLogicName() {return logicName;}public void setLogicName(String logicName) {this.logicName = logicName;}
}// 分表工具类
public final class ShardingUtil {// 实际表名缓存,减少实时判断public static Map<ShardingDatabase, Set<String>> activeTables = new HashMap<>();static {activeTables.put(ShardingDatabase.A, new HashSet<>());activeTables.put(ShardingDatabase.B, new HashSet<>());}public static boolean hasExist(ShardingDatabase db, String realTableName) {return activeTables.get(db).contains(realTableName);}public static void addActiveTable(ShardingDatabase db, String realTableName) {Set<String> set = activeTables.get(db);set.add(realTableName);activeTables.put(db, set);}/*** 根据时间字段获取表名*/public static String getTableName(ShardingTable table, String time) {// 逻辑表名, 如:trans_recordfinal String logicTableName = table.getLogicName();// 组装获得实际表名, 如:trans_record_2022_5final String year = time.substring(0, 4);final String index = time.substring(4, 6);return logicTableName + "_" + year + "_" + index ;}}

Service类

@Service
public class LogService {@Autowiredprivate LogMapper logMapper;// 插入A库的log动态表@DS("a")public void insertA(Log log) {String name = ShardingUtil.getTableName(ShardingTable.LOG, log.getTime());if (!ShardingUtil.hasExist(ShardingDatabase.A, name)) { // 缓存判断logMapper.createTableA(name);ShardingUtil.addActiveTable(ShardingDatabase.A, name);}logMapper.insertLog(name, log);}@DS("b")public void insertB(Log log) {String name = ShardingUtil.getTableName(ShardingTable.LOG, log.getTime());if (!ShardingUtil.hasExist(ShardingDatabase.B, name)) {logMapper.createTableB(name);ShardingUtil.addActiveTable(ShardingDatabase.B, name);}logMapper.insertLog(name, log);}
}

测试

随机写入2条数据,可以在数据库中看到新增的表和数据

 /*** 测试增加随机时间的数据,会自动创建表*/@Testvoid randomAdd() {final Random random = new SecureRandom();for (int i= 0; i < 2; i++) {Log log = new Log();log.setVal(3.6);log.setTime(// 随机年(random.nextInt(100) + 2000) +// 随机月StringUtils.leftPad((random.nextInt(12) + 1) + "", 2, '0') +// 随机日StringUtils.leftPad((random.nextInt(30) + 1) + "", 2, '0') + "115959");logService.insertA(log);logService.insertB(log);}}