Skip to content

Commit

Permalink
Merge branch 'main' into tolik0/concurrent-perpartitioncursor
Browse files Browse the repository at this point in the history
  • Loading branch information
tolik0 committed Dec 6, 2024
2 parents 2038075 + 5c4be66 commit c77b9a2
Show file tree
Hide file tree
Showing 69 changed files with 1,102 additions and 2,139 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/autofix-command.yml
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ jobs:
- name: Set up Poetry
uses: Gr1N/setup-poetry@v9
with:
poetry-version: "1.7.1"
poetry-version: "1.8.4"
- name: Set up Python
uses: actions/setup-python@v5
with:
Expand Down
13 changes: 11 additions & 2 deletions .github/workflows/connector-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,8 @@ jobs:
# cdk_extra: n/a

name: "Check: '${{matrix.connector}}' (skip=${{needs.cdk_changes.outputs['src'] == 'false' || needs.cdk_changes.outputs[matrix.cdk_extra] == 'false'}})"
permissions:
checks: write
steps:
- name: Abort if extra not changed (${{matrix.cdk_extra}})
id: no_changes
Expand Down Expand Up @@ -131,15 +133,18 @@ jobs:
run: |
cd airbyte
make tools.airbyte-ci-dev.install
airbyte-ci-dev connectors \
airbyte-ci-dev \
--ci-report-bucket-name=airbyte-ci-reports-multi \
connectors \
--name ${{matrix.connector}} \
--use-local-cdk \
test \
--fail-fast \
--skip-step qa_checks \
--skip-step connector_live_tests
- name: Evaluate Test Output
- name: Evaluate Test
id: evaluate_output
if: always() && steps.no_changes.outputs.status != 'cancelled'
run: |
# save job output json file as ci step output
Expand All @@ -148,7 +153,9 @@ jobs:
success=$(echo ${job_output} | jq -r '.success')
failed_step=$(echo ${job_output} | jq -r '.failed_steps | select(length > 0) | .[0] // "None"')
run_duration=$(echo ${job_output} | jq -r '.run_duration')
html_report_url=$(echo ${job_output} | jq -r '.html_report_url')
echo "## Job Output for ${{matrix.connector}}" >> $GITHUB_STEP_SUMMARY
echo "- [HTML Report](${html_report_url})" >> $GITHUB_STEP_SUMMARY
echo "- Success: ${success}" >> $GITHUB_STEP_SUMMARY
echo "- Test Duration: $(printf "%.0f" ${run_duration})s" >> $GITHUB_STEP_SUMMARY
if [ "${success}" != "true" ]; then
Expand All @@ -159,6 +166,8 @@ jobs:
echo "::error::Test failed for connector '${{ matrix.connector }}' on step '${failed_step}'. Check the logs for more details."
exit 1
fi
echo "success=${success}" >> $GITHUB_OUTPUT
echo "html_report_url=${html_report_url}" >> $GITHUB_OUTPUT
# Upload the job output to the artifacts
- name: Upload Job Output
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/pdoc_preview.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ jobs:
- name: Set up Poetry
uses: Gr1N/setup-poetry@v9
with:
poetry-version: "1.7.1"
poetry-version: "1.8.4"
- name: Set up Python
uses: actions/setup-python@v5
with:
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/pdoc_publish.yml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ jobs:
- name: Set up Poetry
uses: Gr1N/setup-poetry@v9
with:
poetry-version: "1.7.1"
poetry-version: "1.8.4"
- name: Set up Python
uses: actions/setup-python@v5
with:
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/poetry-lock-command.yml
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ jobs:
- name: Set up Poetry
uses: Gr1N/setup-poetry@v9
with:
poetry-version: "1.7.1"
poetry-version: "1.8.4"
- name: Set up Python
uses: actions/setup-python@v5
with:
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/pypi_publish.yml
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ jobs:
pip-compile --upgrade
- name: Create Pull Request
id: create-pull-request
uses: peter-evans/create-pull-request@v6
uses: peter-evans/create-pull-request@v7
with:
token: ${{ secrets.GH_PAT_MAINTENANCE_OCTAVIA }}
commit-message: "chore: update CDK version following release"
Expand Down
10 changes: 9 additions & 1 deletion .github/workflows/pytest_fast.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,14 @@ jobs:
- name: Checkout code
uses: actions/checkout@v4

