diff --git a/inference/core/interfaces/stream_manager/manager_app/inference_pipeline_manager.py b/inference/core/interfaces/stream_manager/manager_app/inference_pipeline_manager.py index cc4542440b..a06319d5c9 100644 --- a/inference/core/interfaces/stream_manager/manager_app/inference_pipeline_manager.py +++ b/inference/core/interfaces/stream_manager/manager_app/inference_pipeline_manager.py @@ -58,7 +58,10 @@ serialise_sv_detections, ) from inference.core.workflows.errors import WorkflowSyntaxError -from inference.core.workflows.execution_engine.entities.base import WorkflowImageData +from inference.core.workflows.execution_engine.entities.base import ( + VideoMetadata, + WorkflowImageData, +) def ignore_signal(signal_number: int, frame: FrameType) -> None: @@ -262,7 +265,9 @@ def start_loop(loop: asyncio.AbstractEventLoop): webrtc_turn_config = parsed_payload.webrtc_turn_config webcam_fps = parsed_payload.webcam_fps to_inference_queue = SyncAsyncQueue(loop=loop, maxsize=10) - from_inference_queue = SyncAsyncQueue(loop=loop, maxsize=10) + from_inference_queue: "SyncAsyncQueue[Tuple[VideoMetadata, np.ndarray]]" = ( + SyncAsyncQueue(loop=loop, maxsize=10) + ) stream_output = None if parsed_payload.stream_output: @@ -336,6 +341,16 @@ def webrtc_sink( ): if peer_connection.data_output in prediction: workflow_output = prediction[peer_connection.data_output] + json_data = { + peer_connection.data_output: None, + "_video_metadata": { + "frame_id": video_frame.frame_id, + "frame_timestamp": video_frame.frame_timestamp.isoformat(), + "measured_fps": video_frame.measured_fps, + "fps": video_frame.fps, + }, + } + serialized_data = None if isinstance(workflow_output, WorkflowImageData): errors.append( @@ -346,35 +361,43 @@ def webrtc_sink( parsed_detections = serialise_sv_detections( workflow_output ) - serialized_data = json.dumps(parsed_detections) + json_data[peer_connection.data_output] = ( + parsed_detections + ) + serialized_data = json.dumps(json_data) except Exception as error: errors.append( f"Failed to serialise output: {peer_connection.data_output}" ) elif isinstance(workflow_output, dict): try: - serialized_data = json.dumps(workflow_output) + json_data[peer_connection.data_output] = workflow_output + serialized_data = json.dumps(json_data) except Exception as error: errors.append( f"Failed to serialise output: {peer_connection.data_output}" ) else: - serialized_data = str(workflow_output) - if serialized_data is not None: - peer_connection.data_channel.send(serialized_data) + json_data[peer_connection.data_output] = str( + workflow_output + ) + serialized_data = json.dumps(json_data) + if serialized_data is None: + serialized_data = json.dumps(json_data) + peer_connection.data_channel.send(serialized_data) else: errors.append( f"Selected data output '{peer_connection.data_output}' not found in workflow outputs" ) if peer_connection.stream_output is not None: - frame: Optional[np.ndarray] = get_frame_from_workflow_output( + video_metadata, frame = get_frame_from_workflow_output( workflow_output=prediction, frame_output_key=peer_connection.stream_output, ) if frame is None: for k in prediction.keys(): - frame = get_frame_from_workflow_output( + video_metadata, frame = get_frame_from_workflow_output( workflow_output=prediction, frame_output_key=k, ) @@ -382,8 +405,6 @@ def webrtc_sink( errors.append( f"'{peer_connection.stream_output}' not found in workflow outputs, using '{k}' instead" ) - frame = frame.copy() - break if frame is None: errors.append("Visualisation blocks were not executed") errors.append( @@ -393,8 +414,14 @@ def webrtc_sink( "Please try to adjust the scene so models detect objects" ) errors.append("or stop preview, update workflow and try again.") - frame = video_frame.image.copy() - + frame = video_frame.image + video_metadata = VideoMetadata( + frame_number=video_frame.frame_id, + frame_timestamp=video_frame.frame_timestamp, + fps=video_frame.fps, + measured_fps=video_frame.measured_fps, + video_identifier=video_frame.source_id, + ) for row, error in enumerate(errors): frame = cv.putText( frame, @@ -405,7 +432,7 @@ def webrtc_sink( (0, 255, 0), 2, ) - from_inference_queue.sync_put(frame) + from_inference_queue.sync_put((video_metadata, frame)) buffer_sink = InMemoryBufferSink.init( queue_size=parsed_payload.sink_configuration.results_buffer_size, diff --git a/inference/core/interfaces/stream_manager/manager_app/webrtc.py b/inference/core/interfaces/stream_manager/manager_app/webrtc.py index 4222e89aae..ff9bc61da3 100644 --- a/inference/core/interfaces/stream_manager/manager_app/webrtc.py +++ b/inference/core/interfaces/stream_manager/manager_app/webrtc.py @@ -40,7 +40,10 @@ ) from inference.core.utils.async_utils import Queue as SyncAsyncQueue from inference.core.utils.function import experimental -from inference.core.workflows.execution_engine.entities.base import WorkflowImageData +from inference.core.workflows.execution_engine.entities.base import ( + VideoMetadata, + WorkflowImageData, +) logging.getLogger("aiortc").setLevel(logging.WARNING) @@ -61,9 +64,10 @@ def overlay_text_on_np_frame(frame: np.ndarray, text: List[str]): def get_frame_from_workflow_output( workflow_output: Dict[str, Union[WorkflowImageData, Any]], frame_output_key: str -) -> Optional[np.ndarray]: +) -> Optional[Tuple[VideoMetadata, np.ndarray]]: latency: Optional[datetime.timedelta] = None np_image: Optional[np.ndarray] = None + video_metadata: Optional[VideoMetadata] = None step_output = workflow_output.get(frame_output_key) if isinstance(step_output, WorkflowImageData): @@ -76,6 +80,7 @@ def get_frame_from_workflow_output( datetime.datetime.now() - step_output.video_metadata.frame_timestamp ) np_image = step_output.numpy_image + video_metadata = step_output.video_metadata elif isinstance(step_output, dict): for frame_output in step_output.values(): if isinstance(frame_output, WorkflowImageData): @@ -89,19 +94,20 @@ def get_frame_from_workflow_output( - frame_output.video_metadata.frame_timestamp ) np_image = frame_output.numpy_image + video_metadata = frame_output.video_metadata # logger.warning since inference pipeline is noisy on INFO level if DEBUG_WEBRTC_PROCESSING_LATENCY and latency is not None: logger.warning("Processing latency: %ss", latency.total_seconds()) - return np_image + return video_metadata, np_image class VideoTransformTrack(VideoStreamTrack): def __init__( self, to_inference_queue: "SyncAsyncQueue[VideoFrame]", - from_inference_queue: "SyncAsyncQueue[np.ndarray]", + from_inference_queue: "SyncAsyncQueue[Tuple[VideoMetadata, np.ndarray]]", asyncio_loop: asyncio.AbstractEventLoop, fps_probe_frames: int, webcam_fps: Optional[float] = None, @@ -122,7 +128,9 @@ def __init__( self._processed = 0 self.to_inference_queue: "SyncAsyncQueue[VideoFrame]" = to_inference_queue - self.from_inference_queue: "SyncAsyncQueue[np.ndarray]" = from_inference_queue + self.from_inference_queue: ( + "SyncAsyncQueue[Tuple[VideoMetadata, np.ndarray]]" + ) = from_inference_queue self._fps_probe_frames = fps_probe_frames self._probe_count: int = 0 @@ -261,7 +269,7 @@ async def recv(self): self._processed += 1 np_frame: Optional[np.ndarray] = None - np_frame = await self.from_inference_queue.async_get() + video_metadata, np_frame = await self.from_inference_queue.async_get() if np_frame is None: if self._last_processed_frame: @@ -280,6 +288,8 @@ async def recv(self): pts, time_base = await self.next_timestamp() new_frame.pts = pts new_frame.time_base = time_base + if video_metadata is not None: + new_frame.opaque = video_metadata.frame_number return new_frame @@ -378,7 +388,7 @@ def on_status_update(self, status_update: StatusUpdate) -> None: async def init_rtc_peer_connection( webrtc_offer: WebRTCOffer, to_inference_queue: "SyncAsyncQueue[VideoFrame]", - from_inference_queue: "SyncAsyncQueue[np.ndarray]", + from_inference_queue: "SyncAsyncQueue[Tuple[VideoMetadata, np.ndarray]]", asyncio_loop: asyncio.AbstractEventLoop, fps_probe_frames: int, webrtc_turn_config: Optional[WebRTCTURNConfig] = None,