diff --git a/docs/extras/code_samples/invoice_splitter_v1_async.txt b/docs/extras/code_samples/invoice_splitter_v1_async.txt index 58584487..64930d6b 100644 --- a/docs/extras/code_samples/invoice_splitter_v1_async.txt +++ b/docs/extras/code_samples/invoice_splitter_v1_async.txt @@ -10,43 +10,11 @@ mindee_client = Client(api_key="my-api-key") # Load a file from disk input_doc = mindee_client.source_from_path("/path/to/the/file.ext") - # Load a file from disk and enqueue it. -queue_result: AsyncPredictResponse = mindee_client.enqueue(InvoiceSplitterV1, input_doc) - -# Get the id of the queue (job) -queue_id = queue_result.job.id - - -# Limit the amount of API calls to retrieve your document -MAX_RETRIES = 10 - -# How many seconds to wait in-between tries -INTERVAL_SECS = 6 -# Recursive function that tries to retrieve the completed document. -# If the document is not "complete", try again -def get_doc_from_async_queue(queue_id, times_tried=0)->None: - - # Have we exceeded our retry count? - if times_tried >= MAX_RETRIES: - raise Exception(f"Maximum retries reached {times_tried}") - - # Wait for a few seconds before fetching - sleep(INTERVAL_SECS) - - # Fetch and parse the result, using the same type - parsed_result = mindee_client.parse_queued(InvoiceSplitterV1, queue_id) - - # Check whether the result is ready - if parsed_result.job.status == "completed": - - # Print a brief summary of the parsed data - print(parsed_result.document) - return - - # Otherwise, try again... - else: - get_doc_from_async_queue(queue_id, times_tried+1) +result: AsyncPredictResponse = mindee_client.enqueue_and_parse( + InvoiceSplitterV1, + input_doc, +) -# Start the recursion... -get_doc_from_async_queue(queue_id) +# Print a brief summary of the parsed data +print(result.document) diff --git a/mindee/client.py b/mindee/client.py index b836aeda..11d8c6a7 100644 --- a/mindee/client.py +++ b/mindee/client.py @@ -1,5 +1,6 @@ import json from pathlib import Path +from time import sleep from typing import BinaryIO, Dict, Optional, Union from mindee.http.endpoint import CustomEndpoint, Endpoint @@ -76,13 +77,8 @@ def parse( :param product_class: The document class to use. The response object will be instantiated based on this parameter. - :param endpoint_name: For custom endpoints, the "API name" field in the "Settings" page of the API Builder. - Do not set for standard (off the shelf) endpoints. - - :param account_name: For custom endpoints, your account or organization username on the API Builder. - This is normally not required unless you have a custom endpoint which has the - same name as standard (off the shelf) endpoint. - Do not set for standard (off the shelf) endpoints. + :param input_source: The document/source file to use. + Has to be be created beforehand. :param include_words: Whether to include the full text for each page. This performs a full OCR operation on the server and will increase response time. @@ -95,9 +91,11 @@ def parse( :param cropper: Whether to include cropper results for each page. This performs a cropping operation on the server and will increase response time. + + :param endpoint: For custom endpoints, an endpoint has to be given. """ if input_source is None: - raise TypeError("The 'enqueue' function requires an input document.") + raise TypeError("No input document provided.") if not endpoint: endpoint = self._initialize_ots_endpoint(product_class) @@ -131,13 +129,8 @@ def enqueue( :param product_class: The document class to use. The response object will be instantiated based on this parameter. - :param endpoint_name: For custom endpoints, the "API name" field in the "Settings" page of the API Builder. - Do not set for standard (off the shelf) endpoints. - - :param account_name: For custom endpoints, your account or organization username on the API Builder. - This is normally not required unless you have a custom endpoint which has the - same name as standard (off the shelf) endpoint. - Do not set for standard (off the shelf) endpoints. + :param input_source: The document/source file to use. + Has to be be created beforehand. :param include_words: Whether to include the full text for each page. This performs a full OCR operation on the server and will increase response time. @@ -150,9 +143,11 @@ def enqueue( :param cropper: Whether to include cropper results for each page. This performs a cropping operation on the server and will increase response time. + + :param endpoint: For custom endpoints, an endpoint has to be given. """ if input_source is None: - raise TypeError("The 'enqueue' function requires an input document.") + raise TypeError("No input document provided.") if not endpoint: endpoint = self._initialize_ots_endpoint(product_class) @@ -179,13 +174,10 @@ def parse_queued( """ Parses a queued document. + :param product_class: The document class to use. + The response object will be instantiated based on this parameter. :param queue_id: queue_id received from the API - :param endpoint_name: For custom endpoints, the "API name" field in the "Settings" page of the API Builder. - Do not set for standard (off the shelf) endpoints. - :param account_name: For custom endpoints, your account or organization username on the API Builder. - This is normally not required unless you have a custom endpoint which has the - same name as standard (off the shelf) endpoint. - Do not set for standard (off the shelf) endpoints. + :param endpoint: For custom endpoints, an endpoint has to be given. """ if not endpoint: endpoint = self._initialize_ots_endpoint(product_class) @@ -194,6 +186,96 @@ def parse_queued( return self._get_queued_document(product_class, endpoint, queue_id) + def _validate_async_params( + self, initial_delay_sec: float, delay_sec: float + ) -> None: + if delay_sec < 2: + raise TypeError("Cannot set auto-parsing delay to less than 2 seconds.") + if initial_delay_sec < 4: + raise TypeError("Cannot set initial parsing delay to less than 4 seconds.") + + def enqueue_and_parse( + self, + product_class, + input_source: Union[LocalInputSource, UrlInputSource], + include_words: bool = False, + close_file: bool = True, + page_options: Optional[PageOptions] = None, + cropper: bool = False, + endpoint: Optional[Endpoint] = None, + initial_delay_sec: float = 6, + delay_sec: float = 3, + max_retries: int = 10, + ) -> AsyncPredictResponse: + """ + Enqueueing to an async endpoint. + + :param product_class: The document class to use. + The response object will be instantiated based on this parameter. + + :param input_source: The document/source file to use. + Has to be be created beforehand. + + :param include_words: Whether to include the full text for each page. + This performs a full OCR operation on the server and will increase response time. + + :param close_file: Whether to ``close()`` the file after parsing it. + Set to ``False`` if you need to access the file after this operation. + + :param page_options: If set, remove pages from the document as specified. + This is done before sending the file to the server and is useful to avoid page limitations. + + :param cropper: Whether to include cropper results for each page. + This performs a cropping operation on the server and will increase response time. + + :param endpoint: For custom endpoints, an endpoint has to be given. + + :param initial_delay_sec: Delay between each polling attempts + This should not be shorter than 4 seconds. + + :param delay_sec: Delay between each polling attempts + This should not be shorter than 2 seconds. + + :param max_retries: Total amount of polling attempts. + + """ + self._validate_async_params(initial_delay_sec, delay_sec) + if not endpoint: + endpoint = self._initialize_ots_endpoint(product_class) + queue_result = self.enqueue( + product_class, + input_source, + include_words, + close_file, + page_options, + cropper, + endpoint, + ) + logger.debug( + "Successfully enqueued document with job id: %s", queue_result.job.id + ) + sleep(initial_delay_sec) + retry_counter = 1 + poll_results = self.parse_queued(product_class, queue_result.job.id, endpoint) + while retry_counter < max_retries: + if poll_results.job.status == "completed": + break + logger.debug( + "Polling server for parsing result with job id: %s", queue_result.job.id + ) + retry_counter += 1 + sleep(delay_sec) + poll_results = self.parse_queued( + product_class, queue_result.job.id, endpoint + ) + + if poll_results.job.status != "completed": + raise RuntimeError( + f"Couldn't retrieve document after {retry_counter} tries." + ) + + return poll_results + def _make_request( self, product_class, @@ -231,9 +313,7 @@ def _predict_async( :param doc_config: Configuration of the document. """ if input_source is None: - raise TypeError( - "The '_predict_async' class method requires an input document." - ) + raise TypeError("No input document provided") if not endpoint: endpoint = self._initialize_ots_endpoint(product_class) response = endpoint.predict_async_req_post( diff --git a/mindee/parsing/common/job.py b/mindee/parsing/common/job.py index 49cc5618..745afcca 100644 --- a/mindee/parsing/common/job.py +++ b/mindee/parsing/common/job.py @@ -10,13 +10,13 @@ class Job: Will hold information on the queue a document has been submitted to. """ - id: Optional[str] = None + id: str """ID of the job sent by the API in response to an enqueue request.""" issued_at: datetime """Timestamp of the request reception by the API.""" available_at: Optional[datetime] = None """Timestamp of the request after it has been completed.""" - status: Optional[str] = None + status: str """Status of the request, as seen by the API.""" millisecs_taken: int """Time (ms) taken for the request to be processed by the API.""" @@ -30,8 +30,8 @@ def __init__(self, json_response: dict) -> None: self.issued_at = datetime.fromisoformat(json_response["issued_at"]) if json_response.get("available_at"): self.available_at = datetime.fromisoformat(json_response["available_at"]) - self.id = json_response.get("id") - self.status = json_response.get("status") + self.id = json_response["id"] + self.status = json_response["status"] if self.available_at: self.millisecs_taken = int( (self.available_at - self.issued_at).total_seconds() * 1000 diff --git a/tests/api/test_async_response.py b/tests/api/test_async_response.py index 3ed8fae5..a0aff0d1 100644 --- a/tests/api/test_async_response.py +++ b/tests/api/test_async_response.py @@ -2,10 +2,9 @@ import pytest -from mindee.client import OTS_OWNER, Client +from mindee.client import Client from mindee.input.sources import PathInput from mindee.parsing.common.async_predict_response import AsyncPredictResponse -from mindee.parsing.common.document import Document from mindee.product.invoice_splitter import InvoiceSplitterV1 ASYNC_DIR = "./tests/data/async" diff --git a/tests/product/invoice_splitter/test_invoice_splitter_v1.py b/tests/product/invoice_splitter/test_invoice_splitter_v1.py index 4355c5dd..e52abe7e 100644 --- a/tests/product/invoice_splitter/test_invoice_splitter_v1.py +++ b/tests/product/invoice_splitter/test_invoice_splitter_v1.py @@ -1,5 +1,4 @@ import json -from pathlib import Path import pytest diff --git a/tests/test_client.py b/tests/test_client.py index f68d21c6..d724373b 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -5,6 +5,7 @@ from mindee import Client, PageOptions, product from mindee.http.error import HTTPException from mindee.input.sources import LocalInputSource +from mindee.product.invoice_splitter.invoice_splitter_v1 import InvoiceSplitterV1 from mindee.product.receipt.receipt_v4 import ReceiptV4 from tests.test_inputs import FILE_TYPES_DIR from tests.utils import clear_envvars, dummy_envvars @@ -97,3 +98,17 @@ def test_cut_options(dummy_client: Client): pass assert input_doc.count_doc_pages() == 5 input_doc.close() + + +def test_async_wrong_initial_delay(dummy_client: Client): + input_doc = dummy_client.source_from_path(FILE_TYPES_DIR / "pdf" / "blank.pdf") + with pytest.raises(TypeError): + dummy_client.enqueue_and_parse( + InvoiceSplitterV1, input_doc, initial_delay_sec=0 + ) + + +def test_async_wrong_polling_delay(dummy_client: Client): + input_doc = dummy_client.source_from_path(FILE_TYPES_DIR / "pdf" / "blank.pdf") + with pytest.raises(TypeError): + dummy_client.enqueue_and_parse(InvoiceSplitterV1, input_doc, delay_sec=0)