Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Multithreading is significantly broken #106

Open
benoit74 opened this issue Mar 7, 2024 · 5 comments
Open

Multithreading is significantly broken #106

benoit74 opened this issue Mar 7, 2024 · 5 comments
Assignees
Labels
bug Something isn't working
Milestone

Comments

@benoit74
Copy link
Collaborator

benoit74 commented Mar 7, 2024

We are supposed to wait for all video + nodes processing, and return on first exception.

            futures = cf.wait(
                self.videos_futures + self.nodes_futures,
                return_when=cf.FIRST_EXCEPTION,
            )

Log of last khan_academy_fr as reported in #100 shows that multiple nodes had to fail before the scraper stopped.

I tried to debug / fix the issue locally but it is just pure non-sense for now, there is probably a bug somewhere but I do not achieve to understand where.

@benoit74
Copy link
Collaborator Author

benoit74 commented Mar 8, 2024

Block of code mentioned above in fact has two issues:

  • it does not fail on FIRST_EXCEPTION like it is written it should
  • it does not really wait for all videos because when this code is called, we do not yet have all videos_futures created and since we are creating a new list, only videos_futures which exist at this point in time are considered
    • luckily, all nodes_futures are already here, it takes a bit of time to create all of them so some have already been started, and it looks like the executor uses a stack model, processing last added futures first, so waiting for the firsts videos_futures is still mostly waiting for all of them because the firsts ones will be the last to be processed

I tried to replace it with something else:

            cf.wait(
                self.nodes_futures,
                return_when=cf.FIRST_EXCEPTION,
            )
            futures = cf.wait(
                self.videos_futures + self.nodes_futures,
                return_when=cf.FIRST_EXCEPTION,
            )

It seems better, but I'm pretty sure we still have issues with the FIRST_EXCEPTION clause, especially since we probably loose all exceptions on video threads which occurs before the nodes futures have completed.

Multithreading is always a very complex thing to implement. I've isolated #100 but I'm pretty sure we have other locks missing in our code, because even if a list/set/dictionary is thread-safe, it does not means that we do not have some portions of our code where we need locking, e.g. when checking that an item is not already in a list and then adding it to the list if it was missing.

It might be a bad idea, but at this point I suggest that we get rid of the multithreading in this (all?) scraper for now. We do not have the resources to set it up properly or maintain it in the long run. At least it is not ok now, and I always failed in such endeavor even if at first I always believed it was going to be easy. It makes logs very difficult to read because we never know which thread/process is "speaking". It does not help to have linear and predictable performances.

Zimit and iFixit do not have any multithreading. They are known to be quite "slow". But at least we know their behavior is more predictable, and code is easier to maintain.

WDYT?

@rgaudin
Copy link
Member

rgaudin commented Mar 8, 2024

Concurrency is complicated and makes maintenance and debugging difficult. That's a fact.

We deemed it necessary, knowingly, for performance reasons. We use some in almost all scrapers. Discarding it because it makes understanding/fixing an issue seems unreasonable.

This scraper (and others) were running for a long time. If I look at zimfarm runs, I see libretext kept working and khan-fr wasn't run since 11m ago, only to be run and failed from 3m ago (video-encoding-related). Between those dates, a lot of changes happened to the scraper: bootrap, scraperlib, video encoding, video concurrency…

If that helps, get rid of concurrency locally to ensure the rest works as expected then bring it back properly.

But to remove such a feature in a scraper (or all), we'd need a lot more arguments.
And we all know what “for now” means 😅

@benoit74
Copy link
Collaborator Author

benoit74 commented Mar 8, 2024

This scraper (and others) were running for a long time. If I look at zimfarm runs, I see libretext kept working and khan-fr wasn't run since 11m ago, only to be run and failed from 3m ago (video-encoding-related). Between those dates, a lot of changes happened to the scraper: bootrap, scraperlib, video encoding, video concurrency…

Doesn't prove it was working properly. libretext has no video encoding, so sure it works properly. The log of khan-fr has already a multithreading issue, whole log is gone but what's left in scraper stdout on https://farm.openzim.org/pipeline/4b5053ff-0ed4-4433-8217-119a5d8ae7d7/debug shows that there have been multiple exceptions in video processing which did not stopped the scraper.

