Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PF Ethernet Sensor bridge peoplenet and bodypose app support #8

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
287 changes: 287 additions & 0 deletions examples/linux_body_pose_estimation_imx477.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,287 @@
# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# See README.md for detailed information.

import argparse
import ctypes
import logging
import os

import holoscan
from body_pose_estimation import FormatInferenceInputOp, PostprocessorOp
from cuda import cuda

import hololink as hololink_module


class HoloscanApplication(holoscan.core.Application):
def __init__(
self,
headless,
fullscreen,
cuda_context,
cuda_device_ordinal,
hololink_channel,
camera,
frame_limit,
engine,
):
logging.info("__init__")
super().__init__()
self._headless = headless
self._fullscreen = fullscreen
self._cuda_context = cuda_context
self._cuda_device_ordinal = cuda_device_ordinal
self._hololink_channel = hololink_channel
self._camera = camera
self._frame_limit = frame_limit
self._engine = engine

def compose(self):
logging.info("compose")
if self._frame_limit:
self._count = holoscan.conditions.CountCondition(
self,
name="count",
count=self._frame_limit,
)
condition = self._count
else:
self._ok = holoscan.conditions.BooleanCondition(
self, name="ok", enable_tick=True
)
condition = self._ok

csi_to_bayer_pool = holoscan.resources.BlockMemoryPool(
self,
name="pool",
# storage_type of 1 is device memory
storage_type=1,
block_size=self._camera._width
* ctypes.sizeof(ctypes.c_uint16)
* self._camera._height,
num_blocks=2,
)
csi_to_bayer_operator = hololink_module.operators.CsiToBayerOp(
self,
name="csi_to_bayer",
allocator=csi_to_bayer_pool,
cuda_device_ordinal=self._cuda_device_ordinal,
)
self._camera.configure_converter(csi_to_bayer_operator)

frame_size = csi_to_bayer_operator.get_csi_length()
frame_context = self._cuda_context
receiver_operator = hololink_module.operators.LinuxReceiverOperator(
self,
condition,
name="receiver",
frame_size=frame_size,
frame_context=frame_context,
hololink_channel=self._hololink_channel,
device=self._camera,
)

bayer_format = self._camera.bayer_format()
pixel_format = self._camera.pixel_format()
image_processor_operator = hololink_module.operators.ImageProcessorOp(
self,
name="image_processor",
# Optical black value for imx274 is 50
optical_black=50,
bayer_format=bayer_format.value,
pixel_format=pixel_format.value,
)

rgb_components_per_pixel = 3
bayer_pool = holoscan.resources.BlockMemoryPool(
self,
name="pool",
# storage_type of 1 is device memory
storage_type=1,
block_size=self._camera._width
* rgb_components_per_pixel
* ctypes.sizeof(ctypes.c_uint16)
* self._camera._height,
num_blocks=2,
)
demosaic = holoscan.operators.BayerDemosaicOp(
self,
name="demosaic",
pool=bayer_pool,
generate_alpha=False,
bayer_grid_pos=bayer_format.value,
interpolation_mode=0,
)

gamma_correction = hololink_module.operators.GammaCorrectionOp(
self,
name="gamma_correction",
cuda_device_ordinal=self._cuda_device_ordinal,
)

image_shift = hololink_module.operators.ImageShiftToUint8Operator(
self, name="image_shift", shift=8
)

visualizer = holoscan.operators.HolovizOp(
self,
name="holoviz",
fullscreen=self._fullscreen,
headless=self._headless,
**self.kwargs("holoviz"),
)

