> 文章列表 > ORACLE LOGMINER DEBEZIUM

ORACLE LOGMINER DEBEZIUM

ORACLE LOGMINER DEBEZIUM

附录: debezium

https://kgithub.com/debezium/debezium

Debezium Connector for Oracle :: Debezium Documentation

oracle logminer

DBMS_LOGMNR

V$LOGMNR_SESSION

logminer 捕获流程

1、构建数据字典,(可跳过)

execute dbms_logmnr_d.build(options=>dbms_logmnr_d.store_in_redo_logs);

2、添加日志文件

EXECUTE DBMS_LOGMNR.ADD_LOGFILE(LOGFILENAME => '/arch/redo4.log’, OPTIONS => DBMS_LOGMNR.NEW);

3、启动logminer

ECECUTE SYS.DBMS_LOGMNR_START_LOGMNR(OPTIONS=>DBMS_LOGMNR.DICT_FROM_ONLINE_CATALOG + DBMS_LOGMNR.NO_ROWID_IN_STMT)

4、操作数据库blablabla.....

5、查询

SELECT SCN, SQL_REDO, OPERATION_CODE, TIMESTAMP, XID, CSF, TABLE_NAME, SEG_OWNER, OPERATION from V$LOGMNR_CONTENTS;

debezium 源码相关logminer 

debezium-connector-oracle

1、构建数据字典

io.debezium.connector.oracle.logminer.SqlUtils.BUILD_DICTIONARY

static final String BUILD_DICTIONARY = "BEGIN DBMS_LOGMNR_D.BUILD (options => DBMS_LOGMNR_D.STORE_IN_REDO_LOGS); END;";

2、添加日志文件

io.debezium.connector.oracle.logminer.SqlUtils.addLogFileStatement(String, String)

static String addLogFileStatement(String option, String fileName) {return "BEGIN sys.dbms_logmnr.add_logfile(LOGFILENAME => '" + fileName + "', OPTIONS => " + option + ");END;";}

3、启动logminer

io.debezium.connector.oracle.logminer.SqlUtils.startLogMinerStatement(Scn, Scn, LogMiningStrategy, boolean)

// * LogMiner methods */* This returns statement to build LogMiner view for online redo log files* @param startScn mine from* @param endScn mine till* @param strategy Log Mining strategy* @return statement todo: handle corruption. STATUS (Double) — value of 0 indicates it is executable*/static String startLogMinerStatement(Scn startScn, Scn endScn, OracleConnectorConfig.LogMiningStrategy strategy, boolean isContinuousMining) {String miningStrategy;if (strategy.equals(OracleConnectorConfig.LogMiningStrategy.CATALOG_IN_REDO)) {miningStrategy = "DBMS_LOGMNR.DICT_FROM_REDO_LOGS + DBMS_LOGMNR.DDL_DICT_TRACKING ";}else {miningStrategy = "DBMS_LOGMNR.DICT_FROM_ONLINE_CATALOG ";}if (isContinuousMining) {miningStrategy += " + DBMS_LOGMNR.CONTINUOUS_MINE ";}return "BEGIN sys.dbms_logmnr.start_logmnr(" +"startScn => '" + startScn + "', " +"endScn => '" + endScn + "', " +"OPTIONS => " + miningStrategy +" + DBMS_LOGMNR.NO_ROWID_IN_STMT);" +"END;";}

5、查询解析结果

io.debezium.connector.oracle.logminer.LogMinerQueryBuilder.build(OracleConnectorConfig, OracleDatabaseSchema, String)

debezium 查询v$LOGMNR_CONTENTS时,添加了一些过滤条件,例如表、SCN、OPERATION_CODE等

/* Builds the LogMiner contents view query. The returned query will contain 2 bind parameters that the caller is responsible for binding before* executing the query.  The first bind parameter is the lower-bounds of the SCN mining window that is* not-inclusive while the second is the upper-bounds of the SCN mining window that is inclusive. The built query relies on the following columns from V$LOGMNR_CONTENTS:* <pre>*     SCN - the system change number at which the change was made*     SQL_REDO - the reconstructed SQL statement that initiated the change*     OPERATION - the database operation type name*     OPERATION_CODE - the database operation numeric code*     TIMESTAMP - the time when the change event occurred*     XID - the transaction identifier the change participated in*     CSF - the continuation flag, identifies rows that should be processed together as single row, 0=no, 1=yes*     TABLE_NAME - the name of the table for which the change is for*     SEG_OWNER - the name of the schema for which the change is for*     USERNAME - the name of the database user that caused the change*     ROW_ID - the unique identifier of the row that the change is for, may not always be set with valid value*     ROLLBACK - the rollback flag, value of 0 or 1.  1 implies the row was rolled back*     RS_ID - the rollback segment idenifier where the change record was record from* </pre> @param connectorConfig connector configuration, should not be {@code null}* @param schema database schema, should not be {@code null}* @param userName jdbc connection username* @return the SQL string to be used to fetch changes from Oracle LogMiner*/public static String build(OracleConnectorConfig connectorConfig, OracleDatabaseSchema schema, String userName) {final StringBuilder query = new StringBuilder(1024);query.append("SELECT SCN, SQL_REDO, OPERATION_CODE, TIMESTAMP, XID, CSF, TABLE_NAME, SEG_OWNER, OPERATION, ");query.append("USERNAME, ROW_ID, ROLLBACK, RS_ID ");query.append("FROM ").append(LOGMNR_CONTENTS_VIEW).append(" ");// These bind parameters will be bound when the query is executed by the caller.query.append("WHERE SCN > ? AND SCN <= ? ");// Restrict to configured PDB if one is suppliedfinal String pdbName = connectorConfig.getPdbName();if (!Strings.isNullOrEmpty(pdbName)) {query.append("AND ").append("SRC_CON_NAME = '").append(pdbName.toUpperCase()).append("' ");}query.append("AND (");// Always include START, COMMIT, MISSING_SCN, and ROLLBACK operationsquery.append("(OPERATION_CODE IN (6,7,34,36)");if (!schema.storeOnlyCapturedTables()) {// In this mode, the connector will always be fed DDL operations for all tables even if they// are not part of the inclusion/exclusion lists.query.append(" OR ").append(buildDdlPredicate(userName)).append(" ");// Insert, Update, Delete, SelectLob, LobWrite, LobTrim, and LobEraseif (connectorConfig.isLobEnabled()) {query.append(") OR (OPERATION_CODE IN (1,2,3,9,10,11,29) ");}else {query.append(") OR (OPERATION_CODE IN (1,2,3) ");}}else {// Insert, Update, Delete, SelectLob, LobWrite, LobTrim, and LobEraseif (connectorConfig.isLobEnabled()) {query.append(") OR ((OPERATION_CODE IN (1,2,3,9,10,11,29) ");}else {query.append(") OR ((OPERATION_CODE IN (1,2,3) ");}// In this mode, the connector will filter DDL operations based on the table inclusion/exclusion listsquery.append("OR ").append(buildDdlPredicate(userName)).append(") ");}// Always ignore the flush tablequery.append("AND TABLE_NAME != '").append(SqlUtils.LOGMNR_FLUSH_TABLE).append("' ");// There are some common schemas that we automatically ignore when building the runtime Filter// predicates and we put that same list of schemas here and apply those in the generated SQL.if (!OracleConnectorConfig.EXCLUDED_SCHEMAS.isEmpty()) {query.append("AND SEG_OWNER NOT IN (");for (Iterator<String> i = OracleConnectorConfig.EXCLUDED_SCHEMAS.iterator(); i.hasNext();) {String excludedSchema = i.next();query.append("'").append(excludedSchema.toUpperCase()).append("'");if (i.hasNext()) {query.append(",");}}query.append(") ");}String schemaPredicate = buildSchemaPredicate(connectorConfig);if (!Strings.isNullOrEmpty(schemaPredicate)) {query.append("AND ").append(schemaPredicate).append(" ");}String tablePredicate = buildTablePredicate(connectorConfig);if (!Strings.isNullOrEmpty(tablePredicate)) {query.append("AND ").append(tablePredicate).append(" ");}query.append("))");return query.toString();}