Skip to content

Commit

Permalink
Added grpc/http callout tool
Browse files Browse the repository at this point in the history
  • Loading branch information
Ethan committed Nov 9, 2023
1 parent 8cb82ab commit faf58e4
Show file tree
Hide file tree
Showing 12 changed files with 705 additions and 66 deletions.
Binary file added samples/krill/cmd/callout/__debug_bin3668117303
Binary file not shown.
42 changes: 42 additions & 0 deletions samples/krill/cmd/callout/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package main

type Configuration struct {
LoggerConfiguration `json:"logger" yaml:"logger"`
ServerConfiguration `json:"servers" yaml:"servers"`
Outputs []Output `json:"outputs" yaml:"outputs"`
}

type LoggerConfiguration struct {
Level int `json:"level" yaml:"level"`
}

type ServerConfiguration struct {
HTTPServer `json:"http" yaml:"http"`
GRPCServer `json:"grpc" yaml:"grpc"`
}

type HTTPServer struct {
Resources []Resource `json:"resources" yaml:"resources"`
Port int `json:"port" yaml:"port"`
}

type GRPCServer struct {
Outputs []string `json:"outputs" yaml:"outputs"`
Port int `json:"port" yaml:"port"`
}

type Resource struct {
Path string `json:"path" yaml:"path"`
Method string `json:"method" yaml:"method"`
Status int `json:"status" yaml:"status"`
Response string `json:"response" yaml:"response"`
Outputs []string `json:"outputs" yaml:"outputs"`
}

type Output struct {
Name string `json:"name" yaml:"name"`
Path string `json:"path" yaml:"path"`
Type string `json:"type" yaml:"type"`
QoS int `json:"qos" yaml:"qos"`
Endpoint string `json:"endpoint" yaml:"endpoint"`
}
43 changes: 43 additions & 0 deletions samples/krill/cmd/callout/config.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
logger:
level: 0
servers:
http:
port: 3333
resources:
- path: /example
method: GET
status: 200
outputs: ["output1", "output2"]
response: |
{
"hello": "world"
}
- path: /example
method: POST
status: 200
outputs: ["output3", "output4"]
response: |
{
"hello": "world"
}
grpc:
port: 3334
outputs: ['output1', 'output2']
outputs:
- name: output1
type: stdout
- name: output2
type: mqtt
qos: 1
path: default/output1
endpoint: localhost:1883
- name: output3
type: mqtt
qos: 1
path: default/output2
endpoint: localhost:1883
- name: output4
type: mqtt
qos: 1
path: /myresource
endpoint: localhost:1883
50 changes: 50 additions & 0 deletions samples/krill/cmd/callout/grpc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package main

import (
"context"
"encoding/json"

"github.com/iot-for-all/device-simulation/lib/logger"
"github.com/iot-for-all/device-simulation/lib/proto"
)

type GRPCMessageServer struct {
proto.UnimplementedSenderServer
outputs []Out
encoder proto.Encoder
Logger logger.Logger
}

func NewGRPCMessageServer(outputs []Out, encoder proto.Encoder, options ...func(*GRPCMessageServer)) *GRPCMessageServer {
server := &GRPCMessageServer{
outputs: outputs,
encoder: encoder,
}

for _, option := range options {
option(server)
}

return server
}

func (server *GRPCMessageServer) Send(ctx context.Context, m *proto.Message) (*proto.Empty, error) {

server.Logger.Level(logger.Debug).Printf("received new grpc message")

res := server.encoder.Decode(m)

content, err := json.Marshal(res)
if err != nil {
return nil, err
}

for _, output := range server.outputs {
err := output.Out(content)
if err != nil {
return nil, err
}
}

return &proto.Empty{}, nil
}
104 changes: 104 additions & 0 deletions samples/krill/cmd/callout/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
package main

import (
"encoding/json"
"fmt"
"io"
"net"
"os"

"github.com/gofiber/fiber/v2"
"github.com/iot-for-all/device-simulation/lib/env"
"github.com/iot-for-all/device-simulation/lib/logger"
"github.com/iot-for-all/device-simulation/lib/proto"
"github.com/rs/zerolog/log"
"google.golang.org/grpc"
"gopkg.in/yaml.v3"
)

func main() {
err := run()
if err != nil {
panic(err)
}
}

func run() error {
app := fiber.New(fiber.Config{
DisableStartupMessage: true,
})

flagParser := env.NewFlagParser()

flags, err := flagParser.ReadFlags(map[string]any{
"config": "./config.yml",
"yaml": true,
"stdin": false,
})
if err != nil {
return err
}

unmarshal := yaml.Unmarshal
if !(*flags["yaml"].(*bool)) {
unmarshal = json.Unmarshal
}

configReader := env.New[Configuration](func(cr *env.ConfigurationReader[Configuration]) {
cr.Unmarshal = unmarshal
if *flags["stdin"].(*bool) {
cr.ReadFile = func(_ string) ([]byte, error) {
return io.ReadAll(os.Stdin)
}
}
})

configuration, err := configReader.Read(*flags["config"].(*string))
if err != nil {
return err
}

lis, err := net.Listen("tcp", fmt.Sprintf(":%d", configuration.GRPCServer.Port))
if err != nil {
return err
}

lg := logger.NewZeroLoggerWrapper(log.Logger, func(zlw *logger.ZeroLoggerWrapper) {
zlw.LogLevel = configuration.LoggerConfiguration.Level
})

outputs := NewOutputCollection(configuration.Outputs, func(oc *OutputCollection) {
oc.Logger = lg
})

err = outputs.Setup()
if err != nil {
return err
}

httpServer := New(app, configuration.HTTPServer, outputs, func(s *Server) {
s.Logger = lg
})

grpcOutputs := make([]Out, len(configuration.GRPCServer.Outputs))
for index, output := range configuration.GRPCServer.Outputs {
o, err := outputs.Get(output)
if err != nil {
return err
}
grpcOutputs[index] = o
}

messageServer := NewGRPCMessageServer(grpcOutputs, &proto.ProtoEncoder{})
grpcServer := grpc.NewServer()
proto.RegisterSenderServer(grpcServer, messageServer)

go func() {
err := grpcServer.Serve(lis)
if err != nil {
panic(err)
}
}()

return httpServer.Start()
}
Loading

0 comments on commit faf58e4

Please sign in to comment.