From 4efcb5fc0b574df363adf836f8c52d20b884f731 Mon Sep 17 00:00:00 2001 From: cat101 Date: Fri, 24 Nov 2023 20:30:48 -0300 Subject: [PATCH 1/7] Added request pipelining --- landingai/pipeline/frameset.py | 30 +++++++++++++++++++++++------- landingai/predict.py | 20 +++++++++++++++----- landingai/timer.py | 26 ++++++++++++++++++-------- 3 files changed, 56 insertions(+), 20 deletions(-) diff --git a/landingai/pipeline/frameset.py b/landingai/pipeline/frameset.py index f063260d..965e0c39 100644 --- a/landingai/pipeline/frameset.py +++ b/landingai/pipeline/frameset.py @@ -4,6 +4,7 @@ import logging from datetime import datetime from typing import Any, Callable, Dict, Iterable, List, Optional, Type, Union, cast +from concurrent.futures import ThreadPoolExecutor, wait import warnings import cv2 @@ -141,13 +142,16 @@ def from_array(cls, array: np.ndarray, is_bgr: bool = True) -> "Frame": im = Image.fromarray(array) return cls(image=im) - def run_predict(self, predictor: Predictor) -> "Frame": + def run_predict(self, predictor: Predictor, reuse_session: bool = False) -> "Frame": """Run a cloud inference model Parameters ---------- predictor: the model to be invoked. + reuse_session + Whether to reuse the session for sending multiple inference requests. By default, the session is reused to improve the performance. If you want to send multiple requests in parallel, set this to False. + """ - self.predictions = PredictionList(predictor.predict(self.image)) # type: ignore + self.predictions = PredictionList(predictor.predict(self.image, reuse_session=reuse_session)) # type: ignore return self def overlay_predictions(self, options: Optional[Dict[str, Any]] = None) -> "Frame": @@ -413,15 +417,27 @@ def is_empty(self) -> bool: """ return not self.frames # True if the list is empty - def run_predict(self, predictor: Predictor) -> "FrameSet": + def run_predict(self, predictor: Predictor, pipelining: bool = False) -> "FrameSet": """Run a cloud inference model Parameters ---------- predictor: the model to be invoked. - """ - - for frame in self.frames: - frame.run_predict(predictor) + pipelining: whether to request predictions in parallel or sequentially. Parallel requests will help reduce the impact of fixed costs (e.g. network latency, transfer time, etc) but will consume more resources on the client and server side. + """ + + if pipelining: + # Remember that run_predict will retry indefinitely on 429 (with a 60 second delay). This logic is still ok for a multi-threaded context. + with ThreadPoolExecutor( + max_workers=5 + ) as executor: # TODO: make this configurable + futures = [ + executor.submit(frame.run_predict, predictor, reuse_session=False) + for frame in self.frames + ] + wait(futures) + else: + for frame in self.frames: + frame.run_predict(predictor) return self def overlay_predictions( diff --git a/landingai/predict.py b/landingai/predict.py index bb24645b..c6d275ec 100644 --- a/landingai/predict.py +++ b/landingai/predict.py @@ -116,7 +116,7 @@ def _build_default_headers( } @retry( - # All customers have a quota of images per minute. If the server return a 429, then we will wait 60 seconds and retry. + # All customers have a quota of images per minute. If the server return a 429, then we will wait 60 seconds and retry. Note that we will retry forever on 429s which is ok since the rate limiter will eventually allow the request to go through. retry=retry_if_exception_type(RateLimitExceededError), wait=wait_fixed(60), before_sleep=before_sleep_log(_LOGGER, logging.WARNING), @@ -301,6 +301,7 @@ def predict( self, image: Union[np.ndarray, PIL.Image.Image], metadata: Optional[InferenceMetadata] = None, + reuse_session: bool = True, **kwargs: Any, ) -> List[Prediction]: """Run Edge inference on the input image and return the prediction result. @@ -315,7 +316,8 @@ def predict( Note: The metadata is not reported back to LandingLens by default unless the edge inference server (i.e. ModelRunner) enables the feature of reporting historical inference results. See `landingai.common.InferenceMetadata` for more details. - + reuse_session + Whether to reuse the session for sending multiple inference requests. By default, the session is reused to improve the performance. If you want to send multiple requests in parallel, set this to False. Returns ------- List[Prediction] @@ -324,9 +326,17 @@ def predict( buffer_bytes = serialize_image(image) files = {"file": buffer_bytes} data = {"metadata": metadata.json()} if metadata else None - return _do_inference( - self._session, self._url, files, {}, _EdgeExtractor, data=data - ) + if reuse_session: + session = self._session + else: + session = _create_session( + self._url, + 0, + { + "contentType": "multipart/form-data" + }, # No retries for the inference service + ) + return _do_inference(session, self._url, files, {}, _EdgeExtractor, data=data) class _Extractor: diff --git a/landingai/timer.py b/landingai/timer.py index c38d6e7a..2d8299d1 100644 --- a/landingai/timer.py +++ b/landingai/timer.py @@ -5,6 +5,7 @@ import pprint import statistics import time +import threading from collections import defaultdict, deque from contextlib import ContextDecorator from dataclasses import dataclass, field @@ -148,23 +149,31 @@ def stuff(): ) log_fn: Callable[[str], None] = logging.getLogger(__name__).info _elapsed_time: float = field(default=math.nan, init=False, repr=False) - _start_time: Optional[float] = field(default=None, init=False, repr=False) + # Keep track of the start time for each thread and each timer (i.e. each function) + _thread_local_data: threading.local = field( + default=threading.local(), init=False, repr=False + ) def start(self) -> None: """Start a new timer.""" - if self._start_time is not None: + if not hasattr(self._thread_local_data, "_start_time"): + self._thread_local_data._start_time = {} + if self.name in self._thread_local_data._start_time: raise ValueError("Timer is running. Use .stop() to stop it") - - self._start_time = time.perf_counter() + self._thread_local_data._start_time[self.name] = time.perf_counter() def stop(self) -> float: """Stop the timer, and report the elapsed time.""" - if self._start_time is None: + if ( + not hasattr(self._thread_local_data, "_start_time") + or self.name not in self._thread_local_data._start_time + ): raise ValueError("Timer is not running. Use .start() to start it") # Calculate elapsed time - self._elapsed_time = time.perf_counter() - self._start_time - + self._elapsed_time = ( + time.perf_counter() - self._thread_local_data._start_time[self.name] + ) # Report elapsed time attributes = { "name": self.name, @@ -179,7 +188,8 @@ def stop(self) -> float: # Save stats Timer.stats.add(self.name, self._elapsed_time) - self._start_time = None + del self._thread_local_data._start_time[self.name] + return self._elapsed_time def __enter__(self) -> "Timer": From b91c4bb7714c32f998e1fb1c42ff3a7e1728f4e9 Mon Sep 17 00:00:00 2001 From: cat101 Date: Fri, 24 Nov 2023 20:36:12 -0300 Subject: [PATCH 2/7] Lintinh --- landingai/timer.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/landingai/timer.py b/landingai/timer.py index 2d8299d1..c7b9a512 100644 --- a/landingai/timer.py +++ b/landingai/timer.py @@ -11,7 +11,7 @@ from dataclasses import dataclass, field from enum import Enum from functools import partial -from typing import Any, Callable, ClassVar, Dict, MutableSequence, Optional, Sequence +from typing import Any, Callable, ClassVar, Dict, MutableSequence, Sequence class TextColor(Enum): From cf6011314ddf92233d7d4c8a550cc82c8c8282d1 Mon Sep 17 00:00:00 2001 From: cat101 Date: Mon, 4 Dec 2023 14:12:22 -0300 Subject: [PATCH 3/7] Disable OCR performance test --- tests/integration/landingai/test_predict_e2e.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/integration/landingai/test_predict_e2e.py b/tests/integration/landingai/test_predict_e2e.py index 825194c7..26eaa200 100644 --- a/tests/integration/landingai/test_predict_e2e.py +++ b/tests/integration/landingai/test_predict_e2e.py @@ -146,6 +146,7 @@ def test_class_predict(): img_with_masks.save("tests/output/test_class.jpg") +@pytest.mark.skip def test_ocr_predict(): Path("tests/output").mkdir(parents=True, exist_ok=True) predictor = OcrPredictor(api_key=_API_KEY) From 4d1092974179bc1165c595cd251ebe9d33660075 Mon Sep 17 00:00:00 2001 From: cat101 Date: Mon, 4 Dec 2023 16:13:18 -0300 Subject: [PATCH 4/7] Added tests --- tests/unit/landingai/test_predict.py | 34 ++++++++++++++++++++++++++++ 1 file changed, 34 insertions(+) diff --git a/tests/unit/landingai/test_predict.py b/tests/unit/landingai/test_predict.py index 83e02f8c..63ad27d3 100644 --- a/tests/unit/landingai/test_predict.py +++ b/tests/unit/landingai/test_predict.py @@ -24,6 +24,7 @@ ) from landingai.predict import EdgePredictor, Predictor from landingai.visualize import overlay_predictions +from landingai.pipeline.frameset import FrameSet, Frame def test_predict_with_none(): @@ -277,6 +278,39 @@ def test_edge_class_predict(connect_mock): img_with_masks.save("tests/output/test_edge_class.jpg") +@patch("socket.socket") +@responses.activate +def test_edge_batch_predict(connect_mock): + # Fake a successful connection + sock_instance = connect_mock.return_value + sock_instance.connect_ex.return_value = 0 + Path("tests/output").mkdir(parents=True, exist_ok=True) + responses._add_from_file( + file_path="tests/data/responses/test_edge_class_predict.yaml" + ) + # Project: https://app.landing.ai/app/376/pr/26119078438913/deployment?device=tiger-team-integration-tests + # run LandingEdge.CLI with cmdline parameters: run-online -k "your_api_key" -s "your_secret_key" -r 26119078438913 \ + # -m "59eff733-1dcd-4ace-b104-9041c745f1da" -n test_edge_cli --port 8123 + predictor = EdgePredictor("localhost", 8123) + test_image = "tests/data/images/wildfire1.jpeg" + frs = FrameSet.from_image(test_image) + for i in range(9): + frs.append(Frame.from_image(test_image)) + assert frs is not None + frs.run_predict(predictor=predictor, pipelining=True) + + for frame in frs: + assert len(frame.predictions) == 1, "Result should not be empty or None" + assert frame.predictions[0].label_name == "HasFire" + assert frame.predictions[0].label_index == 0 + np.testing.assert_almost_equal( + frame.predictions[0].score, + 0.99565023, + decimal=3, + err_msg="class score mismatch", + ) + + @responses.activate def test_connection_check(): with pytest.raises(ConnectionError): From 674c89979481538891ea78a1be8b9dde57a5c5a6 Mon Sep 17 00:00:00 2001 From: cat101 Date: Thu, 7 Dec 2023 19:22:43 -0300 Subject: [PATCH 5/7] Addressing some comments --- landingai/pipeline/frameset.py | 8 ++++---- tests/unit/landingai/test_predict.py | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/landingai/pipeline/frameset.py b/landingai/pipeline/frameset.py index 965e0c39..cac0671a 100644 --- a/landingai/pipeline/frameset.py +++ b/landingai/pipeline/frameset.py @@ -148,7 +148,7 @@ def run_predict(self, predictor: Predictor, reuse_session: bool = False) -> "Fra ---------- predictor: the model to be invoked. reuse_session - Whether to reuse the session for sending multiple inference requests. By default, the session is reused to improve the performance. If you want to send multiple requests in parallel, set this to False. + Whether to reuse the HTTPS session for sending multiple inference requests. By default, the session is reused to improve the performance on high latency networks (e.g. fewer SSL negotiations). If you are sending requests from multiple threads, set this to False. """ self.predictions = PredictionList(predictor.predict(self.image, reuse_session=reuse_session)) # type: ignore @@ -417,15 +417,15 @@ def is_empty(self) -> bool: """ return not self.frames # True if the list is empty - def run_predict(self, predictor: Predictor, pipelining: bool = False) -> "FrameSet": + def run_predict(self, predictor: Predictor, no_workers: int = 1) -> "FrameSet": """Run a cloud inference model Parameters ---------- predictor: the model to be invoked. - pipelining: whether to request predictions in parallel or sequentially. Parallel requests will help reduce the impact of fixed costs (e.g. network latency, transfer time, etc) but will consume more resources on the client and server side. + no_workers: By default a single worker will request predictions sequentially. Parallel requests can help reduce the impact of fixed costs (e.g. network latency, transfer time, etc) but will consume more resources on the client and server side. The number of workers should typically be under 5. A large number of workers when using cloud inference will be rate limited and produce no improvement. """ - if pipelining: + if no_workers > 1: # Remember that run_predict will retry indefinitely on 429 (with a 60 second delay). This logic is still ok for a multi-threaded context. with ThreadPoolExecutor( max_workers=5 diff --git a/tests/unit/landingai/test_predict.py b/tests/unit/landingai/test_predict.py index 63ad27d3..5652e021 100644 --- a/tests/unit/landingai/test_predict.py +++ b/tests/unit/landingai/test_predict.py @@ -297,7 +297,7 @@ def test_edge_batch_predict(connect_mock): for i in range(9): frs.append(Frame.from_image(test_image)) assert frs is not None - frs.run_predict(predictor=predictor, pipelining=True) + frs.run_predict(predictor=predictor, no_workers=5) for frame in frs: assert len(frame.predictions) == 1, "Result should not be empty or None" From e2d289c1e4e2da2d358fc82d1dc2969c7798b153 Mon Sep 17 00:00:00 2001 From: cat101 Date: Mon, 11 Dec 2023 11:51:04 -0300 Subject: [PATCH 6/7] More Fixes --- landingai/pipeline/frameset.py | 8 ++++---- landingai/predict.py | 2 +- tests/unit/landingai/test_predict.py | 2 +- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/landingai/pipeline/frameset.py b/landingai/pipeline/frameset.py index cac0671a..b6a37bc4 100644 --- a/landingai/pipeline/frameset.py +++ b/landingai/pipeline/frameset.py @@ -142,7 +142,7 @@ def from_array(cls, array: np.ndarray, is_bgr: bool = True) -> "Frame": im = Image.fromarray(array) return cls(image=im) - def run_predict(self, predictor: Predictor, reuse_session: bool = False) -> "Frame": + def run_predict(self, predictor: Predictor, reuse_session: bool = True) -> "Frame": """Run a cloud inference model Parameters ---------- @@ -417,7 +417,7 @@ def is_empty(self) -> bool: """ return not self.frames # True if the list is empty - def run_predict(self, predictor: Predictor, no_workers: int = 1) -> "FrameSet": + def run_predict(self, predictor: Predictor, num_workers: int = 1) -> "FrameSet": """Run a cloud inference model Parameters ---------- @@ -425,10 +425,10 @@ def run_predict(self, predictor: Predictor, no_workers: int = 1) -> "FrameSet": no_workers: By default a single worker will request predictions sequentially. Parallel requests can help reduce the impact of fixed costs (e.g. network latency, transfer time, etc) but will consume more resources on the client and server side. The number of workers should typically be under 5. A large number of workers when using cloud inference will be rate limited and produce no improvement. """ - if no_workers > 1: + if num_workers > 1: # Remember that run_predict will retry indefinitely on 429 (with a 60 second delay). This logic is still ok for a multi-threaded context. with ThreadPoolExecutor( - max_workers=5 + max_workers=num_workers ) as executor: # TODO: make this configurable futures = [ executor.submit(frame.run_predict, predictor, reuse_session=False) diff --git a/landingai/predict.py b/landingai/predict.py index c6d275ec..abefec6f 100644 --- a/landingai/predict.py +++ b/landingai/predict.py @@ -317,7 +317,7 @@ def predict( See `landingai.common.InferenceMetadata` for more details. reuse_session - Whether to reuse the session for sending multiple inference requests. By default, the session is reused to improve the performance. If you want to send multiple requests in parallel, set this to False. + Whether to reuse the HTTPS session for sending multiple inference requests. By default, the session is reused to improve the performance on high latency networks (e.g. fewer SSL negotiations). If you are sending requests from multiple threads, set this to False. Returns ------- List[Prediction] diff --git a/tests/unit/landingai/test_predict.py b/tests/unit/landingai/test_predict.py index 5652e021..fb95f692 100644 --- a/tests/unit/landingai/test_predict.py +++ b/tests/unit/landingai/test_predict.py @@ -297,7 +297,7 @@ def test_edge_batch_predict(connect_mock): for i in range(9): frs.append(Frame.from_image(test_image)) assert frs is not None - frs.run_predict(predictor=predictor, no_workers=5) + frs.run_predict(predictor=predictor, num_workers=5) for frame in frs: assert len(frame.predictions) == 1, "Result should not be empty or None" From 0d6ebd6ab14bfa78f288047e2bdd8390f64773f9 Mon Sep 17 00:00:00 2001 From: cat101 Date: Mon, 11 Dec 2023 12:08:37 -0300 Subject: [PATCH 7/7] commnet fix --- landingai/pipeline/frameset.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/landingai/pipeline/frameset.py b/landingai/pipeline/frameset.py index b6a37bc4..2608de10 100644 --- a/landingai/pipeline/frameset.py +++ b/landingai/pipeline/frameset.py @@ -422,7 +422,7 @@ def run_predict(self, predictor: Predictor, num_workers: int = 1) -> "FrameSet": Parameters ---------- predictor: the model to be invoked. - no_workers: By default a single worker will request predictions sequentially. Parallel requests can help reduce the impact of fixed costs (e.g. network latency, transfer time, etc) but will consume more resources on the client and server side. The number of workers should typically be under 5. A large number of workers when using cloud inference will be rate limited and produce no improvement. + num_workers: By default a single worker will request predictions sequentially. Parallel requests can help reduce the impact of fixed costs (e.g. network latency, transfer time, etc) but will consume more resources on the client and server side. The number of workers should typically be under 5. A large number of workers when using cloud inference will be rate limited and produce no improvement. """ if num_workers > 1: