【Spring手写动态分表】异构数据库双写
这篇文章介绍了如何在异构数据库中实现日志数据的动态分表写入。作者考虑了三种方案:ShardingSphere、数据库中间件和手写动态分表,最终选择了手写动态分表以实现逻辑清晰的解决方案。
首先,作者通过引入动态数据源确保数据库连接的灵活性,配置了MySQL和PostgreSQL两种数据库的数据源。然后,创建了实体类Log,定义了表的结构。接着,在Mapper类中分别实现了PostgreSQL和MySQL的建表语句,根据时间字段动态生成表名,并重写了插入语句以支持动态表名。
为了方便管理和扩展,作者还创建了辅助工具类,包括数据库枚举类、数据表枚举类以及分表工具类。这些工具类帮助生成、判断和缓存实际表名,简化了动态分表的实现。最后,通过Service类实现了对两个数据库的动态插入操作,确保了数据的一致性和正确性。
通过这个实现,作者成功地将日志数据按月份动态分表写入到两个异构数据库中,解决了跨数据库分表的问题,同时保持了代码的简洁和逻辑的清晰。
背景
有日志数据提供到一个kafka数据队列,要求将数据按月份动态分表(如table_2022_10),同步写入到前置库(MySQL)及业务库(Postgres)中,本次不涉及查询/修改/删除
方案
大致选型了三种方案:
选型 | 优点 | 缺点 |
---|---|---|
ShardingSphere | 开发引用简单,建表分表规则容易设计 | 异构数据库同时使用会抛出异常,无法使用 |
ShardingSphere-Proxy/Mycat/pg-pool | 直接部署数据库分库分表中间件,免去开发 | 简单需求引入过多服务,没有必要 |
手写动态分表 | 逻辑清晰简单 | 适用于简单的分表场景 |
实现步骤
结合简单的场景,最终选择了手写动态分表
POM
核心是引入动态数据源,保证可切换
<parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.7.2</version><relativePath/> <!-- lookup parent from repository --></parent><groupId>com.wi.demo</groupId><artifactId>kafka-consumer</artifactId><version>1.0</version><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><exclusions><exclusion><groupId>org.junit.vintage</groupId><artifactId>junit-vintage-engine</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka-test</artifactId><scope>test</scope></dependency><!-- https://mvnrepository.com/artifact/org.postgresql/postgresql --><dependency><groupId>org.postgresql</groupId><artifactId>postgresql</artifactId><version>42.2.2</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.24</version></dependency><!-- 数据库连接池 --><dependency><groupId>com.alibaba</groupId><artifactId>druid</artifactId><version>1.1.13</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>druid-spring-boot-starter</artifactId><version>1.1.13</version></dependency><dependency><groupId>com.baomidou</groupId><artifactId>mybatis-plus-boot-starter</artifactId><version>3.4.2</version></dependency><dependency><groupId>com.baomidou</groupId><artifactId>dynamic-datasource-spring-boot-starter</artifactId><version>3.5.0</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency><dependency><groupId>org.apache.commons</groupId><artifactId>commons-lang3</artifactId><version>3.9</version><scope>test</scope></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build>
数据库配置
- 数据库A:postgres
- 数据库B:mysql
spring:datasource:dynamic:# 指定默认数据源primary: a# true:找不到数据源报错,false:找不到数据源则使用数据源strict: falsedatasource:a:driver-class-name: org.postgresql.Driverurl: jdbc:postgresql://localhost:5432/test?characterEncoding=utf8&useSSL=false&serverTimezone=Asia/Shanghai&allowMultiQueries=trueusername: postgrespassword: 123456b:driver-class-name: com.mysql.jdbc.Driverurl: jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=utf8&useSSL=falseusername: rootpassword: 123456druid:minIdle: 1initialSize: 1maxActive: 5maxWait: 30000
实体类Log
此处表名为虚拟表名
@Data
@TableName("log")
public class Log implements Serializable {private static final long serialVersionUID = 1L;@TableId(type = IdType.AUTO)private Long id;@TableField("val")private Double val;@TableField("time")private String time; // 格式:yyyyMMddHHmmss}
Mapper类
- 由于有两种数据库,所以建表语句格式不同,需要分成两种方法
- 动态分表时,插入语句的实际表名会变化,所以需要动态传入,即insert方法也要重新构造
- 由于不涉及查询/修改/删除,不需要实现额外sql
@Mapper
public interface LogMapper extends BaseMapper<Log> {// pg格式的建表DDL,传入实际动态表名@Insert("CREATE TABLE IF NOT EXISTS \\"${realTable}\\" (" +" \\"id\\" serial8, " +" \\"val\\" float4, " +" \\"time\\" varchar(20), " +" PRIMARY KEY (\\"id\\") " +");" +"COMMENT ON COLUMN \\"${realTable}\\".\\"id\\" IS '主键';" +"COMMENT ON COLUMN \\"${realTable}\\".\\"val\\" IS '数值'; " +"COMMENT ON COLUMN \\"${realTable}\\".\\"time\\" IS '定位时间yyyyMMddHHmmss'; " +"COMMENT ON TABLE \\"${realTable}\\" IS '动态日志表(消息队列)';")public void createTableA(@Param("realTable") String realTable);// mysql格式的建表DDL,传入实际动态表名@Insert("CREATE TABLE IF NOT EXISTS `${realTable}` ( " +" `id` bigint(0) NOT NULL AUTO_INCREMENT COMMENT '主键', " +" `val` float(24, 6) NULL COMMENT '数值', " +" `time` varchar(20) COMMENT '定位时间yyyyMMddHHmmss', " +" PRIMARY KEY (`id`) " +") " +"COMMENT = '动态日志表(消息队列)';")public void createTableB(@Param("realTable") String realTable);// 重写插入语句@Insert("insert into ${realTable} (val, time) values (#{record.val}, #{record.time}) ")public void insertLog(@Param("realTable") String realTable, @Param("record") Log log);
}
辅助工具类
// 数据库枚举类,方便扩展
public enum ShardingDatabase {A("a"),B("b");private ShardingDatabase(String dbName) {this.dbName = dbName;}private String dbName;public String getDbName() {return dbName;}public void setDbName(String dbName) {this.dbName = dbName;}
}// 数据表枚举类,方便扩展
public enum ShardingTable {LOG("log");private ShardingTable(String logicName) {this.logicName = logicName;}private String logicName;public String getLogicName() {return logicName;}public void setLogicName(String logicName) {this.logicName = logicName;}
}// 分表工具类
public final class ShardingUtil {// 实际表名缓存,减少实时判断public static Map<ShardingDatabase, Set<String>> activeTables = new HashMap<>();static {activeTables.put(ShardingDatabase.A, new HashSet<>());activeTables.put(ShardingDatabase.B, new HashSet<>());}public static boolean hasExist(ShardingDatabase db, String realTableName) {return activeTables.get(db).contains(realTableName);}public static void addActiveTable(ShardingDatabase db, String realTableName) {Set<String> set = activeTables.get(db);set.add(realTableName);activeTables.put(db, set);}/* 根据时间字段获取表名*/public static String getTableName(ShardingTable table, String time) {// 逻辑表名, 如:trans_recordfinal String logicTableName = table.getLogicName();// 组装获得实际表名, 如:trans_record_2022_5final String year = time.substring(0, 4);final String index = time.substring(4, 6);return logicTableName + "_" + year + "_" + index ;}}
Service类
@Service
public class LogService {@Autowiredprivate LogMapper logMapper;// 插入A库的log动态表@DS("a")public void insertA(Log log) {String name = ShardingUtil.getTableName(ShardingTable.LOG, log.getTime());if (!ShardingUtil.hasExist(ShardingDatabase.A, name)) { // 缓存判断logMapper.createTableA(name);ShardingUtil.addActiveTable(ShardingDatabase.A, name);}logMapper.insertLog(name, log);}@DS("b")public void insertB(Log log) {String name = ShardingUtil.getTableName(ShardingTable.LOG, log.getTime());