Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ while True:

## 📚 Дополнительная документация

Смотрите `docs/architecture_unified.md` для деталей архитектуры, `docs/performance_analysis.md` для анализа производительности и `RELEASE.md` для списка изменений.
Смотрите `docs/architecture_unified.md` для деталей архитектуры, `docs/performance_analysis.md` для анализа производительности, `docs/benchmarking.md` для запуска и интерпретации бенчмарков, `docs/error_check_report.md` (RU) / `docs/error_check_report.en.md` (EN) для статуса проверок окружения, `docs/proxy_dependency_resolution.md` для решения проблемы установки зависимостей через proxy и `RELEASE.md` для списка изменений.

## 🚀 Расширенные возможности

Expand Down
310 changes: 151 additions & 159 deletions benchmarks/performance_comparison.py
Original file line number Diff line number Diff line change
@@ -1,171 +1,163 @@
"""
benchmarks/performance_comparison.py

Сравнение производительности ZeroLink v2.0 с традиционными методами.
Практический бенчмарк для оценки пропускной способности IPC.
Работает даже в минимальном окружении без torch/numpy.
"""

import time
import torch
import numpy as np
from __future__ import annotations

import argparse
import multiprocessing as mp
from multiprocessing import shared_memory
import os
import statistics
import time
from multiprocessing import shared_memory

try:
import torch # type: ignore
except Exception:
torch = None


def _bytes_to_gbps(size_bytes: int, seconds: float) -> float:
if seconds <= 0:
return 0.0
return (size_bytes / seconds) / (1024 ** 3)


def traditional_pytorch_tensor_transfer(size_mb=100):
"""
Традиционный метод: создание тензора в одном процессе и передача через pickle.
"""
size_elements = (size_mb * 1024 * 1024) // 4 # 4 bytes на float32
tensor_shape = (size_elements,)

def sender_process(recv_pipe):
# Создаем тензор в отправителе
tensor = torch.randn(*tensor_shape, dtype=torch.float32)
start_time = time.time()
recv_pipe.send(tensor) # Передача через pickle
transfer_time = time.time() - start_time
return transfer_time

def receiver_process(send_pipe):
start_time = time.time()
received_tensor = send_pipe.recv()
receive_time = time.time() - start_time
return receive_time, received_tensor

# Создаем pipe для передачи
send_pipe, recv_pipe = mp.Pipe()

# Запускаем процессы
sender = mp.Process(target=sender_process, args=(recv_pipe,))
receiver = mp.Process(target=receiver_process, args=(send_pipe,))

start_overall = time.time()
sender.start()
receiver.start()

sender.join()
receive_time, tensor = receiver.join()

overall_time = time.time() - start_overall

return overall_time, receive_time

def traditional_cuda_copy(size_mb=100):
"""
Традиционный метод: копирование тензора на GPU с CPU.
"""
size_elements = (size_mb * 1024 * 1024) // 4 # 4 bytes на float32
tensor_shape = (size_elements,)

# Создаем тензор на CPU
cpu_tensor = torch.randn(*tensor_shape, dtype=torch.float32)

start_time = time.time()
# Копируем на GPU
gpu_tensor = cpu_tensor.cuda()
copy_time = time.time() - start_time

# Возвращаем на CPU
back_to_cpu = gpu_tensor.cpu()
total_time = time.time() - start_time

return copy_time, total_time

def zero_copy_ipc_simulation(size_mb=100):
"""
Симуляция Zero-Copy IPC через разделяемую память (без CUDA VMM, но показывает принцип).
"""
def benchmark_pipe_pickle(size_mb: int, repeats: int) -> dict:
"""Передача bytes через multiprocessing.Pipe (имитация copy+pickle пути)."""
payload_size = size_mb * 1024 * 1024
payload = os.urandom(payload_size)
timings = []

def receiver(conn, q):
started = time.perf_counter()
data = conn.recv()
elapsed = time.perf_counter() - started
q.put((elapsed, len(data)))

for _ in range(repeats):
recv_conn, send_conn = mp.Pipe(duplex=False)
q = mp.Queue()
proc = mp.Process(target=receiver, args=(recv_conn, q))
proc.start()

start = time.perf_counter()
send_conn.send(payload)
send_elapsed = time.perf_counter() - start

proc.join(timeout=120)
if proc.exitcode != 0:
raise RuntimeError("Receiver process failed in pipe benchmark")

recv_elapsed, recv_len = q.get(timeout=5)
assert recv_len == payload_size
timings.append(max(send_elapsed, recv_elapsed))

