Skip to content

Commit

Permalink
Merge pull request #339 from backend-developers-ltd/COM-253-efficient…
Browse files Browse the repository at this point in the history
…-receipts-transfer

efficient receipts transfer
  • Loading branch information
kkowalski-reef authored Jan 13, 2025
2 parents c8199ff + 6ed7daa commit 9b894f2
Show file tree
Hide file tree
Showing 47 changed files with 1,695 additions and 796 deletions.
3 changes: 0 additions & 3 deletions compute_horde/compute_horde/receipts/__init__.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,9 @@
from compute_horde.receipts.schemas import Receipt, ReceiptType
from compute_horde.receipts.transfer import ReceiptFetchError, get_miner_receipts

default_app_config = "compute_horde.receipts.apps.ComputeHordeReceiptsConfig"

# Reexported for compatibility. These were moved to submodules.
__all__ = [
"Receipt",
"ReceiptType",
"get_miner_receipts",
"ReceiptFetchError",
]
92 changes: 83 additions & 9 deletions compute_horde/compute_horde/receipts/models.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
from datetime import timedelta
from typing import ClassVar, Self, TypeAlias, assert_never

from django.db import models

from compute_horde.executor_class import DEFAULT_EXECUTOR_CLASS, ExecutorClass
from compute_horde.receipts import ReceiptType
from compute_horde.receipts.schemas import (
JobAcceptedReceiptPayload,
JobFinishedReceiptPayload,
Expand All @@ -23,6 +25,9 @@ class AbstractReceipt(models.Model):
miner_signature = models.CharField(max_length=256, null=True, blank=True)
timestamp = models.DateTimeField()

# https://github.com/typeddjango/django-stubs/issues/1684#issuecomment-1706446344
objects: ClassVar[models.Manager[Self]]

class Meta:
abstract = True
constraints = [
Expand All @@ -42,9 +47,6 @@ class JobStartedReceipt(AbstractReceipt):
is_organic = models.BooleanField()
ttl = models.IntegerField()

# https://github.com/typeddjango/django-stubs/issues/1684#issuecomment-1706446344
objects: models.Manager["JobStartedReceipt"]

def to_receipt(self) -> Receipt:
if self.miner_signature is None:
raise ReceiptNotSigned("Miner signature is required")
Expand All @@ -64,14 +66,33 @@ def to_receipt(self) -> Receipt:
miner_signature=self.miner_signature,
)

@classmethod
def from_receipt(cls, receipt: Receipt) -> "JobStartedReceipt":
if not isinstance(receipt.payload, JobStartedReceiptPayload):
raise ValueError(
f"Incompatible receipt payload type. "
f"Got: {type(receipt.payload).__name__} "
f"Expected: {JobStartedReceiptPayload.__name__}"
)

return JobStartedReceipt(
job_uuid=receipt.payload.job_uuid,
miner_hotkey=receipt.payload.miner_hotkey,
validator_hotkey=receipt.payload.validator_hotkey,
miner_signature=receipt.miner_signature,
validator_signature=receipt.validator_signature,
timestamp=receipt.payload.timestamp,
executor_class=receipt.payload.executor_class,
max_timeout=receipt.payload.max_timeout,
is_organic=receipt.payload.is_organic,
ttl=receipt.payload.ttl,
)


class JobAcceptedReceipt(AbstractReceipt):
time_accepted = models.DateTimeField()
ttl = models.IntegerField()

# https://github.com/typeddjango/django-stubs/issues/1684#issuecomment-1706446344
objects: models.Manager["JobAcceptedReceipt"]

def to_receipt(self) -> Receipt:
if self.miner_signature is None:
raise ReceiptNotSigned("Miner signature is required")
Expand All @@ -89,15 +110,32 @@ def to_receipt(self) -> Receipt:
miner_signature=self.miner_signature,
)

@classmethod
def from_receipt(cls, receipt: Receipt) -> "JobAcceptedReceipt":
if not isinstance(receipt.payload, JobAcceptedReceiptPayload):
raise ValueError(
f"Incompatible receipt payload type. "
f"Got: {type(receipt.payload).__name__} "
f"Expected: {JobAcceptedReceiptPayload.__name__}"
)

return JobAcceptedReceipt(
job_uuid=receipt.payload.job_uuid,
miner_hotkey=receipt.payload.miner_hotkey,
validator_hotkey=receipt.payload.validator_hotkey,
miner_signature=receipt.miner_signature,
validator_signature=receipt.validator_signature,
timestamp=receipt.payload.timestamp,
time_accepted=receipt.payload.time_accepted,
ttl=receipt.payload.ttl,
)


class JobFinishedReceipt(AbstractReceipt):
time_started = models.DateTimeField()
time_took_us = models.BigIntegerField()
score_str = models.CharField(max_length=256)

