> 文章列表 > 记一次中大规模数据库迁移过程,从MySql到PostgreSQL.

记一次中大规模数据库迁移过程,从MySql到PostgreSQL.

记一次中大规模数据库迁移过程,从MySql到PostgreSQL.

从MySql到PostgreSQL迁移的决策过程就不说了。我也是第一次用PostgreSQL,也没法说好不好。决策已经定了,下面介绍一下执行过程。

一、数据基本情况

服务器:4核CPU,8G内存,1T硬盘,8Mbit网速。

数据库:MySql-5.5-community,数据量492GB,包含索引、日志。

由于服务器硬盘容量已不足300GB,没有办法在服务器上同时运行MySql和PostgreSQL完成迁移,所以只在本地运行PostgreSQL,并将数据先迁移到本地。

二、采用通用代码迁移。

因为熟悉,决定采用Java迁移。(为了减少工作量,选择站在巨人的肩膀上。)搜索到了这么一篇文章:自己动手写一个Mysql到PostgreSQL数据库迁移工具,看起来不错,拷贝到本地,稍做适配、改进,对主键为整形的数据表,采用增量方式进行迁移,代码如下:

package springDemo;import java.sql.SQLException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.IntStream;import javax.sql.DataSource;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.util.Assert;import com.zaxxer.hikari.HikariDataSource;public class DataTableMigration{private static Logger LOG = LoggerFactory.getLogger(DataTableMigration.class);private final JdbcTemplate targetJdbc;private final JdbcTemplate sourceJdbc;private final String tableName;private final String primaryKey;private final String[] columnNamesInSourceDB;private final String[] columnNamesInTargetDB;private final Map<String, String> columnMappings;public DataTableMigration(DataSource sourceDataSource, String tableName, DataSource targetDataSource) throws SQLException {this.tableName = tableName.toLowerCase();if (sourceDataSource instanceof HikariDataSource) {HikariDataSource hikariDataSource = (HikariDataSource) sourceDataSource;hikariDataSource.setMaxLifetime(86400000); // 设置为24小时hikariDataSource.setConnectionTimeout(600000);hikariDataSource.setReadOnly(true);}if (targetDataSource instanceof HikariDataSource) {HikariDataSource hikariDataSource = (HikariDataSource) targetDataSource;hikariDataSource.setMaxLifetime(86400000); // 设置为24小时hikariDataSource.setConnectionTimeout(600000);}this.sourceJdbc = new JdbcTemplate(sourceDataSource);this.targetJdbc = new JdbcTemplate(targetDataSource);System.out.println(sourceDataSource);System.out.println(targetDataSource);this.primaryKey = MigrationUtils.getPrimaryKeyByTableName(sourceDataSource.getConnection(), this.tableName);this.columnNamesInSourceDB = MigrationUtils.getColumnsByTableName(sourceDataSource.getConnection(), this.tableName);Assert.isTrue(this.columnNamesInSourceDB != null && this.columnNamesInSourceDB.length > 0,"can't find column infor from source db for the table " + this.tableName);this.columnNamesInTargetDB = MigrationUtils.getColumnsByTableName(targetDataSource.getConnection(), this.tableName);Assert.isTrue(this.columnNamesInTargetDB != null && this.columnNamesInTargetDB.length > 0,"can't find column infor from target db for the table " + this.tableName);this.columnMappings = new HashMap<>();}protected JdbcTemplate getSourceJdbc() {return this.sourceJdbc;}protected JdbcTemplate getTargetJdbc() {return this.targetJdbc;}protected List<Map<String, Object>> queryForList(String querySql, long offset, long stepLength) {return getSourceJdbc().queryForList(querySql, offset, stepLength);}private Object[] rowToParam(Map<String, Object> row) {return Arrays.stream(columnNamesInTargetDB).map(colInSource -> columnMappings.getOrDefault(colInSource, colInSource)).map(row::get).toArray();}protected String getInsertSQL() {return String.format("insert into %s (%s) values(%s) ",this.tableName,String.join(",", columnNamesInTargetDB),IntStream.range(0, columnNamesInTargetDB.length).mapToObj(n -> "?").collect(Collectors.joining(",")));}protected String getInsertSQLOnCconflict() {return String.format("insert into %s (%s) values(%s) ON CONFLICT (%s) DO NOTHING",this.tableName,String.join(",", columnNamesInTargetDB),IntStream.range(0, columnNamesInTargetDB.length).mapToObj(n -> "?").collect(Collectors.joining(",")),this.primaryKey);}protected int getStepLength() {return 1000000;}protected long getSourceMaxIndex() {long count = getSourceJdbc().queryForObject("select max(" + primaryKey + ") from " + tableName, Long.class);return count;}protected long getTargetMaxIndex() {long count = getTargetJdbc().queryForObject("select count(1) from " + tableName, Long.class);if (count > 0)count = getTargetJdbc().queryForObject("select max(" + primaryKey + ") from " + tableName, Long.class);elsecount = getSourceJdbc().queryForObject("select min(" + primaryKey + ") from " + tableName, Long.class) - 1;return count;}public void migrateIntegerIndexTable() throws Exception {LOG.info("start to migrate data from source db to target db");String sql = String.format("select %s from %s where %s > ? order by %s asc limit ?;",String.join(",", columnNamesInSourceDB), this.tableName, this.primaryKey, this.primaryKey);long maxRecords = getSourceMaxIndex();long stepLength = getStepLength();for (long offset = getTargetMaxIndex(); offset < maxRecords; offset = getTargetMaxIndex()) {List<Map<String, Object>> rows = queryForList(sql, offset, stepLength);LOG.info("get records From source");getTargetJdbc().batchUpdate(getInsertSQL(),rows.stream().map(this::rowToParam).collect(Collectors.toList()));LOG.info("moved {} records", offset);}}public void migrateIntegerIndexTableJust1Line(long id) throws Exception {LOG.info("start to migrate data from source db to target db");String sql = String.format("select %s from %s where %s = ? limit ?;",String.join(",", columnNamesInSourceDB), this.tableName, this.primaryKey);List<Map<String, Object>> rows = queryForList(sql, id, 1);LOG.info("get records From source");getTargetJdbc().batchUpdate(getInsertSQL(),rows.stream().map(this::rowToParam).collect(Collectors.toList()));LOG.info("moved {} record", id);}//	从原库获取总数量。protected int getSourceTotalRecords() {int count = getSourceJdbc().queryForObject("select count(1) from " + tableName, Integer.class);LOG.info("source db has {} records", count);return count;}
//	从目标库获取已经存储的数量。protected int getTargetTotalRecords() {int count = getTargetJdbc().queryForObject("select count(1) from " + tableName, Integer.class);LOG.info("target db has {} records", count);return count;}public void migrateStringIndexTable() throws SQLException {LOG.info("start to migrate data from source db to target db");String sql = String.format("select %s from %s order by %s asc limit ?, ?;",String.join(",", columnNamesInSourceDB), this.tableName, this.primaryKey);int maxRecords = getSourceTotalRecords();int stepLength = getStepLength();for (int offset = 0; offset < maxRecords; offset = offset + stepLength) {List<Map<String, Object>> rows = queryForList(sql, offset, stepLength);LOG.info("get records From source, " + rows.size());getTargetJdbc().batchUpdate(getInsertSQLOnCconflict(),rows.stream().map(this::rowToParam).collect(Collectors.toList()));LOG.info("moved {} records", offset);}    	}public void close() {try {if (sourceJdbc != null) {sourceJdbc.getDataSource().getConnection().close();}if (targetJdbc != null) {targetJdbc.getDataSource().getConnection().close();}} catch (SQLException e) {LOG.error("Error closing database connection", e);}}public static void main(String[] args) {LOG.atInfo();Config cf = new Config();System.setProperty("spring.jdbc.getParameterType.ignore","true");try {DataTableMigration dtmStr = new DataTableMigration(cf.sourceDataSource(), "target", cf.targetDataSource());dtmStr.migrateStringIndexTable();dtmStr.close();String[] tableNames = { "dailyexchange", "movingavg", "stats" };for (int i = 0; i < tableNames.length; i++) {DataTableMigration dtmInt = new DataTableMigration(cf.sourceDataSource(), tableNames[i], cf.targetDataSource());dtmInt.migrateIntegerIndexTable();dtmInt.close();}//			DataTableMigration dtmInt = new DataTableMigration(cf.sourceDataSource(), "min1", cf.targetDataSource());
//			dtmInt.migrateIntegerIndexTable();
//			dtmInt.close();} catch (SQLException e) {e.printStackTrace();} catch (Exception e) {e.printStackTrace();}}
}

开始几个数据表,由于规模较小只有几百万行,几个小时就迁移完成。下面开始迁移最大的数据表min1,有34亿行。这个速度就无法接受了。考虑到每次通讯会耗费时间,所以尽量加大每批次传输量。调整每批次迁移数量到100万行后(最大是1048576),稍微提高了传输速度,达到10分钟每百万行。如下:

HikariDataSource (null)
HikariDataSource (null)
2023-04-12T07:31:49.370+08:00  INFO   --- [           main] com.zaxxer.hikari.HikariDataSource       : HikariPool-1 - Starting...
2023-04-12T07:31:50.701+08:00  INFO   --- [           main] com.zaxxer.hikari.pool.HikariPool        : HikariPool-1 - Added connection com.mysql.cj.jdbc.ConnectionImpl@3cce5371
2023-04-12T07:31:50.704+08:00  INFO   --- [           main] com.zaxxer.hikari.HikariDataSource       : HikariPool-1 - Start completed.
2023-04-12T07:31:51.056+08:00  INFO   --- [           main] com.zaxxer.hikari.HikariDataSource       : HikariPool-2 - Starting...
2023-04-12T07:31:51.148+08:00  INFO   --- [           main] com.zaxxer.hikari.pool.HikariPool        : HikariPool-2 - Added connection org.postgresql.jdbc.PgConnection@19b93fa8
2023-04-12T07:31:51.148+08:00  INFO   --- [           main] com.zaxxer.hikari.HikariDataSource       : HikariPool-2 - Start completed.
2023-04-12T07:31:51.164+08:00  INFO   --- [           main] springDemo.DataTableMigration            : start to migrate data from source db to target db
2023-04-12T07:40:24.912+08:00  WARN   --- [           main] com.zaxxer.hikari.pool.PoolBase          : HikariPool-1 - Failed to validate connection com.mysql.cj.jdbc.ConnectionImpl@3016fd5e (No operations allowed after connection closed.). Possibly consider using a shorter maxLifetime value.
2023-04-12T07:40:29.923+08:00  WARN   --- [           main] com.zaxxer.hikari.pool.PoolBase          : HikariPool-1 - Failed to validate connection com.mysql.cj.jdbc.ConnectionImpl@6c45ee6e (No operations allowed after connection closed.). Possibly consider using a shorter maxLifetime value.
2023-04-12T07:40:34.928+08:00  WARN   --- [           main] com.zaxxer.hikari.pool.PoolBase          : HikariPool-1 - Failed to validate connection com.mysql.cj.jdbc.ConnectionImpl@6b3e12b5 (No operations allowed after connection closed.). Possibly consider using a shorter maxLifetime value.
2023-04-12T07:40:39.933+08:00  WARN   --- [           main] com.zaxxer.hikari.pool.PoolBase          : HikariPool-1 - Failed to validate connection com.mysql.cj.jdbc.ConnectionImpl@5aac4250 (No operations allowed after connection closed.). Possibly consider using a shorter maxLifetime value.
2023-04-12T07:40:44.936+08:00  WARN   --- [           main] com.zaxxer.hikari.pool.PoolBase          : HikariPool-1 - Failed to validate connection com.mysql.cj.jdbc.ConnectionImpl@1338fb5 (No operations allowed after connection closed.). Possibly consider using a shorter maxLifetime value.
2023-04-12T07:40:49.938+08:00  WARN   --- [           main] com.zaxxer.hikari.pool.PoolBase          : HikariPool-1 - Failed to validate connection com.mysql.cj.jdbc.ConnectionImpl@42463763 (No operations allowed after connection closed.). Possibly consider using a shorter maxLifetime value.
2023-04-12T07:40:54.941+08:00  WARN   --- [           main] com.zaxxer.hikari.pool.PoolBase          : HikariPool-1 - Failed to validate connection com.mysql.cj.jdbc.ConnectionImpl@59f63e24 (No operations allowed after connection closed.). Possibly consider using a shorter maxLifetime value.
2023-04-12T07:40:59.947+08:00  WARN   --- [           main] com.zaxxer.hikari.pool.PoolBase          : HikariPool-1 - Failed to validate connection com.mysql.cj.jdbc.ConnectionImpl@7ca33c24 (No operations allowed after connection closed.). Possibly consider using a shorter maxLifetime value.
2023-04-12T07:41:20.733+08:00  INFO   --- [           main] springDemo.DataTableMigration            : get records From source
2023-04-12T07:41:33.743+08:00  INFO   --- [           main] springDemo.DataTableMigration            : moved 2990509187 records

 以这个速度传输完34亿行数据大概需要24天(是的吧(34亿/100万)*10分钟/1440分钟),仍然无法接受。参考相关文章(找不见了),了解到采用通用代码迁移数据,将会有大量时间用于构建List<Map<String, Object>>映射。

三、编写专用代码迁移。

想偷懒采用通用代码,对大表看来不太行。所以没有办法,不得不编写专门的迁移代码,鸣谢:ChatGTP。代码具体如下:

package pack;import java.sql.*;
import java.time.LocalDate;
import java.time.LocalTime;
import java.io.InputStream;
import java.util.Properties;import org.apache.log4j.Logger;
import org.apache.log4j.PropertyConfigurator;public class MysqlToPostgres {/** 要通过批量插入的方式将MySQL数据库的数据表迁移到PostgreSQL数据库中,你需要基于JDBC技术开发一个Java程序。* * 以下是迁移过程中的步骤:* * 1. 使用JDBC连接MySQL数据库和PostgreSQL数据库。* * 2. 从MySQL数据库中读取要迁移的数据表。* * 3. 将MySQL数据表中的数据批量读取出来。* * 4. 将数据批量插入到PostgreSQL数据库中的相应数据表中。* */private static final Logger logger = Logger.getLogger(Min1.class);private Connection mysqlConn = null;private Connection pgConn = null;public static void main(String args[]) {System.out.println(System.getProperty("java.class.path"));PropertyConfigurator.configure("log4j.properties");MysqlToPostgres M2P = new MysqlToPostgres();M2P.init();long flag = M2P.getTargetMaxIndex();long end = M2P.getSourceMaxIndex();logger.info("source line count:" + end);for (; flag < end; flag = M2P.getTargetMaxIndex()) {logger.info("target line count:" + flag);M2P.migrate(flag);
//			break;}M2P.uninit();}public void init() {Properties props = new Properties();InputStream input = null;try {String filename = "consts.properties";input = MysqlToPostgres.class.getClassLoader().getResourceAsStream(filename);if (input == null) {System.out.println("Sorry, unable to find " + filename);return;}// load the properties file// get the property value and print it outprops.load(input);String sourceIP = props.getProperty("sourceIP");String targetIP = props.getProperty("targetIP");String username = props.getProperty("DBUserName");String password = props.getProperty("DBPassword");System.out.println(getMinute() + " " + username);// 连接MySQL数据库Class.forName("com.mysql.jdbc.Driver");mysqlConn = DriverManager.getConnection("jdbc:mysql://" + sourceIP + "/cf_stock?useCompression=true", username, password);// 连接PostgreSQL数据库Class.forName("org.postgresql.Driver");pgConn = DriverManager.getConnection("jdbc:postgresql://" + targetIP + "/cf_stock", username, password);} catch (Exception e) {e.printStackTrace();}}protected long getSourceMaxIndex() {long count = 0;Statement mysqlStmt = null;try {mysqlStmt = mysqlConn.createStatement();// 批量读取MySQL数据表中的数据ResultSet mysqlRs = mysqlStmt.executeQuery("select max(recordID) from min1;");if (mysqlRs.next()) {count = mysqlRs.getLong("max(recordID)");}mysqlStmt.close();} catch (Exception e) {e.printStackTrace();}return count;}protected long getTargetMaxIndex() {long count = 0;Statement pgStmt = null;try {pgStmt = pgConn.createStatement();// 批量读取MySQL数据表中的数据ResultSet pgRs = pgStmt.executeQuery("select max(recordID) from min1;");if (pgRs.next()) {count = pgRs.getLong(1);}pgStmt.close();} catch (Exception e) {e.printStackTrace();}return count;}public void migrate(long flag) {PreparedStatement pgStmt = null;PreparedStatement mysqlStmt = null;try {String sql = "INSERT INTO min1 "+ "(recordID, dayRecordID, targetID, date, minute, "+ "open, high, low, close, average, shareVolume, moneyVolume, openInterest) "+ "VALUES (?,?,?,?,?, ?,?,?,?, ?,?,?,?) "; pgStmt = pgConn.prepareStatement(sql);// 批量读取MySQL数据表中的数据String mysqlSql = "select * from min1 where recordID > ? order by recordID asc limit 1000000;";mysqlStmt = mysqlConn.prepareStatement(mysqlSql);mysqlStmt.setLong(1, flag);ResultSet mysqlRs = mysqlStmt.executeQuery();logger.info(getMinute()+" get records from mysql.");int i = 0;while (mysqlRs.next()) {Min1 m1 = new Min1(mysqlRs);// 将数据批量插入到PostgreSQL数据库中pgStmt.setLong		(1, m1.recordID);pgStmt.setLong		(2, m1.dayRecordID);pgStmt.setString	(3, m1.targetID);pgStmt.setDate		(4, m1.date);pgStmt.setShort		(5, m1.minute);pgStmt.setFloat	(6, m1.open);pgStmt.setFloat	(7, m1.high);pgStmt.setFloat	(8, m1.low);pgStmt.setFloat	(9, m1.close);pgStmt.setFloat	(10, m1.average);pgStmt.setLong	(11, m1.shareVolume);pgStmt.setLong	(12, m1.moneyVolume);pgStmt.setLong	(13, m1.openInterest);pgStmt.addBatch();i++;if (i % 500000 == 0) {System.out.println(i);}}// 提交批量插入logger.info(getMinute() + " combine all sql into a batch.");pgStmt.executeBatch();logger.info(getMinute() + " after excute batch.");pgStmt.clearBatch();mysqlRs.close();mysqlStmt.close();pgStmt.close();} catch (Exception e) {e.printStackTrace();}}public void uninit() {try {mysqlConn.close();pgConn.close();} catch (Exception e) {e.printStackTrace();}}public String getMinute() {LocalTime now = LocalTime.now();return "" + now.getHour() + ":" + now.getMinute() + ":" + now.getSecond();}
}

运行起来效果还可以,大概2分钟迁移100万行,如此算来大概需要5天:

[main] INFO pack.Min1 - source line count:3474392405
[main] INFO pack.Min1 - target line count:2991509187
[main] INFO pack.Min1 - 7:44:14 get records from mysql.
500000
1000000
[main] INFO pack.Min1 - 7:44:15 combine all sql into a batch.
[main] INFO pack.Min1 - 7:44:29 after excute batch.
[main] INFO pack.Min1 - target line count:2992509187
[main] INFO pack.Min1 - 7:45:54 get records from mysql.
500000
1000000
[main] INFO pack.Min1 - 7:45:56 combine all sql into a batch.
[main] INFO pack.Min1 - 7:46:10 after excute batch.
[main] INFO pack.Min1 - target line count:2993509187

 完。