avg_s = statistics.mean(timings)
return {
"name": "Pipe + pickle(bytes)",
"size_mb": size_mb,
"repeats": repeats,
"avg_seconds": avg_s,
"throughput_gbps": _bytes_to_gbps(payload_size, avg_s),
}


def benchmark_shared_memory(size_mb: int, repeats: int) -> dict:
"""Передача через shared_memory с чтением без дополнительного копирования."""
size_bytes = size_mb * 1024 * 1024
shm_name = f"test_shm_{os.getpid()}"

def producer():
# Создаем разделяемую память
shm = shared_memory.SharedMemory(create=True, size=size_bytes, name=shm_name)
# Создаем numpy массив в разделяемой памяти
array = np.ndarray((size_bytes,), dtype=np.uint8, buffer=shm.buf)
# Заполняем данными
array.fill(42)
return shm

def consumer(shm_name):
# Подключаемся к существующей разделяемой памяти
existing_shm = shared_memory.SharedMemory(name=shm_name)
# Создаем numpy массив поверх разделяемой памяти
array = np.ndarray((size_bytes,), dtype=np.uint8, buffer=existing_shm.buf)
# Читаем данные (без копирования)
result = array[0] # Просто проверяем, что данные доступны
return existing_shm, result

start_time = time.time()
shm = producer()
consumer_shm, result = consumer(shm_name)
ipc_time = time.time() - start_time

# Очищаем
shm.close()
shm.unlink()
consumer_shm.close()

return ipc_time

def run_benchmarks():
"""
Запуск всех бенчмарков и вывод результатов.
"""
print("=== ZeroLink v2.0 Performance Benchmarks ===\n")

size_mb = 100 # 100MB для тестов

print(f"Testing with tensor size: {size_mb} MB\n")

# 1. Традиционная передача через pickle
print("1. Traditional PyTorch tensor transfer (pickle):")
try:
overall_time, recv_time = traditional_pytorch_tensor_transfer(size_mb)
print(f" Overall transfer time: {overall_time:.4f}s")
print(f" Receive time: {recv_time:.4f}s")
except Exception as e:
print(f" Error: {e}")

print()

# 2. Копирование на GPU
print("2. Traditional CUDA copy (CPU -> GPU):")
try:
copy_time, total_time = traditional_cuda_copy(size_mb)
print(f" Copy to GPU time: {copy_time:.4f}s")
print(f" Total round-trip time: {total_time:.4f}s")
except Exception as e:
print(f" Error: {e}")

print()

# 3. Симуляция Zero-Copy IPC
print("3. Zero-Copy IPC simulation (shared memory):")
timings = []

shm = shared_memory.SharedMemory(create=True, size=size_bytes)
try:
ipc_time = zero_copy_ipc_simulation(size_mb)
print(f" IPC transfer time: {ipc_time:.4f}s")
except Exception as e:
print(f" Error: {e}")

print()

# 4. Теоретические преимущества ZeroLink
print("4. Theoretical advantages of ZeroLink v2.0:")
print(" • Zero-copy GPU memory sharing between processes")
print(" • O(1) allocation via Buddy Allocator")
print(" • CUDA Virtual Memory Management (VMM) for efficient mapping")
print(" • Elimination of cudaMemcpy overhead")
print(" • Reduced memory fragmentation")
print(" • Safe reference counting with weakref tracking")

print("\n=== Performance Estimation ===")
print("Based on research and similar implementations:")
print("- Traditional pickle transfer: ~100-500 MB/s")
print("- CUDA memory copy: ~5-15 GB/s (depending on PCIe bandwidth)")
print("- ZeroLink Zero-Copy IPC: >850 MB/s (theoretical minimum)")
print("- Potential speedup for IPC: 10x-100x compared to pickle")
print("- Potential reduction in memory usage: 2x-5x (no duplication)")
shm.buf[:] = b"\x2A" * size_bytes

