Last active
February 26, 2021 15:52
-
-
Save JLDLaughlin/653e4d3a98c7a2fef4b7ed7c26b20163 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresOffsetContext.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresOffsetContext.java | |
index 15c2af574..a7b76aebd 100644 | |
--- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresOffsetContext.java | |
+++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresOffsetContext.java | |
@@ -52,7 +52,7 @@ private PostgresOffsetContext(PostgresConnectorConfig connectorConfig, Lsn lsn, | |
this.lastCompletelyProcessedLsn = lastCompletelyProcessedLsn; | |
this.lastCommitLsn = lastCommitLsn; | |
- sourceInfo.update(lsn, time, txId, null, sourceInfo.xmin()); | |
+ sourceInfo.update(lsn, time, txId, null, sourceInfo.xmin(), lastCommitLsn); | |
sourceInfoSchema = sourceInfo.schema(); | |
this.lastSnapshotRecord = lastSnapshotRecord; | |
@@ -141,7 +141,7 @@ public void updateWalPosition(Lsn lsn, Lsn lastCompletelyProcessedLsn, Instant c | |
public void updateCommitPosition(Lsn lsn, Lsn lastCompletelyProcessedLsn, Instant commitTime, Long txId, TableId tableId, Long xmin) { | |
this.lastCompletelyProcessedLsn = lastCompletelyProcessedLsn; | |
this.lastCommitLsn = lastCompletelyProcessedLsn; | |
- sourceInfo.update(lsn, commitTime, txId, tableId, xmin); | |
+ sourceInfo.update(lsn, commitTime, txId, tableId, xmin, lastCompletelyProcessedLsn); | |
} | |
boolean hasLastKnownPosition() { | |
diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresSourceInfoStructMaker.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresSourceInfoStructMaker.java | |
index 6f0d075f7..294e42039 100644 | |
--- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresSourceInfoStructMaker.java | |
+++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresSourceInfoStructMaker.java | |
@@ -10,6 +10,7 @@ | |
import io.debezium.config.CommonConnectorConfig; | |
import io.debezium.connector.AbstractSourceInfoStructMaker; | |
+import io.debezium.connector.postgresql.connection.Lsn; | |
public class PostgresSourceInfoStructMaker extends AbstractSourceInfoStructMaker<SourceInfo> { | |
@@ -23,6 +24,7 @@ public PostgresSourceInfoStructMaker(String connector, String version, CommonCon | |
.field(SourceInfo.TABLE_NAME_KEY, Schema.STRING_SCHEMA) | |
.field(SourceInfo.TXID_KEY, Schema.OPTIONAL_INT64_SCHEMA) | |
.field(SourceInfo.LSN_KEY, Schema.OPTIONAL_INT64_SCHEMA) | |
+ .field(SourceInfo.SEQUENCE_KEY, Schema.OPTIONAL_STRING_SCHEMA) | |
.field(SourceInfo.XMIN_KEY, Schema.OPTIONAL_INT64_SCHEMA) | |
.build(); | |
} | |
@@ -47,6 +49,17 @@ public Struct struct(SourceInfo sourceInfo) { | |
if (sourceInfo.lsn() != null) { | |
result.put(SourceInfo.LSN_KEY, sourceInfo.lsn().asLong()); | |
} | |
+ if (sourceInfo.sequence() != null) { | |
+ StringBuilder sequence = new StringBuilder("["); | |
+ for (Lsn lsn : sourceInfo.sequence()) { | |
+ if (lsn != null) { | |
+ sequence.append(lsn.asLong()); | |
+ sequence.append(","); | |
+ } | |
+ } | |
+ sequence.append("]"); | |
+ result.put(SourceInfo.SEQUENCE_KEY, sequence.toString()); | |
+ } | |
if (sourceInfo.xmin() != null) { | |
result.put(SourceInfo.XMIN_KEY, sourceInfo.xmin()); | |
} | |
diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/SourceInfo.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/SourceInfo.java | |
index 30b19a50e..7e0d7da5f 100644 | |
--- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/SourceInfo.java | |
+++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/SourceInfo.java | |
@@ -75,11 +75,13 @@ | |
public static final String TXID_KEY = "txId"; | |
public static final String XMIN_KEY = "xmin"; | |
public static final String LSN_KEY = "lsn"; | |
+ public static final String SEQUENCE_KEY = "sequence"; | |
public static final String LAST_SNAPSHOT_RECORD_KEY = "last_snapshot_record"; | |
private final String dbName; | |
private Lsn lsn; | |
+ private Lsn[] sequence; | |
private Long txId; | |
private Long xmin; | |
private Instant timestamp; | |
@@ -103,6 +105,12 @@ protected SourceInfo(PostgresConnectorConfig connectorConfig) { | |
* @param xmin the xmin of the slot, may be null | |
* @return this instance | |
*/ | |
+ protected SourceInfo update(Lsn lsn, Instant commitTime, Long txId, TableId tableId, Long xmin, Lsn lastCommitLsn) { | |
+ update(lsn, commitTime, txId, tableId, xmin); | |
+ this.sequence = new Lsn[]{ lastCommitLsn, lsn }; | |
+ return this; | |
+ } | |
+ | |
protected SourceInfo update(Lsn lsn, Instant commitTime, Long txId, TableId tableId, Long xmin) { | |
this.lsn = lsn; | |
if (commitTime != null) { | |
@@ -138,6 +146,10 @@ public Long xmin() { | |
return this.xmin; | |
} | |
+ public Lsn[] sequence() { | |
+ return this.sequence; | |
+ } | |
+ | |
@Override | |
protected String database() { | |
return dbName; | |
@@ -179,6 +191,9 @@ public String toString() { | |
if (xmin != null) { | |
sb.append(", xmin=").append(xmin); | |
} | |
+ if (sequence != null) { | |
+ sb.append(", sequence=").append(sequence); | |
+ } | |
if (timestamp != null) { | |
sb.append(", timestamp=").append(timestamp); | |
} | |
diff --git a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresConnectorIT.java b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresConnectorIT.java | |
index 9b799e3ac..8b702b552 100644 | |
--- a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresConnectorIT.java | |
+++ b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresConnectorIT.java | |
@@ -2466,6 +2466,71 @@ public void shouldProduceMessagesOnlyForConfiguredTables() throws Exception { | |
stopConnector(); | |
} | |
+ private List<Long> getSequence(SourceRecord record) { | |
+ assertTrue(record.value() instanceof Struct); | |
+ Struct source = ((Struct) record.value()).getStruct("source"); | |
+ String stringSequence = source.getString("sequence"); | |
+ List<Long> sequence = new ArrayList<>(); | |
+ assertEquals('[', stringSequence.charAt(0)); | |
+ assertEquals(']', stringSequence.charAt(stringSequence.length() - 1)); | |
+ for (String s : stringSequence.substring(1, stringSequence.length() - 1).split(",")) { | |
+ sequence.add(Long.parseLong(s.trim())); | |
+ } | |
+ return sequence; | |
+ } | |
+ | |
+ @Test | |
+ public void shouldHaveLastCommitLsn() throws InterruptedException { | |
+ TestHelper.execute(SETUP_TABLES_STMT); | |
+ start(PostgresConnector.class, TestHelper.defaultConfig() | |
+ .with(CommonConnectorConfig.SOURCE_STRUCT_MAKER_VERSION, Version.V2) | |
+ .with(PostgresConnectorConfig.PROVIDE_TRANSACTION_METADATA, true) | |
+ .with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NEVER.getValue()) | |
+ .build()); | |
+ assertConnectorIsRunning(); | |
+ | |
+ waitForAvailableRecords(100, TimeUnit.MILLISECONDS); | |
+ assertNoRecordsToConsume(); | |
+ | |
+ final int n_inserts = 3; | |
+ for (int i = 0; i < n_inserts; ++i) { | |
+ TestHelper.execute(INSERT_STMT); | |
+ } | |
+ | |
+ List<SourceRecord> records = new ArrayList<>(); | |
+ Awaitility.await("Skip empty transactions and find the data").atMost(Duration.ofSeconds(TestHelper.waitTimeForRecords() * 3)).until(() -> { | |
+ int n_transactions = 0; | |
+ while (n_transactions < n_inserts) { | |
+ final List<SourceRecord> candidate = consumeRecordsByTopic(2).allRecordsInOrder(); | |
+ if (candidate.get(1).topic().contains("transaction")) { | |
+ // empty transaction, should be skipped | |
+ continue; | |
+ } | |
+ records.addAll(candidate); | |
+ records.addAll(consumeRecordsByTopic(2).allRecordsInOrder()); | |
+ ++n_transactions; | |
+ } | |
+ return true; | |
+ }); | |
+ | |
+ assertEquals(4 * n_inserts, records.size()); | |
+ List<Long> second_transaction_sequence = getSequence(records.get(5)); | |
+ assertEquals(second_transaction_sequence.size(), 2); | |
+ long second_last_commit_lsn = second_transaction_sequence.get(0); | |
+ assertEquals(second_last_commit_lsn, getSequence(records.get(6)).get(0).longValue()); | |
+ | |
+ List<Long> third_transaction_sequence = getSequence(records.get(9)); | |
+ assertEquals(third_transaction_sequence.size(), 2); | |
+ long third_last_commit_lsn = third_transaction_sequence.get(0); | |
+ assertEquals(third_last_commit_lsn, getSequence(records.get(10)).get(0).longValue()); | |
+ | |
+ assertTrue(second_last_commit_lsn < third_last_commit_lsn); | |
+ | |
+ // now stop the connector | |
+ stopConnector(); | |
+ assertNoRecordsToConsume(); | |
+ } | |
+ | |
private CompletableFuture<Void> batchInsertRecords(long recordsCount, int batchSize) { | |
String insertStmt = "INSERT INTO text_table(j, jb, x, u) " + | |
"VALUES ('{\"bar\": \"baz\"}'::json, '{\"bar\": \"baz\"}'::jsonb, " + | |
diff --git a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/SourceInfoTest.java b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/SourceInfoTest.java | |
index b791bdf09..d956777b7 100644 | |
--- a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/SourceInfoTest.java | |
+++ b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/SourceInfoTest.java | |
@@ -72,6 +72,7 @@ public void schemaIsCorrect() { | |
.field("table", Schema.STRING_SCHEMA) | |
.field("txId", Schema.OPTIONAL_INT64_SCHEMA) | |
.field("lsn", Schema.OPTIONAL_INT64_SCHEMA) | |
+ .field("sequence", Schema.OPTIONAL_STRING_SCHEMA) | |
.field("xmin", Schema.OPTIONAL_INT64_SCHEMA) | |
.build(); | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
I think we should actually write
null
in the output if one of the inputs is null, rather than just skipping it.Other than that this looks fine, let's submit it to the DBZ folks and see what they think.