Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion pytest.ini
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
[pytest]
filterwarnings =
ignore:.*Swig.*
ignore:.*no current event loop.*
ignore:.*no current event loop.*


norecursedirs = .git worktrees parxy.worktrees
2 changes: 1 addition & 1 deletion src/parxy_cli/commands/parse.py
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ def parse(
'-l',
help='Extraction level',
),
] = Level.PAGE,
] = Level.BLOCK,
mode: Annotated[
OutputMode,
typer.Option(
Expand Down
13 changes: 13 additions & 0 deletions src/parxy_core/drivers/abstract_driver.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import base64
import hashlib
import io
import time
from abc import ABC, abstractmethod
from logging import Logger
from typing import Dict, Any, Self, Tuple, Optional
Expand Down Expand Up @@ -108,8 +109,20 @@ def parse(
self._validate_level(level)

try:
# Start timing
start_time = time.perf_counter()

document = self._handle(file=file, level=level, **kwargs)

# Calculate elapsed time in milliseconds
end_time = time.perf_counter()
elapsed_ms = (end_time - start_time) * 1000

# Store elapsed time in parsing metadata
if document.parsing_metadata is None:
document.parsing_metadata = {}
document.parsing_metadata['driver_elapsed_time'] = elapsed_ms

# Increment the documents processed counter
tracer.count(
'documents.processed',
Expand Down
39 changes: 38 additions & 1 deletion src/parxy_core/drivers/landingai.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,44 @@ def _handle(
service=self.__class__,
) from aex

return landingaiade_to_parxy(parse_response)
doc = landingaiade_to_parxy(parse_response)

# Initialize parsing_metadata if needed
if doc.parsing_metadata is None:
doc.parsing_metadata = {}

# Extract cost information from metadata
# According to https://docs.landing.ai/ade/ade-json-response.md
# metadata contains: credit_usage, duration_ms, filename, job_id, page_count, version
if parse_response.metadata:
metadata = parse_response.metadata

# Extract cost estimation from credit_usage
if hasattr(metadata, 'credit_usage') and metadata.credit_usage is not None:
doc.parsing_metadata['cost_estimation'] = metadata.credit_usage
doc.parsing_metadata['cost_estimation_unit'] = 'credits'

# Extract processing details
ade_details = {}
if hasattr(metadata, 'duration_ms') and metadata.duration_ms is not None:
ade_details['duration_ms'] = metadata.duration_ms
if hasattr(metadata, 'job_id') and metadata.job_id is not None:
ade_details['job_id'] = metadata.job_id
if hasattr(metadata, 'page_count') and metadata.page_count is not None:
ade_details['page_count'] = metadata.page_count
if hasattr(metadata, 'version') and metadata.version is not None:
ade_details['version'] = metadata.version
if hasattr(metadata, 'filename') and metadata.filename is not None:
ade_details['filename'] = metadata.filename

# Add failed_pages if present (for partial content responses)
if hasattr(metadata, 'failed_pages') and metadata.failed_pages is not None:
ade_details['failed_pages'] = metadata.failed_pages

if ade_details:
doc.parsing_metadata['ade_details'] = ade_details

return doc


@trace_with_output('converting')
Expand Down
173 changes: 171 additions & 2 deletions src/parxy_core/drivers/llamaparse.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import io
from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, Optional

import requests

from parxy_core.models.config import LlamaParseConfig
from parxy_core.tracing.utils import trace_with_output
Expand All @@ -23,6 +25,17 @@
FileNotFoundException,
)

_credits_per_parsing_mode = {
# Minimum credits per parsing mode as deduced from https://developers.llamaindex.ai/python/cloud/general/pricing/
'accurate': 3, # equivalent to Parse page with LLM as observed in their dashboard
'parse_page_without_llm': 1,
'parse_page_with_llm': 3,
'parse_page_with_lvm': 6,
'parse_page_with_agent': 10,
'parse_document_with_llm': 30,
'parse_document_with_agent': 30,
}


class LlamaParseDriver(Driver):
"""Llama Cloud Services document processing via LlamaParse API.
Expand Down Expand Up @@ -65,6 +78,96 @@ def _initialize_driver(self):
**self._config.model_dump() if self._config else {},
)

def _fetch_usage_metrics(self, job_id: str) -> Optional[dict]:
"""Fetch actual usage metrics from the LlamaParse beta API.

Parameters
----------
job_id : str
The job ID to fetch metrics for

Returns
-------
Optional[dict]
Dictionary with 'total_cost', 'cost_unit', 'parsing_mode_counts', and 'mode_details'
Returns None if organization_id is not configured or if the API call fails
"""
# Only fetch if organization_id is configured
if not self._config or not self._config.organization_id:
return None

try:
# Construct the beta API endpoint
base_url = self._config.base_url.rstrip('/')
endpoint = f'{base_url}/api/v1/beta/usage-metrics'

# Prepare request parameters
params = {
'organization_id': self._config.organization_id,
'event_aggregation_key': job_id,
}

# Prepare headers with authentication
headers = {
'Authorization': f'Bearer {self._config.api_key.get_secret_value()}',
'Content-Type': 'application/json',
}

# Make the API request
response = requests.get(
endpoint, params=params, headers=headers, timeout=10
)
response.raise_for_status()

data = response.json()
items = data.get('items', [])

if not items:
return None

# Aggregate usage data by parsing mode
parsing_mode_counts = {}
mode_details = []

for item in items:
if item.get('event_type') == 'pages_parsed':
mode = item.get('properties', {}).get('mode', 'unknown')
pages = item.get('value', 0)
model = item.get('properties', {}).get('model', 'unknown')

# Count pages per mode
parsing_mode_counts[mode] = parsing_mode_counts.get(mode, 0) + pages

# Store detailed info
mode_details.append(
{
'mode': mode,
'model': model,
'pages': pages,
'day': item.get('day'),
}
)

# Calculate total cost based on actual usage
total_cost = 0
for mode, count in parsing_mode_counts.items():
credits_per_page = _credits_per_parsing_mode.get(mode, 3)
total_cost += credits_per_page * count

return {
'total_cost': total_cost,
'cost_unit': 'credits',
'parsing_mode_counts': parsing_mode_counts,
'mode_details': mode_details,
}

except Exception as e:
# Log the error but don't fail the parsing
self._logger.warning(
f'Failed to fetch usage metrics from beta API: {str(e)}'
)
return None

def _handle(
self,
file: str | io.BytesIO | bytes,
Expand Down Expand Up @@ -136,7 +239,73 @@ def _handle(
res.error, self.__class__, res.model_dump(exclude={'file_name'})
)

return llamaparse_to_parxy(doc=res, level=level)
converted_document = llamaparse_to_parxy(doc=res, level=level)

if converted_document.parsing_metadata is None:
converted_document.parsing_metadata = {}

converted_document.parsing_metadata['job_id'] = res.job_id
converted_document.parsing_metadata['job_metadata'] = (
res.job_metadata.model_dump_json()
)
converted_document.parsing_metadata['job_error'] = res.error
converted_document.parsing_metadata['job_error_code'] = res.error_code
converted_document.parsing_metadata['job_status'] = res.status

# Try to fetch actual usage metrics from beta API if organization_id is configured
usage_metrics = self._fetch_usage_metrics(res.job_id)

if usage_metrics:
# Use actual metrics from the API
converted_document.parsing_metadata['cost_estimation'] = usage_metrics[
'total_cost'
]
converted_document.parsing_metadata['cost_estimation_unit'] = usage_metrics[
'cost_unit'
]
converted_document.parsing_metadata['parsing_mode_counts'] = usage_metrics[
'parsing_mode_counts'
]
converted_document.parsing_metadata['cost_data_source'] = 'beta_api'
converted_document.parsing_metadata['usage_details'] = usage_metrics[
'mode_details'
]
else:
# Fall back to estimation from page source_data
parsing_modes = {}
parsing_mode_counts = {}

for page in converted_document.pages:
if page.source_data and 'parsingMode' in page.source_data:
mode = page.source_data['parsingMode']
parsing_modes[page.number] = mode

# Count pages per parsing mode
if mode in parsing_mode_counts:
parsing_mode_counts[mode] += 1
else:
parsing_mode_counts[mode] = 1

if parsing_modes:
converted_document.parsing_metadata['page_parsing_modes'] = (
parsing_modes
)
converted_document.parsing_metadata['parsing_mode_counts'] = (
parsing_mode_counts
)

# Calculate cost estimation based on parsing modes
total_cost = 0
for mode, count in parsing_mode_counts.items():
# Use the credit cost from the dictionary, or default to 3 if not recognized
credits_per_page = _credits_per_parsing_mode.get(mode, 3)
total_cost += credits_per_page * count

converted_document.parsing_metadata['cost_estimation'] = total_cost
converted_document.parsing_metadata['cost_estimation_unit'] = 'credits'
converted_document.parsing_metadata['cost_data_source'] = 'estimation'

return converted_document


@trace_with_output('converting')
Expand Down
Loading