#
pool = holoscan.resources.UnboundedAllocator(self)
preprocessor_args = self.kwargs("preprocessor")
preprocessor = holoscan.operators.FormatConverterOp(
self,
name="preprocessor",
pool=pool,
**preprocessor_args,
)
format_input = FormatInferenceInputOp(
self,
name="transpose",
pool=pool,
)
inference = holoscan.operators.InferenceOp(
self,
name="inference",
allocator=pool,
model_path_map={
"yolo_pose": self._engine,
},
**self.kwargs("inference"),
)
postprocessor_args = self.kwargs("postprocessor")
postprocessor_args["image_width"] = preprocessor_args["resize_width"]
postprocessor_args["image_height"] = preprocessor_args["resize_height"]
postprocessor = PostprocessorOp(
self,
name="postprocessor",
allocator=pool,
**postprocessor_args,
)

#
self.add_flow(receiver_operator, csi_to_bayer_operator, {("output", "input")})
self.add_flow(
csi_to_bayer_operator, image_processor_operator, {("output", "input")}
)
self.add_flow(image_processor_operator, demosaic, {("output", "receiver")})
self.add_flow(demosaic, gamma_correction, {("transmitter", "input")})
self.add_flow(gamma_correction, image_shift)
self.add_flow(image_shift, visualizer, {("output", "receivers")})
self.add_flow(image_shift, preprocessor, {("output", "")})
self.add_flow(preprocessor, format_input)
self.add_flow(format_input, inference, {("", "receivers")})
self.add_flow(inference, postprocessor, {("transmitter", "in")})
self.add_flow(postprocessor, visualizer, {("out", "receivers")})


def main():
parser = argparse.ArgumentParser()
parser.add_argument("--headless", action="store_true", help="Run in headless mode")
parser.add_argument(
"--fullscreen", action="store_true", help="Run in fullscreen mode"
)
parser.add_argument(
"--frame-limit",
type=int,
default=None,
help="Exit after receiving this many frames",
)
default_configuration = os.path.join(
os.path.dirname(__file__), "body_pose_estimation.yaml"
)
parser.add_argument(
"--configuration", default=default_configuration, help="Configuration file"
)
default_engine = os.path.join(os.path.dirname(__file__), "yolov8n-pose.onnx")
parser.add_argument(
"--engine",
default=default_engine,
help="TRT engine model",
)
parser.add_argument(
"--log-level",
type=int,
default=20,
help="Logging level to display",
)
parser.add_argument(
"--cam",
type=int,
default=0,
choices=(0, 1),
help="which camera to stream: 0 to stream camera connected to j14 or 1 to stream camera connected to j17 (default is 0)",
)
args = parser.parse_args()
hololink_module.logging_level(args.log_level)
logging.info("Initializing.")
# Get a handle to the GPU
(cu_result,) = cuda.cuInit(0)
assert cu_result == cuda.CUresult.CUDA_SUCCESS
cu_device_ordinal = 0
cu_result, cu_device = cuda.cuDeviceGet(cu_device_ordinal)
assert cu_result == cuda.CUresult.CUDA_SUCCESS
cu_result, cu_context = cuda.cuDevicePrimaryCtxRetain(cu_device)
assert cu_result == cuda.CUresult.CUDA_SUCCESS

# Get a handle to the Hololink device
if args.cam == 0:
channel_metadata = hololink_module.Enumerator.find_channel(
channel_ip="192.168.0.2"
)
elif args.cam == 1:
channel_metadata = hololink_module.Enumerator.find_channel(
channel_ip="192.168.0.3"
)
else:
raise Exception(f"Unexpected camera={args.cam}")

hololink_channel = hololink_module.DataChannel(channel_metadata)
# Get a handle to the camera
camera = hololink_module.sensors.imx477.Imx477(hololink_channel, args.cam)

# Set up the application
application = HoloscanApplication(
args.headless,
args.fullscreen,
cu_context,
cu_device_ordinal,
hololink_channel,
camera,
args.frame_limit,
args.engine,
)
application.config(args.configuration)
# Run it.
hololink = hololink_channel.hololink()
hololink.start()
hololink.reset()
camera.configure()
application.run()
hololink.stop()

(cu_result,) = cuda.cuDevicePrimaryCtxRelease(cu_device)
assert cu_result == cuda.CUresult.CUDA_SUCCESS


if __name__ == "__main__":
main()
Loading