# https://github.com/typeddjango/django-stubs/issues/1684#issuecomment-1706446344
objects: models.Manager["JobFinishedReceipt"]

def time_took(self):
return timedelta(microseconds=self.time_took_us)

Expand All @@ -121,3 +159,39 @@ def to_receipt(self) -> Receipt:
validator_signature=self.validator_signature,
miner_signature=self.miner_signature,
)

@classmethod
def from_receipt(cls, receipt: Receipt) -> "JobFinishedReceipt":
if not isinstance(receipt.payload, JobFinishedReceiptPayload):
raise ValueError(
f"Incompatible receipt payload type. "
f"Got: {receipt.payload.__class__.__name__} "
f"Expected: {JobFinishedReceiptPayload.__name__}"
)

return JobFinishedReceipt(
job_uuid=receipt.payload.job_uuid,
miner_hotkey=receipt.payload.miner_hotkey,
validator_hotkey=receipt.payload.validator_hotkey,
miner_signature=receipt.miner_signature,
validator_signature=receipt.validator_signature,
timestamp=receipt.payload.timestamp,
time_started=receipt.payload.time_started,
time_took_us=receipt.payload.time_took_us,
score_str=receipt.payload.score_str,
)


ReceiptModel: TypeAlias = JobAcceptedReceipt | JobStartedReceipt | JobFinishedReceipt


def receipt_to_django_model(receipt: Receipt) -> ReceiptModel:
match receipt.payload.receipt_type:
case ReceiptType.JobAcceptedReceipt:
return JobAcceptedReceipt.from_receipt(receipt)
case ReceiptType.JobStartedReceipt:
return JobStartedReceipt.from_receipt(receipt)
case ReceiptType.JobFinishedReceipt:
return JobFinishedReceipt.from_receipt(receipt)
case _:
assert_never(receipt.payload.receipt_type)
29 changes: 25 additions & 4 deletions compute_horde/compute_horde/receipts/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,15 +61,36 @@ def score(self):
]


class BadReceiptSignature(Exception):
def __init__(self, receipt: "Receipt"):
self.receipt = receipt


class BadMinerReceiptSignature(BadReceiptSignature):
pass


class BadValidatorReceiptSignature(BadReceiptSignature):
pass


class Receipt(BaseModel):
payload: ReceiptPayload
validator_signature: str
miner_signature: str

def verify_miner_signature(self):
def verify_miner_signature(self, throw=False):
miner_keypair = bittensor.Keypair(ss58_address=self.payload.miner_hotkey)
return miner_keypair.verify(self.payload.blob_for_signing(), self.miner_signature)
is_valid = miner_keypair.verify(self.payload.blob_for_signing(), self.miner_signature)
if throw and not is_valid:
raise BadMinerReceiptSignature(self)
return is_valid

def verify_validator_signature(self):
def verify_validator_signature(self, throw=False):
validator_keypair = bittensor.Keypair(ss58_address=self.payload.validator_hotkey)
return validator_keypair.verify(self.payload.blob_for_signing(), self.validator_signature)
is_valid = validator_keypair.verify(
self.payload.blob_for_signing(), self.validator_signature
)
if throw and not is_valid:
raise BadValidatorReceiptSignature(self)
return is_valid
21 changes: 21 additions & 0 deletions compute_horde/compute_horde/receipts/store/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
import abc
import datetime
from collections.abc import Sequence

from compute_horde.receipts.schemas import Receipt


class BaseReceiptStore(metaclass=abc.ABCMeta):
@abc.abstractmethod
def store(self, receipts: Sequence[Receipt]) -> None:
"""
Append receipts to the store.
"""
...

@abc.abstractmethod
def evict(self, cutoff: datetime.datetime) -> None:
"""
Remove receipts (roughly) older than the cutoff
"""
...
165 changes: 165 additions & 0 deletions compute_horde/compute_horde/receipts/store/local.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
import gzip
import logging
import os
import re
import shutil
import tempfile
from collections import defaultdict
from collections.abc import Sequence
from datetime import datetime
from glob import glob
from pathlib import Path

from django.conf import settings
from django.core.exceptions import ImproperlyConfigured
from django.utils import timezone

from compute_horde.receipts.schemas import (
Receipt,
)
from compute_horde.receipts.store.base import BaseReceiptStore

"""
Considerations:
- smaller pages make for more (smaller) batches when downloading them for the first time (and we need ~5 hours of pages)
- pages get copied over on write, larger pages may impact efficiency of this
- we keep up with the 2 latest pages based on time and the clock may be out of sync between miners and validators
"""
PAGE_TIME = 60 * 5 # 5 minutes
N_ACTIVE_PAGES = 2

logger = logging.getLogger(__name__)