And we all know what “for now” means 😅

You made a point 🤣

If that helps, get rid of concurrency locally to ensure the rest works as expected then bring it back properly.

I don't feel like it is necessary, but thank you! It is just that we need to all be aware that I will spend time to fix some things now, and other bugs will probably arise in the future. But I'm committed to do by best to avoid/fix as many as possible! (PR for the bugs I'm aware of is almost ready indeed)

@benoit74 benoit74 added this to the 1.2.2 milestone Mar 8, 2024
@rgaudin
Copy link
Member

rgaudin commented Mar 8, 2024

Understandable ; fortunately we only have a few kolibri recipes for now so it will be easy to spot regressions

@benoit74
Copy link
Collaborator Author

I'm finally sorry to say that there is no PR ready yet, I do not achieve to make multiprocessing work as expected:

  • stop on first exception in a task (node or video)
  • wait for all nodes and all videos completion

I've reproduced the problem in an isolated test case

import concurrent.futures as cf
from time import sleep
import functools

from datetime import datetime

# Without callbacks
#
# Expected behavior
# t0: all nodes added to the working queue, node 0 and node 1 starts
# t3: node 0 submits video 0
#     node 0 and node 1 complete
#     video 0 starts
#     node 2 and node 3 starts
# t6: node 2 submits video 2
#     node 2 and node 3 completes
#     node 4 and node 5 starts
# t6.5: video 0 fails
#       video 2 starts
# t7: checks detects that exception occured and cancels everything
# finished
#
# Real behavior with ThreadPoolExecutor (not perfect - ru
# nning tasks are continuing till
# they end, but more or less ok)
# t0: all nodes added to the working queue, node 0 and node 1 starts
# t3: node 0 submits video 0
#     node 0 and node 1 complete
#     video 0 starts
#     node 2 and node 3 starts
# t6: node 2 submits video 2
#     node 2 and node 3 completes
#     node 4 and node 5 starts
# t6.5: video 0 fails
#       video 2 starts
# t7: checks detects that exception occured and cancels everything
# t8: node 4 and node 5 completes <== UNEXPECTED
# t10: video 2 completes <== UNEXPECTED
# finished

# ---------------
# With callbacks
# ---------------
#
# With callbacks, the callback is executed at the end of the future, in "main" thread,
# so we assume it will delay the start of the next future
#
# Expected behavior with callbacks
# t0: all nodes added to the working queue
#     node 0 and node 1 starts
# t3: node 0 submits video 0
#     node 0 and node 1 complete
#     video 0 starts
#     callbacks of node 0 and node 1 starts
# t4: callbacks of node 0 and node 1 completes
#     node 2 and node 3 starts
# t6.5: video 0 fails
#       callback of video 0 starts
# t7: node 2 and node 3 completes
#     callbacks of node 2 and node 3 starts
#     checks detects that exception occured and cancels everything
# t7.5: callback of video 0 completes
# t8: callbacks of node 2 and node 3 completes
# finished
#
#
# Real behavior with ThreadPoolExecutor (a real mess, way too many things are executed)
# t0: all nodes added to the working queue
#     node 0 and node 1 starts
# t3: node 0 submits video 0
#     node 0 and node 1 complete
#     video 0 starts
#     callbacks of node 0 and node 1 starts
# t4: callbacks of node 0 and node 1 completes
#     node 2 and node 3 starts
# t6.5: video 0 fails
#       callback of video 0 starts
# t7: node 2 and node 3 completes
#     callbacks of node 2 and node 3 starts
#     checks detects that exception occured and cancels everything
#     callback of node 4 starts <= WHY?, future did not even started
# t7.5: callback of video 0 completes
#     video 2 starts <= WHY? we are supposed to cancel futures, not start new ones
# t8: callbacks of node 2 and node 3 completes
#     node 5 and node 6 starts <= WHY?
# t11: video 2 complete <= WHY?
#     callback of video 2 starts <= WHY?
#     node 5 and node 6 complete
#     callback of node 5 and node 6 starts <= WHY?
# t12: callback of video 2 complete
#     video 6 starts <= WHY?
#     callback of node 5 and node 6 completes
# t15: video 6 completes
#     callback of video 6 starts <= WHY?
# t16: call of video 6 complete
# finished

# Alter this to activate/deactivate callbacks
WITH_CALLBACKS = True

start = datetime.now()


def log(message: str):
    print(f"{round((datetime.now() - start).total_seconds(), 1)} - {message}")


def wait_for(seconds: float):
    mystart = datetime.now()
    while (datetime.now() - mystart).total_seconds() < seconds:
        pass


def node_callback(future: cf.Future, a: int):
    log(f"Node {a} callback started")
    wait_for(1)
    log(f"Node {a} callback finished")


def video_callback(future: cf.Future, a: int):
    log(f"Video {a} callback started")
    wait_for(1)
    log(f"Video {a} callback finished")


def process_video(a: int):
    log(f"Video {a} started")
    wait_for(3.5)
    if a < 1:
        log(f"Video {a} failed")
        raise Exception(f"Video problem with {a}")
    log(f"Video {a} completed")


def process_node(a: int):
    log(f"Node {a} started")
    wait_for(3)
    if a % 2 == 0:
        future = videos_executor.submit(process_video, a)
        videos_futures.add(future)
        if WITH_CALLBACKS:
            future.add_done_callback(functools.partial(video_callback, a=a))
    log(f"Node {a} completed")


# ProcessPoolExecutor has a very weird behavior, I struggled to make it work as expected
# ThreadPoolExecutor seems to be more stable, even if I don't really know why
nodes_executor = cf.ThreadPoolExecutor(max_workers=2)
videos_executor = cf.ThreadPoolExecutor(max_workers=1)

nodes_futures: set[cf.Future] = set()
videos_futures: set[cf.Future] = set()

for a in range(7):
    future = nodes_executor.submit(process_node, a)
    nodes_futures.add(future)
    if WITH_CALLBACKS:
        future.add_done_callback(functools.partial(node_callback, a=a))

while True:
    # we cannot use the cf.wait since videos_futures is not yet set (it needs nodes to
    # be started) and we would hence not capture exceptions occuring in nodes_futures
    # cf.wait(nodes_futures | videos_futures, return_when=cf.FIRST_EXCEPTION)
    wait_for(1)
    # log("Checking status")
    if (
        sum(1 if future._exception else 0 for future in nodes_futures | videos_futures)
        > 0
    ):  # we have to use ._exception, because .exception() if waiting for future to
        # complete
        log("Exception encountered")
        break

    if sum(0 if future.done() else 1 for future in nodes_futures | videos_futures) == 0:
        log("All tasks completed")
        break


log(
    f"{sum(1 if future.done() else 0 for future in nodes_futures | videos_futures)} tasks done"
)
log(
    f"{sum(0 if future.done() else 1 for future in nodes_futures | videos_futures)} tasks not done"
)

# this works more or else, because it cancels only futures which were not already
# started + it waits for only running futures to complete before returning, meaning that
# videos futures are cancelled only once the nodes futures have all completed
log("Shutting down nodes")
nodes_executor.shutdown(cancel_futures=True)
log("Shutting down videos")
videos_executor.shutdown(cancel_futures=True)
log("All exectutors shut down")

log(
    f"{sum(1 if future.done() else 0 for future in nodes_futures | videos_futures)} tasks done"
)
log(
    f"{sum(0 if future.done() else 1 for future in nodes_futures | videos_futures)} tasks not done"
)

For now I consider that concurrent.futures module is not behaving like we want it to. In addition to the requirements not been met, I had to poll continuously to find task failures, and use the private _exception property.

At this stage, I recommend to invest some development days to migrate to multiprocessing module. More code will be needed but at least we will be able to tune our stuff like we want. Or maybe there is another Python package doing this properly. If we do it on our own, we should definitely share this in python-scraperlib so that it is reused across Python scrapers.

I think that for the coming weeks / months we can live with this "bug", it seems to mainly mean that the scraper will not stop on first task failure ... but it probably has always been so.

@kelson42 kelson42 pinned this issue Jun 11, 2024
@benoit74 benoit74 modified the milestones: 1.2.2, 2.0.0 Sep 24, 2024
@benoit74 benoit74 self-assigned this Sep 24, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

2 participants