Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug] How to handle exceptions raised in parallelized function #63

Open
Wenzel opened this issue Dec 11, 2020 · 8 comments
Open

[Bug] How to handle exceptions raised in parallelized function #63

Wenzel opened this issue Dec 11, 2020 · 8 comments
Labels
enhancement New feature or request

Comments

@Wenzel
Copy link

Wenzel commented Dec 11, 2020

Describe the bug
I would like to know how I can handle any exception that would occur in the function that I'm trying to parallelize

Minimal code to reproduce

#!/ust/bin/env python3

import pypeln as pl

def compute(x):
    if x == 3:
        raise ValueError("Value 3 is not supported")
    else:
        return x*x

data = [1, 2, 3, 4, 5]
stage = pl.process.map(compute, data, workers=4)

for x in stage:
    print(f"Result: {x}")

Results

Result: 1
Result: 4
Traceback (most recent call last):
  File "test.py", line 14, in <module>
    for x in stage:
  File "/home/wenzel/local/test_python/pypeln/venv/lib/python3.8/site-packages/pypeln/process/stage.py", line 83, in to_iterable
    for elem in main_queue:
  File "/home/wenzel/local/test_python/pypeln/venv/lib/python3.8/site-packages/pypeln/process/queue.py", line 48, in __iter__
    raise exception
ValueError: 

('Value 3 is not supported',)

