Skip to content

Commit d377423

Browse files
author
Nicolas ESTRADA
committed
feat: added basic support for scalar array types
1 parent dd5a63b commit d377423

File tree

2 files changed

+107
-1
lines changed

2 files changed

+107
-1
lines changed

sources/pg_legacy_replication/schema_types.py

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,8 @@ def _to_dlt_column_type(type_id: int, modifier: Optional[str]) -> TColumnType:
116116
pg_type = _PG_TYPES.get(type_id)
117117
if pg_type in _MISSING_TYPES:
118118
return {"data_type": _MISSING_TYPES[pg_type]}
119+
if modifier and modifier.endswith("[]"):
120+
return {"data_type": "json"}
119121
if pg_type is None:
120122
logger.warning(
121123
"No type found for type_id '%s' and modifier '%s'", type_id, modifier
@@ -181,4 +183,30 @@ def _to_dlt_val(
181183
return data_type_handlers[data_type](raw_value)
182184

183185
raw_type = _DATUM_RAW_TYPES[datum]
186+
if _is_scalar_pg_array(data_type, raw_type, raw_value):
187+
raw_type, raw_value = "text", _pg_array_to_json_str(raw_value)
188+
184189
return coerce_value(data_type, raw_type, raw_value)
190+
191+
192+
def _is_scalar_pg_array(
193+
data_type: TDataType, raw_type: TDataType, raw_value: Any
194+
) -> bool:
195+
return (
196+
data_type == "json"
197+
and raw_type == "binary"
198+
and raw_value.startswith(b"{")
199+
and raw_value.endswith(b"}")
200+
)
201+
202+
203+
def _pg_array_to_json_str(raw_value: bytes) -> str:
204+
"""
205+
Decode the byte string to a regular string and strip the curly braces
206+
"""
207+
content = raw_value[1:-1].decode()
208+
csv = ",".join(
209+
f'"{element}"' if element.isalpha() else element
210+
for element in content.split(",")
211+
)
212+
return f"[{csv}]"

tests/pg_legacy_replication/cases.py

Lines changed: 79 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
1+
from base64 import b64encode
12
from typing import List
23

34
import pendulum
45
from dlt.common import Decimal
5-
from dlt.common.data_types.typing import DATA_TYPES
66
from dlt.common.schema import TColumnSchema, TTableSchema, TTableSchemaColumns
77
from dlt.common.typing import TDataItem
88

@@ -259,6 +259,49 @@
259259
},
260260
],
261261
},
262+
{
263+
"transactionId": 754,
264+
"commitTime": "1736873892023448",
265+
"table": "src_pl_dataset_202501140458116348.data_types",
266+
"op": "INSERT",
267+
"newTuple": [
268+
{"columnName": "bit_col", "columnType": 1560, "datumString": "1"},
269+
{
270+
"columnName": "box_col",
271+
"columnType": 603,
272+
"datumBytes": b64encode(b"(1,1),(0,0)").decode(),
273+
},
274+
{
275+
"columnName": "uuid_col",
276+
"columnType": 2950,
277+
"datumString": "6e1f5de1-1093-4bfe-98e4-62ac56b2db54",
278+
},
279+
{
280+
"columnName": "text_a",
281+
"columnType": 1009,
282+
"datumBytes": b64encode(b"{a,b}").decode(),
283+
},
284+
],
285+
"newTypeinfo": [
286+
{
287+
"modifier": "bit(1)",
288+
"valueOptional": True,
289+
},
290+
{
291+
"modifier": "box",
292+
"valueOptional": True,
293+
},
294+
{
295+
"modifier": "uuid",
296+
"valueOptional": True,
297+
},
298+
{
299+
"modifier": "text[]",
300+
"valueOptional": True,
301+
},
302+
],
303+
"oldTuple": [],
304+
},
262305
]
263306

264307
DATA_ITEMS: List[TDataItem] = [
@@ -309,6 +352,15 @@
309352
"_pg_commit_ts": pendulum.parse("2024-10-19T00:56:23.354856+00:00"),
310353
"_pg_tx_id": 932,
311354
},
355+
{
356+
"bit_col": "1",
357+
"box_col": "KDEsMSksKDAsMCk=",
358+
"uuid_col": "6e1f5de1-1093-4bfe-98e4-62ac56b2db54",
359+
"text_a": ["a", "b"],
360+
"_pg_lsn": 1,
361+
"_pg_commit_ts": pendulum.parse("2025-01-14T16:58:12.023448+00:00"),
362+
"_pg_tx_id": 754,
363+
},
312364
]
313365

314366
TABLE_SCHEMAS: List[TTableSchema] = [
@@ -417,4 +469,30 @@
417469
},
418470
},
419471
},
472+
{
473+
"name": "data_types",
474+
"columns": {
475+
"bit_col": {"data_type": "text", "name": "bit_col", "nullable": True},
476+
"box_col": {"data_type": "text", "name": "box_col", "nullable": True},
477+
"uuid_col": {"data_type": "text", "name": "uuid_col", "nullable": True},
478+
"text_a": {"data_type": "json", "name": "text_a", "nullable": True},
479+
"_pg_lsn": {"data_type": "bigint", "name": "_pg_lsn", "nullable": True},
480+
"_pg_deleted_ts": {
481+
"data_type": "timestamp",
482+
"name": "_pg_deleted_ts",
483+
"nullable": True,
484+
},
485+
"_pg_commit_ts": {
486+
"data_type": "timestamp",
487+
"name": "_pg_commit_ts",
488+
"nullable": True,
489+
},
490+
"_pg_tx_id": {
491+
"data_type": "bigint",
492+
"name": "_pg_tx_id",
493+
"nullable": True,
494+
"precision": 32,
495+
},
496+
},
497+
},
420498
]

0 commit comments

Comments
 (0)