From bde9d35580c72c839a4200e50acd9f445ea942cc Mon Sep 17 00:00:00 2001 From: GediminasKr-BURGA Date: Mon, 10 Nov 2025 13:43:13 +0200 Subject: [PATCH] Update airbyte connections api paggination, to fetch until the end --- app/app.py | 128 ++++++++++++++++++++++++++++++++++++++++------------- 1 file changed, 97 insertions(+), 31 deletions(-) diff --git a/app/app.py b/app/app.py index e1a33e4..4fcbc02 100644 --- a/app/app.py +++ b/app/app.py @@ -300,6 +300,7 @@ def get_airbyte_connections(workspace_id): """ Get all connections using the new Airbyte API format. New API: GET /v1/connections + Handles pagination to fetch all connections, not just the first page. """ if workspace_id is None: return { @@ -314,23 +315,9 @@ def get_airbyte_connections(workspace_id): url_base = app.config['AIRBYTE_API_LINK'] api_base = app.config['AIRBYTE_API_BASE'] api_version = "v1" - url = f"{url_base}/{api_base}/{api_version}/connections" + base_url = f"{url_base}/{api_base}/{api_version}/connections" headers = {"accept": "application/json"} - try: - response = requests.get(url, headers=headers) - response.raise_for_status() - except requests.exceptions.RequestException as e: - print(f"Error: {e}") - return { - "None - Basic dbt Labs Project Initialization": { - "connection": "None", - "destination_version": "None", - "destination_name": "None", - "destination_type": "None" - } - } - result = { "None - Basic dbt Labs Project Initialization": { "connection": "None", @@ -340,22 +327,101 @@ def get_airbyte_connections(workspace_id): } } - if response.status_code == 200: - # New API returns data in a "data" array - connections_data = response.json().get("data", []) - for connection in connections_data: - connection_id = connection.get("connectionId") - destination_id = connection.get("destinationId") - init_version, destination_name, dwh_type = get_airbyte_destination_type(destination_id, workspace_id) - name = connection.get("name") - - result[name] = { - "connection": connection_id, - "destination_id": destination_id, # Store the destination ID for later use - "destination_version": init_version, - "destination_name": destination_name, - "destination_type": dwh_type - } + # Pagination parameters + limit = 100 # Request up to 100 items per page + offset = 0 + has_more = True + + while has_more: + # Build URL with pagination parameters + params = { + "limit": limit, + "offset": offset + } + url = f"{base_url}?{urllib.parse.urlencode(params)}" + + try: + response = requests.get(url, headers=headers) + response.raise_for_status() + except requests.exceptions.RequestException as e: + print(f"Error fetching Airbyte connections: {e}") + # If this is the first page and it fails, return the default "None" option + if offset == 0: + return { + "None - Basic dbt Labs Project Initialization": { + "connection": "None", + "destination_version": "None", + "destination_name": "None", + "destination_type": "None" + } + } + # If a subsequent page fails, break and return what we have + break + + if response.status_code == 200: + response_json = response.json() + # New API returns data in a "data" array + connections_data = response_json.get("data", []) + + # Process connections from this page + for connection in connections_data: + connection_id = connection.get("connectionId") + destination_id = connection.get("destinationId") + init_version, destination_name, dwh_type = get_airbyte_destination_type(destination_id, workspace_id) + name = connection.get("name") + + result[name] = { + "connection": connection_id, + "destination_id": destination_id, # Store the destination ID for later use + "destination_version": init_version, + "destination_name": destination_name, + "destination_type": dwh_type + } + + # Check if there are more pages + # Airbyte API v1 returns pagination info in a "next" field with a URL + # Priority: Check "next" field first (most reliable indicator) + + next_url = response_json.get("next") + pagination = response_json.get("pagination", {}) + has_more_pagination = pagination.get("hasMore", False) + + # If there's a "next" URL, there are more pages - extract offset from it + if next_url: + # Extract offset from next URL (e.g., "http://...?limit=20&offset=20") + if isinstance(next_url, str) and "offset" in next_url: + try: + parsed = urllib.parse.urlparse(next_url) + query_params = urllib.parse.parse_qs(parsed.query) + offset = int(query_params.get("offset", [offset + limit])[0]) + # Note: API may enforce its own limit (e.g., 20), but we keep our requested limit + # The offset extraction ensures we continue from the correct position + except (ValueError, KeyError, IndexError) as e: + print(f"Warning: Could not parse offset from next URL: {e}") + offset += limit + else: + # If next is a simple token/number, increment offset + offset += limit + # If pagination object indicates more pages + elif has_more_pagination: + offset += limit + # If we got fewer items than requested, we've reached the last page + elif len(connections_data) < limit: + has_more = False + # If we got exactly the limit, there might be more, so continue + elif len(connections_data) == limit: + offset += limit + # Safety check: prevent infinite loops by setting a maximum offset + # Assuming reasonable maximum of 10,000 connections (100 pages * 100 items) + if offset > 10000: + print("Warning: Reached maximum pagination limit (10,000 connections). Some connections may be missing.") + has_more = False + else: + # No more pages + has_more = False + else: + # Non-200 status, stop pagination + has_more = False return result