class LocalFilesystemPagedReceiptStore(BaseReceiptStore):
def __init__(self):
super().__init__()
if not getattr(settings, "LOCAL_RECEIPTS_ROOT", ""):
raise ImproperlyConfigured("Required settings.py setting missing: LOCAL_RECEIPTS_ROOT")
self.pages_directory = Path(settings.LOCAL_RECEIPTS_ROOT) # type: ignore
self.pages_directory.mkdir(parents=True, exist_ok=True)

@classmethod
def current_page(cls) -> int:
"""
Get current page ID
"""
return cls.current_page_at(timezone.now())

@staticmethod
def current_page_at(dt: datetime) -> int:
"""
Calculate what the current page was at given time
"""
return int(dt.timestamp() // PAGE_TIME)

def store(self, receipts: Sequence[Receipt]) -> None:
"""
Append receipts to the store.
"""
pages: defaultdict[int, list[Receipt]] = defaultdict(list)
for receipt in receipts:
pages[self.current_page()].append(receipt)
for page, receipts_in_page in pages.items():
self._append_to_page(receipts_in_page, page)

def page_filepath(self, page: int) -> Path:
"""
Find the filepath under which given pagefile should be found.
Does not check whether it actually exists or not.
"""
return self.pages_directory / f"{page}.jsonl"

def archive_filepath(self, page: int) -> Path:
"""
Find the filepath under which given pagefile's archive should be found.
Does not check whether it actually exists or not.
"""
return self.pages_directory / f"{page}.jsonl.gz"

def delete_page(self, page: int) -> None:
"""
Deletes the page file from the file system if it exists.
Does nothing otherwise.
"""
try:
os.unlink(self.page_filepath(page))
except FileNotFoundError:
pass
try:
os.unlink(self.archive_filepath(page))
except FileNotFoundError:
pass

def evict(self, cutoff: datetime) -> None:
self.delete_pages_older_than(cutoff)

def delete_pages_older_than(self, older_than: int | datetime) -> None:
if isinstance(older_than, datetime):
older_than = self.current_page_at(older_than)

old_pages = [p for p in self.get_available_pages() if p < older_than]

for old_page in old_pages:
try:
self.delete_page(old_page)
except Exception:
logger.exception("Error while deleting page %s", old_page, exc_info=True)

def archive_old_pages(self) -> None:
"""
Create archives for all old pages if they don't exist yet.
Skips active pages as these can be still written to.
"""
current_page = self.current_page()
upper_cutoff = current_page - N_ACTIVE_PAGES
pages_to_archive = [p for p in self.get_available_pages() if p <= upper_cutoff]
for page in pages_to_archive:
archive_filepath = self.archive_filepath(page)
if archive_filepath.exists():
continue
self.do_archive_page(page)

def do_archive_page(self, page: int) -> None:
"""
Packs given page and creates an additional archive file used by nginx to serve the page.
"""
with open(self.page_filepath(page), "rb") as page_file:
with gzip.open(self.archive_filepath(page), "wb") as archive_file:
shutil.copyfileobj(page_file, archive_file)

def get_available_pages(self) -> list[int]:
"""
Return IDs of all existing pages.
"""
pagefiles = self._get_available_page_filepaths()
pages: list[int] = []
pattern = re.compile(r"(\d+)\.jsonl$")
for pagefile in pagefiles:
match = pattern.search(str(pagefile))
if match:
pages.append(int(match[1]))
return pages

def _append_to_page(self, receipts: Sequence[Receipt], page: int) -> None:
"""
Write new receipts to the specified page.
For read safety this will copy the page file first, append to the copy and do a swap with the original.
"""
page_filepath = self.page_filepath(page)
page_filepath.touch(exist_ok=True)
with tempfile.TemporaryDirectory() as tmpdir:
tmp_page_filepath = Path(tmpdir) / "tmp_page.jsonl"
shutil.copyfile(page_filepath, tmp_page_filepath)
with open(tmp_page_filepath, "a") as pagefile:
for r in receipts:
pagefile.write(r.model_dump_json())
pagefile.write("\n")
shutil.move(tmp_page_filepath, page_filepath)

def _get_available_page_filepaths(self) -> list[Path]:
"""
Return filepaths of all existing pages.
"""
pagefiles = glob(str(self.pages_directory / "*.jsonl"))
return [Path(pagefile) for pagefile in pagefiles]
13 changes: 13 additions & 0 deletions compute_horde/compute_horde/receipts/store/noop.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
import datetime
from collections.abc import Sequence

from compute_horde.receipts import Receipt
from compute_horde.receipts.store.base import BaseReceiptStore


class NoopReceiptStore(BaseReceiptStore):
def store(self, receipts: Sequence[Receipt]) -> None:
pass

def evict(self, cutoff: datetime.datetime) -> None:
pass
Loading

0 comments on commit 9b894f2

Please sign in to comment.