Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
128 changes: 97 additions & 31 deletions app/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,7 @@ def get_airbyte_connections(workspace_id):
"""
Get all connections using the new Airbyte API format.
New API: GET /v1/connections
Handles pagination to fetch all connections, not just the first page.
"""
if workspace_id is None:
return {
Expand All @@ -314,23 +315,9 @@ def get_airbyte_connections(workspace_id):
url_base = app.config['AIRBYTE_API_LINK']
api_base = app.config['AIRBYTE_API_BASE']
api_version = "v1"
url = f"{url_base}/{api_base}/{api_version}/connections"
base_url = f"{url_base}/{api_base}/{api_version}/connections"
headers = {"accept": "application/json"}

try:
response = requests.get(url, headers=headers)
response.raise_for_status()
except requests.exceptions.RequestException as e:
print(f"Error: {e}")
return {
"None - Basic dbt Labs Project Initialization": {
"connection": "None",
"destination_version": "None",
"destination_name": "None",
"destination_type": "None"
}
}

result = {
"None - Basic dbt Labs Project Initialization": {
"connection": "None",
Expand All @@ -340,22 +327,101 @@ def get_airbyte_connections(workspace_id):
}
}

if response.status_code == 200:
# New API returns data in a "data" array
connections_data = response.json().get("data", [])
for connection in connections_data:
connection_id = connection.get("connectionId")
destination_id = connection.get("destinationId")
init_version, destination_name, dwh_type = get_airbyte_destination_type(destination_id, workspace_id)
name = connection.get("name")

result[name] = {
"connection": connection_id,
"destination_id": destination_id, # Store the destination ID for later use
"destination_version": init_version,
"destination_name": destination_name,
"destination_type": dwh_type
}
# Pagination parameters
limit = 100 # Request up to 100 items per page
offset = 0
has_more = True

while has_more:
# Build URL with pagination parameters
params = {
"limit": limit,
"offset": offset
}
url = f"{base_url}?{urllib.parse.urlencode(params)}"

try:
response = requests.get(url, headers=headers)
response.raise_for_status()
except requests.exceptions.RequestException as e:
print(f"Error fetching Airbyte connections: {e}")
# If this is the first page and it fails, return the default "None" option
if offset == 0:
return {
"None - Basic dbt Labs Project Initialization": {
"connection": "None",
"destination_version": "None",
"destination_name": "None",
"destination_type": "None"
}
}
# If a subsequent page fails, break and return what we have
break

if response.status_code == 200:
response_json = response.json()
# New API returns data in a "data" array
connections_data = response_json.get("data", [])

# Process connections from this page
for connection in connections_data:
connection_id = connection.get("connectionId")
destination_id = connection.get("destinationId")
init_version, destination_name, dwh_type = get_airbyte_destination_type(destination_id, workspace_id)
name = connection.get("name")

result[name] = {
"connection": connection_id,
"destination_id": destination_id, # Store the destination ID for later use
"destination_version": init_version,
"destination_name": destination_name,
"destination_type": dwh_type
}

# Check if there are more pages
# Airbyte API v1 returns pagination info in a "next" field with a URL
# Priority: Check "next" field first (most reliable indicator)

next_url = response_json.get("next")
pagination = response_json.get("pagination", {})
has_more_pagination = pagination.get("hasMore", False)

# If there's a "next" URL, there are more pages - extract offset from it
if next_url:
# Extract offset from next URL (e.g., "http://...?limit=20&offset=20")
if isinstance(next_url, str) and "offset" in next_url:
try:
parsed = urllib.parse.urlparse(next_url)
query_params = urllib.parse.parse_qs(parsed.query)
offset = int(query_params.get("offset", [offset + limit])[0])
# Note: API may enforce its own limit (e.g., 20), but we keep our requested limit
# The offset extraction ensures we continue from the correct position
except (ValueError, KeyError, IndexError) as e:
print(f"Warning: Could not parse offset from next URL: {e}")
offset += limit
else:
# If next is a simple token/number, increment offset
offset += limit
# If pagination object indicates more pages
elif has_more_pagination:
offset += limit
# If we got fewer items than requested, we've reached the last page
elif len(connections_data) < limit:
has_more = False
# If we got exactly the limit, there might be more, so continue
elif len(connections_data) == limit:
offset += limit
# Safety check: prevent infinite loops by setting a maximum offset
# Assuming reasonable maximum of 10,000 connections (100 pages * 100 items)
if offset > 10000:
print("Warning: Reached maximum pagination limit (10,000 connections). Some connections may be missing.")
has_more = False
else:
# No more pages
has_more = False
else:
# Non-200 status, stop pagination
has_more = False

return result

Expand Down