diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml new file mode 100644 index 00000000..f735b2df --- /dev/null +++ b/.github/workflows/ci.yaml @@ -0,0 +1,34 @@ +name: Run Tests + +on: + [ push, pull_request ] + +jobs: + test: + runs-on: ubuntu-latest + + steps: + - name: Check out code + uses: actions/checkout@v2 + + - name: Set up Python + uses: actions/setup-python@v2 + with: + python-version: '3.8' + + - name: Install dependencies + run: | + python -m pip install --upgrade pip + pip install -r requirements.txt + + - name: Install mypy + run: | + python -m pip install mypy + + - name: Run mypy + run: | + mypy . + + - name: Run tests + run: | + python ./scripts/run_tests.py diff --git a/mypy.ini b/mypy.ini new file mode 100644 index 00000000..624cd195 --- /dev/null +++ b/mypy.ini @@ -0,0 +1,2 @@ +[mypy] +exclude = venv/ diff --git a/project/thread_pool.py b/project/thread_pool.py new file mode 100644 index 00000000..497df505 --- /dev/null +++ b/project/thread_pool.py @@ -0,0 +1,106 @@ +import threading +import queue +from typing import Callable, Optional, Any, Tuple, List + + +class ThreadPool: + """A thread pool for executing tasks asynchronously in multiple threads. + + This class manages a pool of worker threads that process tasks from a queue. + Tasks are executed in parallel, and an optional callback can be provided to + handle the result of each task. The thread pool can be safely shut down using + the `dispose` method. + + Attributes: + num_threads (int): The number of worker threads in the pool. + tasks (queue.Queue[Tuple[Callable[[], Any], Optional[Callable[[Any], None]]]]): + A queue of tasks to be executed, where each task is a tuple of a callable + and an optional callback. + threads (List[threading.Thread]): The list of worker threads. + shutdown_flag (threading.Event): A flag to signal the threads to stop. + """ + + def __init__(self, num_threads: int): + """Initialize the thread pool with a specified number of worker threads. + + Args: + num_threads (int): The number of threads to create in the pool. Must be + a positive integer. + + Raises: + ValueError: If `num_threads` is less than or equal to 0. + """ + if num_threads <= 0: + raise ValueError("Number of threads must be a positive integer") + + self.num_threads: int = num_threads + self.tasks: queue.Queue[ + Tuple[Callable[[], Any], Optional[Callable[[Any], None]]] + ] = queue.Queue() + self.threads: List[threading.Thread] = [] + self.shutdown_flag: threading.Event = threading.Event() + + for _ in range(num_threads): + thread = threading.Thread(target=self._worker) + thread.daemon = True + thread.start() + self.threads.append(thread) + + def _worker(self) -> None: + """Worker function that processes tasks from the queue. + + This method runs in each worker thread and continuously retrieves tasks + from the queue until the shutdown flag is set. Each task is executed, and + if a callback is provided, it is called with the task's result. + + The method uses a timeout to periodically check the shutdown flag, allowing + the thread to exit gracefully when the pool is disposed. + """ + while not self.shutdown_flag.is_set(): + try: + task, callback = self.tasks.get( + timeout=1 + ) # Таймаут для проверки shutdown_flag + result = task() + if callback: + callback(result) + self.tasks.task_done() + except queue.Empty: + continue + + def enqueue( + self, task: Callable[[], Any], callback: Optional[Callable[[Any], None]] = None + ) -> None: + """Add a task to the queue for execution. + + The task will be executed by one of the worker threads as soon as possible. + If a callback is provided, it will be called with the result of the task. + + Args: + task (Callable[[], Any]): A callable that takes no arguments and returns + a result of any type. + callback (Optional[Callable[[Any], None]]): An optional callable that + takes the result of the task as an argument and returns nothing. + Defaults to None. + + Notes: + If the thread pool has been disposed (i.e., `dispose` has been called), + the task will not be enqueued. + """ + if not self.shutdown_flag.is_set(): + self.tasks.put((task, callback)) + + def dispose(self) -> None: + """Shut down the thread pool and wait for all threads to finish. + + This method sets the shutdown flag, which causes all worker threads to stop + processing new tasks, and then waits for all threads to complete their current + tasks and terminate. + + Notes: + After calling this method, no new tasks can be enqueued. Any tasks still + in the queue will not be processed. + """ + self.shutdown_flag.set() + for thread in self.threads: + thread.join() diff --git a/tests/test_basic.py b/tests/test_basic.py deleted file mode 100644 index 4811167b..00000000 --- a/tests/test_basic.py +++ /dev/null @@ -1,18 +0,0 @@ -import pytest -import project # on import will print something from __init__ file - - -def setup_module(module): - print("basic setup module") - - -def teardown_module(module): - print("basic teardown module") - - -def test_1(): - assert 1 + 1 == 2 - - -def test_2(): - assert "1" + "1" == "11" diff --git a/tests/test_threading_pool.py b/tests/test_threading_pool.py new file mode 100644 index 00000000..8e3aea6b --- /dev/null +++ b/tests/test_threading_pool.py @@ -0,0 +1,34 @@ +import time +from project.thread_pool import ThreadPool + + +def test_thread_pool_execution(): + pool = ThreadPool(3) + + def sample_task(): + time.sleep(0.1) + return 42 + + results = [] + + def callback(result): + results.append(result) + + pool.enqueue(sample_task, callback) + time.sleep(0.2) + pool.dispose() + + assert results == [42] + + +def test_thread_pool_thread_count(): + num_threads = 5 + pool = ThreadPool(num_threads) + assert len(pool.threads) == num_threads + pool.dispose() + + +def test_thread_pool_dispose(): + pool = ThreadPool(2) + pool.dispose() + assert pool.shutdown_flag.is_set()