From 693ed8ce3bb7073b026328ca5fae02ec0e58bed8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=A1=D0=BE=D1=84=D0=B8=D1=8F=20=D0=A8=D0=B2=D0=BE=D1=80?= =?UTF-8?q?=D0=BE=D0=B1?= Date: Mon, 3 Feb 2025 18:18:20 +0300 Subject: [PATCH 01/11] feat(thread_pool): Imlement class Threadpool with features --- project/thread_pool.py | 35 +++++++++++++++++++++++++++++++++++ 1 file changed, 35 insertions(+) create mode 100644 project/thread_pool.py diff --git a/project/thread_pool.py b/project/thread_pool.py new file mode 100644 index 00000000..28787b73 --- /dev/null +++ b/project/thread_pool.py @@ -0,0 +1,35 @@ +import threading +import queue + +class ThreadPool: + def __init__(self, num_threads: int): + self.num_threads = num_threads + self.tasks = queue.Queue() + self.threads = [] + self.shutdown_flag = threading.Event() + + for _ in range(num_threads): + thread = threading.Thread(target=self._worker) + thread.daemon = True + thread.start() + self.threads.append(thread) + + def _worker(self): + while not self.shutdown_flag.is_set(): + try: + task, callback = self.tasks.get(timeout=1) # Таймаут для проверки shutdown_flag + result = task() + if callback: + callback(result) + self.tasks.task_done() + except queue.Empty: + continue + + def enqueue(self, task, callback=None): + if not self.shutdown_flag.is_set(): + self.tasks.put((task, callback)) + + def dispose(self): + self.shutdown_flag.set() + for thread in self.threads: + thread.join() \ No newline at end of file From 876ecd760446b111143f41c704aa2c796f10af53 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=A1=D0=BE=D1=84=D0=B8=D1=8F=20=D0=A8=D0=B2=D0=BE=D1=80?= =?UTF-8?q?=D0=BE=D0=B1?= Date: Mon, 3 Feb 2025 18:25:54 +0300 Subject: [PATCH 02/11] feat(tests): Implement unit tests for threading_pool.py --- tests/test_basic.py | 18 ------------------ tests/test_threading_pool.py | 0 2 files changed, 18 deletions(-) delete mode 100644 tests/test_basic.py create mode 100644 tests/test_threading_pool.py diff --git a/tests/test_basic.py b/tests/test_basic.py deleted file mode 100644 index 4811167b..00000000 --- a/tests/test_basic.py +++ /dev/null @@ -1,18 +0,0 @@ -import pytest -import project # on import will print something from __init__ file - - -def setup_module(module): - print("basic setup module") - - -def teardown_module(module): - print("basic teardown module") - - -def test_1(): - assert 1 + 1 == 2 - - -def test_2(): - assert "1" + "1" == "11" diff --git a/tests/test_threading_pool.py b/tests/test_threading_pool.py new file mode 100644 index 00000000..e69de29b From 42100e4cb7a66730955705ae7e2906d2d27c9dc3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=A1=D0=BE=D1=84=D0=B8=D1=8F=20=D0=A8=D0=B2=D0=BE=D1=80?= =?UTF-8?q?=D0=BE=D0=B1?= Date: Thu, 6 Feb 2025 19:58:14 +0300 Subject: [PATCH 03/11] feat(tests): tests added for module threading_pool.py --- tests/test_threading_pool.py | 34 ++++++++++++++++++++++++++++++++++ 1 file changed, 34 insertions(+) diff --git a/tests/test_threading_pool.py b/tests/test_threading_pool.py index e69de29b..8e3aea6b 100644 --- a/tests/test_threading_pool.py +++ b/tests/test_threading_pool.py @@ -0,0 +1,34 @@ +import time +from project.thread_pool import ThreadPool + + +def test_thread_pool_execution(): + pool = ThreadPool(3) + + def sample_task(): + time.sleep(0.1) + return 42 + + results = [] + + def callback(result): + results.append(result) + + pool.enqueue(sample_task, callback) + time.sleep(0.2) + pool.dispose() + + assert results == [42] + + +def test_thread_pool_thread_count(): + num_threads = 5 + pool = ThreadPool(num_threads) + assert len(pool.threads) == num_threads + pool.dispose() + + +def test_thread_pool_dispose(): + pool = ThreadPool(2) + pool.dispose() + assert pool.shutdown_flag.is_set() From 314ff5980f7451cccc4bcab728e009b82870ee21 Mon Sep 17 00:00:00 2001 From: sofiashvorob Date: Thu, 6 Feb 2025 20:01:27 +0300 Subject: [PATCH 04/11] feat(test) tests added --- tests/test_threading_pool.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_threading_pool.py b/tests/test_threading_pool.py index 8e3aea6b..cd70fb54 100644 --- a/tests/test_threading_pool.py +++ b/tests/test_threading_pool.py @@ -3,7 +3,7 @@ def test_thread_pool_execution(): - pool = ThreadPool(3) + pool = ThreadPool(3) def sample_task(): time.sleep(0.1) From 3a4c739b7db52c58d81200e2a44e5c3f951df1f6 Mon Sep 17 00:00:00 2001 From: sofiashvorob Date: Thu, 6 Feb 2025 20:10:52 +0300 Subject: [PATCH 05/11] feat(test) tests added --- tests/test_threading_pool.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_threading_pool.py b/tests/test_threading_pool.py index cd70fb54..8e3aea6b 100644 --- a/tests/test_threading_pool.py +++ b/tests/test_threading_pool.py @@ -3,7 +3,7 @@ def test_thread_pool_execution(): - pool = ThreadPool(3) + pool = ThreadPool(3) def sample_task(): time.sleep(0.1) From 6f88f8fd8b67288e8b0ed434dfbd006cfc389eb6 Mon Sep 17 00:00:00 2001 From: Sem4kok Date: Sat, 5 Apr 2025 17:47:57 +0300 Subject: [PATCH 06/11] feat(ci): ci added --- .github/workflows/ci.yaml | 34 ++++++++++++++++++++++++++++++++++ mypy.ini | 2 ++ 2 files changed, 36 insertions(+) create mode 100644 .github/workflows/ci.yaml create mode 100644 mypy.ini diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml new file mode 100644 index 00000000..f735b2df --- /dev/null +++ b/.github/workflows/ci.yaml @@ -0,0 +1,34 @@ +name: Run Tests + +on: + [ push, pull_request ] + +jobs: + test: + runs-on: ubuntu-latest + + steps: + - name: Check out code + uses: actions/checkout@v2 + + - name: Set up Python + uses: actions/setup-python@v2 + with: + python-version: '3.8' + + - name: Install dependencies + run: | + python -m pip install --upgrade pip + pip install -r requirements.txt + + - name: Install mypy + run: | + python -m pip install mypy + + - name: Run mypy + run: | + mypy . + + - name: Run tests + run: | + python ./scripts/run_tests.py diff --git a/mypy.ini b/mypy.ini new file mode 100644 index 00000000..624cd195 --- /dev/null +++ b/mypy.ini @@ -0,0 +1,2 @@ +[mypy] +exclude = venv/ From 82d4d3d240ef20ff52e404dda53300ae64e01530 Mon Sep 17 00:00:00 2001 From: Sem4kok Date: Sat, 5 Apr 2025 17:49:47 +0300 Subject: [PATCH 07/11] feat(ci): annotation added for mypy --- project/thread_pool.py | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/project/thread_pool.py b/project/thread_pool.py index 28787b73..b7e6e572 100644 --- a/project/thread_pool.py +++ b/project/thread_pool.py @@ -1,12 +1,14 @@ import threading import queue +from typing import Callable, Optional, Any, List + class ThreadPool: def __init__(self, num_threads: int): - self.num_threads = num_threads - self.tasks = queue.Queue() - self.threads = [] - self.shutdown_flag = threading.Event() + self.num_threads: int = num_threads + self.tasks: queue.Queue[tuple[Callable[[], Any], Optional[Callable[[Any], None]]]] = queue.Queue() + self.threads: List[threading.Thread] = [] + self.shutdown_flag: threading.Event = threading.Event() for _ in range(num_threads): thread = threading.Thread(target=self._worker) @@ -14,7 +16,7 @@ def __init__(self, num_threads: int): thread.start() self.threads.append(thread) - def _worker(self): + def _worker(self) -> None: while not self.shutdown_flag.is_set(): try: task, callback = self.tasks.get(timeout=1) # Таймаут для проверки shutdown_flag @@ -25,11 +27,11 @@ def _worker(self): except queue.Empty: continue - def enqueue(self, task, callback=None): + def enqueue(self, task: Callable[[], Any], callback: Optional[Callable[[Any], None]] = None) -> None: if not self.shutdown_flag.is_set(): self.tasks.put((task, callback)) - def dispose(self): + def dispose(self) -> None: self.shutdown_flag.set() for thread in self.threads: - thread.join() \ No newline at end of file + thread.join() From 7e1e19d07fd830a0ee3eaf84cd231929f36ad51a Mon Sep 17 00:00:00 2001 From: Sem4kok Date: Sat, 5 Apr 2025 17:53:36 +0300 Subject: [PATCH 08/11] fix(pool): Tuple fix --- project/thread_pool.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/project/thread_pool.py b/project/thread_pool.py index b7e6e572..0b417a4f 100644 --- a/project/thread_pool.py +++ b/project/thread_pool.py @@ -1,12 +1,12 @@ import threading import queue -from typing import Callable, Optional, Any, List +from typing import Callable, Optional, Any, Tuple, List class ThreadPool: def __init__(self, num_threads: int): self.num_threads: int = num_threads - self.tasks: queue.Queue[tuple[Callable[[], Any], Optional[Callable[[Any], None]]]] = queue.Queue() + self.tasks: queue.Queue[Tuple[Callable[[], Any], Optional[Callable[[Any], None]]]] = queue.Queue() self.threads: List[threading.Thread] = [] self.shutdown_flag: threading.Event = threading.Event() From 53d716d5960b2cf68fde48da43a1f5c260dab66f Mon Sep 17 00:00:00 2001 From: Sem4kok Date: Sat, 5 Apr 2025 17:56:20 +0300 Subject: [PATCH 09/11] chore(pool): code beautify --- project/thread_pool.py | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/project/thread_pool.py b/project/thread_pool.py index 0b417a4f..c02aac03 100644 --- a/project/thread_pool.py +++ b/project/thread_pool.py @@ -6,7 +6,9 @@ class ThreadPool: def __init__(self, num_threads: int): self.num_threads: int = num_threads - self.tasks: queue.Queue[Tuple[Callable[[], Any], Optional[Callable[[Any], None]]]] = queue.Queue() + self.tasks: queue.Queue[ + Tuple[Callable[[], Any], Optional[Callable[[Any], None]]] + ] = queue.Queue() self.threads: List[threading.Thread] = [] self.shutdown_flag: threading.Event = threading.Event() @@ -19,7 +21,9 @@ def __init__(self, num_threads: int): def _worker(self) -> None: while not self.shutdown_flag.is_set(): try: - task, callback = self.tasks.get(timeout=1) # Таймаут для проверки shutdown_flag + task, callback = self.tasks.get( + timeout=1 + ) # Таймаут для проверки shutdown_flag result = task() if callback: callback(result) @@ -27,7 +31,9 @@ def _worker(self) -> None: except queue.Empty: continue - def enqueue(self, task: Callable[[], Any], callback: Optional[Callable[[Any], None]] = None) -> None: + def enqueue( + self, task: Callable[[], Any], callback: Optional[Callable[[Any], None]] = None + ) -> None: if not self.shutdown_flag.is_set(): self.tasks.put((task, callback)) From b77bae9516371e86f4bca9abe0c70b777343d9f2 Mon Sep 17 00:00:00 2001 From: Sem4kok Date: Sat, 5 Apr 2025 17:57:19 +0300 Subject: [PATCH 10/11] chore(pool): code beautify --- project/thread_pool.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/project/thread_pool.py b/project/thread_pool.py index c02aac03..c273aff9 100644 --- a/project/thread_pool.py +++ b/project/thread_pool.py @@ -32,7 +32,7 @@ def _worker(self) -> None: continue def enqueue( - self, task: Callable[[], Any], callback: Optional[Callable[[Any], None]] = None + self, task: Callable[[], Any], callback: Optional[Callable[[Any], None]] = None ) -> None: if not self.shutdown_flag.is_set(): self.tasks.put((task, callback)) From 8d5e2fb0307df2180b3bdf59470871819db308aa Mon Sep 17 00:00:00 2001 From: Sem4kok Date: Sat, 5 Apr 2025 18:00:21 +0300 Subject: [PATCH 11/11] doc(pool): doc added --- project/thread_pool.py | 63 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 63 insertions(+) diff --git a/project/thread_pool.py b/project/thread_pool.py index c273aff9..497df505 100644 --- a/project/thread_pool.py +++ b/project/thread_pool.py @@ -4,7 +4,35 @@ class ThreadPool: + """A thread pool for executing tasks asynchronously in multiple threads. + + This class manages a pool of worker threads that process tasks from a queue. + Tasks are executed in parallel, and an optional callback can be provided to + handle the result of each task. The thread pool can be safely shut down using + the `dispose` method. + + Attributes: + num_threads (int): The number of worker threads in the pool. + tasks (queue.Queue[Tuple[Callable[[], Any], Optional[Callable[[Any], None]]]]): + A queue of tasks to be executed, where each task is a tuple of a callable + and an optional callback. + threads (List[threading.Thread]): The list of worker threads. + shutdown_flag (threading.Event): A flag to signal the threads to stop. + """ + def __init__(self, num_threads: int): + """Initialize the thread pool with a specified number of worker threads. + + Args: + num_threads (int): The number of threads to create in the pool. Must be + a positive integer. + + Raises: + ValueError: If `num_threads` is less than or equal to 0. + """ + if num_threads <= 0: + raise ValueError("Number of threads must be a positive integer") + self.num_threads: int = num_threads self.tasks: queue.Queue[ Tuple[Callable[[], Any], Optional[Callable[[Any], None]]] @@ -19,6 +47,15 @@ def __init__(self, num_threads: int): self.threads.append(thread) def _worker(self) -> None: + """Worker function that processes tasks from the queue. + + This method runs in each worker thread and continuously retrieves tasks + from the queue until the shutdown flag is set. Each task is executed, and + if a callback is provided, it is called with the task's result. + + The method uses a timeout to periodically check the shutdown flag, allowing + the thread to exit gracefully when the pool is disposed. + """ while not self.shutdown_flag.is_set(): try: task, callback = self.tasks.get( @@ -34,10 +71,36 @@ def _worker(self) -> None: def enqueue( self, task: Callable[[], Any], callback: Optional[Callable[[Any], None]] = None ) -> None: + """Add a task to the queue for execution. + + The task will be executed by one of the worker threads as soon as possible. + If a callback is provided, it will be called with the result of the task. + + Args: + task (Callable[[], Any]): A callable that takes no arguments and returns + a result of any type. + callback (Optional[Callable[[Any], None]]): An optional callable that + takes the result of the task as an argument and returns nothing. + Defaults to None. + + Notes: + If the thread pool has been disposed (i.e., `dispose` has been called), + the task will not be enqueued. + """ if not self.shutdown_flag.is_set(): self.tasks.put((task, callback)) def dispose(self) -> None: + """Shut down the thread pool and wait for all threads to finish. + + This method sets the shutdown flag, which causes all worker threads to stop + processing new tasks, and then waits for all threads to complete their current + tasks and terminate. + + Notes: + After calling this method, no new tasks can be enqueued. Any tasks still + in the queue will not be processed. + """ self.shutdown_flag.set() for thread in self.threads: thread.join()