Skip to content

Commit 72202ee

Browse files
authored
feat(concurrent): [ISSUE #10550] do not raise when record does not have cursor value in… (#96)
1 parent c90eea1 commit 72202ee

File tree

2 files changed

+43
-10
lines changed

2 files changed

+43
-10
lines changed

airbyte_cdk/sources/streams/concurrent/cursor.py

Lines changed: 29 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,18 @@
55
import functools
66
import logging
77
from abc import ABC, abstractmethod
8-
from typing import Any, Callable, Iterable, List, Mapping, MutableMapping, Optional, Protocol, Tuple
8+
from typing import (
9+
Any,
10+
Callable,
11+
Iterable,
12+
List,
13+
Mapping,
14+
MutableMapping,
15+
Optional,
16+
Protocol,
17+
Tuple,
18+
Union,
19+
)
920

1021
from airbyte_cdk.sources.connector_state_manager import ConnectorStateManager
1122
from airbyte_cdk.sources.message import MessageRepository
@@ -175,7 +186,9 @@ def __init__(
175186
self.start, self._concurrent_state = self._get_concurrent_state(stream_state)
176187
self._lookback_window = lookback_window
177188
self._slice_range = slice_range
178-
self._most_recent_cursor_value_per_partition: MutableMapping[StreamSlice, Any] = {}
189+
self._most_recent_cursor_value_per_partition: MutableMapping[
190+
Union[StreamSlice, Mapping[str, Any], None], Any
191+
] = {}
179192
self._has_closed_at_least_one_slice = False
180193
self._cursor_granularity = cursor_granularity
181194
# Flag to track if the logger has been triggered (per stream)
@@ -216,10 +229,13 @@ def observe(self, record: Record) -> None:
216229
most_recent_cursor_value = self._most_recent_cursor_value_per_partition.get(
217230
record.associated_slice
218231
)
219-
cursor_value = self._extract_cursor_value(record)
232+
try:
233+
cursor_value = self._extract_cursor_value(record)
220234

221-
if most_recent_cursor_value is None or most_recent_cursor_value < cursor_value:
222-
self._most_recent_cursor_value_per_partition[record.associated_slice] = cursor_value
235+
if most_recent_cursor_value is None or most_recent_cursor_value < cursor_value:
236+
self._most_recent_cursor_value_per_partition[record.associated_slice] = cursor_value
237+
except ValueError:
238+
self._log_for_record_without_cursor_value()
223239

224240
def _extract_cursor_value(self, record: Record) -> Any:
225241
return self._connector_state_converter.parse_value(self._cursor_field.extract_value(record))
@@ -459,10 +475,13 @@ def should_be_synced(self, record: Record) -> bool:
459475
try:
460476
record_cursor_value: CursorValueType = self._extract_cursor_value(record) # type: ignore # cursor_field is converted to an InterpolatedString in __post_init__
461477
except ValueError:
462-
if not self._should_be_synced_logger_triggered:
463-
LOGGER.warning(
464-
f"Could not find cursor field `{self.cursor_field.cursor_field_key}` in record. The incremental sync will assume it needs to be synced"
465-
)
466-
self._should_be_synced_logger_triggered = True
478+
self._log_for_record_without_cursor_value()
467479
return True
468480
return self.start <= record_cursor_value <= self._end_provider()
481+
482+
def _log_for_record_without_cursor_value(self) -> None:
483+
if not self._should_be_synced_logger_triggered:
484+
LOGGER.warning(
485+
f"Could not find cursor field `{self.cursor_field.cursor_field_key}` in record for stream {self._stream_name}. The incremental sync will assume it needs to be synced"
486+
)
487+
self._should_be_synced_logger_triggered = True

unit_tests/sources/streams/concurrent/test_cursor.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,20 @@ def _cursor_without_slice_boundary_fields(self) -> ConcurrentCursor:
101101
_NO_LOOKBACK_WINDOW,
102102
)
103103

104+
def test_given_no_cursor_value_when_observe_then_do_not_raise(self) -> None:
105+
cursor = self._cursor_with_slice_boundary_fields()
106+
partition = _partition(_NO_SLICE)
107+
108+
cursor.observe(
109+
Record(
110+
data={"record_with_A_CURSOR_FIELD_KEY": "any value"},
111+
associated_slice=partition.to_slice(),
112+
stream_name=_A_STREAM_NAME,
113+
)
114+
)
115+
116+
# did not raise
117+
104118
def test_given_boundary_fields_when_close_partition_then_emit_state(self) -> None:
105119
cursor = self._cursor_with_slice_boundary_fields()
106120
cursor.close_partition(

0 commit comments

Comments
 (0)