Skip to content

Commit

Permalink
Handle timestamp incremental replication (#89)
Browse files Browse the repository at this point in the history
* Handle timestamp (nothing to do with datetime -- SQL Server rowversion) incremental replication

* Release 2.6.4 prep
  • Loading branch information
mjsqu authored Oct 24, 2024
1 parent fde353a commit f9a59ba
Show file tree
Hide file tree
Showing 6 changed files with 108 additions and 14 deletions.
2 changes: 1 addition & 1 deletion .bumpversion.cfg
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[bumpversion]
current_version = 2.6.3
current_version = 2.6.4
parse = (?P<major>\d+)
\.(?P<minor>\d+)
\.(?P<patch>\d+)
Expand Down
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
# tap-mssql 2.6.4 2024-10-24
* Update to handle `timestamp` (not a datetime value, a [deprecated](https://learn.microsoft.com/en-us/sql/t-sql/data-types/rowversion-transact-sql?view=sql-server-ver16#remarks) synonym of internal `rowversion`) as string
* Add tests for incremental syncing using a `timestamp` column as `replication-key`

# tap-mssql 2.6.3 2024-10-17
* Updating CDC documentation with a packaged method to maintain CDC tables.

Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "tap-mssql"
version = "2.6.3"
version = "2.6.4"
description = "A pipelinewise compatible tap for connecting Microsoft SQL Server"
authors = ["Rob Winters <wintersrd@gmail.com>"]
license = "GNU Affero"
Expand Down
6 changes: 5 additions & 1 deletion tap_mssql/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@

DECIMAL_TYPES = set(["decimal", "number", "money", "smallmoney", "numeric"])

DATETIME_TYPES = set(["datetime2", "datetime", "datetimeoffset", "timestamp", "smalldatetime"])
DATETIME_TYPES = set(["datetime2", "datetime", "datetimeoffset", "smalldatetime"])

DATE_TYPES = set(["date"])

Expand Down Expand Up @@ -101,6 +101,10 @@ def schema_for_column(c, config):

if data_type == "bit":
result.type = ["null", "boolean"]

elif data_type in ["timestamp", "rowversion"]:
result.type = ["null", "string"]
result.format = "rowversion"

elif data_type in BYTES_FOR_INTEGER_TYPE:
result.type = ["null", "integer"]
Expand Down
14 changes: 11 additions & 3 deletions tap_mssql/sync_strategies/incremental.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,19 @@ def sync_table(mssql_conn, config, catalog_entry, state, columns):
replication_key_value = datetime.fromtimestamp(
pendulum.parse(replication_key_value).timestamp()
)
# Handle timestamp incremental (timestamp)
if catalog_entry.schema.properties[replication_key_metadata].format == 'rowversion':
select_sql += """ WHERE CAST("{}" AS BIGINT) >=
convert(bigint, convert (varbinary(8), '0x{}', 1))
ORDER BY "{}" ASC""".format(
replication_key_metadata, replication_key_value, replication_key_metadata
)

else:
select_sql += ' WHERE "{}" >= %(replication_key_value)s ORDER BY "{}" ASC'.format(
replication_key_metadata, replication_key_metadata
)

select_sql += ' WHERE "{}" >= %(replication_key_value)s ORDER BY "{}" ASC'.format(
replication_key_metadata, replication_key_metadata
)

params["replication_key_value"] = replication_key_value
elif replication_key_metadata is not None:
Expand Down
94 changes: 86 additions & 8 deletions tests/test_tap_mssql.py
Original file line number Diff line number Diff line change
Expand Up @@ -561,15 +561,15 @@ def test_with_no_state(self):

(message_types, versions) = message_types_and_versions(SINGER_MESSAGES)

self.assertEqual(
[
"ActivateVersionMessage",
"RecordMessage",
],
sorted(list(set(message_types))),
)

self.assertTrue(isinstance(versions[0], int))
self.assertEqual(versions[0], versions[1])
record_messages = [message for message in SINGER_MESSAGES if isinstance(message,singer.RecordMessage)]
incremental_record_messages = [m for m in record_messages if m.stream == 'dbo-incremental']
integer_incremental_record_messages = [m for m in record_messages if m.stream == 'dbo-integer_incremental']

self.assertEqual(len(incremental_record_messages),3)
self.assertEqual(len(integer_incremental_record_messages),3)

def test_with_state(self):
state = {
Expand Down Expand Up @@ -602,7 +602,14 @@ def test_with_state(self):
)
self.assertTrue(isinstance(versions[0], int))
self.assertEqual(versions[0], versions[1])
self.assertEqual(versions[1], 12345)

# Based on state values provided check the number of record messages emitted
record_messages = [message for message in SINGER_MESSAGES if isinstance(message,singer.RecordMessage)]
incremental_record_messages = [m for m in record_messages if m.stream == 'dbo-incremental']
integer_incremental_record_messages = [m for m in record_messages if m.stream == 'dbo-integer_incremental']

self.assertEqual(len(incremental_record_messages),2)
self.assertEqual(len(integer_incremental_record_messages),1)


class TestViews(unittest.TestCase):
Expand Down Expand Up @@ -650,6 +657,76 @@ def test_do_not_discover_key_properties_for_view(self):

self.assertEqual(primary_keys, {"a_table": ["id"], "a_view": []})

class TestTimestampIncrementalReplication(unittest.TestCase):
def setUp(self):
self.conn = test_utils.get_test_connection()

with connect_with_backoff(self.conn) as open_conn:
with open_conn.cursor() as cursor:
try:
cursor.execute("drop table incremental")
except:
pass
cursor.execute("CREATE TABLE incremental (val int, updated timestamp)")
cursor.execute("INSERT INTO incremental (val) VALUES (1)") #00000000000007d1
cursor.execute("INSERT INTO incremental (val) VALUES (2)") #00000000000007d2
cursor.execute("INSERT INTO incremental (val) VALUES (3)") #00000000000007d3

self.catalog = test_utils.discover_catalog(self.conn, {})

for stream in self.catalog.streams:
stream.metadata = [
{
"breadcrumb": (),
"metadata": {
"selected": True,
"table-key-properties": [],
"database-name": "dbo",
},
},
{"breadcrumb": ("properties", "val"), "metadata": {"selected": True}},
]

stream.stream = stream.table
test_utils.set_replication_method_and_key(stream, "INCREMENTAL", "updated")

def test_with_no_state(self):
state = {}

global SINGER_MESSAGES
SINGER_MESSAGES.clear()

tap_mssql.do_sync(self.conn, test_utils.get_db_config(), self.catalog, state)

(message_types, versions) = message_types_and_versions(SINGER_MESSAGES)

record_messages = [message for message in SINGER_MESSAGES if isinstance(message,singer.RecordMessage)]

self.assertEqual(len(record_messages),3)


def test_with_state(self):
state = {
"bookmarks": {
"dbo-incremental": {
"version": 1,
"replication_key_value": '00000000000007d2',
"replication_key": "updated",
},
}
}

global SINGER_MESSAGES
SINGER_MESSAGES.clear()
tap_mssql.do_sync(self.conn, test_utils.get_db_config(), self.catalog, state)

(message_types, versions) = message_types_and_versions(SINGER_MESSAGES)

# Given the state value supplied, there should only be two RECORD messages
record_messages = [message for message in SINGER_MESSAGES if isinstance(message,singer.RecordMessage)]

self.assertEqual(len(record_messages),2)

class TestPrimaryKeyUniqueKey(unittest.TestCase):
def setUp(self):
self.conn = test_utils.get_test_connection()
Expand Down Expand Up @@ -708,6 +785,7 @@ def test_only_primary_key(self):
self.assertEqual(primary_keys["pk_only_table"], ["pk"])
self.assertEqual(primary_keys["pk_uc_table"], ["pk"])


if __name__ == "__main__":
# test1 = TestBinlogReplication()
# test1.setUp()
Expand Down

0 comments on commit f9a59ba

Please sign in to comment.