This document will walk through an end-to-end example of creating Discovery Handler to discover HTTP-based devices that publish random sensor data. It will also walk through how to create a custom broker to leverage the discovered devices. Reference the Discovery Handler development and broker Pod development documents if you prefer generic documentation over an example.
Before continuing, you may wish to reference the Akri architecture and Akri agent documentation. They will provide a good understanding of Akri, how it works, and what components it is composed of.
Any Docker-compatible container registry will work for hosting the containers being used in this example (Docker Hub, Github Container Registry, Azure Container Registry, etc). Here, we are using the GitHub Container Registry. You can follow the getting started guide here to enable it for yourself.
{% hint style="info" %}
if your container registry is private, you will need to create a kubernetes secret kubectl create secret docker-registry crPullSecret --docker-server=<cr> --docker-username=<cr-user> --docker-password=<cr-token>
and access it with an imagePullSecret
. Here, we will assume the secret is named crPullSecret
.
{% endhint %}
Akri has implemented discovery via several protocols with sample brokers and applications to demonstrate usage. However, there may be protocols you would like to use to discover resources that have not been implemented as Discovery Handlers yet. To enable the discovery of resources via a new protocol, you will implement a Discovery Handler (DH), which does discovery on behalf of the Agent. A Discovery Handler is anything that implements the Discovery
service and Registration
client defined in the Akri's discovery gRPC proto file. These DHs run as their own Pods and are expected to register with the Agent, which hosts the Registration
service defined in the gRPC interface.
Install cargo-generate
and use the tool to pull down Akri's template, specifying the name of the project with the --name
parameter.
cargo generate --git https://github.com/project-akri/akri-discovery-handler-template.git --name akri-http-discovery-handler
Inside the newly created akri-http-discovery-handler
project, navigate to main.rs
. It contains all the logic to register our DiscoveryHandler
with the Akri Agent. We only need to specify the DiscoveryHandler
name and whether the device discovered by our DiscoveryHandler
can be shared. Set name
equal to "http"
and shared
to true
, as our HTTP Discovery Handler will discover devices that can be shared between nodes. The protocol name also resolves to the name of the socket the Discovery Handler will run on.
Akri's Configuration CRD takes in a DiscoveryHandlerInfo
, which is defined structurally as follows:
#[derive(Serialize, Deserialize, Clone, Debug)]
#[serde(rename_all = "camelCase")]
pub struct DiscoveryHandlerInfo {
pub name: String,
#[serde(default)]
pub discovery_details: String,
}
When creating a Discovery Handler, you must decide what name or label to give it and add any details you would like your Discovery Handler to receive in the discovery_details
string. The Agent passes this string to Discovery Handlers as part of a DiscoverRequest
. A discovery handler must then parse this string -- Akri's built in Discovery Handlers store an expected structure in it as serialized YAML -- to determine what to discover, filter out of discovery, and so on. In our case, no parsing is required, as it will simply put our discovery endpoint. Our implementation will ping the discovery service at that URL to see if there are any devices.
Ultimately, the Discovery Handler section of our HTTP Configuration will look like the following.
apiVersion: akri.sh/v0
kind: Configuration
metadata:
name: http
spec:
discoveryHandler:
name: http
discoveryDetails: http://discovery:9999/discovery
Now that we know what will be passed to our Discovery Handler, let's implement the discovery functionality.
A DiscoveryHandlerImpl
Struct has been created (in discovery_handler.rs
) that minimally implements the DiscoveryHandler
service. Let's fill in the discover
function, which returns the list of discovered devices. It should have all the functionality desired for discovering devices via your protocol and filtering for only the desired set. For the HTTP protocol, discover
will perform an HTTP GET on the Discovery Handler's discovery service URL received in the DiscoverRequest
.
First, let's add the additional crates we are using to our Cargo.toml
under dependencies.
tokio-stream = { version = "0.1", features = ["net"] }
anyhow = "1.0.38"
reqwest = "0.10.8"
env_logger = "0.9.0"
log = "0.4"
Now, import our dependencies and define some constants. Add the following after the other imports at the top of discovery_handler.rs
.
use anyhow::Error;
use reqwest::get;
use std::collections::HashMap;
const BROKER_NAME: &str = "AKRI_HTTP";
const DEVICE_ENDPOINT: &str = "AKRI_HTTP_DEVICE_ENDPOINT";
Fill in your discover
function so as to match the following. Note, discover
creates a streamed connection with the Agent, where the Agent gets the receiving end of the channel and the Discovery Handler sends device updates via the sending end of the channel. If the Agent drops its end, the Discovery Handler will stop discovery and attempt to re-register with the Agent. The Agent may drop its end due to an error or a deleted Configuration.
#[async_trait]
impl DiscoveryHandler for DiscoveryHandlerImpl {
type DiscoverStream = DiscoverStream;
async fn discover(
&self,
request: tonic::Request<DiscoverRequest>,
) -> Result<Response<Self::DiscoverStream>, Status> {
// Get the discovery url from the `DiscoverRequest`
let url = request.get_ref().discovery_details.clone();
// Create a channel for sending and receiving device updates
let (stream_sender, stream_receiver) = mpsc::channel(4);
let register_sender = self.register_sender.clone();
tokio::spawn(async move {
loop {
let resp = get(&url).await.unwrap();
// Response is a newline separated list of devices (host:port) or empty
let device_list = &resp.text().await.unwrap();
let devices = device_list
.lines()
.map(|endpoint| {
let mut properties = HashMap::new();
properties.insert(BROKER_NAME.to_string(), "http".to_string());
properties.insert(DEVICE_ENDPOINT.to_string(), endpoint.to_string());
Device {
id: endpoint.to_string(),
properties,
mounts: Vec::default(),
device_specs: Vec::default(),
}
})
.collect::<Vec<Device>>();
// Send the Agent the list of devices.
if let Err(_) = stream_sender.send(Ok(DiscoverResponse { devices })).await {
// Agent dropped its end of the stream. Stop discovering and signal to try to re-register.
register_sender.send(()).await.unwrap();
break;
}
}
});
// Send the agent one end of the channel to receive device updates
Ok(Response::new(tokio_stream::wrappers::ReceiverStream::new(
stream_receiver,
)))
}
}
Now you are ready to build your HTTP discovery handler and push it to your container registry. To do so, we simply need to run this step from the base folder of the Akri repo:
HOST="ghcr.io"
USER=[[GITHUB-USER]]
DH="http-discovery-handler"
TAGS="v1"
DH_IMAGE="${HOST}/${USER}/${DH}"
DH_IMAGE_TAGGED="${DH_IMAGE}:${TAGS}"
docker build \
--tag=${DH_IMAGE_TAGGED} \
--file=./Dockerfile.discovery-handler \
. && \
docker push ${DH_IMAGE_TAGGED}
Save the name of your image. We will pass it into our Akri installation command when we are ready to deploy our discovery handler.
At this point, we've extended Akri to discover devices with our HTTP Discovery Handler, and we've created an HTTP broker that can be deployed. To really test our new discovery and brokers, we need to create something to discover.
For this exercise, we can create an HTTP service that listens to various paths. Each path can simulate a different device by publishing some value. With this, we can create a single Kubernetes pod that can simulate multiple devices. To make our scenario more realistic, we can add a discovery endpoint as well. Further, we can create a series of Kubernetes services that create facades for the various paths, giving the illusion of multiple devices and a separate discovery service.
To that end, let's:
- Create a web service that mocks HTTP devices and a discovery service
- Deploy, start, and expose our mock HTTP devices and discovery service
To simulate a set of discoverable HTTP devices and a discovery service, create a simple HTTP server (samples/apps/http-apps/cmd/device/main.go
). The application will accept a list of path
arguments, which will define endpoints that the service will respond to. These endpoints represent devices in our HTTP Discovery Handler. The application will also accept a set of device
arguments, which will define the set of discovered devices.
package main
import (
"flag"
"fmt"
"log"
"math/rand"
"net"
"net/http"
"time"
"strings"
"html"
)
const (
addr = ":8080"
)
// RepeatableFlag is an alias to use repeated flags with flag
type RepeatableFlag []string
// String is a method required by flag.Value interface
func (e *RepeatableFlag) String() string {
result := strings.Join(*e, "\n")
return result
}
// Set is a method required by flag.Value interface
func (e *RepeatableFlag) Set(value string) error {
*e = append(*e, value)
return nil
}
var _ flag.Value = (*RepeatableFlag)(nil)
var paths RepeatableFlag
var devices RepeatableFlag
func main() {
flag.Var(&paths, "path", "Repeat this flag to add paths for the device")
flag.Var(&devices, "device", "Repeat this flag to add devices to the discovery service")
flag.Parse()
// At a minimum, respond on `/`
if len(paths) == 0 {
paths = []string{"/"}
}
log.Printf("[main] Paths: %d", len(paths))
seed := rand.NewSource(time.Now().UnixNano())
entr := rand.New(seed)
handler := http.NewServeMux()
// Create handler for the discovery endpoint
handler.HandleFunc("/discovery", func(w http.ResponseWriter, r *http.Request) {
log.Printf("[discovery] Handler entered")
fmt.Fprintf(w, "%s\n", html.EscapeString(devices.String()))
})
// Create handler for each endpoint
for _, path := range paths {
log.Printf("[main] Creating handler: %s", path)
handler.HandleFunc(path, func(w http.ResponseWriter, r *http.Request) {
log.Printf("[device] Handler entered: %s", path)
fmt.Fprint(w, entr.Float64())
})
}
s := &http.Server{
Addr: addr,
Handler: handler,
}
listen, err := net.Listen("tcp", addr)
if err != nil {
log.Fatal(err)
}
log.Printf("[main] Starting Device: [%s]", addr)
log.Fatal(s.Serve(listen))
}
To ensure that our GoLang project builds, we need to create samples/apps/http-apps/go.mod
:
module github.com/project-akri/akri/http-extensibility
go 1.15
To build and deploy the mock devices and discovery, a simple Dockerfile can be created that builds and exposes our mock server samples/apps/http-apps/Dockerfiles/device
:
FROM golang:1.15 as build
WORKDIR /http-extensibility
COPY go.mod .
RUN go mod download
COPY . .
RUN GOOS=linux \
go build -a -installsuffix cgo \
-o /bin/device \
github.com/project-akri/akri/http-extensibility/cmd/device
FROM gcr.io/distroless/base-debian10
COPY --from=build /bin/device /
USER 999
EXPOSE 8080
ENTRYPOINT ["/device"]
CMD ["--path=/","--path=/sensor","--device=device:8000","--device=device:8001"]
And to deploy, use docker build
and docker push
:
cd ./samples/apps/http-apps
HOST="ghcr.io"
USER=[[GITHUB-USER]]
PREFIX="http-apps"
TAGS="v1"
IMAGE="${HOST}/${USER}/${PREFIX}-device:${TAGS}"
docker build \
--tag=${IMAGE} \
--file=./Dockerfiles/device \
.
docker push ${IMAGE}
The mock devices can be deployed with a Kubernetes deployment samples/apps/http-apps/kubernetes/device.yaml
(update image based on the ${IMAGE}):
apiVersion: apps/v1
kind: Deployment
metadata:
name: device
spec:
replicas: 1
selector:
matchLabels:
id: akri-http-device
template:
metadata:
labels:
id: akri-http-device
name: device
spec:
imagePullSecrets:
- name: crPullSecret
containers:
- name: device
image: IMAGE
imagePullPolicy: Always
args:
- --path=/
- --device=http://device-1:8080
- --device=http://device-2:8080
- --device=http://device-3:8080
- --device=http://device-4:8080
- --device=http://device-5:8080
- --device=http://device-6:8080
- --device=http://device-7:8080
- --device=http://device-8:8080
- --device=http://device-9:8080
ports:
- name: http
containerPort: 8080
Then apply device.yaml
to create a deployment (called device
) and a pod (called device-...
):
kubectl apply --filename=./samples/apps/http-apps/kubernetes/device.yaml
{% hint style="info" %} We're using one deployment|pod to represent 9 devices AND a discovery service ... we will create 9 (distinct) Services against it (1 for each mock device) and 1 Service to present the discovery service. {% endhint %}
Then create 9 mock device Services:
for NUM in {1..9}
do
# Services are uniquely named
# The service uses the Pods port: 8080
kubectl expose deployment/device \
--name=device-${NUM} \
--port=8080 \
--target-port=8080 \
--labels=id=akri-http-device
done
Optional: check one the services:
kubectl run curl -it --rm --image=curlimages/curl -- shThen, pick a value for
X
between 1 and 9:X=6 curl device-${X}:8080
Any or all of these should return a (random) 'sensor' value.
Then create a Service (called discovery
) using the deployment:
kubectl expose deployment/device \
--name=discovery \
--port=8080 \
--target-port=8080 \
--labels=id=akri-http-device
Optional: check the service to confirm that it reports a list of devices correctly using:
kubectl run curl -it --rm --image=curlimages/curl -- shThen, curl the service's endpoint:
curl discovery:8080/discoveryThis should return a list of 9 devices, of the form
http://device-X:8080
Now that we have created a HTTP Discovery Handler and created some mock devices, let's deploy Akri and see how it discovers the devices and creates Akri Instances for each Device.
{% hint style="info" %} Optional: If you've previous installed Akri and wish to reset, you may:
sudo helm delete akri
{% endhint %}
Akri has provided helm templates for custom Discovery Handlers and their Configurations. These templates are provided as a starting point. They may need to be modified to meet the needs of a Discovery Handler. When installing Akri, specify that you want to deploy a custom Discovery Handler as a DaemonSet by setting custom.discovery.enabled=true
. Specify the container for that DaemonSet as the HTTP discovery handler that you built above by setting custom.discovery.image.repository=$DH_IMAGE
and custom.discovery.image.repository=$TAGS
. To automatically deploy a custom Configuration, set custom.configuration.enabled=true
. We will customize this Configuration to contain the discovery endpoint needed by our HTTP Discovery Handler by setting it in the discovery_details
string of the Configuration, like so: custom.configuration.discoveryDetails=http://discovery:9999/discovery
. We also need to set the name the Discovery Handler will register under (custom.configuration.discoveryHandlerName
) and a name for the Discovery Handler and Configuration (custom.discovery.name
and custom.configuration.name
). All these settings come together as the following Akri installation command:
helm repo add akri-helm-charts https://project-akri.github.io/akri/
helm install akri akri-helm-charts/akri \
--set imagePullSecrets[0].name="crPullSecret" \
--set custom.discovery.enabled=true \
--set custom.discovery.image.repository=$DH_IMAGE \
--set custom.discovery.image.tag=$TAGS \
--set custom.discovery.name=akri-http-discovery \
--set custom.configuration.enabled=true \
--set custom.configuration.name=akri-http \
--set custom.configuration.discoveryHandlerName=http \
--set custom.configuration.discoveryDetails=http://discovery:9999/discovery
Watch as the Agent, Controller, and Discovery Handler Pods are spun up and as Instances are created for each of the discovery devices.
watch kubectl get pods,akrii
If you simply wanted Akri to expose discovered devices to the cluster as Kubernetes resources, you could stop here. If you have a workload that could utilize one of these resources, you could manually deploy pods that request them as resources. Alternatively, you could have Akri automatically deploy workloads to discovered devices. We call these workloads brokers. To quickly see this, lets deploy empty nginx pods to discovered resources, by updating our Configuration to include a broker PodSpec.
helm upgrade akri akri-helm-charts/akri \
--set imagePullSecrets[0].name="crPullSecret" \
--set custom.discovery.enabled=true \
--set custom.discovery.image.repository=$DH_IMAGE \
--set custom.discovery.image.tag=$TAGS \
--set custom.discovery.name=akri-http-discovery \
--set custom.configuration.enabled=true \
--set custom.configuration.name=akri-http \
--set custom.configuration.discoveryHandlerName=http \
--set custom.configuration.discoveryDetails=http://discovery:9999/discovery \
--set custom.brokerPod.image.repository=nginx
watch kubectl get pods,akrii
Our empty nginx brokers do not do anything with the devices they've requested, so lets create our own broker.
We have successfully created our Discovery Handler. If you want Akri to also automatically deploy Pods (called brokers) to each discovered device, this section will show you how to create a custom broker that will make the HTTP-based Device data available to the cluster. The broker can be written in any language as it will be deployed as an individual pod.
3 different broker implementations have been created for the HTTP Discovery Handler in the http-extensibility branch, 2 in Rust and 1 in Go:
-
The standalone broker is a self-contained scenario that demonstrates the ability to interact with HTTP-based devices
by
curl
ing a device's endpoints. This type of solution would be applicable in batch-like scenarios where the brokerperforms a predictable set of processing steps for a device.
-
The second scenario uses gRPC. gRPC is an increasingly common alternative to REST-like APIs and supports
high-throughput and streaming methods. gRPC is not a requirement for broker implementations in Akri but is used here
as one of many mechanisms that may be used. The gRPC-based broker has a companion client. This is a more realistic
scenario in which the broker proxies client requests using gRPC to HTTP-based devices. The advantage of this approach
is that device functionality is encapsulated by an API that is exposed by the broker. In this case the API has a
single method but in practice, there could be many methods implemented.
-
The third implementation is a gRPC-based broker and companion client implemented in Golang. This is functionally
equivalent to the Rust implementation and shares a protobuf definition. For this reason, you may combine the Rust
broker and client with the Golang broker and client arbitrarily. The Golang broker is described in the
http-apps
directory.
For this, we will describe the first option, a standalone broker. For a more detailed look at the other gRPC options, please look at extensibility-http-grpc.md in the http-extensibility branch.
First, let's create a new Rust project for our sample broker. We can use cargo to create our project by navigating to samples/brokers
and running:
cargo new http
Once the http project has been created, it can be added to the greater Akri project by adding "samples/brokers/http"
to the members in ./Cargo.toml
.
To access the HTTP-based Device data, we first need to retrieve the discovery information. Any information stored in the Device
properties map will be transferred into the broker container's environment variables. Retrieving them is simply a matter of querying environment variables like this:
let device_url = env::var("AKRI_HTTP_DEVICE_ENDPOINT")?;
For our HTTP broker, the data can be retrieved with a simple GET:
async fn read_sensor(device_url: &str) {
match get(device_url).await {
Ok(resp) => {
let body = resp.text().await;
}
Err(err) => println!("Error: {:?}", err),
};
}
We can tie all the pieces together in samples/brokers/http/src/main.rs
. We retrieve the HTTP-based Device url from the environment variables, make a simple GET request to retrieve the device data, and output the response to the log:
use reqwest::get;
use std::env;
use tokio::{time, time::Duration};
const DEVICE_ENDPOINT: &str = "AKRI_HTTP_DEVICE_ENDPOINT";
async fn read_sensor(device_url: &str) {
match get(device_url).await {
Ok(resp) => {
let body = resp.text().await;
println!("[main:read_sensor] Response body: {:?}", body);
}
Err(err) => println!("Error: {:?}", err),
};
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let device_url = env::var(DEVICE_ENDPOINT)?;
let mut tasks = Vec::new();
tasks.push(tokio::spawn(async move {
loop {
time::delay_for(Duration::from_secs(10)).await;
read_sensor(&device_url[..]).await;
}
}));
futures::future::join_all(tasks).await;
Ok(())
}
and ensure that we have the required dependencies in samples/brokers/http/Cargo.toml
:
[[bin]]
name = "standalone"
path = "src/main.rs"
[dependencies]
futures = "0.3"
reqwest = "0.10.8"
tokio = { version = "0.2", features = ["rt-threaded", "time", "stream", "fs", "macros", "uds"] }
To build the HTTP broker, we need to create a Dockerfile, samples/brokers/http/Dockerfiles/standalone
:
FROM amd64/rust:1.47 as build
RUN rustup component add rustfmt --toolchain 1.47.0-x86_64-unknown-linux-gnu
RUN USER=root cargo new --bin http
WORKDIR /http
COPY ./samples/brokers/http/Cargo.toml ./Cargo.toml
RUN cargo build \
--bin=standalone \
--release
RUN rm ./src/*.rs
RUN rm ./target/release/deps/standalone*
COPY ./samples/brokers/http .
RUN cargo build \
--bin=standalone \
--release
FROM amd64/debian:bullseye-slim
RUN apt-get update && \
apt-get install -y --no-install-recommends \
ca-certificates \
libssl-dev \
openssl && \
apt-get clean
COPY --from=build /http/target/release/standalone /standalone
LABEL org.opencontainers.image.source https://github.com/project-akri/akri
ENV SSL_CERT_FILE=/etc/ssl/certs/ca-certificates.crt
ENV SSL_CERT_DIR=/etc/ssl/certs
ENV RUST_LOG standalone
ENTRYPOINT ["/standalone"]
Akri's .dockerignore
is configured so that docker will ignore most files in our repository, some exceptions will need to be added to build the HTTP broker:
!samples/brokers/http
Now you are ready to build the HTTP broker! To do so, we simply need to run this step from the base folder of the Akri repo:
HOST="ghcr.io"
USER=[[GITHUB-USER]]
BROKER="http-broker"
TAGS="v1"
BROKER_IMAGE="${HOST}/${USER}/${BROKER}"
BROKER_IMAGE_TAGGED="${BROKER_IMAGE}:${TAGS}"
docker build \
--tag=${BROKER_IMAGE_TAGGED} \
--file=./samples/brokers/http/Dockerfiles/standalone \
. && \
docker push ${BROKER_IMAGE_TAGGED}
Now that the HTTP broker has been created, we can substitute it's image in for the simple nginx broker we previously used in our installation command.
helm upgrade akri akri-helm-charts/akri \
--set imagePullSecrets[0].name="crPullSecret" \
--set custom.discovery.enabled=true \
--set custom.discovery.image.repository=$DH_IMAGE \
--set custom.discovery.image.tag=$TAGS \
--set custom.discovery.name=akri-http-discovery \
--set custom.configuration.enabled=true \
--set custom.configuration.name=akri-http \
--set custom.configuration.discoveryHandlerName=http \
--set custom.configuration.discoveryDetails=http://discovery:9999/discovery \
--set custom.configuration.brokerPod.image.repository=$BROKER_IMAGE \
--set custom.configuration.brokerPod.image.tag=$TAGS
watch kubectl get pods,akrii
Note: substitute
helm upgrade
forhelm install
if you do not have an existing Akri installation
We can watch as the broker pods get deployed:
watch kubectl get pods -o wide