-
Notifications
You must be signed in to change notification settings - Fork 0
/
publish_image_from_file_process_script.py
123 lines (102 loc) · 3.79 KB
/
publish_image_from_file_process_script.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
#!/usr/bin/env python
#
# Copyright (c) 2024 Numurus, LLC <https://www.numurus.com>.
#
# This file is part of nepi-engine
# (see https://github.com/nepi-engine).
#
# License: 3-clause BSD, see https://opensource.org/licenses/BSD-3-Clause
#
# Sample NEPI Process Script.
# 1. Open an image file and publish as ros image message at specified rate
# 2. Run until Stopped
import os
import time
import sys
import rospy
import numpy as np
import cv2
from nepi_edge_sdk_base import nepi_ros
from nepi_edge_sdk_base import nepi_img
from sensor_msgs.msg import Image
#########################################
# USER SETTINGS - Edit as Necessary
#########################################
Current_Path = (os.path.dirname(os.path.realpath(__file__)) )
### Open and publish image
#Source_Folder= Current_Path + "/sample_data"
Source_Folder= "/mnt/nepi_storage/sample_data"
Image_File = "test_image.png"
Publish_Image_Encoding = "bgr8" # "bgr8", "rgb8", or "mono8"
Publish_Image_Topic_Name = "test_image"
Publish_Image_Rate_Hz = 20
#########################################
# ROS NAMESPACE SETUP
#########################################
# ROS namespace setup
NEPI_BASE_NAMESPACE = nepi_ros.get_base_namespace()
#########################################
# Node Class
#########################################
class publish_image_from_file_process(object):
#######################
### Node Initialization
def __init__(self):
rospy.loginfo("Starting Initialization Processes")
## Initialize Class Variables
## Define Class Namespaces
## Define Class Services Calls
## Create Class Sevices
## Create Class Publishers
# Open image file
if os.path.exists(Source_Folder):
file2open = (Source_Folder + '/' + Image_File)
if os.path.isfile(file2open):
print("Opening File: " + file2open)
cv_image = cv2.imread(file2open)
print(cv_image.shape)
self.img_out_msg = nepi_img.cv2img_to_rosimg(cv_image,encoding=Publish_Image_Encoding)
# Setup custom image publish topic
IMAGE_OUTPUT_TOPIC = NEPI_BASE_NAMESPACE + Publish_Image_Topic_Name
print("Publishing image to topic: " + IMAGE_OUTPUT_TOPIC)
self.image_pub = rospy.Publisher(IMAGE_OUTPUT_TOPIC, Image, queue_size=10)
# Start a timed image publish callback
pub_interval_sec = float(1)/Publish_Image_Rate_Hz
print("Publishing image at " + str(Publish_Image_Rate_Hz) + " Hz")
rospy.Timer(rospy.Duration(pub_interval_sec), self.image_publish_callback)
else:
print("File not found in specified folder")
else:
print("Path not found")
## Start Node Processes
## Initiation Complete
rospy.loginfo("Initialization Complete")
#######################
### Node Methods
### Add your CV2 image customization code here
def image_publish_callback(self,timer):
# Publish new image to ros
if not rospy.is_shutdown():
self.img_out_msg.header.stamp = rospy.Time.now()
self.image_pub.publish(self.img_out_msg)
#######################
# Node Cleanup Function
def cleanup_actions(self):
rospy.loginfo("Shutting down: Executing script cleanup actions")
#########################################
# Main
#########################################
if __name__ == '__main__':
current_filename = sys.argv[0].split('/')[-1]
current_filename = current_filename.split('.')[0]
rospy.loginfo(("Starting " + current_filename), disable_signals=True) # Disable signals so we can force a shutdown
rospy.init_node(name=current_filename)
#Launch the node
node_name = current_filename.rpartition("_")[0]
rospy.loginfo("Launching node named: " + node_name)
node_class = eval(node_name)
node = node_class()
#Set up node shutdown
rospy.on_shutdown(node.cleanup_actions)
# Spin forever (until object is detected)
rospy.spin()