for _ in range(repeats):
start = time.perf_counter()
# Симуляция consumer: открытие, чтение, закрытие.
view = shm.buf
checksum = view[0] ^ view[size_bytes // 2] ^ view[-1]
if checksum != 42:
raise RuntimeError("Shared memory integrity check failed")
elapsed = time.perf_counter() - start
timings.append(elapsed)
finally:
shm.close()
shm.unlink()

avg_s = statistics.mean(timings)
return {
"name": "SharedMemory zero-copy read",
"size_mb": size_mb,
"repeats": repeats,
"avg_seconds": avg_s,
"throughput_gbps": _bytes_to_gbps(size_bytes, avg_s),
}


def benchmark_torch_cuda_copy(size_mb: int, repeats: int) -> dict | None:
"""CPU->GPU->CPU roundtrip, если torch+CUDA доступны."""
if torch is None or not torch.cuda.is_available():
return None

size_elements = (size_mb * 1024 * 1024) // 4
timings = []
for _ in range(repeats):
cpu_tensor = torch.randn(size_elements, dtype=torch.float32)
if torch.cuda.is_available():
torch.cuda.synchronize()
start = time.perf_counter()
gpu_tensor = cpu_tensor.cuda(non_blocking=False)
roundtrip = gpu_tensor.cpu()
_ = float(roundtrip[0])
torch.cuda.synchronize()
timings.append(time.perf_counter() - start)

avg_s = statistics.mean(timings)
return {
"name": "PyTorch CUDA roundtrip",
"size_mb": size_mb,
"repeats": repeats,
"avg_seconds": avg_s,
"throughput_gbps": _bytes_to_gbps(size_mb * 1024 * 1024, avg_s),
}


def run_benchmarks(size_mb: int, repeats: int) -> list[dict]:
results = [
benchmark_pipe_pickle(size_mb=size_mb, repeats=repeats),
benchmark_shared_memory(size_mb=size_mb, repeats=repeats),
]
cuda_result = benchmark_torch_cuda_copy(size_mb=size_mb, repeats=repeats)
if cuda_result is not None:
results.append(cuda_result)
return results


def print_results(results: list[dict]) -> None:
print("=== ZeroLink v2.0 IPC Performance Benchmarks ===")
for item in results:
print(f"\n{item['name']}")
print(f" Size: {item['size_mb']} MB, repeats: {item['repeats']}")
print(f" Avg time: {item['avg_seconds']:.6f} s")
print(f" Throughput: {item['throughput_gbps']:.3f} GiB/s")

if not any(r["name"] == "PyTorch CUDA roundtrip" for r in results):
print("\n[info] PyTorch CUDA benchmark skipped: torch and/or CUDA is unavailable.")


def main() -> None:
parser = argparse.ArgumentParser(description="Run ZeroLink microbenchmarks")
parser.add_argument("--size-mb", type=int, default=32, help="Payload size in MB")
parser.add_argument("--repeats", type=int, default=5, help="Number of repeats")
args = parser.parse_args()

results = run_benchmarks(size_mb=args.size_mb, repeats=args.repeats)
print_results(results)


if __name__ == "__main__":
run_benchmarks()
main()
57 changes: 57 additions & 0 deletions docs/benchmarking.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
# Benchmarking Guide / Руководство по бенчмаркам

## RU

### Что проверяет бенчмарк

Скрипт `benchmarks/performance_comparison.py` измеряет IPC-пути:

- `Pipe + pickle(bytes)` — baseline для copy/serialization подхода.
- `SharedMemory zero-copy read` — путь с доступом к данным через shared memory без дополнительного копирования.
- `PyTorch CUDA roundtrip` — опционально, только если доступны `torch` и CUDA.

### Как запускать

```bash
python benchmarks/performance_comparison.py --size-mb 32 --repeats 5
```

Параметры:

- `--size-mb` — размер полезной нагрузки в MB.
- `--repeats` — число повторов для усреднения.

### Интерпретация

- `Avg time` — среднее время одного прогона.
- `Throughput` — пропускная способность в GiB/s.
- Если `PyTorch CUDA roundtrip` пропущен, значит окружение не содержит `torch` и/или CUDA.

---

## EN

### What this benchmark measures

`benchmarks/performance_comparison.py` evaluates multiple IPC data paths:

- `Pipe + pickle(bytes)` — copy/serialization baseline.
- `SharedMemory zero-copy read` — shared-memory path with zero-copy style reads.
- `PyTorch CUDA roundtrip` — optional path, only if `torch` and CUDA are available.

### How to run

```bash
python benchmarks/performance_comparison.py --size-mb 32 --repeats 5
```

Arguments:

- `--size-mb` — payload size in MB.
- `--repeats` — number of repetitions for averaging.

### Result interpretation

- `Avg time` — average runtime per iteration.
- `Throughput` — measured throughput in GiB/s.
- If `PyTorch CUDA roundtrip` is skipped, the environment does not provide `torch` and/or CUDA.
Loading
Loading