Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions neuracore-dictionary.txt
Original file line number Diff line number Diff line change
Expand Up @@ -271,3 +271,4 @@ getpid
WRONLY
rels
huggingface
itemsize
48 changes: 44 additions & 4 deletions neuracore/core/robot.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from neuracore.core.streaming.data_stream import DataStream
from neuracore.core.streaming.recording_state_manager import get_recording_state_manager
from neuracore.core.utils.robot_mapping import RobotMapping
from neuracore.data_daemon.communications_management.producer import Producer
from neuracore.data_daemon.communications_management.producer import (
RecordingContext as DaemonRecordingContext,
)
Expand Down Expand Up @@ -107,6 +108,7 @@ def __init__(
self._auth: Auth = get_auth()
self._temp_dir = None
self._data_streams: dict[str, DataStream] = dict()
self._daemon_recording_context: DaemonRecordingContext | None = None

self.org_id = org_id or get_current_org()

Expand Down Expand Up @@ -319,8 +321,11 @@ def stop_recording(self, recording_id: str) -> None:
if not self.id:
raise RobotError("Robot not initialized. Call init() first.")

DaemonRecordingContext(recording_id=recording_id).stop_recording()
self._stop_all_streams()
producer_stop_sequence_numbers = self._stop_all_streams()
self._get_daemon_recording_context().stop_recording(
recording_id=recording_id,
producer_stop_sequence_numbers=producer_stop_sequence_numbers,
)

try:
response = requests.post(
Expand All @@ -347,13 +352,20 @@ def stop_recording(self, recording_id: str) -> None:
except requests.exceptions.RequestException as e:
raise RobotError(f"Failed to stop recording: {str(e)}")

def _stop_all_streams(self) -> None:
def _stop_all_streams(self) -> dict[str, int]:
"""Stop recording on all data streams for this robot instance."""
producer_stop_sequence_numbers: dict[str, int] = {}
for stream_id, stream in self._data_streams.items():
try:
stream.stop_recording()
producer = getattr(stream, "_producer", None)
if isinstance(producer, Producer):
producer_stop_sequence_numbers[producer.producer_id] = (
producer.get_last_sent_sequence_number()
)
except Exception:
logger.exception("Failed to stop data stream %s", stream_id)
return producer_stop_sequence_numbers

def is_recording(self) -> bool:
"""Check if the robot is currently recording data.
Expand Down Expand Up @@ -640,7 +652,7 @@ def cancel_recording(self, recording_id: str) -> None:
if not self.id:
raise RobotError("Robot not initialized. Call init() first.")

DaemonRecordingContext(recording_id=recording_id).stop_recording()
self._get_daemon_recording_context().stop_recording(recording_id=recording_id)

try:
response = requests.post(
Expand All @@ -663,6 +675,34 @@ def cancel_recording(self, recording_id: str) -> None:
except requests.exceptions.RequestException as e:
raise RobotError(f"Failed to cancel recording: {str(e)}")

def _get_daemon_recording_context(self) -> DaemonRecordingContext:
"""Return a reusable daemon recording context, creating it lazily."""
if self._daemon_recording_context is None:
self._daemon_recording_context = DaemonRecordingContext()
return self._daemon_recording_context

def _cleanup_daemon_recording_context(self) -> None:
"""Release daemon recording context resources."""
if self._daemon_recording_context is None:
return
try:
self._daemon_recording_context.close()
except Exception:
logger.exception("Failed to cleanup daemon recording context")
finally:
self._daemon_recording_context = None

def close(self) -> None:
"""Release local resources owned by this Robot instance."""
self._cleanup_daemon_recording_context()
if self._temp_dir is not None:
self._temp_dir.cleanup()
self._temp_dir = None

def __del__(self) -> None:
"""Best-effort cleanup for daemon recording resources."""
self.close()


# Global robot registry
_robots: dict[RobotInstanceIdentifier, Robot] = {}
Expand Down
5 changes: 5 additions & 0 deletions neuracore/core/streaming/data_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,10 @@ def _handle_ensure_producer(self, context: DataRecordingContext) -> None:
):
self._producer.set_recording_id(context.recording_id)

# Reopen producer channel state for each new recording in case
# the daemon expired the channel while this producer object was idle.
self._producer.start_producer()
self._producer.open_ring_buffer()
self._producer.start_new_trace()

def stop_recording(self) -> list[threading.Thread]:
Expand Down Expand Up @@ -232,6 +236,7 @@ def log(self, metadata: CameraData, frame: np.ndarray) -> None:
metadata_dict = metadata.model_dump(mode="json", exclude={"frame"})
metadata_dict["width"] = self.width
metadata_dict["height"] = self.height
metadata_dict["frame_nbytes"] = int(frame.size * frame.itemsize)
metadata_json = json.dumps(metadata_dict).encode("utf-8")

# Pack: [metadata_len (4 bytes)] [metadata_json] [frame_bytes]
Expand Down
70 changes: 1 addition & 69 deletions neuracore/data_daemon/bootstrap.py
Original file line number Diff line number Diff line change
@@ -1,72 +1,4 @@
"""Daemon bootstrap and lifecycle management.

This module provides a clean, modular initialization sequence for the
data daemon. It coordinates the startup of all subsystems in the correct
order across the three execution contexts.

INITIALIZATION SEQUENCE
=======================

DaemonBootstrap.start()
├─[1] Configuration
│ └── ProfileManager → ConfigManager → DaemonConfig
├─[2] Authentication
│ └── Auth.login(api_key) - Initialize Auth singleton
├─[3] Event Loops (EventLoopManager)
│ ├── General Loop Thread started
│ ├── Encoder Loop Thread started
│ └── init_emitter(loop=general_loop)
├─[4] Async Services (on General Loop)
│ ├── aiohttp.ClientSession
│ ├── SqliteStateStore + init_async_store()
│ ├── StateManager (registers event listeners)
│ ├── UploadManager (listens for READY_FOR_UPLOAD)
│ ├── ConnectionManager + start() (monitors API)
│ └── ProgressReporter (listens for PROGRESS_REPORT)
├─[5] Recording & Encoding (RecordingDiskManager)
│ ├── _TraceFilesystem (path management)
│ ├── _TraceController (trace lifecycle)
│ ├── _EncoderManager (encoder factory)
│ ├── StorageBudget (disk space tracking)
│ ├── _RawBatchWriter → schedule_on_general_loop()
│ └── _BatchEncoderWorker → schedule_on_encoder_loop()
├─[6] ZMQ Communications
│ └── CommunicationsManager
└─[7] Return DaemonContext
└── Daemon created with context, calls run()


MODULE REGISTRY
===============

Main Thread:
EventLoopManager - DaemonBootstrap - Manages async loops
CommunicationsManager- DaemonBootstrap - ZMQ sockets
Daemon - runner_entry - Message loop

General Loop:
Emitter - EventLoopManager - Event coordination
AuthManager - bootstrap_async - API auth (singleton)
SqliteStateStore - bootstrap_async - Trace state persistence
StateManager - bootstrap_async - State coordination
UploadManager - bootstrap_async - Cloud uploads
ConnectionManager - bootstrap_async - API monitoring
ProgressReporter - bootstrap_async - Progress reporting
_RawBatchWriter - RecordingDiskManager - Raw file I/O

Encoder Loop:
_BatchEncoderWorker - RecordingDiskManager - Video/JSON encoding
VideoTrace - _EncoderManager - H.264 encoding
JsonTrace - _EncoderManager - JSON encoding

"""
"""Daemon bootstrap and lifecycle management."""

from __future__ import annotations

Expand Down
Loading