Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added parallel inference support to FrameSet #161

Merged
merged 7 commits into from
Dec 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 23 additions & 7 deletions landingai/pipeline/frameset.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import logging
from datetime import datetime
from typing import Any, Callable, Dict, Iterable, List, Optional, Type, Union, cast
from concurrent.futures import ThreadPoolExecutor, wait
import warnings

import cv2
Expand Down Expand Up @@ -141,13 +142,16 @@ def from_array(cls, array: np.ndarray, is_bgr: bool = True) -> "Frame":
im = Image.fromarray(array)
return cls(image=im)

def run_predict(self, predictor: Predictor) -> "Frame":
def run_predict(self, predictor: Predictor, reuse_session: bool = True) -> "Frame":
"""Run a cloud inference model
Parameters
----------
predictor: the model to be invoked.
reuse_session
Whether to reuse the HTTPS session for sending multiple inference requests. By default, the session is reused to improve the performance on high latency networks (e.g. fewer SSL negotiations). If you are sending requests from multiple threads, set this to False.

"""
self.predictions = PredictionList(predictor.predict(self.image)) # type: ignore
self.predictions = PredictionList(predictor.predict(self.image, reuse_session=reuse_session)) # type: ignore
return self

def overlay_predictions(self, options: Optional[Dict[str, Any]] = None) -> "Frame":
Expand Down Expand Up @@ -413,15 +417,27 @@ def is_empty(self) -> bool:
"""
return not self.frames # True if the list is empty

def run_predict(self, predictor: Predictor) -> "FrameSet":
def run_predict(self, predictor: Predictor, num_workers: int = 1) -> "FrameSet":
"""Run a cloud inference model
Parameters
----------
predictor: the model to be invoked.
"""

for frame in self.frames:
frame.run_predict(predictor)
num_workers: By default a single worker will request predictions sequentially. Parallel requests can help reduce the impact of fixed costs (e.g. network latency, transfer time, etc) but will consume more resources on the client and server side. The number of workers should typically be under 5. A large number of workers when using cloud inference will be rate limited and produce no improvement.
"""

if num_workers > 1:
# Remember that run_predict will retry indefinitely on 429 (with a 60 second delay). This logic is still ok for a multi-threaded context.
with ThreadPoolExecutor(
max_workers=num_workers
) as executor: # TODO: make this configurable
futures = [
executor.submit(frame.run_predict, predictor, reuse_session=False)
for frame in self.frames
]
wait(futures)
else:
for frame in self.frames:
frame.run_predict(predictor)
return self