Traceback (most recent call last):
  File "/home/wenzel/local/test_python/pypeln/venv/lib/python3.8/site-packages/pypeln/process/worker.py", line 99, in __call__
    self.process_fn(
  File "/home/wenzel/local/test_python/pypeln/venv/lib/python3.8/site-packages/pypeln/process/worker.py", line 186, in __call__
    self.apply(worker, elem, **kwargs)
  File "/home/wenzel/local/test_python/pypeln/venv/lib/python3.8/site-packages/pypeln/process/api/map.py", line 27, in apply
    y = self.f(elem.value, **kwargs)
  File "test.py", line 7, in compute
    raise ValueError("Value 3 is not supported")
ValueError: Value 3 is not supported

Expected behavior
I have no expected behavior.
instead, i was looking for a way to use the API and get some error recovery.
In this situation the whole pipeline is broken, and I'm not sure how to recover.

I'm trying to see if I can switch to your library, coming from concurrent.futures.

This is the operation i would like to do (demo with concurrent.futures):

class Downloader(AbstractContextManager):

    def __init__(self):
        # let Python decide how many workers to use
        # usually the best decision for IO tasks
        self._logger = logging.getLogger(f"{self.__class__.__module__}.{self.__class__.__name__}")
        self._dl_pool = ThreadPoolExecutor()
        self._future_to_obj: Dict[Future, FutureData] = {}
        self.stats = Counter()

    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        self._dl_pool.shutdown()

    def submit(self, url: str, callback: Callable):
        user_data = (url, )
        future_data = FutureData(user_data, callback)
        future = self._dl_pool.submit(self._download_url, *user_data)
        self._future_to_obj[future] = future_data
        future.add_done_callback(self._on_download_done)
        self.stats["submitted"] += 1

    def _download_url(self, url: str) -> str:
         # this function might raise multiple network errors
         # .....
         return r.read()

    def _on_download_done(self, future: Future):
        try:
            future_data: FutureData = self._future_to_obj[future]
        except KeyError:
            self._logger.debug("Failed to find obj in callback for %s", future)
            self.stats["future_fail"] += 1
            return
        else:
            # call the user callback
            url, *rest = future_data.user_data
            try:
                data = future.result()
            except Exception:   # Here we have error recovery
                self._logger.debug("Error while fetching resource: %s", url)
                self.stats["fetch_error"] += 1
            else:
                future_data.user_callback(*future_data.user_data, data)
            finally:
                self.stats["total"] += 1

⬆️ TLDR I'm using add_done_callback in order to chain my futures into the next function and create a pipeline.
But as i'm dealing with Future objects, their exception is only raised when you try to access their result() (which is not the case with pypeln)

Library Info
0.4.6

Additional context
Add any other context about the problem here.

Thanks for your library, it looks amazing !

@Wenzel Wenzel added the bug Something isn't working label Dec 11, 2020
@cgarciae
Copy link
Owner

cgarciae commented Jan 2, 2021

Hey, sorry this response is very late.

Pypeln forwards the error object from the workers processes to the main thread / process where the stage is being consumed, in this case to the for loop. You can handle it like you would handle error in regular iterators:

#!/ust/bin/env python3

import pypeln as pl
def compute(x):
    if x == 3:
        raise ValueError("Value 3 is not supported")
    else:
        return x * x

data = [1, 2, 3, 4, 5]
stage = pl.process.map(compute, data, workers=4)

try:
    for x in stage:
        print(f"Result: {x}")
except ValueError as e:
    print(f"Got {e}")

@Wenzel
Copy link
Author

Wenzel commented Jan 2, 2021

Hi @cgarciae ,

Hey, sorry this response is very late.

no worries, and happy new year 👍

You can handle it like you would handle error in regular iterators

I understand the solution your proposed, however it doesn't fit my usecase, because handling the error would break the iteration and the pipeline.

How would you implement something like this:

try:
    for x in stage:
        print(f"Result: {x}")
except ValueError as e:
    print(f"Got {e}")
    continue  # sort of

Unless there is a hidden mechanic that I'm not seeing, I don't understand how your pipeline iteration can recover from an error and continue iterating on the rest of the values ?

Thank you for your time.

@cgarciae
Copy link
Owner

cgarciae commented Jan 2, 2021

I see, so in looking a bit more at your problem specific code it seems you want to know which elements from the stage failed. You could create a decorator that turns exceptions into return values:

#!/ust/bin/env python3

import pypeln as pl
import functools

def return_exceptions(f):
    @functools.wraps(f)
    def wrapped(x):
        if isinstance(x, BaseException):
            return x
        try:
            return f(x)
        except BaseException as e:
            return e
    return wrapped

@return_exceptions
def compute(x):
    if x == 3:
        raise ValueError("Value 3 is not supported")
    else:
        return x * x

With this you can either handle them immediately in the main thread:

data = [1, 2, 3, 4, 5]
stage = pl.process.map(compute, data, workers=4)

for x in stage:
    if instance(x, BaseException):
        # log error
    else:
        # do something

Or even construct longer pipelines:

@return_exceptions
def compute_more(x):
    ...

data = [1, 2, 3, 4, 5]
stage = pl.process.map(compute, data, workers=4)
stage = pl.thread.map(compute_more, stage, workers=2)

for x in stage:
    if instance(x, BaseException):
        # log error
    else:
        # do something

Maybe error handling of this type could be incorporated into the library either by providing these decorators or directly having a flag throughout the API. It would be nice to see alternative solutions before commiting to something.

@cgarciae cgarciae added enhancement New feature or request and removed bug Something isn't working labels Jan 2, 2021
@Wenzel
Copy link
Author

Wenzel commented Jan 2, 2021

Hey @cgarciae

thank you very much for your proposal !
It works as expected:

#!/usr/bin/env python3

import pypeln as pl
import functools

def return_exceptions(f):
    @functools.wraps(f)
    def wrapped(x):
        if isinstance(x, BaseException):
            return x
        try:
            return f(x)
        except BaseException as e:
            return e
    return wrapped

@return_exceptions
def compute(x):
    if x == 3:
        raise ValueError("Value 3 is not supported")
    else:
        return x*x

@return_exceptions
def compute_more(x):
    if x == 25:
        raise ValueError("Value 25 is not supported")
    else:
        return x+1

data = [1, 2, 3, 4, 5]
stage = pl.process.map(compute, data, workers=4)
stage = pl.process.map(compute_more, stage, workers=4)

for x in stage:
    if isinstance(x, BaseException):
        print(f"Exception ! {x}")
        continue
    print(f"Result: {x}")
Result: 2
Result: 5
Exception ! Value 3 is not supported
Result: 17
Exception ! Value 25 is not supported

Maybe error handling of this type could be incorporated into the library either by providing these decorators or directly having a flag throughout the API. It would be nice to see alternative solutions before commiting to something.

I agree that it would be a nice additions to pypeln.
I few things to think about:

  • What are the performance implications of isinstance(x, BaseException) ?
  • Could we investigate to return Future objects, like what concurrent.futures is already doing ? The exception is only raised when actually accessing the result: future.result().

@cgarciae
Copy link
Owner

cgarciae commented Jan 2, 2021

As you suggest, an alternative es to create an "applicative" interface e.g. Result and manage the success and failed states of the computation via an apply method, and only raise exceptions if the users wants to get access to the actual value outside the "monad":

import pypeln as pl
import functools
from abc import ABC, abstractmethod


class Result(ABC):
    @abstractmethod
    def result(self):
        ...

    @abstractmethod
    def apply(self, f):
        ...


class Ok(Result):
    def __init__(self, value):
        self.value = value

    def result(self):
        return self.value

    def apply(self, f) -> Result:
        try:
            y = f(self.value)
        except BaseException as e:
            return Failed(e)

        return Ok(y)


class Failed(Result):
    def __init__(self, exception: BaseException):
        self.exception = exception

    def result(self):
        raise self.exception

    def apply(self, f) -> Result:
        return self


def return_exceptions(f):
    @functools.wraps(f)
    def wrapped(x):
        if not isinstance(x, Result):
            x = Ok(x)

        return x.apply(f)

    return wrapped


@return_exceptions
def compute(x):
    if x == 3:
        raise ValueError("Value 3 is not supported")
    else:
        return x * x


@return_exceptions
def compute_more(x):
    if x == 25:
        raise ValueError("Value 25 is not supported")
    else:
        return x + 1


data = [1, 2, 3, 4, 5]
stage = pl.process.map(compute, data, workers=4)
stage = pl.process.map(compute_more, stage, workers=4)

for x in stage:
    try:
        y = x.result()
        print(f"Result: {y}")
    except BaseException as e:
        print(f"Exception ! {e}")

The nice thing is that it makes it clear to the user that it has to handle success and failure states, on the other hand I think this kind of functionality should be opt-in for users since its not that common in Python.

@cgarciae
Copy link
Owner

cgarciae commented Jan 2, 2021

What are the performance implications of isinstance(x, BaseException) ?

Being a native function I guess it would be implemented in C and should not be a problem. There are already a bunch of instance in the codebase.

@npuichigo
Copy link

@cgarciae How to use return_exceptions together with flat_map. In flat_map, the doc says you are able to filter out unwanted elements when there are exceptions, missing data, etc. but I have no idea how it works.

@cgarciae
Copy link
Owner

cgarciae commented May 17, 2023

Hey @npuichigo, you can use flat_map to filter exceptions using this pattern:

import pypeln as pl

def map_or_filter(x):
    try:
        y = ... # do stuff
        yield y
    except:
        pass # not yielding acts like a filter
        
state = pl.process.flat_map(map_or_filter, data)
...

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

3 participants