Skip to content

Commit

Permalink
public and consume improvements
Browse files Browse the repository at this point in the history
* prepare for command and control
* allow setting QoS and ttd for publish
  • Loading branch information
ctron committed Jul 4, 2019
1 parent 2b28840 commit b180563
Show file tree
Hide file tree
Showing 6 changed files with 158 additions and 9 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
.idea/
hot-*
hot
70 changes: 70 additions & 0 deletions cmd/command.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*******************************************************************************
* Copyright (c) 2019 Red Hat Inc
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*******************************************************************************/

package main

import (
"bufio"
"os"
"time"
)

type CommandReader interface {
ReadCommand(timeout time.Duration) *string
}

// noop command reader

type NoopCommandReader struct {
}

func (_ *NoopCommandReader) ReadCommand(timeout time.Duration) *string {
return nil
}

var _ CommandReader = &NoopCommandReader{}

// stdin command reader

type StdinCommandReader struct {
}

func (_ *StdinCommandReader) ReadCommand(timeout time.Duration) *string {

s := make(chan string)
e := make(chan error)

go func() {
reader := bufio.NewReader(os.Stdin)
line, err := reader.ReadString('\n')
if err != nil {
e <- err
} else {
s <- line
}
close(s)
close(e)
}()

select {
case line := <-s:
return &line
case _ = <-e:
return nil
case <-time.After(timeout):
return nil
}

}

var _ CommandReader = &StdinCommandReader{}
29 changes: 29 additions & 0 deletions cmd/consume.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,5 +88,34 @@ func consume(messageType string, uri string, tenant string) error {
}

utils.PrintMessage(msg)
if processCommands {
processCommand(msg)
}
}
}

func processCommand(msg *amqp.Message) {
ttd, ok := msg.ApplicationProperties["ttd"].(int32)

if !ok {
return
}

if ttd < 0 {
return
}

reader := &StdinCommandReader{}

fmt.Printf("Enter command response (%v s): ", ttd)

cmd := reader.ReadCommand(time.Duration(ttd) * time.Second)

if cmd == nil {
fmt.Print("Timeout!")
fmt.Println()
return
}

// FIXME: implement
}
21 changes: 18 additions & 3 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ import (

var insecure bool
var contentType string = "text/plain"
var processCommands bool = false
var ttd uint32 = 0
var qos uint8 = 0

func createTlsConfig() *tls.Config {
return &tls.Config{
Expand Down Expand Up @@ -60,16 +63,28 @@ func main() {
}

cmdPublish.AddCommand(cmdPublishHttp)
cmdPublish.Flags().StringVarP(&contentType, "content-type", "t", "text/plain", "content type")

// publish flags

cmdPublish.PersistentFlags().StringVar(&contentType, "content-type", "text/plain", "content type")

// publish http flags

cmdPublishHttp.Flags().Uint32VarP(&ttd, "ttd", "t", 0, "Wait for command")
cmdPublishHttp.Flags().Uint8VarP(&qos, "qos", "q", 0, "Quality of service")

// consume flags

cmdConsume.Flags().BoolVarP(&processCommands, "command", "c", false, "Enable commands")

// root command

var rootCmd = &cobra.Command{Use: "hot"}
rootCmd.AddCommand(cmdConsume, cmdPublish)

rootCmd.Flags().BoolVar(&insecure, "insecure", false, "Skip TLS validation")
rootCmd.PersistentFlags().BoolVar(&insecure, "insecure", false, "Skip TLS validation")

if err := rootCmd.Execute(); err != nil {
panic(err)
println(err.Error())
}
}
26 changes: 25 additions & 1 deletion cmd/publish_http.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ import (
"fmt"
"net/http"
neturl "net/url"
"strconv"

"github.com/ctron/hot/pkg/utils"
)

func publishHttp(messageType string, uri string, tenant string, deviceId string, authId string, password string, contentType string, payload string) error {
Expand All @@ -44,6 +47,12 @@ func publishHttp(messageType string, uri string, tenant string, deviceId string,

request.SetBasicAuth(authId+"@"+tenant, password)

if qos > 0 {
request.Header.Set("QoS-Level", strconv.Itoa(int(qos)))
}
if ttd > 0 {
request.Header.Set("hono-ttd", strconv.FormatUint(uint64(ttd), 10))
}
request.Header.Set("Content-Type", contentType)

response, err := client.Do(request)
Expand All @@ -59,7 +68,22 @@ func publishHttp(messageType string, uri string, tenant string, deviceId string,
return err
}

fmt.Println(body.String())
utils.PrintStart()

utils.PrintTitle("Headers")
for k, v := range response.Header {
if len(v) == 1 {
utils.PrintEntry(k, v[0])
} else {
utils.PrintEntry(k, v)
}
}

if body.Len() > 0 {
utils.PrintTitle("Payload")
fmt.Println(body.String())
}
utils.PrintEnd()

if err := response.Body.Close(); err != nil {
fmt.Printf("Failed to close response: %v", err)
Expand Down
20 changes: 15 additions & 5 deletions pkg/utils/print.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func PrintTitle(title string) {
}

func PrintEntry(k interface{}, v interface{}) {
fmt.Printf("%s => %s", k, v)
fmt.Printf("%s => %[2]v (%[2]T)", k, v)
fmt.Println()
}

Expand Down Expand Up @@ -59,19 +59,29 @@ func PrintMessageProperties(p *amqp.MessageProperties) {

}

func PrintStart() {
fmt.Println(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>")
}

func PrintEnd() {
fmt.Println("<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<")
}

func PrintMessage(msg *amqp.Message) {

PrintStart()

PrintAnnotations("Annotations", msg.Annotations)
PrintAnnotations("Delivery annotations", msg.DeliveryAnnotations)

PrintMessageProperties(msg.Properties)
PrintProperties("Application Properties", msg.ApplicationProperties)

fmt.Println("---------------------------------------------------------")
fmt.Println("Payload")
fmt.Println("---------------------------------------------------------")
PrintTitle("Payload")

fmt.Printf("%s", msg.GetData())
fmt.Println()
fmt.Println("=========================================================")

PrintEnd()

}

0 comments on commit b180563

Please sign in to comment.