Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
140 changes: 52 additions & 88 deletions src/data360/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,16 @@

import dotenv
import httpx
from pydantic import ValidationError
from pydantic import ValidationError as PydanticValidationError

from .config import get_data360_settings
from .errors import (
Data360MCPError,
NotFoundError,
ParseError,
classify_error,
)
from .errors import ValidationError as Data360ValidationError
from .models import (
DiscoveryResult,
EnrichedIndicator,
Expand Down Expand Up @@ -286,7 +293,7 @@ async def _search_raw(
url = data360_config.search_url or f"{data360_config.api_url}/searchv2"
payload = _build_search_payload(request)

error_msg: str | None = None
mcp_error: Data360MCPError | None = None
try:
async with httpx.AsyncClient(timeout=30.0) as client:
response = await client.post(url, json=payload)
Expand All @@ -295,28 +302,22 @@ async def _search_raw(
try:
response_data = response.json()
except ValueError as e:
_logger.error(f"Failed to parse JSON response: {e}")
error_msg = f"Failed to parse API response: {str(e)}"
mcp_error = ParseError(context="search", original_error=e)
else:
try:
return _process_search_response(response_data, request)
except Exception as e:
_logger.error(f"Failed to validate response data: {e}")
error_msg = f"Failed to validate API response: {str(e)}"
mcp_error = ParseError(
context="search",
detail=f"Failed to parse API response: {str(e)}",
original_error=e,
)

except Exception as e:
if isinstance(e, httpx.HTTPStatusError):
error_msg = f"HTTP error {e.response.status_code}: {e.response.text}"
elif isinstance(e, httpx.TimeoutException):
error_msg = f"Request timeout: {str(e)}"
elif isinstance(e, httpx.RequestError):
error_msg = f"Request error: {str(e)}"
else:
error_msg = f"Unexpected error: {str(e)}"
_logger.error(error_msg)
mcp_error = classify_error(e, context="search")

if error_msg:
return SearchResponse(items=None, error=error_msg)
if mcp_error:
return SearchResponse(items=None, error=mcp_error.detail)
# This should never be reached, but pyright needs it for type checking
return SearchResponse(
items=None, error="Unexpected error: no response and no error message"
Expand Down Expand Up @@ -566,8 +567,13 @@ async def get_metadata(
# Validate inputs
try:
MetadataRequest(database_id=database_id, indicator_id=indicator_id)
except ValidationError as e:
return MetadataResponse(error=f"Invalid arguments: {e}")
except PydanticValidationError as e:
mcp_err = Data360ValidationError(
context="metadata",
detail=f"Invalid arguments: {e}",
original_error=e,
)
return MetadataResponse(error=mcp_err.detail)

# Determine URLs
metadata_url = data360_config.metadata_url or f"{data360_config.api_url}/metadata"
Expand Down Expand Up @@ -610,28 +616,18 @@ async def get_metadata(
if k in select_fields
}
else:
error_msg = f"No metadata found for indicator ID '{indicator_id}'"
_logger.warning(error_msg)
errors.append(error_msg)
mcp_err = NotFoundError(
context="metadata",
detail=f"No metadata found for indicator ID '{indicator_id}'",
)
errors.append(mcp_err.detail)
except ValueError as e:
error_msg = f"Failed to parse metadata JSON response: {str(e)}"
_logger.error(error_msg)
errors.append(error_msg)
mcp_err = ParseError(context="metadata", original_error=e)
errors.append(mcp_err.detail)

except Exception as e:
error_msg: str
if isinstance(e, httpx.HTTPStatusError):
error_msg = f"HTTP error fetching metadata: {e.response.status_code} - {e.response.text}"
elif isinstance(e, httpx.TimeoutException):
error_msg = f"Timeout fetching metadata: {str(e)}"
elif isinstance(e, httpx.RequestError):
error_msg = (
f"Request error fetching metadata for {indicator_id!r}: {str(e)}"
)
else:
error_msg = f"Unexpected error fetching metadata: {str(e)}"
_logger.exception(error_msg)
errors.append(error_msg)
mcp_err = classify_error(e, context="metadata")
errors.append(mcp_err.detail)

# 2. Fetch Disaggregation
if fetch_disaggregation:
Expand All @@ -650,30 +646,12 @@ async def get_metadata(
raw_disaggregations
)
except ValueError as e:
error_msg = (
f"Failed to parse disaggregation JSON response: {str(e)}"
)
_logger.error(error_msg)
errors.append(error_msg)

except httpx.HTTPStatusError as e:
error_msg = f"HTTP error fetching disaggregations: {e.response.status_code} - {e.response.text}"
_logger.error(error_msg)
errors.append(error_msg)
except httpx.TimeoutException as e:
error_msg = f"Timeout fetching disaggregations: {str(e)}"
_logger.error(error_msg)
errors.append(error_msg)
except httpx.RequestError as e:
error_msg = (
f"Request error fetching disaggregations for {indicator_id!r}: {str(e)}"
)
_logger.error(error_msg)
errors.append(error_msg)
mcp_err = ParseError(context="disaggregation", original_error=e)
errors.append(mcp_err.detail)

except Exception as e:
error_msg = f"Unexpected error fetching disaggregations: {str(e)}"
_logger.exception("Unexpected error in disaggregation fetch")
errors.append(error_msg)
mcp_err = classify_error(e, context="disaggregation")
errors.append(mcp_err.detail)

# 3. Combine and Return
error_message = "; ".join(errors) if errors else None
Expand Down Expand Up @@ -725,15 +703,9 @@ async def get_disaggregation(
valid_dimensions = _get_valid_disaggregations(raw_data)
return {"dimensions": valid_dimensions}

except httpx.HTTPStatusError as e:
return {"error": f"HTTP error: {e.response.status_code} - {e.response.text}"}
except httpx.TimeoutException as e:
return {"error": f"Timeout: {str(e)}"}
except httpx.RequestError as e:
return {"error": f"Request error: {str(e)}"}
except Exception as e:
_logger.exception("Unexpected error in get_disaggregation")
return {"error": f"Unexpected error: {str(e)}"}
mcp_err = classify_error(e, context="disaggregation")
return {"error": mcp_err.detail}


async def get_data(
Expand Down Expand Up @@ -793,10 +765,13 @@ async def get_data(
indicator_id=indicator_id,
disaggregation_filters=disaggregation_filters,
)
except ValidationError as e:
error_msg = f"Invalid arguments: {e}"
_logger.error(error_msg)
return IndicatorDataResponse(error=error_msg)
except PydanticValidationError as e:
mcp_err = Data360ValidationError(
context="data",
detail=f"Invalid arguments: {e}",
original_error=e,
)
return IndicatorDataResponse(error=mcp_err.detail)

# Prepare API parameters
params: dict[str, Any] = {
Expand Down Expand Up @@ -862,9 +837,8 @@ async def get_data(
try:
data_json = data_res.json()
except ValueError as e:
error_msg = f"Failed to parse data JSON response: {str(e)}"
_logger.error(error_msg)
return IndicatorDataResponse(data=None, error=error_msg)
mcp_err = ParseError(context="data", original_error=e)
return IndicatorDataResponse(data=None, error=mcp_err.detail)

raw_data = data_json.get("value", [])
total_count = data_json.get("@odata.count") # May be None
Expand Down Expand Up @@ -904,19 +878,9 @@ async def get_data(
failed_validation=validation_errors if validation_errors else None,
)

except httpx.HTTPStatusError as e:
error_msg = (
f"HTTP error fetching data: {e.response.status_code} - {e.response.text}"
)
except httpx.TimeoutException as e:
error_msg = f"Timeout fetching data: {str(e)}"
except httpx.RequestError as e:
error_msg = f"Request error fetching data for {indicator_id!r}: {str(e)}"
except Exception as e:
error_msg = f"Unexpected error fetching data: {str(e)}"

_logger.error(error_msg)
return IndicatorDataResponse(data=None, error=error_msg)
mcp_err = classify_error(e, context="data")
return IndicatorDataResponse(data=None, error=mcp_err.detail)


async def get_indicators(database_id: str) -> list[str]:
Expand Down
Loading