def overlay_predictions(
Expand Down
20 changes: 15 additions & 5 deletions landingai/predict.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ def _build_default_headers(
}

@retry(
# All customers have a quota of images per minute. If the server return a 429, then we will wait 60 seconds and retry.
# All customers have a quota of images per minute. If the server return a 429, then we will wait 60 seconds and retry. Note that we will retry forever on 429s which is ok since the rate limiter will eventually allow the request to go through.
retry=retry_if_exception_type(RateLimitExceededError),
wait=wait_fixed(60),
before_sleep=before_sleep_log(_LOGGER, logging.WARNING),
Expand Down Expand Up @@ -301,6 +301,7 @@ def predict(
self,
image: Union[np.ndarray, PIL.Image.Image],
metadata: Optional[InferenceMetadata] = None,
reuse_session: bool = True,
**kwargs: Any,
) -> List[Prediction]:
"""Run Edge inference on the input image and return the prediction result.
Expand All @@ -315,7 +316,8 @@ def predict(
Note: The metadata is not reported back to LandingLens by default unless the edge inference server (i.e. ModelRunner) enables the feature of reporting historical inference results.

See `landingai.common.InferenceMetadata` for more details.

reuse_session
Whether to reuse the HTTPS session for sending multiple inference requests. By default, the session is reused to improve the performance on high latency networks (e.g. fewer SSL negotiations). If you are sending requests from multiple threads, set this to False.
Returns
-------
List[Prediction]
Expand All @@ -324,9 +326,17 @@ def predict(
buffer_bytes = serialize_image(image)
files = {"file": buffer_bytes}
data = {"metadata": metadata.json()} if metadata else None
return _do_inference(
self._session, self._url, files, {}, _EdgeExtractor, data=data
)
if reuse_session:
session = self._session
else:
session = _create_session(
self._url,
0,
{
"contentType": "multipart/form-data"
}, # No retries for the inference service
)
return _do_inference(session, self._url, files, {}, _EdgeExtractor, data=data)


class _Extractor:
Expand Down
28 changes: 19 additions & 9 deletions landingai/timer.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,13 @@
import pprint
import statistics
import time
import threading
from collections import defaultdict, deque
from contextlib import ContextDecorator
from dataclasses import dataclass, field
from enum import Enum
from functools import partial
from typing import Any, Callable, ClassVar, Dict, MutableSequence, Optional, Sequence
from typing import Any, Callable, ClassVar, Dict, MutableSequence, Sequence


class TextColor(Enum):
Expand Down Expand Up @@ -148,23 +149,31 @@ def stuff():
)
log_fn: Callable[[str], None] = logging.getLogger(__name__).info
_elapsed_time: float = field(default=math.nan, init=False, repr=False)
_start_time: Optional[float] = field(default=None, init=False, repr=False)
# Keep track of the start time for each thread and each timer (i.e. each function)
_thread_local_data: threading.local = field(
default=threading.local(), init=False, repr=False
)

def start(self) -> None:
"""Start a new timer."""
if self._start_time is not None:
if not hasattr(self._thread_local_data, "_start_time"):
self._thread_local_data._start_time = {}
if self.name in self._thread_local_data._start_time:
raise ValueError("Timer is running. Use .stop() to stop it")

self._start_time = time.perf_counter()
self._thread_local_data._start_time[self.name] = time.perf_counter()

def stop(self) -> float:
"""Stop the timer, and report the elapsed time."""
if self._start_time is None:
if (
not hasattr(self._thread_local_data, "_start_time")
or self.name not in self._thread_local_data._start_time
):
raise ValueError("Timer is not running. Use .start() to start it")

# Calculate elapsed time
self._elapsed_time = time.perf_counter() - self._start_time

self._elapsed_time = (
time.perf_counter() - self._thread_local_data._start_time[self.name]
)
# Report elapsed time
attributes = {
"name": self.name,
Expand All @@ -179,7 +188,8 @@ def stop(self) -> float:
# Save stats
Timer.stats.add(self.name, self._elapsed_time)

self._start_time = None
del self._thread_local_data._start_time[self.name]

return self._elapsed_time

def __enter__(self) -> "Timer":
Expand Down
1 change: 1 addition & 0 deletions tests/integration/landingai/test_predict_e2e.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ def test_class_predict():
img_with_masks.save("tests/output/test_class.jpg")


@pytest.mark.skip
def test_ocr_predict():
Path("tests/output").mkdir(parents=True, exist_ok=True)
predictor = OcrPredictor(api_key=_API_KEY)
Expand Down
34 changes: 34 additions & 0 deletions tests/unit/landingai/test_predict.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
)
from landingai.predict import EdgePredictor, Predictor
from landingai.visualize import overlay_predictions
from landingai.pipeline.frameset import FrameSet, Frame


def test_predict_with_none():
Expand Down Expand Up @@ -277,6 +278,39 @@ def test_edge_class_predict(connect_mock):
img_with_masks.save("tests/output/test_edge_class.jpg")


@patch("socket.socket")
@responses.activate
def test_edge_batch_predict(connect_mock):
# Fake a successful connection
sock_instance = connect_mock.return_value
sock_instance.connect_ex.return_value = 0
Path("tests/output").mkdir(parents=True, exist_ok=True)
responses._add_from_file(
file_path="tests/data/responses/test_edge_class_predict.yaml"
)
# Project: https://app.landing.ai/app/376/pr/26119078438913/deployment?device=tiger-team-integration-tests
# run LandingEdge.CLI with cmdline parameters: run-online -k "your_api_key" -s "your_secret_key" -r 26119078438913 \
# -m "59eff733-1dcd-4ace-b104-9041c745f1da" -n test_edge_cli --port 8123
predictor = EdgePredictor("localhost", 8123)
test_image = "tests/data/images/wildfire1.jpeg"
frs = FrameSet.from_image(test_image)
for i in range(9):
frs.append(Frame.from_image(test_image))
assert frs is not None
frs.run_predict(predictor=predictor, num_workers=5)

for frame in frs:
assert len(frame.predictions) == 1, "Result should not be empty or None"
assert frame.predictions[0].label_name == "HasFire"
assert frame.predictions[0].label_index == 0
np.testing.assert_almost_equal(
frame.predictions[0].score,
0.99565023,
decimal=3,
err_msg="class score mismatch",
)


@responses.activate
def test_connection_check():
with pytest.raises(ConnectionError):
Expand Down