Skip to content

Commit

Permalink
✨ add support for asynchronous auto-polling (#171)
Browse files Browse the repository at this point in the history
  • Loading branch information
sebastianMindee authored Oct 9, 2023
1 parent 3d56e8d commit 0470066
Show file tree
Hide file tree
Showing 6 changed files with 131 additions and 70 deletions.
44 changes: 6 additions & 38 deletions docs/extras/code_samples/invoice_splitter_v1_async.txt
Original file line number Diff line number Diff line change
Expand Up @@ -10,43 +10,11 @@ mindee_client = Client(api_key="my-api-key")
# Load a file from disk
input_doc = mindee_client.source_from_path("/path/to/the/file.ext")


# Load a file from disk and enqueue it.
queue_result: AsyncPredictResponse = mindee_client.enqueue(InvoiceSplitterV1, input_doc)

# Get the id of the queue (job)
queue_id = queue_result.job.id


# Limit the amount of API calls to retrieve your document
MAX_RETRIES = 10

# How many seconds to wait in-between tries
INTERVAL_SECS = 6
# Recursive function that tries to retrieve the completed document.
# If the document is not "complete", try again
def get_doc_from_async_queue(queue_id, times_tried=0)->None:

# Have we exceeded our retry count?
if times_tried >= MAX_RETRIES:
raise Exception(f"Maximum retries reached {times_tried}")

# Wait for a few seconds before fetching
sleep(INTERVAL_SECS)

# Fetch and parse the result, using the same type
parsed_result = mindee_client.parse_queued(InvoiceSplitterV1, queue_id)

# Check whether the result is ready
if parsed_result.job.status == "completed":

# Print a brief summary of the parsed data
print(parsed_result.document)
return

# Otherwise, try again...
else:
get_doc_from_async_queue(queue_id, times_tried+1)
result: AsyncPredictResponse = mindee_client.enqueue_and_parse(
InvoiceSplitterV1,
input_doc,
)

# Start the recursion...
get_doc_from_async_queue(queue_id)
# Print a brief summary of the parsed data
print(result.document)
130 changes: 105 additions & 25 deletions mindee/client.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import json
from pathlib import Path
from time import sleep
from typing import BinaryIO, Dict, Optional, Union

from mindee.http.endpoint import CustomEndpoint, Endpoint
Expand Down Expand Up @@ -76,13 +77,8 @@ def parse(
:param product_class: The document class to use.
The response object will be instantiated based on this parameter.
:param endpoint_name: For custom endpoints, the "API name" field in the "Settings" page of the API Builder.
Do not set for standard (off the shelf) endpoints.
:param account_name: For custom endpoints, your account or organization username on the API Builder.
This is normally not required unless you have a custom endpoint which has the
same name as standard (off the shelf) endpoint.
Do not set for standard (off the shelf) endpoints.
:param input_source: The document/source file to use.
Has to be be created beforehand.
:param include_words: Whether to include the full text for each page.
This performs a full OCR operation on the server and will increase response time.
Expand All @@ -95,9 +91,11 @@ def parse(
:param cropper: Whether to include cropper results for each page.
This performs a cropping operation on the server and will increase response time.
:param endpoint: For custom endpoints, an endpoint has to be given.
"""
if input_source is None:
raise TypeError("The 'enqueue' function requires an input document.")
raise TypeError("No input document provided.")

if not endpoint:
endpoint = self._initialize_ots_endpoint(product_class)
Expand Down Expand Up @@ -131,13 +129,8 @@ def enqueue(
:param product_class: The document class to use.
The response object will be instantiated based on this parameter.
:param endpoint_name: For custom endpoints, the "API name" field in the "Settings" page of the API Builder.
Do not set for standard (off the shelf) endpoints.
:param account_name: For custom endpoints, your account or organization username on the API Builder.
This is normally not required unless you have a custom endpoint which has the
same name as standard (off the shelf) endpoint.
Do not set for standard (off the shelf) endpoints.
:param input_source: The document/source file to use.
Has to be be created beforehand.
:param include_words: Whether to include the full text for each page.
This performs a full OCR operation on the server and will increase response time.
Expand All @@ -150,9 +143,11 @@ def enqueue(
:param cropper: Whether to include cropper results for each page.
This performs a cropping operation on the server and will increase response time.
:param endpoint: For custom endpoints, an endpoint has to be given.
"""
if input_source is None:
raise TypeError("The 'enqueue' function requires an input document.")
raise TypeError("No input document provided.")

if not endpoint:
endpoint = self._initialize_ots_endpoint(product_class)
Expand All @@ -179,13 +174,10 @@ def parse_queued(
"""
Parses a queued document.
:param product_class: The document class to use.
The response object will be instantiated based on this parameter.
:param queue_id: queue_id received from the API
:param endpoint_name: For custom endpoints, the "API name" field in the "Settings" page of the API Builder.
Do not set for standard (off the shelf) endpoints.
:param account_name: For custom endpoints, your account or organization username on the API Builder.
This is normally not required unless you have a custom endpoint which has the
same name as standard (off the shelf) endpoint.
Do not set for standard (off the shelf) endpoints.
:param endpoint: For custom endpoints, an endpoint has to be given.
"""
if not endpoint:
endpoint = self._initialize_ots_endpoint(product_class)
Expand All @@ -194,6 +186,96 @@ def parse_queued(

return self._get_queued_document(product_class, endpoint, queue_id)

def _validate_async_params(
self, initial_delay_sec: float, delay_sec: float
) -> None:
if delay_sec < 2:
raise TypeError("Cannot set auto-parsing delay to less than 2 seconds.")
if initial_delay_sec < 4:
raise TypeError("Cannot set initial parsing delay to less than 4 seconds.")

def enqueue_and_parse(
self,
product_class,
input_source: Union[LocalInputSource, UrlInputSource],
include_words: bool = False,
close_file: bool = True,
page_options: Optional[PageOptions] = None,
cropper: bool = False,
endpoint: Optional[Endpoint] = None,
initial_delay_sec: float = 6,
delay_sec: float = 3,
max_retries: int = 10,
) -> AsyncPredictResponse:
"""
Enqueueing to an async endpoint.
:param product_class: The document class to use.
The response object will be instantiated based on this parameter.
:param input_source: The document/source file to use.
Has to be be created beforehand.
:param include_words: Whether to include the full text for each page.
This performs a full OCR operation on the server and will increase response time.
:param close_file: Whether to ``close()`` the file after parsing it.
Set to ``False`` if you need to access the file after this operation.
:param page_options: If set, remove pages from the document as specified.
This is done before sending the file to the server and is useful to avoid page limitations.
:param cropper: Whether to include cropper results for each page.
This performs a cropping operation on the server and will increase response time.
:param endpoint: For custom endpoints, an endpoint has to be given.
:param initial_delay_sec: Delay between each polling attempts
This should not be shorter than 4 seconds.
:param delay_sec: Delay between each polling attempts
This should not be shorter than 2 seconds.
:param max_retries: Total amount of polling attempts.
"""
self._validate_async_params(initial_delay_sec, delay_sec)
if not endpoint:
endpoint = self._initialize_ots_endpoint(product_class)
queue_result = self.enqueue(
product_class,
input_source,
include_words,
close_file,
page_options,
cropper,
endpoint,
)
logger.debug(
"Successfully enqueued document with job id: %s", queue_result.job.id
)
sleep(initial_delay_sec)
retry_counter = 1
poll_results = self.parse_queued(product_class, queue_result.job.id, endpoint)
while retry_counter < max_retries:
if poll_results.job.status == "completed":
break
logger.debug(
"Polling server for parsing result with job id: %s", queue_result.job.id
)
retry_counter += 1
sleep(delay_sec)
poll_results = self.parse_queued(
product_class, queue_result.job.id, endpoint
)

if poll_results.job.status != "completed":
raise RuntimeError(
f"Couldn't retrieve document after {retry_counter} tries."
)

return poll_results

def _make_request(
self,
product_class,
Expand Down Expand Up @@ -231,9 +313,7 @@ def _predict_async(
:param doc_config: Configuration of the document.
"""
if input_source is None:
raise TypeError(
"The '_predict_async' class method requires an input document."
)
raise TypeError("No input document provided")
if not endpoint:
endpoint = self._initialize_ots_endpoint(product_class)
response = endpoint.predict_async_req_post(
Expand Down
8 changes: 4 additions & 4 deletions mindee/parsing/common/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,13 @@ class Job:
Will hold information on the queue a document has been submitted to.
"""

id: Optional[str] = None
id: str
"""ID of the job sent by the API in response to an enqueue request."""
issued_at: datetime
"""Timestamp of the request reception by the API."""
available_at: Optional[datetime] = None
"""Timestamp of the request after it has been completed."""
status: Optional[str] = None
status: str
"""Status of the request, as seen by the API."""
millisecs_taken: int
"""Time (ms) taken for the request to be processed by the API."""
Expand All @@ -30,8 +30,8 @@ def __init__(self, json_response: dict) -> None:
self.issued_at = datetime.fromisoformat(json_response["issued_at"])
if json_response.get("available_at"):
self.available_at = datetime.fromisoformat(json_response["available_at"])
self.id = json_response.get("id")
self.status = json_response.get("status")
self.id = json_response["id"]
self.status = json_response["status"]
if self.available_at:
self.millisecs_taken = int(
(self.available_at - self.issued_at).total_seconds() * 1000
Expand Down
3 changes: 1 addition & 2 deletions tests/api/test_async_response.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,9 @@

import pytest

from mindee.client import OTS_OWNER, Client
from mindee.client import Client
from mindee.input.sources import PathInput
from mindee.parsing.common.async_predict_response import AsyncPredictResponse
from mindee.parsing.common.document import Document
from mindee.product.invoice_splitter import InvoiceSplitterV1

ASYNC_DIR = "./tests/data/async"
Expand Down
1 change: 0 additions & 1 deletion tests/product/invoice_splitter/test_invoice_splitter_v1.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import json
from pathlib import Path

import pytest

Expand Down
15 changes: 15 additions & 0 deletions tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from mindee import Client, PageOptions, product
from mindee.http.error import HTTPException
from mindee.input.sources import LocalInputSource
from mindee.product.invoice_splitter.invoice_splitter_v1 import InvoiceSplitterV1
from mindee.product.receipt.receipt_v4 import ReceiptV4
from tests.test_inputs import FILE_TYPES_DIR
from tests.utils import clear_envvars, dummy_envvars
Expand Down Expand Up @@ -97,3 +98,17 @@ def test_cut_options(dummy_client: Client):
pass
assert input_doc.count_doc_pages() == 5
input_doc.close()


def test_async_wrong_initial_delay(dummy_client: Client):
input_doc = dummy_client.source_from_path(FILE_TYPES_DIR / "pdf" / "blank.pdf")
with pytest.raises(TypeError):
dummy_client.enqueue_and_parse(
InvoiceSplitterV1, input_doc, initial_delay_sec=0
)


def test_async_wrong_polling_delay(dummy_client: Client):
input_doc = dummy_client.source_from_path(FILE_TYPES_DIR / "pdf" / "blank.pdf")
with pytest.raises(TypeError):
dummy_client.enqueue_and_parse(InvoiceSplitterV1, input_doc, delay_sec=0)

0 comments on commit 0470066

Please sign in to comment.