- name: Set up Poetry
uses: Gr1N/setup-poetry@v9
with:
poetry-version: "1.8.4"

- name: Check Poetry lock file is current
run: poetry check

- uses: hynek/build-and-inspect-python-package@v2
env:
# Pass in dummy version '0.0.0dev0' version to appease dynamic versioning
Expand All @@ -36,7 +44,7 @@ jobs:
- name: Set up Poetry
uses: Gr1N/setup-poetry@v9
with:
poetry-version: "1.7.1"
poetry-version: "1.8.4"
- name: Set up Python
uses: actions/setup-python@v5
with:
Expand Down
6 changes: 3 additions & 3 deletions .github/workflows/pytest_matrix.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ jobs:
name: Pytest (All, Python ${{ matrix.python-version }}, ${{ matrix.os }})
# Don't run on forks. Run on pushes to main, and on PRs that are not from forks.
if: >
(github.event_name == 'push' && github.ref == 'refs/heads/main') ||
(github.event.pull_request.head.repo.fork == false)
github.event_name == 'pull_request' ||
(github.event_name == 'push' && github.ref == 'refs/heads/main')
strategy:
matrix:
python-version: [
Expand Down Expand Up @@ -61,7 +61,7 @@ jobs:
uses: Gr1N/setup-poetry@v9
if: steps.changes.outputs.src == 'true'
with:
poetry-version: "1.7.1"
poetry-version: "1.8.4"
- name: Set up Python
uses: actions/setup-python@v5
if: steps.changes.outputs.src == 'true'
Expand Down
6 changes: 3 additions & 3 deletions .github/workflows/python_lint.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ jobs:
- name: Set up Poetry
uses: Gr1N/setup-poetry@v9
with:
poetry-version: "1.7.1"
poetry-version: "1.8.4"
- name: Set up Python
uses: actions/setup-python@v5
with:
Expand All @@ -40,7 +40,7 @@ jobs:
- name: Set up Poetry
uses: Gr1N/setup-poetry@v9
with:
poetry-version: "1.7.1"
poetry-version: "1.8.4"
- name: Set up Python
uses: actions/setup-python@v5
with:
Expand All @@ -63,7 +63,7 @@ jobs:
- name: Set up Poetry
uses: Gr1N/setup-poetry@v9
with:
poetry-version: "1.7.1"
poetry-version: "1.8.4"
- name: Set up Python
uses: actions/setup-python@v5
with:
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/test-command.yml
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ jobs:
- name: Set up Poetry
uses: Gr1N/setup-poetry@v9
with:
poetry-version: "1.7.1"
poetry-version: "1.8.4"
- name: Set up Python
uses: actions/setup-python@v5
with:
Expand Down
56 changes: 25 additions & 31 deletions airbyte_cdk/sources/declarative/concurrent_declarative_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,23 +89,10 @@ def __init__(
component_factory=component_factory,
)

# todo: We could remove state from initialization. Now that streams are grouped during the read(), a source
# no longer needs to store the original incoming state. But maybe there's an edge case?
self._state = state

self._concurrent_streams: Optional[List[AbstractStream]]
self._synchronous_streams: Optional[List[Stream]]

# If the connector command was SPEC, there is no incoming config, and we cannot instantiate streams because
# they might depend on it. Ideally we want to have a static method on this class to get the spec without
# any other arguments, but the existing entrypoint.py isn't designed to support this. Just noting this
# for our future improvements to the CDK.
if config:
self._concurrent_streams, self._synchronous_streams = self._group_streams(
config=config or {}
)
else:
self._concurrent_streams = None
self._synchronous_streams = None

concurrency_level_from_manifest = self._source_config.get("concurrency_level")
if concurrency_level_from_manifest:
concurrency_level_component = self._constructor.create_component(
Expand Down Expand Up @@ -139,17 +126,20 @@ def read(
logger: logging.Logger,
config: Mapping[str, Any],
catalog: ConfiguredAirbyteCatalog,
state: Optional[Union[List[AirbyteStateMessage]]] = None,
state: Optional[List[AirbyteStateMessage]] = None,
) -> Iterator[AirbyteMessage]:
# ConcurrentReadProcessor pops streams that are finished being read so before syncing, the names of the concurrent
# streams must be saved so that they can be removed from the catalog before starting synchronous streams
if self._concurrent_streams:
concurrent_streams, _ = self._group_streams(config=config)

# ConcurrentReadProcessor pops streams that are finished being read so before syncing, the names of
# the concurrent streams must be saved so that they can be removed from the catalog before starting
# synchronous streams
if len(concurrent_streams) > 0:
concurrent_stream_names = set(
[concurrent_stream.name for concurrent_stream in self._concurrent_streams]
[concurrent_stream.name for concurrent_stream in concurrent_streams]
)

selected_concurrent_streams = self._select_streams(
streams=self._concurrent_streams, configured_catalog=catalog
streams=concurrent_streams, configured_catalog=catalog
)
# It would appear that passing in an empty set of streams causes an infinite loop in ConcurrentReadProcessor.
# This is also evident in concurrent_source_adapter.py so I'll leave this out of scope to fix for now
Expand All @@ -168,8 +158,7 @@ def read(
yield from super().read(logger, config, filtered_catalog, state)

def discover(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteCatalog:
concurrent_streams = self._concurrent_streams or []
synchronous_streams = self._synchronous_streams or []
concurrent_streams, synchronous_streams = self._group_streams(config=config)
return AirbyteCatalog(
streams=[
stream.as_airbyte_stream() for stream in concurrent_streams + synchronous_streams
Expand All @@ -195,17 +184,21 @@ def _group_streams(

state_manager = ConnectorStateManager(state=self._state) # type: ignore # state is always in the form of List[AirbyteStateMessage]. The ConnectorStateManager should use generics, but this can be done later

name_to_stream_mapping = {
stream["name"]: stream for stream in self.resolved_manifest["streams"]
}
# Combine streams and dynamic_streams. Note: both cannot be empty at the same time,
# and this is validated during the initialization of the source.
streams = self._stream_configs(self._source_config) + self._dynamic_stream_configs(
self._source_config, config
)

name_to_stream_mapping = {stream["name"]: stream for stream in streams}

for declarative_stream in self.streams(config=config):
# Some low-code sources use a combination of DeclarativeStream and regular Python streams. We can't inspect
# these legacy Python streams the way we do low-code streams to determine if they are concurrent compatible,
# so we need to treat them as synchronous
if (
isinstance(declarative_stream, DeclarativeStream)
and name_to_stream_mapping[declarative_stream.name].get("retriever")["type"]
and name_to_stream_mapping[declarative_stream.name]["retriever"]["type"]
== "SimpleRetriever"
):
incremental_sync_component_definition = name_to_stream_mapping[
Expand All @@ -214,7 +207,7 @@ def _group_streams(

partition_router_component_definition = (
name_to_stream_mapping[declarative_stream.name]
.get("retriever")
.get("retriever", {})
.get("partition_router")
)
is_without_partition_router_or_cursor = not bool(
Expand All @@ -236,7 +229,7 @@ def _group_streams(
cursor = self._constructor.create_concurrent_cursor_from_datetime_based_cursor(
state_manager=state_manager,
model_type=DatetimeBasedCursorModel,
component_definition=incremental_sync_component_definition,
component_definition=incremental_sync_component_definition, # type: ignore # Not None because of the if condition above
stream_name=declarative_stream.name,
stream_namespace=declarative_stream.namespace,
config=config or {},
Expand Down Expand Up @@ -372,10 +365,11 @@ def _group_streams(
def _is_datetime_incremental_without_partition_routing(
self,
declarative_stream: DeclarativeStream,
incremental_sync_component_definition: Mapping[str, Any],
incremental_sync_component_definition: Mapping[str, Any] | None,
) -> bool:
return (
bool(incremental_sync_component_definition)
incremental_sync_component_definition is not None
and bool(incremental_sync_component_definition)
and incremental_sync_component_definition.get("type", "")
== DatetimeBasedCursorModel.__name__
and self._stream_supports_concurrent_partition_processing(
Expand Down
Loading

0 comments on commit c77b9a2

Please sign in to comment.