From 58ab5b38aa90004815603821606c0b00c51344cc Mon Sep 17 00:00:00 2001 From: nanocubit <99839681+nanocubit@users.noreply.github.com> Date: Sat, 7 Feb 2026 22:54:57 +0300 Subject: [PATCH] Make CPU SHM pool importable without numpy for smoke checks --- .github/workflows/ci.yml | 89 +++++++++++++----- README.md | 22 ++++- docs/performance_estimation.py | 20 ++--- pyproject.toml | 35 ++++++-- requirements-cgpu.txt | 3 + requirements-dev.txt | 6 ++ requirements-gpu.txt | 4 + requirements-ray.txt | 3 + requirements.txt | 6 +- setup.py | 75 ++++++++++------ tests/conftest.py | 45 +++++++++- zerolink/core/cpu/shm_pool.py | 52 ++++++++--- zerolink/core/gpu/ext/ipc_ext.cpp | 4 +- zerolink/core/protocol/messages.py | 23 ++++- zerolink/monitoring/telemetry.py | 140 ++++++++++++++--------------- zerolink/runtime/unified.py | 130 +++++++++++++++++++-------- zerolink/server/main_server.py | 60 +++++++++---- zerolink/workers/gpu_worker.py | 67 +++++++------- 18 files changed, 527 insertions(+), 257 deletions(-) create mode 100644 requirements-cgpu.txt create mode 100644 requirements-gpu.txt create mode 100644 requirements-ray.txt diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index a846e77..27bfca0 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -7,38 +7,79 @@ on: branches: [ master ] jobs: - test: + test-core: + name: Core profile (no CUDA extension) runs-on: ubuntu-latest strategy: matrix: - python-version: [3.8, 3.9, "3.10"] + python-version: ["3.10", "3.11", "3.12"] steps: - - uses: actions/checkout@v3 - - name: Set up Python ${{ matrix.python-version }} - uses: actions/setup-python@v4 - with: - python-version: ${{ matrix.python-version }} - - - name: Install dependencies - run: | - python -m pip install --upgrade pip - pip install -r requirements.txt - pip install pytest - - - name: Run tests - run: | - pytest tests/ + - uses: actions/checkout@v4 + - name: Set up Python ${{ matrix.python-version }} + uses: actions/setup-python@v5 + with: + python-version: ${{ matrix.python-version }} + + - name: Install core + dev dependencies + run: | + python -m pip install --upgrade pip + pip install -r requirements-dev.txt + pip install -e . --no-build-isolation + + - name: Run core protocol tests (CPU-only) + run: | + pytest --noconftest tests/test_protocol.py -q + + test-gpu-profile: + name: GPU dependency profile (import smoke) + runs-on: ubuntu-latest + + steps: + - uses: actions/checkout@v4 + - name: Set up Python 3.11 + uses: actions/setup-python@v5 + with: + python-version: "3.11" + + - name: Install GPU profile dependencies + run: | + python -m pip install --upgrade pip + pip install -r requirements-gpu.txt + + - name: Validate GPU profile imports + run: | + python -c "import torch; print('torch:', torch.__version__)" + python -c "import cuda; print('cuda-python import: ok')" + + test-ray-profile: + name: Ray profile (import smoke) + runs-on: ubuntu-latest + + steps: + - uses: actions/checkout@v4 + - name: Set up Python 3.11 + uses: actions/setup-python@v5 + with: + python-version: "3.11" + + - name: Install Ray profile dependencies + run: | + python -m pip install --upgrade pip + pip install -r requirements-ray.txt + + - name: Validate Ray profile imports + run: | + python -c "import ray; print('ray:', ray.__version__)" build: runs-on: ubuntu-latest - needs: test + needs: [test-core, test-gpu-profile, test-ray-profile] if: github.ref == 'refs/heads/master' steps: - - uses: actions/checkout@v3 - - name: Build and publish Docker image - run: | - # Сборка Docker-образа - docker build -f deployment/docker/Dockerfile.unified -t zerolink:${{ github.sha }} . - docker tag zerolink:${{ github.sha }} zerolink:latest \ No newline at end of file + - uses: actions/checkout@v4 + - name: Build and publish Docker image + run: | + docker build -f deployment/docker/Dockerfile.unified -t zerolink:${{ github.sha }} . + docker tag zerolink:${{ github.sha }} zerolink:latest diff --git a/README.md b/README.md index 8082d25..527517e 100644 --- a/README.md +++ b/README.md @@ -44,18 +44,32 @@ ZeroLink v2.0 обеспечивает значительный прирост ```bash git clone -cd pynexus_rex_v2 +cd zerolink -# Установка зависимостей +# Профили зависимостей +# core (минимальный профиль) pip install -r requirements.txt -# Сборка C++ Extension +# gpu (PyTorch + CUDA Python) +pip install -r requirements-gpu.txt + +# ray (distributed профиль) +pip install -r requirements-ray.txt + +# dev (инструменты разработки) +pip install -r requirements-dev.txt + +# Сборка C++ Extension (опционально, для GPU пути) python setup.py build_ext --inplace ``` ### 2. Запуск тестов ```bash +# CPU-only профиль (без conftest GPU импортов) +pytest --noconftest tests/test_protocol.py -q + +# Полный прогон (требует GPU профиль и torch/cuda) pytest -q ``` @@ -79,7 +93,7 @@ runtime.start() # Запускает сервер в фоне ```python from zerolink.workers import GPUWorker -worker = GPUWorker(sock_path="/tmp/pynexus.sock", device_id=0) +worker = GPUWorker(sock_path="/tmp/zerolink.sock", device_id=0) worker.connect() # Event loop diff --git a/docs/performance_estimation.py b/docs/performance_estimation.py index c204e51..970d8bf 100644 --- a/docs/performance_estimation.py +++ b/docs/performance_estimation.py @@ -25,27 +25,27 @@ def realistic_performance_estimation(): metrics = { "IPC Bandwidth": { "traditional": "~100-500 MB/s (через pickle)", - "pynexus": ">850 MB/s (zero-copy)", + "zerolink": ">850 MB/s (zero-copy)", "improvement": "2-10x быстрее" }, "Allocation Speed": { "traditional": "O(log n) - зависит от состояния пула", - "pynexus": "O(1) - через Buddy Allocator", + "zerolink": "O(1) - через Buddy Allocator", "improvement": "2-5x быстрее" }, "Memory Efficiency": { "traditional": "Высокое потребление (дублирование тензоров)", - "pynexus": "Минимизированное (zero-copy sharing)", + "zerolink": "Минимизированное (zero-copy sharing)", "improvement": "2-5x меньше памяти" }, "Fragmentation": { "traditional": "Высокая (особенно при длительных сессиях)", - "pynexus": "Минимальная (Buddy Allocator + defragmentation)", + "zerolink": "Минимальная (Buddy Allocator + defragmentation)", "improvement": "5-10x меньше фрагментации" }, "Latency": { "traditional": "Высокая (сериализация + копирование)", - "pynexus": "Низкая (прямой доступ к памяти)", + "zerolink": "Низкая (прямой доступ к памяти)", "improvement": "10-30% снижение" } } @@ -53,7 +53,7 @@ def realistic_performance_estimation(): for metric, values in metrics.items(): print(f"{metric}:") print(f" Традиционный подход: {values['traditional']}") - print(f" ZeroLink: {values['pynexus']}") + print(f" ZeroLink: {values['zerolink']}") print(f" Улучшение: {values['improvement']}\n") # Сценарии использования и потенциальный прирост @@ -118,17 +118,17 @@ def realistic_performance_estimation(): "Standard PyTorch": { "pros": "Простота, зрелость", "cons": "Высокое потребление памяти, фрагментация", - "vs_pynexus": "ZeroLink обеспечивает 2-5x более эффективное использование памяти" + "vs_zerolink": "ZeroLink обеспечивает 2-5x более эффективное использование памяти" }, "Custom CUDA Kernels": { "pros": "Максимальная производительность для специфичных задач", "cons": "Высокая сложность разработки и поддержки", - "vs_pynexus": "ZeroLink предоставляет универсальное решение с меньшей сложностью" + "vs_zerolink": "ZeroLink предоставляет универсальное решение с меньшей сложностью" }, "NCCL": { "pros": "Высокая производительность для collective операций", "cons": "Ограниченная применимость (не для произвольного IPC)", - "vs_pynexus": "ZeroLink дополняет NCCL, обеспечивая эффективный point-to-point IPC" + "vs_zerolink": "ZeroLink дополняет NCCL, обеспечивая эффективный point-to-point IPC" } } @@ -136,7 +136,7 @@ def realistic_performance_estimation(): print(f"{solution}:") print(f" Плюсы: {details['pros']}") print(f" Минусы: {details['cons']}") - print(f" PyNexus vs: {details['vs_pynexus']}\n") + print(f" ZeroLink vs: {details['vs_zerolink']}\n") print("Заключение:") print("ZeroLink v2.0 обеспечивает значительный прирост производительности") diff --git a/pyproject.toml b/pyproject.toml index fa72189..bb59c88 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,5 +1,5 @@ [build-system] -requires = ["setuptools>=61.0.0", "wheel", "torch"] +requires = ["setuptools>=61.0.0", "wheel"] build-backend = "setuptools.build_meta" [project] @@ -27,24 +27,43 @@ classifiers = [ ] dependencies = [ - "torch>=1.12.0", - "cuda-python>=11.7", "blake3>=0.4.0", + "numpy>=1.24.0", ] [project.optional-dependencies] +gpu = [ + "torch>=1.12.0", + "cuda-python>=11.7", +] +ray = [ + "ray>=2.0.0", +] +cgpu = [ + "cgpu>=0.1.0", +] dev = [ "pytest>=7.0", "pytest-cov", "black", "mypy", ] +full = [ + "torch>=1.12.0", + "cuda-python>=11.7", + "ray>=2.0.0", + "cgpu>=0.1.0", + "pytest>=7.0", + "pytest-cov", + "black", + "mypy", +] [project.urls] -Homepage = "https://github.com/your-org/pynexus-rex" -Documentation = "https://github.com/your-org/pynexus-rex/docs" -Repository = "https://github.com/your-org/pynexus-rex.git" -"Bug Tracker" = "https://github.com/your-org/pynexus-rex/issues" +Homepage = "https://github.com/your-org/zerolink" +Documentation = "https://github.com/your-org/zerolink/docs" +Repository = "https://github.com/your-org/zerolink.git" +"Bug Tracker" = "https://github.com/your-org/zerolink/issues" [tool.setuptools] -packages = ["pynexus_rex"] \ No newline at end of file +packages = ["zerolink"] diff --git a/requirements-cgpu.txt b/requirements-cgpu.txt new file mode 100644 index 0000000..654fa88 --- /dev/null +++ b/requirements-cgpu.txt @@ -0,0 +1,3 @@ +# Optional low-level CUDA driver integration +-r requirements-gpu.txt +cgpu>=0.1.0 diff --git a/requirements-dev.txt b/requirements-dev.txt index e69de29..4e7a90f 100644 --- a/requirements-dev.txt +++ b/requirements-dev.txt @@ -0,0 +1,6 @@ +# Development/test tools +-r requirements.txt +pytest>=7.0 +pytest-cov +black +mypy diff --git a/requirements-gpu.txt b/requirements-gpu.txt new file mode 100644 index 0000000..27232de --- /dev/null +++ b/requirements-gpu.txt @@ -0,0 +1,4 @@ +# GPU profile +-r requirements.txt +torch>=1.12.0 +cuda-python>=11.7 diff --git a/requirements-ray.txt b/requirements-ray.txt new file mode 100644 index 0000000..6e97bb9 --- /dev/null +++ b/requirements-ray.txt @@ -0,0 +1,3 @@ +# Distributed Ray profile +-r requirements-gpu.txt +ray>=2.0.0 diff --git a/requirements.txt b/requirements.txt index d3e3431..c7f1bef 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,5 +1,3 @@ -torch>=1.12.0 -cuda-python>=11.7 +# Core runtime dependencies (CPU/control-plane friendly) blake3>=0.4.0 -ray>=2.0.0 -cgpu>=0.1.0 \ No newline at end of file +numpy>=1.24.0 diff --git a/setup.py b/setup.py index ac1cdcb..d3540ae 100644 --- a/setup.py +++ b/setup.py @@ -5,45 +5,73 @@ import sys from setuptools import setup, find_packages -from torch.utils.cpp_extension import BuildExtension, CUDAExtension + def parse_requirements(filename: str) -> list[str]: """ - Считывает зависимости из файла requirements.txt. - Игнорирует комментарии и пустые строки. + Считывает зависимости из requirements-файла. + Игнорирует комментарии, пустые строки и include-директивы (-r ...). """ requirements = [] try: with open(filename, 'r', encoding='utf-8') as f: for line in f: line = line.strip() - if line and not line.startswith('#'): - requirements.append(line) + if not line or line.startswith('#') or line.startswith('-r '): + continue + requirements.append(line) except FileNotFoundError: return [] + return requirements + -# Парсинг зависимостей +# Парсинг базовых зависимостей install_requires = parse_requirements('requirements.txt') +extras_require = { + "gpu": [ + "torch>=1.12.0", + "cuda-python>=11.7", + ], + "ray": [ + "ray>=2.0.0", + ], + "cgpu": [ + "cgpu>=0.1.0", + ], + "dev": [ + "pytest>=7.0", + "pytest-cov", + "black", + "mypy", + ], +} +extras_require["full"] = sorted({dep for group in extras_require.values() for dep in group}) + # Список расширений ext_modules = [] +cmdclass = {} # Проверяем флаг --no-cuda # Если он есть, мы НЕ компилируем C++ модуль (полезно для чисто-Python окружений или CI) -if "--no-cuda" not in sys.argv: - ext_modules.append( - CUDAExtension( - name='zerolink.core.gpu.ext.ipc_ext', - sources=['zerolink/core/gpu/ext/ipc_ext.cpp'], - extra_link_args=['-lcuda'], - # Опционально: можно добавить define_macros для детальной настройки - # define_macros=[('TORCH_EXTENSION_NAME', 'zerolink.core.gpu.ext.ipc_ext')], - ) - ) +if "--no-cuda" in sys.argv: + sys.argv.remove("--no-cuda") else: - # Удаляем флаг, чтобы setuptools не ругался на лишний аргумент - if "--no-cuda" in sys.argv: - sys.argv.remove("--no-cuda") + try: + from torch.utils.cpp_extension import BuildExtension, CUDAExtension + + ext_modules.append( + CUDAExtension( + name='zerolink.core.gpu.ext.ipc_ext', + sources=['zerolink/core/gpu/ext/ipc_ext.cpp'], + extra_link_args=['-lcuda'], + ) + ) + cmdclass = {'build_ext': BuildExtension} + except ImportError: + # Разрешаем установку core-профиля без torch/cuda toolchain. + ext_modules = [] + cmdclass = {} setup( name="zerolink", @@ -53,15 +81,12 @@ def parse_requirements(filename: str) -> list[str]: long_description=open('README.md', 'r', encoding='utf-8').read(), long_description_content_type="text/markdown", url="https://github.com/your-org/zerolink", - packages=find_packages(exclude=['tests*', 'docs*', 'scripts*', 'deployment*']), ext_modules=ext_modules, - - cmdclass={'build_ext': BuildExtension}, - + cmdclass=cmdclass, install_requires=install_requires, + extras_require=extras_require, python_requires='>=3.8', - classifiers=[ "Development Status :: 5 - Production/Stable", "Intended Audience :: Developers", @@ -75,4 +100,4 @@ def parse_requirements(filename: str) -> list[str]: "Programming Language :: C++", "Topic :: Scientific/Engineering :: Artificial Intelligence", ], -) \ No newline at end of file +) diff --git a/tests/conftest.py b/tests/conftest.py index b6c16cb..592ea08 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -2,28 +2,65 @@ tests/conftest.py Фикстуры для тестов ZeroLink v2.0. + +Важно: conftest должен быть CPU-safe и не тянуть GPU/torch-зависимые +импорты на этапе collection, чтобы smoke-тесты могли запускаться +в минимальном окружении. """ -import pytest import socket import struct +from typing import Type + +import pytest -# Импорты для создания фейковых объектов -from zerolink.server.main_server import MainIPCLeaseManager2P from zerolink.core.protocol import MSG_HDR_FMT, MSG_MAGIC, MSG_VER + +@pytest.fixture +def protocol_header_constants(): + """Базовые константы заголовка протокола для CPU-only тестов.""" + return { + "fmt": MSG_HDR_FMT, + "magic": MSG_MAGIC, + "ver": MSG_VER, + "size": struct.calcsize(MSG_HDR_FMT), + } + + @pytest.fixture def fake_pool(): """Фейковый пул, который просто запоминает вызовы free.""" + class SimpleFakePool: def __init__(self): self.freed = [] + def free(self, allocation): self.freed.append(allocation) + return SimpleFakePool() + @pytest.fixture def unix_socketpair(): """Создает пару Unix Domain сокетов.""" s1, s2 = socket.socketpair(socket.AF_UNIX, socket.SOCK_SEQPACKET) - return s1, s2 \ No newline at end of file + try: + yield s1, s2 + finally: + s1.close() + s2.close() + + +@pytest.fixture +def main_ipc_lease_manager_cls() -> Type: + """ + Ленивая загрузка GPU/torch-зависимого менеджера. + + Использовать только в тестах, которым действительно нужен серверный слой. + """ + pytest.importorskip("torch") + from zerolink.server.main_server import MainIPCLeaseManager2P + + return MainIPCLeaseManager2P diff --git a/zerolink/core/cpu/shm_pool.py b/zerolink/core/cpu/shm_pool.py index ddd46bb..4eb50e1 100644 --- a/zerolink/core/cpu/shm_pool.py +++ b/zerolink/core/cpu/shm_pool.py @@ -10,9 +10,14 @@ """ import multiprocessing.shared_memory as shm -import numpy as np import threading -from typing import Dict, List, Optional, Tuple +import logging +from typing import Dict, List, Optional, Tuple, Any + +try: + import numpy as np +except ImportError: + np = None from dataclasses import dataclass @dataclass @@ -28,19 +33,25 @@ class SharedMemoryPool: Менеджер разделяемой памяти для CPU. Управляет большим блоком SHM и выдает из него куски. """ - def __init__(self, name: str = "pynexus_cpu_pool", total_size_mb: int = 1024): + def __init__(self, name: str = "zerolink_cpu_pool", total_size_mb: int = 1024): self.name = name self.total_size = total_size_mb * 1024 * 1024 self.lock = threading.Lock() - + self.logger = logging.getLogger("zerolink.cpu.shm_pool") + if not self.logger.handlers: + handler = logging.StreamHandler() + handler.setFormatter(logging.Formatter("%(asctime)s %(levelname)s %(name)s %(message)s")) + self.logger.addHandler(handler) + self.logger.setLevel(logging.INFO) + # Создаем или открываем SHM try: self.shm = shm.SharedMemory(name=self.name, create=True, size=self.total_size) - print(f"[CPU Pool] Created new SHM: {self.name}") + self.logger.info("created shared memory pool %s", self.name) except FileExistsError: self.shm = shm.SharedMemory(name=self.name) - print(f"[CPU Pool] Attached to existing SHM: {self.name}") + self.logger.info("attached to existing shared memory pool %s", self.name) # Простой Free List (offset -> size) # В production можно использовать Buddy Allocator аналогично GPU @@ -93,14 +104,33 @@ def free(self, block: CPUBlock) -> bool: new_size += self.free_blocks[next_offset] del self.free_blocks[next_offset] - # Попытка слить с предыдущим - # (Для упрощения ищем только точное совпадение конца, нужно итератор) - # В упрощенном варианте просто добавляем блок в free_blocks + # Полное коалесцирование: сливаем с любыми соседними блоками до стабилизации. + merged = True + while merged: + merged = False + for off, size in sorted(self.free_blocks.items()): + if off + size == new_offset: + # merge left neighbor + new_offset = off + new_size += size + del self.free_blocks[off] + merged = True + break + if new_offset + new_size == off: + # merge right neighbor + new_size += size + del self.free_blocks[off] + merged = True + break + self.free_blocks[new_offset] = new_size return True - def get_numpy_array(self, block: CPUBlock, dtype: np.dtype, shape: Tuple[int, ...]): + def get_numpy_array(self, block: CPUBlock, dtype: Any, shape: Tuple[int, ...]): """Создает NumPy массив над разделяемой памятью (Zero-Copy).""" + if np is None: + raise RuntimeError("NumPy is required for get_numpy_array(); install numpy.") + # Используем memoryview для доступа к SHM # self.shm.buf возвращает memoryview всего региона mem_slice = self.shm.buf[block.offset:block.offset + block.size] @@ -111,6 +141,6 @@ def cleanup(self): try: self.shm.close() self.shm.unlink() - print(f"[CPU Pool] Unlinked SHM: {self.name}") + self.logger.info("unlinked shared memory pool %s", self.name) except Exception: pass \ No newline at end of file diff --git a/zerolink/core/gpu/ext/ipc_ext.cpp b/zerolink/core/gpu/ext/ipc_ext.cpp index 3c1e307..71b54b4 100644 --- a/zerolink/core/gpu/ext/ipc_ext.cpp +++ b/zerolink/core/gpu/ext/ipc_ext.cpp @@ -196,10 +196,10 @@ static py::capsule import_vmm_segments( // Это позволяет объекту жить в Python, пока Capsule жива auto* sp = new std::shared_ptr(std::move(region)); - return py::capsule(sp, "pynexus.ImportedRegion", + return py::capsule(sp, "zerolink.ImportedRegion", [](PyObject* cap) { // Когда Capsule удаляется в Python (refcount=0) - auto* p = (std::shared_ptr*)PyCapsule_GetPointer(cap, "pynexus.ImportedRegion"); + auto* p = (std::shared_ptr*)PyCapsule_GetPointer(cap, "zerolink.ImportedRegion"); delete p; // Уменьшает shared_ptr. Если счетчик == 0 -> вызовется ~ImportedRegion } ); diff --git a/zerolink/core/protocol/messages.py b/zerolink/core/protocol/messages.py index 5fce932..5d761f6 100644 --- a/zerolink/core/protocol/messages.py +++ b/zerolink/core/protocol/messages.py @@ -113,12 +113,27 @@ def unpack_ctrl(data: bytes) -> Tuple[int, int, int, bytes]: # Alloc Payload (2-Phase Wrapper) # ============================================================================ -def pack_alloc_payload(lease_id: int, mapping_payload: bytes) -> bytes: +def pack_alloc_payload( + lease_id: int, + mapping_payload: bytes, + mapping_hash: Optional[bytes] = None, +) -> bytes: """ Оборачивает PNXIPC10 mapping payload для отправки через ALLOC. - Формат: + Форматы: + - без хэша: + - с хэшем: <32B hash> + + Примечание: + наличие/отсутствие хэша в payload должно быть согласовано с флагом + CTRL_FLAG_HAS_HASH в control frame. """ - return struct.pack(" Tuple[int, bytes, Optional[bytes]]: """ @@ -364,4 +379,4 @@ def unpack_error_payload(payload: bytes) -> Tuple[int, str, int, str]: msg = payload[off:off+msg_len].decode("utf-8", "replace") - return lease_id, alloc_id, code, msg \ No newline at end of file + return lease_id, alloc_id, code, msg diff --git a/zerolink/monitoring/telemetry.py b/zerolink/monitoring/telemetry.py index 7e2758c..209bad2 100644 --- a/zerolink/monitoring/telemetry.py +++ b/zerolink/monitoring/telemetry.py @@ -11,7 +11,7 @@ import time import threading from dataclasses import dataclass -from typing import Dict, Optional, Callable +from typing import Dict, Callable, Optional from enum import Enum @@ -24,12 +24,13 @@ class MetricType(Enum): @dataclass class Metric: """Базовая метрика.""" + name: str type: MetricType value: float = 0.0 - labels: Dict[str, str] = None + labels: Optional[Dict[str, str]] = None description: str = "" - + def __post_init__(self): if self.labels is None: self.labels = {} @@ -37,81 +38,71 @@ def __post_init__(self): class TelemetryCollector: """Сборщик телеметрии.""" - + def __init__(self): self.metrics: Dict[str, Metric] = {} self.lock = threading.Lock() - - # Регистрируем базовые метрики - self.register_counter("pynexus_pool_allocation_total", "Total number of pool allocations") - self.register_gauge("pynexus_pool_used_bytes", "Current bytes used in pool") - self.register_counter("pynexus_ipc_transmit_bytes_total", "Total bytes transmitted via IPC") - self.register_gauge("pynexus_ipc_active_leases", "Number of active IPC leases") - self.register_histogram("pynexus_pool_allocation_latency_seconds", "Pool allocation latency in seconds") - self.register_gauge("pynexus_fragmentation_percentage", "Pool fragmentation percentage") - + + # Базовые метрики пула/IPC + self.register_counter("zerolink_pool_allocation_total", "Total number of pool allocations") + self.register_gauge("zerolink_pool_used_bytes", "Current bytes used in pool") + self.register_counter("zerolink_ipc_transmit_bytes_total", "Total bytes transmitted via IPC") + self.register_gauge("zerolink_ipc_active_leases", "Number of active IPC leases") + self.register_histogram("zerolink_pool_allocation_latency_seconds", "Pool allocation latency in seconds") + self.register_gauge("zerolink_fragmentation_percentage", "Pool fragmentation percentage") + + # Новые метрики reliability/operations + self.register_histogram("zerolink_runtime_operation_latency_seconds", "Latency of runtime/worker/server operations") + self.register_counter("zerolink_alloc_failures_total", "Total number of allocation/import failures") + self.register_counter("zerolink_lease_events_total", "Total number of lease state transitions") + def register_metric(self, name: str, metric_type: MetricType, description: str = ""): - """Регистрирует новую метрику.""" with self.lock: if name not in self.metrics: - self.metrics[name] = Metric( - name=name, - type=metric_type, - description=description - ) - + self.metrics[name] = Metric(name=name, type=metric_type, description=description) + def register_counter(self, name: str, description: str = ""): - """Регистрирует counter метрику.""" self.register_metric(name, MetricType.COUNTER, description) - + def register_gauge(self, name: str, description: str = ""): - """Регистрирует gauge метрику.""" self.register_metric(name, MetricType.GAUGE, description) - + def register_histogram(self, name: str, description: str = ""): - """Регистрирует histogram метрику.""" self.register_metric(name, MetricType.HISTOGRAM, description) - - def increment_counter(self, name: str, amount: float = 1.0, labels: Dict[str, str] = None): - """Увеличивает counter.""" + + def increment_counter(self, name: str, amount: float = 1.0, labels: Optional[Dict[str, str]] = None): with self.lock: if name in self.metrics: self.metrics[name].value += amount if labels: self.metrics[name].labels.update(labels) - - def set_gauge(self, name: str, value: float, labels: Dict[str, str] = None): - """Устанавливает значение gauge.""" + + def set_gauge(self, name: str, value: float, labels: Optional[Dict[str, str]] = None): with self.lock: if name in self.metrics: self.metrics[name].value = value if labels: self.metrics[name].labels.update(labels) - - def observe_histogram(self, name: str, value: float, labels: Dict[str, str] = None): - """Добавляет значение в histogram (для простоты как gauge).""" + + def observe_histogram(self, name: str, value: float, labels: Optional[Dict[str, str]] = None): + # Упрощенная модель: храним последнее значение. with self.lock: if name in self.metrics: - self.metrics[name].value = value # В реальном коде тут будет более сложная логика + self.metrics[name].value = value if labels: self.metrics[name].labels.update(labels) - + def get_metrics(self) -> Dict[str, Metric]: - """Возвращает все метрики.""" with self.lock: return self.metrics.copy() - + def get_prometheus_format(self) -> str: - """Возвращает метрики в формате Prometheus.""" output = [] - for name, metric in self.get_metrics().items(): - # Добавляем комментарий с описанием if metric.description: output.append(f"# HELP {name} {metric.description}") output.append(f"# TYPE {name} {metric.type.value}") - # Формируем лейблы labels_str = "" if metric.labels: labels_parts = [f'{k}="{v}"' for k, v in metric.labels.items()] @@ -122,61 +113,66 @@ def get_prometheus_format(self) -> str: return "\n".join(output) +# Глобальный экземпляр телеметрии +telemetry = TelemetryCollector() + + def monitor_ipc_transmission(bytes_count: int): """Мониторинг передачи данных через IPC.""" - telemetry.increment_counter("pynexus_ipc_transmit_bytes_total", bytes_count) + telemetry.increment_counter("zerolink_ipc_transmit_bytes_total", bytes_count) def update_fragmentation(fragmentation_percent: float): """Обновление метрики фрагментации.""" - telemetry.set_gauge("pynexus_fragmentation_percentage", fragmentation_percent) + telemetry.set_gauge("zerolink_fragmentation_percentage", fragmentation_percent) def update_active_leases(count: int): """Обновление количества активных аренд.""" - telemetry.set_gauge("pynexus_ipc_active_leases", count) + telemetry.set_gauge("zerolink_ipc_active_leases", count) def update_pool_usage(used_bytes: int): """Обновление использования пула.""" - telemetry.set_gauge("pynexus_pool_used_bytes", used_bytes) + telemetry.set_gauge("zerolink_pool_used_bytes", used_bytes) -# Глобальный экземпляр телеметрии -telemetry = TelemetryCollector() +def observe_runtime_latency(operation: str, seconds: float): + """Наблюдение задержки runtime/worker/server операции.""" + telemetry.observe_histogram( + "zerolink_runtime_operation_latency_seconds", + seconds, + labels={"operation": operation}, + ) + + +def record_alloc_failure(component: str, reason: str = "unknown"): + """Учет ошибок аллокации/импорта.""" + telemetry.increment_counter( + "zerolink_alloc_failures_total", + labels={"component": component, "reason": reason}, + ) + + +def record_lease_event(component: str, event: str): + """Учет событий lease churn (create/active/release/revoke...).""" + telemetry.increment_counter( + "zerolink_lease_events_total", + labels={"component": component, "event": event}, + ) def monitor_pool_allocation(func: Callable) -> Callable: """Декоратор для мониторинга аллокаций пула.""" + def wrapper(*args, **kwargs): start_time = time.time() try: result = func(*args, **kwargs) - # Увеличиваем счетчик аллокаций - telemetry.increment_counter("pynexus_pool_allocation_total") + telemetry.increment_counter("zerolink_pool_allocation_total") return result finally: - # Измеряем задержку latency = time.time() - start_time - telemetry.observe_histogram("pynexus_pool_allocation_latency_seconds", latency) - return wrapper - - -def monitor_ipc_transmission(bytes_count: int): - """Мониторинг передачи данных через IPC.""" - telemetry.increment_counter("pynexus_ipc_transmit_bytes_total", bytes_count) - + telemetry.observe_histogram("zerolink_pool_allocation_latency_seconds", latency) -def update_fragmentation(fragmentation_percent: float): - """Обновление метрики фрагментации.""" - telemetry.set_gauge("pynexus_fragmentation_percentage", fragmentation_percent) - - -def update_active_leases(count: int): - """Обновление количества активных аренд.""" - telemetry.set_gauge("pynexus_ipc_active_leases", count) - - -def update_pool_usage(used_bytes: int): - """Обновление использования пула.""" - telemetry.set_gauge("pynexus_pool_used_bytes", used_bytes) \ No newline at end of file + return wrapper diff --git a/zerolink/runtime/unified.py b/zerolink/runtime/unified.py index d66ba40..aa73036 100644 --- a/zerolink/runtime/unified.py +++ b/zerolink/runtime/unified.py @@ -12,19 +12,28 @@ import os import signal import sys +import json +import logging import torch import multiprocessing as mp import threading import time from typing import Callable, Any, List -from ..monitoring.telemetry import telemetry, monitor_pool_allocation, update_pool_usage, update_active_leases +from ..monitoring.telemetry import ( + telemetry, + monitor_pool_allocation, + update_pool_usage, + update_active_leases, + observe_runtime_latency, + record_alloc_failure, +) # Импорты внутренних компонентов from ..core.gpu.vmm_pool import DeviceMemoryPoolV2 from ..core.gpu.multi_gpu_pool import MultiDevicePool, PoolType from ..core.gpu.pinned_region import PinnedMemoryManager -from ..server.main_server import MainIPCLeaseManager2P +from ..server.main_server import MainIPCLeaseManager2P, MainServer # ============================================================================ # Критическое исправление: CUDA Contexts и Fork @@ -37,6 +46,18 @@ except RuntimeError: pass # Уже установлено или spawn недоступен (Windows) + +def _build_logger() -> logging.Logger: + logger = logging.getLogger("zerolink.runtime") + if not logger.handlers: + handler = logging.StreamHandler() + handler.setFormatter(logging.Formatter("%(asctime)s %(levelname)s %(name)s %(message)s")) + logger.addHandler(handler) + logger.setLevel(logging.INFO) + return logger + + + class ZeroLinkRuntime: """ ZeroLink Runtime для одновременной работы CPU и GPU. @@ -44,19 +65,20 @@ class ZeroLinkRuntime: """ def __init__( self, - socket_path: str = "/tmp/pynexus.sock", + socket_path: str = "/tmp/zerolink.sock", gpu_device_id: int = 0, gpu_pool_size_gb: int = 4, - cpu_pool_name: str = "pynexus_cpu_pool", + cpu_pool_name: str = "zerolink_cpu_pool", cpu_pool_size_mb: int = 512, num_cpu_workers: int = 4, enable_integrity_hash: bool = True ): self.socket_path = socket_path self._running = False + self.logger = _build_logger() # 1. Инициализация GPU Subsystem (Main Process) - print(f"[Runtime] Initializing GPU Subsystem on device {gpu_device_id}...") + self.logger.info(json.dumps({"event": "runtime_gpu_init_start", "device_id": gpu_device_id})) try: # Инициализация GPU пула self.gpu_pool = DeviceMemoryPoolV2( @@ -64,24 +86,29 @@ def __init__( total_size_gb=gpu_pool_size_gb, pool_id=f"gpu_pool_{gpu_device_id}_{id(self)}" ) - print(f"[Runtime] GPU Pool initialized on device {gpu_device_id}") + self.logger.info(json.dumps({"event": "runtime_gpu_init_success", "device_id": gpu_device_id})) except Exception as e: - print(f"[Runtime] CRITICAL: Failed to init GPU Pool: {e}") + record_alloc_failure("runtime", "gpu_pool_init") + self.logger.exception(json.dumps({"event": "runtime_gpu_init_failed", "device_id": gpu_device_id, "error": str(e)})) # В реальном коде здесь была бы полная инициализация self.gpu_pool = None # Менеджер IPC аренд (управляет соединениями с воркерами) - self.ipc_manager = None # Будет инициализирован в `start()` + self.ipc_manager = MainIPCLeaseManager2P( + pool=self.gpu_pool, + enable_integrity_hash=enable_integrity_hash, + ) if self.gpu_pool is not None else None + self.main_server = MainServer(self.socket_path, self.ipc_manager) if self.ipc_manager else None # 2. Инициализация CPU Subsystem - print(f"[Runtime] Initializing CPU Subsystem ({num_cpu_workers} workers)...") + self.logger.info(json.dumps({"event": "runtime_cpu_init_start", "workers": num_cpu_workers})) self.cpu_pool_name = cpu_pool_name self.cpu_pool_size_mb = cpu_pool_size_mb self.num_cpu_workers = num_cpu_workers self.cpu_executor = mp.Pool(processes=num_cpu_workers) self.cpu_pool_manager = None # Заглушка для примера - print(f"✅ UnifiedRuntime Ready") + self.logger.info(json.dumps({"event": "runtime_ready", "socket_path": self.socket_path})) def start(self, block: bool = False): """ @@ -97,14 +124,15 @@ def start(self, block: bool = False): self._running = True self._server_thread = threading.Thread(target=self._run_event_loop, daemon=True) self._server_thread.start() - print("[Runtime] Server started (Thread)") + self.logger.info(json.dumps({"event": "runtime_server_started"})) if block: self._server_thread.join() def stop(self): """Остановка всех подсистем.""" - print("[Runtime] Shutting down...") + stop_started = time.time() + self.logger.info(json.dumps({"event": "runtime_shutdown_start"})) self._running = False # Остановка CPU воркеров @@ -112,23 +140,30 @@ def stop(self): self.cpu_executor.join() # Остановка GPU пула и сервера - if self.ipc_manager: - # В реальном коде тут был бы вызов shutdown - pass + if self.main_server: + self.main_server.stop() # Остановка потока сервера (если он был запущен) if hasattr(self, '_server_thread') and self._server_thread and self._server_thread.is_alive(): self._server_thread.join(timeout=5.0) - print("[Runtime] Shutdown complete.") + observe_runtime_latency("runtime_shutdown", time.time() - stop_started) + self.logger.info(json.dumps({"event": "runtime_shutdown_complete"})) def _run_event_loop(self): """Внутренний метод для потока сервера.""" - while self._running: - # Здесь была бы логика сокета и вызов менеджера аренд - time.sleep(1.0) - # ... socket.accept() ... - # ... self.ipc_manager.handle_worker() ... + if not self.main_server: + self.logger.warning(json.dumps({"event": "runtime_server_skipped", "reason": "gpu_pool_unavailable"})) + while self._running: + time.sleep(0.1) + return + + try: + self.main_server.start() + except Exception as e: + record_alloc_failure("runtime", "main_server_start") + self.logger.exception(json.dumps({"event": "runtime_server_error", "error": str(e)})) + self._running = False @monitor_pool_allocation def execute_gpu(self, func, *args, **kwargs): @@ -155,9 +190,8 @@ def execute_gpu(self, func, *args, **kwargs): # Обновляем метрики использования пула if self.gpu_pool: - # В реальном коде тут будет получение статистики из пула - # telemetry.update_pool_usage(used_bytes) - pass + used_bytes = sum(a.size for a in self.gpu_pool.allocations.values()) + update_pool_usage(used_bytes) return result @@ -179,30 +213,48 @@ def map_cpu(self, func: Callable, iterable: List[Any]) -> List[Any]: def get_pool_stats(self) -> dict: """Возвращает статистику GPU и CPU пулов.""" - # В реальном коде здесь были бы реальные метрики + gpu_stats = { + "pool_id": None, + "used_gb": 0.0, + "allocations": 0, + "total_gb": 0.0, + } + + if self.gpu_pool is not None: + used_bytes = sum(a.size for a in self.gpu_pool.allocations.values()) + gpu_stats = { + "pool_id": self.gpu_pool.pool_id, + "used_gb": used_bytes / (1024**3), + "allocations": len(self.gpu_pool.allocations), + "total_gb": self.gpu_pool.total_size / (1024**3), + } + update_pool_usage(int(used_bytes)) + stats = { - "gpu": {"pool_id": "mock_gpu_pool", "used_gb": 2.0}, + "gpu": gpu_stats, "cpu": { "pool_name": self.cpu_pool_name, "total_mb": self.cpu_pool_size_mb, - "workers": self.cpu_executor._processes if hasattr(self.cpu_executor, '_processes') else self.num_cpu_workers - } + "workers": self.cpu_executor._processes if hasattr(self.cpu_executor, '_processes') else self.num_cpu_workers, + }, } - - # Обновляем метрики телеметрии - if 'gpu' in stats and 'used_gb' in stats['gpu']: - used_bytes = stats['gpu']['used_gb'] * 1024**3 # Переводим в байты - update_pool_usage(int(used_bytes)) - return stats def get_ipc_lease_status(self) -> dict: """Возвращает статус IPC аренд.""" - # В реальном коде делегируется менеджеру аренд + if not self.ipc_manager: + return {"pending": 0, "active": 0, "workers_connected": 0} + + with self.ipc_manager.lock: + pending = sum(1 for l in self.ipc_manager.leases.values() if l.status == "PENDING") + active = sum(1 for l in self.ipc_manager.leases.values() if l.status == "ACTIVE") + workers_connected = len(self.ipc_manager.workers) + + update_active_leases(active) return { - "pending": 0, - "active": 0, - "workers_connected": 0 + "pending": pending, + "active": active, + "workers_connected": workers_connected, } # ------------------------------------------------------------------------- @@ -211,7 +263,7 @@ def get_ipc_lease_status(self) -> dict: def _signal_handler(self, sig, frame): """Обрабатывает SIGINT/SIGTERM для корректного завершения.""" - print(f"\n[Runtime] Caught signal {sig}, cleaning up...") + self.logger.warning(json.dumps({"event": "runtime_signal", "signal": int(sig)})) self.stop() sys.exit(0) diff --git a/zerolink/server/main_server.py b/zerolink/server/main_server.py index ac9be3f..39baf86 100644 --- a/zerolink/server/main_server.py +++ b/zerolink/server/main_server.py @@ -17,6 +17,7 @@ import threading import secrets import logging +import json from dataclasses import dataclass, field from typing import Dict, List, Optional, Tuple, Any @@ -36,6 +37,7 @@ # Импорты пула from ..core.gpu.vmm_pool import DeviceMemoryPoolV2, DeviceAllocation +from ..monitoring.telemetry import observe_runtime_latency, record_alloc_failure, record_lease_event, update_active_leases # ============================================================================ # Структуры состояния @@ -80,7 +82,11 @@ def __init__(self, pool: DeviceMemoryPoolV2, enable_integrity_hash: bool = True) # lease_id (int) -> LeaseState self.leases: Dict[int, LeaseState] = {} - self.logger = logging.getLogger("LeaseManager") + self.logger = logging.getLogger("zerolink.lease_manager") + if not self.logger.handlers: + handler = logging.StreamHandler() + handler.setFormatter(logging.Formatter("%(asctime)s %(levelname)s %(name)s %(message)s")) + self.logger.addHandler(handler) self.logger.setLevel(logging.INFO) def register_worker(self, sock: socket.socket, device_id: int): @@ -88,7 +94,7 @@ def register_worker(self, sock: socket.socket, device_id: int): fd = sock.fileno() with self.lock: self.workers[fd] = WorkerState(sock=sock, device_id=device_id) - self.logger.info(f"Worker registered: fd={fd}, device={device_id}") + self.logger.info(json.dumps({"event": "worker_registered", "fd": fd, "device_id": device_id})) def unregister_worker(self, fd: int): """Удаляет воркера и очищает его аренды.""" @@ -99,7 +105,7 @@ def unregister_worker(self, fd: int): # В реальной системе нужно возвращать память в пул to_remove = [lid for lid, l in self.leases.items() if l.worker_id == fd] for lid in to_remove: - self.logger.warning(f"Revoking lease {lid} due to worker disconnect") + self.logger.warning(json.dumps({"event": "lease_revoked_disconnect", "lease_id": lid, "worker_fd": fd})) self._revoke_lease(lid) def grant_lease(self, worker_fd: int, allocation: DeviceAllocation) -> int: @@ -107,6 +113,7 @@ def grant_lease(self, worker_fd: int, allocation: DeviceAllocation) -> int: Инициирует выдачу аренды воркеру (Отправляет ALLOC). Возвращает lease_id. """ + start_ts = time.time() with self.lock: worker = self.workers.get(worker_fd) if not worker: @@ -162,7 +169,8 @@ def grant_lease(self, worker_fd: int, allocation: DeviceAllocation) -> int: # ALLOC = Type 2 send_frame_with_fds(worker.sock, MSG_ALLOC, 0, wrapper, fds, flags=ctrl_flags) except Exception as e: - self.logger.error(f"Failed to send ALLOC to worker {worker_fd}: {e}") + record_alloc_failure("server", "send_alloc") + self.logger.error(json.dumps({"event": "send_alloc_failed", "worker_fd": worker_fd, "error": str(e)})) raise # 5. Сохранение состояния @@ -173,9 +181,12 @@ def grant_lease(self, worker_fd: int, allocation: DeviceAllocation) -> int: alloc_id=alloc_id, expected_hash=expected_hash ) + update_active_leases(len(self.leases)) + record_lease_event("server", "created") + observe_runtime_latency("server_grant_lease", time.time() - start_ts) return lease_id - def handle_message(self, fd: int, mtype: int, req_id: int, payload: bytes): + def handle_message(self, fd: int, mtype: int, req_id: int, payload: bytes, mflags: int = 0): """Обрабатывает входящее сообщение от воркера.""" with self.lock: # Обновляем heartbeat @@ -183,19 +194,18 @@ def handle_message(self, fd: int, mtype: int, req_id: int, payload: bytes): self.workers[fd].last_seen = time.time() if mtype == MSG_ACK: - self._handle_ack(fd, payload) + self._handle_ack(fd, payload, mflags) elif mtype == MSG_RELEASE: self._handle_release(fd, payload) elif mtype == MSG_PONG: pass # Просто обновили last_seen elif mtype == MSG_ERROR: - self.logger.error(f"Received ERROR from worker {fd}") + self.logger.error(json.dumps({"event": "worker_error_message", "worker_fd": fd})) - def _handle_ack(self, fd: int, payload: bytes): - lease_id, alloc_id, got_hash = unpack_ack_payload(payload, 0) - # flags check skipped for simplicity + def _handle_ack(self, fd: int, payload: bytes, mflags: int): + lease_id, alloc_id, got_hash = unpack_ack_payload(payload, mflags) if lease_id not in self.leases: - self.logger.warning(f"ACK for unknown lease {lease_id}") + self.logger.warning(json.dumps({"event": "ack_unknown_lease", "lease_id": lease_id, "worker_fd": fd})) return lease = self.leases[lease_id] @@ -205,16 +215,18 @@ def _handle_ack(self, fd: int, payload: bytes): # Проверка хеша (если воркер прислал) if self.enable_integrity_hash and got_hash: if got_hash != lease.expected_hash: - self.logger.error(f"Hash mismatch for lease {lease_id}!") + record_alloc_failure("server", "hash_mismatch") + self.logger.error(json.dumps({"event": "lease_hash_mismatch", "lease_id": lease_id, "worker_fd": fd})) self._revoke_lease(lease_id) return lease.status = "ACTIVE" - self.logger.info(f"Lease {lease_id} is now ACTIVE on worker {fd}") + record_lease_event("server", "active") + self.logger.info(json.dumps({"event": "lease_active", "lease_id": lease_id, "worker_fd": fd})) def _handle_release(self, fd: int, payload: bytes): lease_id, _ = unpack_release_payload(payload) - self.logger.info(f"Worker {fd} released lease {lease_id}") + self.logger.info(json.dumps({"event": "lease_released_by_worker", "worker_fd": fd, "lease_id": lease_id})) self._revoke_lease(lease_id) def _revoke_lease(self, lease_id: int): @@ -225,6 +237,8 @@ def _revoke_lease(self, lease_id: int): # Но только если Main сам больше не использует эту память. # Здесь мы просто удаляем запись об аренде. del self.leases[lease_id] + update_active_leases(len(self.leases)) + record_lease_event("server", "revoked") def _get_backing_fds(self, allocation: DeviceAllocation) -> Tuple[List[Dict], List[int]]: """ @@ -326,6 +340,12 @@ def __init__(self, socket_path: str, lease_manager: MainIPCLeaseManager2P): self.selector = selectors.DefaultSelector() self.server_sock: Optional[socket.socket] = None self._running = False + self.logger = logging.getLogger("zerolink.server") + if not self.logger.handlers: + handler = logging.StreamHandler() + handler.setFormatter(logging.Formatter("%(asctime)s %(levelname)s %(name)s %(message)s")) + self.logger.addHandler(handler) + self.logger.setLevel(logging.INFO) def start(self): """Инициализация сокета и запуск лупа (блокирующий вызов).""" @@ -342,7 +362,7 @@ def start(self): self.selector.register(self.server_sock, selectors.EVENT_READ, data=None) self._running = True - print(f"[Server] Listening on {self.socket_path}") + self.logger.info(json.dumps({"event": "server_listening", "socket_path": self.socket_path})) try: while self._running: @@ -354,7 +374,8 @@ def start(self): self._handle_client_data(key, mask) # Здесь можно добавить self.manager.check_heartbeats() except Exception as e: - print(f"[Server] Event loop error: {e}") + record_alloc_failure("server", "event_loop") + self.logger.exception(json.dumps({"event": "server_event_loop_error", "error": str(e)})) finally: self.stop() @@ -405,17 +426,18 @@ def _handle_client_data(self, key, mask): key.data["state"] = "CONNECTED" # Можно ответить ACK else: - print(f"[Server] Expected HELLO, got {mtype}") + self.logger.warning(json.dumps({"event": "server_expected_hello", "got_type": mtype})) self._close_client(key) else: # Делегируем менеджеру - self.manager.handle_message(sock.fileno(), mtype, req_id, payload) + self.manager.handle_message(sock.fileno(), mtype, req_id, payload, flags) # Закрываем входящие FD, если они не нужны (менеджер их не забирает при приеме) # В текущем протоколе воркер не шлет FD серверу, только наоборот. for fd in fds: os.close(fd) except Exception as e: - print(f"[Server] Client error: {e}") + record_alloc_failure("server", "client_handler") + self.logger.error(json.dumps({"event": "server_client_error", "error": str(e)})) self._close_client(key) def _close_client(self, key): diff --git a/zerolink/workers/gpu_worker.py b/zerolink/workers/gpu_worker.py index aa5b162..8a7ec3f 100644 --- a/zerolink/workers/gpu_worker.py +++ b/zerolink/workers/gpu_worker.py @@ -14,24 +14,30 @@ import struct import weakref import threading +import json +import logging +import time import torch from dataclasses import dataclass, field from typing import Optional, Tuple, List, Dict, Any # Импорты из нашего пакета from ..core.protocol import ( - MSG_HELLO, MSG_ALLOC, MSG_ACK, MSG_RELEASE, + MSG_HELLO, MSG_ALLOC, MSG_ACK, MSG_RELEASE, MSG_ERROR, MSG_PING, MSG_PONG, + CTRL_FLAG_HAS_HASH, pack_ctrl, - send_frame, send_frame_with_fds, + recv_frame_with_fds, unpack_alloc_payload, pack_ack_payload, pack_release_payload, pack_error_payload, unpack_ipc_payload, - hash32 + hash32, ) +from ..monitoring.telemetry import observe_runtime_latency, record_alloc_failure, record_lease_event, update_active_leases + # ============================================================================ # Локальные структуры данных # ============================================================================ @@ -68,6 +74,12 @@ def __init__( self.lock = threading.RLock() self._send_lock = threading.Lock() self._running = True + self.logger = logging.getLogger("zerolink.worker") + if not self.logger.handlers: + handler = logging.StreamHandler() + handler.setFormatter(logging.Formatter("%(asctime)s %(levelname)s %(name)s %(message)s")) + self.logger.addHandler(handler) + self.logger.setLevel(logging.INFO) # -------------------------------------------------------------------------- # Lifecycle: Connect / Shutdown @@ -86,9 +98,9 @@ def connect(self): # HELLO handshake payload = struct.pack(" bool: return False try: - # В реальном проекте здесь импорт из core.protocol.framing - # mtype, mflags, req_id, payload, fds = recv_frame_with_fds(self.sock) - - # Для демонстрации создадим фейковую функцию recv_frame_with_fds здесь, - # чтобы этот файл был автономным. - # Либо (что лучше) импортируем из ..core.protocol.framing - - # ВНИМАНИЕ: Для работы скрипта я добавлю локальную реализацию recv, - # чтобы избежать ошибок импорта, так как framing.py находится в другой папке - # и импорты зависят от конкретного пути проекта. - # В реальном проекте просто: from ..core.protocol.framing import recv_frame_with_fds - - # Заглушка для автономности файла: - mtype, mflags, req_id, payload, fds = self._recv_frame_with_fds_stub(self.sock) + mtype, mflags, req_id, payload, fds = recv_frame_with_fds(self.sock) - except (ConnectionResetError, BrokenPipeError, OSError): - print("[Worker] Connection lost") + except (ConnectionResetError, BrokenPipeError, OSError) as e: + self.logger.warning(json.dumps({"event": "worker_connection_lost", "error": str(e)})) return False # Обработка сообщений @@ -155,8 +154,7 @@ def loop_once(self) -> bool: except OSError: pass # Отвечаем PONG - with self._send_lock: - send_frame(self.sock, MSG_PONG, req_id, b"") + self._send_frame(MSG_PONG, req_id, b"") return True # Обработка ALLOC (Импорт памяти) @@ -172,16 +170,12 @@ def loop_once(self) -> bool: # Прочие сообщения игнорируем return True - # Локальный заглушка recv_frame_with_fds для автономности файла (чтобы не падал при запуске) - def _recv_frame_with_fds_stub(self, sock: socket.socket): - # ... (код из framing.py, интегрированный сюда для автономности) ... - pass - def _handle_alloc(self, ctrl_flags: int, req_id: int, payload: bytes, fds: List[int]): """Обработка запроса на выделение памяти.""" lease_id = 0 alloc_id = "" + start_ts = time.time() try: lease_id, mapping_payload, expected_hash = unpack_alloc_payload(payload, ctrl_flags) @@ -234,6 +228,8 @@ def _handle_alloc(self, ctrl_flags: int, req_id: int, payload: bytes, fds: List[ region=region, alloc_id=alloc_id ) + update_active_leases(len(self.active_leases)) + record_lease_event("worker", "created") # 6. Отправляем ACK # Если Main прислал хеш, Worker должен ответить с ним? @@ -245,13 +241,16 @@ def _handle_alloc(self, ctrl_flags: int, req_id: int, payload: bytes, fds: List[ else: self._send_frame(MSG_ACK, req_id, pack_ack_payload(lease_id, alloc_id)) + observe_runtime_latency("worker_handle_alloc", time.time() - start_ts) + except Exception as e: # Best-effort ERROR try: self._send_frame(MSG_ERROR, req_id, pack_error_payload(lease_id, alloc_id, 1, str(e))) except Exception: pass - print(f"[Worker] ALLOC failed lease={lease_id} alloc={alloc_id}: {e}") + record_alloc_failure("worker", "alloc_import") + self.logger.error(json.dumps({"event": "worker_alloc_failed", "lease_id": lease_id, "alloc_id": alloc_id, "error": str(e)})) finally: # КРИТИЧНО: Закрываем FD @@ -304,7 +303,7 @@ def release(self, lease_id: int, sync: bool = True): with self.lock: entry = self.active_leases.get(lease_id) if not entry: - print(f"[Worker] Warning: Release unknown lease {lease_id}") + self.logger.warning(json.dumps({"event": "worker_release_unknown_lease", "lease_id": lease_id})) return # Блокируем новые операции @@ -330,16 +329,22 @@ def release(self, lease_id: int, sync: bool = True): rel_pl = pack_release_payload(lease_id, alloc_id) self._send_frame(MSG_RELEASE, 0, rel_pl) except OSError as e: - print(f"[Worker] Failed to send RELEASE {lease_id}: {e}") + self.logger.error(json.dumps({"event": "worker_release_send_failed", "lease_id": lease_id, "error": str(e)})) return # Удаление из словаря with self.lock: self.active_leases.pop(lease_id, None) + update_active_leases(len(self.active_leases)) + record_lease_event("worker", "released") def _cleanup_lease(self, lease_id: int): with self.lock: self.active_leases.pop(lease_id, None) + update_active_leases(len(self.active_leases)) + record_lease_event("worker", "cleaned") + update_active_leases(len(self.active_leases)) + record_lease_event("worker", "released") # -------------------------------------------------------------------------- # Внутренние методы (Helper)