diff --git a/README.md b/README.md index 8b625fe..c600817 100644 --- a/README.md +++ b/README.md @@ -44,3 +44,11 @@ The purpose of this example is to demonstrate how Zenoh-Flow can handle a complex dataflow graph, like a robotic application. Go to the [README](./montblanc/README.md) for instructions on how to run it. + + +#### Transcoding + +The purpose of this example is to demonstrate how Zenoh-Flow can handle be used within a Zenoh +router to transcode live data. + +Go to the [README](./transcoding/README.md) for instructions on how to run it. diff --git a/transcoding/README.md b/transcoding/README.md new file mode 100644 index 0000000..5e36336 --- /dev/null +++ b/transcoding/README.md @@ -0,0 +1,119 @@ + +# Zenoh-Flow for data transcoding. + +This document will guide you in building, installing and configuring Zenoh-Flow together with Zenoh for data transcoding. + +Note: this guide has been tested on Ubuntu 22.04 LTS +## Prerequisites + +In order to be able to build and run Zenoh-Flow the following dependencies are needed: + +- build-essentials +- python3-dev +- python3-pip +- python3-venv +- clang +- libclang-dev +- rust +- pkg-config + +Please make sure those dependencies are installed before proceeding. + +## Build Zenoh and Zenoh-Flow + +Clone the repositories and build: +``` +cd ~ + +git clone https://github.com/eclipse-zenoh/zenoh -b 0.7.2-rc +cd zenoh +cargo build --release --all-targets --features shared-memory + +cd .. +git clone https://github.com/eclipse-zenoh/zenoh-flow -b v0.5.0-alpha.1 +cd zenoh-flow +cargo build --release --all-targets + +cd .. +git clone https://github.com/eclipse-zenoh/zenoh-flow-python -b v0.5.0-alpha.1 +cd zenoh-flow-python +cargo build --release --all-targets + +cd zenoh-flow-python + +python3 -m venv venv +source venv/bin/activate +pip3 install -r requirements-dev.txt +maturin build --release +deactivate +``` + +## Install + +Install Zenoh and Zenoh-Flow + +``` +cd ~ + +sudo mkdir -p /etc/zenoh/ +sudo mkdir -p /var/zenoh-flow/python +sudo mkdir -p /var/zenoh-flow/flows +sudo mkdir -p /etc/zenoh-flow/extensions.d + +sudo cp zenoh/target/release/zenohd /usr/bin/ +sudo cp zenoh/target/release/libzenoh_plugin_*.so /usr/lib/ + +sudo cp zenoh-flow/target/release/libzenoh_plugin_zenoh_flow.so /usr/lib/ +sudo cp zenoh-flow/target/release/zfctl /usr/bin/ +sudo cp zenoh-flow-python/target/release/libzenoh_flow_python_*_wrapper.so /var/zenoh-flow/python +sudo cp zenoh-flow-python/01-python.zfext /etc/zenoh-flow/extensions.d/ +sudo cp zenoh-flow/zfctl/.config/zfctl-zenoh.json /etc/zenoh-flow/ + +pip3 install ./zenoh-flow-python/target/wheels/eclipse_zenoh_flow-0.5.0a1-cp37-abi3-manylinux_2_34_x86_64.whl +``` + +## Start Runtime + +Copy the `zenoh-config.json` from this folder to `/etc/zenoh/zenoh.json`. + +Now you can start the Zenoh router with the Zenoh-Flow plugin. +Open a terminal and run: `RUST_LOG=debug zenohd -c /etc/zenoh/zenoh.json` + +Then on another terminal run: `zfctl list runtimes` + +You should get an output similar to this: +``` ++----------------------------------+--------------------+--------+ +| UUID | Name | Status | ++----------------------------------+--------------------+--------+ +| bb4a456d6c0948bfae21a6e8c9051d6b | protoc-client-test | Ready | ++----------------------------------+--------------------+--------+ +``` + +This means that the zenoh-flow runtime is was loaded and it is ready. + +## The transcoding application. + +Copy the content of this folder in: `/var/zenoh-flow/flows` and run `pip3 install -r /var/zenoh-flow/flows/requirements.txt`. + + +On a terminal start the publisher side: `cd /var/zenoh-flow/flows && python3 pub-proto.py` +On a new terminal start the subscriber side: `cd /var/zenoh-flow/flows && python3 pub-cdr.py` + +The subscriber will not receive any data as the transcoding is not yet deployed. + +On a 3rd terminal instruct zenoh-flow to launch the transcoding flow: `zfctl launch /var/zenoh-flow/flows/dataflow.yml` it will return the instance id. + +Now you should see the data being transcoded and received by your subscriber. + +Once you are done you can list the current running flow instances: `zfctl list instances` and delete the running one with `zfctl destroy `. + +Once the instance is delete you will see that the subscriber is not going to receive any data. + + + + + + + + diff --git a/transcoding/dataflow.yml b/transcoding/dataflow.yml new file mode 100644 index 0000000..cd93633 --- /dev/null +++ b/transcoding/dataflow.yml @@ -0,0 +1,35 @@ +flow: Transcoder +vars: + BASE_DIR: "/var/zenoh-flow/flows" + +operators: + - id : Conversion + descriptor: "file://{{ BASE_DIR }}/transcoder.yml" +sources: + - id : ZenohSrc + configuration: + key-expressions: + proto: data/proto + descriptor: "builtin://zenoh" + +sinks: + - id : ZenohSink + configuration: + key-expressions: + cdr: data/cdr + descriptor: "builtin://zenoh" + +links: +- from: + node : ZenohSrc + output : proto + to: + node : Conversion + input : in + +- from: + node : Conversion + output : out + to: + node : ZenohSink + input : cdr \ No newline at end of file diff --git a/transcoding/message.proto b/transcoding/message.proto new file mode 100644 index 0000000..6fb4d48 --- /dev/null +++ b/transcoding/message.proto @@ -0,0 +1,6 @@ +syntax = "proto3"; + +message MyMsg { + uint64 u_value = 1; + string s_value = 2; +} \ No newline at end of file diff --git a/transcoding/message_pb2.py b/transcoding/message_pb2.py new file mode 100644 index 0000000..49c4e55 --- /dev/null +++ b/transcoding/message_pb2.py @@ -0,0 +1,34 @@ +# -*- coding: utf-8 -*- +# Generated by the protocol buffer compiler. DO NOT EDIT! +# source: message.proto +"""Generated protocol buffer code.""" +from google.protobuf import descriptor as _descriptor +from google.protobuf import descriptor_pool as _descriptor_pool +from google.protobuf import message as _message +from google.protobuf import reflection as _reflection +from google.protobuf import symbol_database as _symbol_database +# @@protoc_insertion_point(imports) + +_sym_db = _symbol_database.Default() + + + + +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\rmessage.proto\")\n\x05MyMsg\x12\x0f\n\x07u_value\x18\x01 \x01(\x04\x12\x0f\n\x07s_value\x18\x02 \x01(\tb\x06proto3') + + + +_MYMSG = DESCRIPTOR.message_types_by_name['MyMsg'] +MyMsg = _reflection.GeneratedProtocolMessageType('MyMsg', (_message.Message,), { + 'DESCRIPTOR' : _MYMSG, + '__module__' : 'message_pb2' + # @@protoc_insertion_point(class_scope:MyMsg) + }) +_sym_db.RegisterMessage(MyMsg) + +if _descriptor._USE_C_DESCRIPTORS == False: + + DESCRIPTOR._options = None + _MYMSG._serialized_start=17 + _MYMSG._serialized_end=58 +# @@protoc_insertion_point(module_scope) diff --git a/transcoding/pub-proto.py b/transcoding/pub-proto.py new file mode 100644 index 0000000..cc4dad4 --- /dev/null +++ b/transcoding/pub-proto.py @@ -0,0 +1,89 @@ +# +# Copyright (c) 2022 ZettaScale Technology +# +# This program and the accompanying materials are made available under the +# terms of the Eclipse Public License 2.0 which is available at +# http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +# which is available at https://www.apache.org/licenses/LICENSE-2.0. +# +# SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +# +# Contributors: +# ZettaScale Zenoh Team, +# + +import sys +import time +import argparse +import itertools +import json +import zenoh +from zenoh import config +from message_pb2 import MyMsg + +# --- Command line argument parsing --- --- --- --- --- --- +parser = argparse.ArgumentParser( + prog='z_pub', + description='zenoh pub example') +parser.add_argument('--mode', '-m', dest='mode', + choices=['peer', 'client'], + type=str, + help='The zenoh session mode.') +parser.add_argument('--connect', '-e', dest='connect', + metavar='ENDPOINT', + action='append', + type=str, + help='Endpoints to connect to.') +parser.add_argument('--listen', '-l', dest='listen', + metavar='ENDPOINT', + action='append', + type=str, + help='Endpoints to listen on.') +parser.add_argument('--key', '-k', dest='key', + default='data/proto', + type=str, + help='The key expression to publish onto.') +parser.add_argument('--value', '-v', dest='value', + default='Pub from Python!', + type=str, + help='The value to publish.') +parser.add_argument("--iter", dest="iter", type=int, + help="How many puts to perform") +parser.add_argument('--config', '-c', dest='config', + metavar='FILE', + type=str, + help='A configuration file.') + +args = parser.parse_args() +conf = zenoh.Config.from_file(args.config) if args.config is not None else zenoh.Config() +if args.mode is not None: + conf.insert_json5(zenoh.config.MODE_KEY, json.dumps(args.mode)) +if args.connect is not None: + conf.insert_json5(zenoh.config.CONNECT_KEY, json.dumps(args.connect)) +if args.listen is not None: + conf.insert_json5(zenoh.config.LISTEN_KEY, json.dumps(args.listen)) +key = args.key +value = args.value + +# initiate logging +zenoh.init_logger() + +print("Opening session...") +session = zenoh.open(conf) + +print(f"Declaring Publisher on '{key}'...") +pub = session.declare_publisher(key) + +for idx in itertools.count() if args.iter is None else range(args.iter): + time.sleep(1) + + msg = MyMsg( + u_value = idx, + s_value = value + ) + + print(f"Putting Data ('{key}': '{msg}')...") + pub.put(msg.SerializeToString()) + +pub.undeclare() +session.close() \ No newline at end of file diff --git a/transcoding/requirements.txt b/transcoding/requirements.txt new file mode 100644 index 0000000..737bf20 --- /dev/null +++ b/transcoding/requirements.txt @@ -0,0 +1,3 @@ +grpcio-tools==1.54.2 +pycdr2==1.0.0 +eclipse-zenoh==0.7.2rc0 \ No newline at end of file diff --git a/transcoding/sub-cdr.py b/transcoding/sub-cdr.py new file mode 100644 index 0000000..1170901 --- /dev/null +++ b/transcoding/sub-cdr.py @@ -0,0 +1,106 @@ +# +# Copyright (c) 2022 ZettaScale Technology +# +# This program and the accompanying materials are made available under the +# terms of the Eclipse Public License 2.0 which is available at +# http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +# which is available at https://www.apache.org/licenses/LICENSE-2.0. +# +# SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +# +# Contributors: +# ZettaScale Zenoh Team, +# + +import sys +import time +from datetime import datetime +import argparse +import json +import zenoh +from zenoh import Reliability, Sample +from dataclasses import dataclass +from pycdr2 import IdlStruct +from pycdr2.types import uint64 + + +# CRD +@dataclass +class MyStruct(IdlStruct, typename="MyStruct"): + u_value: uint64 + s_value: str + + + +# --- Command line argument parsing --- --- --- --- --- --- +parser = argparse.ArgumentParser( + prog='z_sub', + description='zenoh sub example') +parser.add_argument('--mode', '-m', dest='mode', + choices=['peer', 'client'], + type=str, + help='The zenoh session mode.') +parser.add_argument('--connect', '-e', dest='connect', + metavar='ENDPOINT', + action='append', + type=str, + help='Endpoints to connect to.') +parser.add_argument('--listen', '-l', dest='listen', + metavar='ENDPOINT', + action='append', + type=str, + help='Endpoints to listen on.') +parser.add_argument('--key', '-k', dest='key', + default='data/cdr', + type=str, + help='The key expression to subscribe to.') +parser.add_argument('--config', '-c', dest='config', + metavar='FILE', + type=str, + help='A configuration file.') + +args = parser.parse_args() +conf = zenoh.Config.from_file( + args.config) if args.config is not None else zenoh.Config() +if args.mode is not None: + conf.insert_json5(zenoh.config.MODE_KEY, json.dumps(args.mode)) +if args.connect is not None: + conf.insert_json5(zenoh.config.CONNECT_KEY, json.dumps(args.connect)) +if args.listen is not None: + conf.insert_json5(zenoh.config.LISTEN_KEY, json.dumps(args.listen)) +key = args.key + +# Zenoh code --- --- --- --- --- --- --- --- --- --- --- + + + +# initiate logging +zenoh.init_logger() + +print("Opening session...") +session = zenoh.open(conf) + +print("Declaring Subscriber on '{}'...".format(key)) + + +def listener(sample: Sample): + data = MyStruct.deserialize(sample.payload) + print(f">> [Subscriber] Received {sample.kind} ('{sample.key_expr}': '{data}')") + + +# WARNING, you MUST store the return value in order for the subscription to work!! +# This is because if you don't, the reference counter will reach 0 and the subscription +# will be immediately undeclared. +sub = session.declare_subscriber(key, listener, reliability=Reliability.RELIABLE()) + +print("Enter 'q' to quit...") +c = '\0' +while c != 'q': + c = sys.stdin.read(1) + if c == '': + time.sleep(1) + +# Cleanup: note that even if you forget it, cleanup will happen automatically when +# the reference counter reaches 0 +sub.undeclare() +session.close() \ No newline at end of file diff --git a/transcoding/transcoder.py b/transcoding/transcoder.py new file mode 100644 index 0000000..f48c3ae --- /dev/null +++ b/transcoding/transcoder.py @@ -0,0 +1,115 @@ +# +# Copyright (c) 2022 ZettaScale Technology +# +# This program and the accompanying materials are made available under the +# terms of the Eclipse Public License 2.0 which is available at +# http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +# which is available at https://www.apache.org/licenses/LICENSE-2.0. +# +# SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +# +# Contributors: +# ZettaScale Zenoh Team, +# + + +from zenoh_flow.interfaces import Operator +from zenoh_flow import Input, Output +from zenoh_flow.types import Context +import logging +from typing import Dict, Any +from dataclasses import dataclass +from pycdr2 import IdlStruct +from pycdr2.types import uint64 + + +# CRD +@dataclass +class MyStruct(IdlStruct, typename="MyStruct"): + u_value: uint64 + s_value: str + + + +# Protobuf + + +# -*- coding: utf-8 -*- +# Generated by the protocol buffer compiler. DO NOT EDIT! +# source: message.proto +"""Generated protocol buffer code.""" +from google.protobuf import descriptor as _descriptor +from google.protobuf import descriptor_pool as _descriptor_pool +from google.protobuf import message as _message +from google.protobuf import reflection as _reflection +from google.protobuf import symbol_database as _symbol_database +# @@protoc_insertion_point(imports) + +_sym_db = _symbol_database.Default() + + + + +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\rmessage.proto\")\n\x05MyMsg\x12\x0f\n\x07u_value\x18\x01 \x01(\x04\x12\x0f\n\x07s_value\x18\x02 \x01(\tb\x06proto3') + + + +_MYMSG = DESCRIPTOR.message_types_by_name['MyMsg'] +MyMsg = _reflection.GeneratedProtocolMessageType('MyMsg', (_message.Message,), { + 'DESCRIPTOR' : _MYMSG, + '__module__' : 'message_pb2' + # @@protoc_insertion_point(class_scope:MyMsg) + }) +_sym_db.RegisterMessage(MyMsg) + +if _descriptor._USE_C_DESCRIPTORS == False: + + DESCRIPTOR._options = None + _MYMSG._serialized_start=17 + _MYMSG._serialized_end=58 +# @@protoc_insertion_point(module_scope) + + +### + +def protobuf_deserialize(b): + msg = MyMsg() + msg.ParseFromString(b) + return msg + + +class Transcoder(Operator): + def __init__( + self, + context: Context, + configuration: Dict[str, Any], + inputs: Dict[str, Input], + outputs: Dict[str, Output], + ): + logging.basicConfig(format='%(levelname)s: %(message)s', level=logging.DEBUG) + self.output = outputs.take("out", MyStruct ,lambda x: x.serialize()) + self.in_stream = inputs.take("in",MyMsg , protobuf_deserialize) + + if self.in_stream is None: + raise ValueError("No input 'in' found") + if self.output is None: + raise ValueError("No output 'ou' found") + + def finalize(self) -> None: + return None + + async def iteration(self) -> None: + data_msg = await self.in_stream.recv() + payload = data_msg.get_data() + print(f"Protobuf= (type: {type(payload)}) : {payload} ") + # Transcode + cdrMsg = MyStruct(payload.u_value, payload.s_value) + + print(f"CDR= (type: {type(cdrMsg)}) : {cdrMsg} ") + await self.output.send(cdrMsg) + + return None + + +def register(): + return Transcoder \ No newline at end of file diff --git a/transcoding/transcoder.yml b/transcoding/transcoder.yml new file mode 100644 index 0000000..cc391b5 --- /dev/null +++ b/transcoding/transcoder.yml @@ -0,0 +1,8 @@ +id: compute-proximity +vars: + BASE_DIR: "/var/zenoh-flow/flows" +uri: "file://{{BASE_DIR }}/transcoder.py" + + +inputs: [in] +outputs: [out] \ No newline at end of file diff --git a/transcoding/zenoh-config.json b/transcoding/zenoh-config.json new file mode 100644 index 0000000..b5266aa --- /dev/null +++ b/transcoding/zenoh-config.json @@ -0,0 +1,29 @@ +{ + "listen":{ + "endpoints":["tcp/0.0.0.0:7447"] + }, + "plugins_search_dirs":["/usr/lib/"], + "plugins":{ + "storage_manager":{ + "required":true, + "storages":{ + "zfrpc":{ + "key_expr":"zf/runtime/**", + "volume": "memory" + }, + "zf":{ + "key_expr":"zenoh-flow/**", + "volume": "memory" + } + } + }, + "zenoh_flow":{ + "required":true, + "path":"/etc/zenoh-flow", + "pid_file": "/var/zenoh-flow/runtime.pid", + "extensions": "/etc/zenoh-flow/extensions.d", + "worker_pool_size":4, + "use_shm": false + } + } +} \ No newline at end of file