diff --git a/.gitignore b/.gitignore index 36b28b2..8962ca2 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,3 @@ .idea/ hot-* +hot diff --git a/cmd/command.go b/cmd/command.go new file mode 100644 index 0000000..ce6eeef --- /dev/null +++ b/cmd/command.go @@ -0,0 +1,70 @@ +/******************************************************************************* + * Copyright (c) 2019 Red Hat Inc + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0 + * + * SPDX-License-Identifier: EPL-2.0 + *******************************************************************************/ + +package main + +import ( + "bufio" + "os" + "time" +) + +type CommandReader interface { + ReadCommand(timeout time.Duration) *string +} + +// noop command reader + +type NoopCommandReader struct { +} + +func (_ *NoopCommandReader) ReadCommand(timeout time.Duration) *string { + return nil +} + +var _ CommandReader = &NoopCommandReader{} + +// stdin command reader + +type StdinCommandReader struct { +} + +func (_ *StdinCommandReader) ReadCommand(timeout time.Duration) *string { + + s := make(chan string) + e := make(chan error) + + go func() { + reader := bufio.NewReader(os.Stdin) + line, err := reader.ReadString('\n') + if err != nil { + e <- err + } else { + s <- line + } + close(s) + close(e) + }() + + select { + case line := <-s: + return &line + case _ = <-e: + return nil + case <-time.After(timeout): + return nil + } + +} + +var _ CommandReader = &StdinCommandReader{} diff --git a/cmd/consume.go b/cmd/consume.go index 7b98233..ac6c163 100644 --- a/cmd/consume.go +++ b/cmd/consume.go @@ -88,5 +88,34 @@ func consume(messageType string, uri string, tenant string) error { } utils.PrintMessage(msg) + if processCommands { + processCommand(msg) + } + } +} + +func processCommand(msg *amqp.Message) { + ttd, ok := msg.ApplicationProperties["ttd"].(int32) + + if !ok { + return + } + + if ttd < 0 { + return } + + reader := &StdinCommandReader{} + + fmt.Printf("Enter command response (%v s): ", ttd) + + cmd := reader.ReadCommand(time.Duration(ttd) * time.Second) + + if cmd == nil { + fmt.Print("Timeout!") + fmt.Println() + return + } + + // FIXME: implement } diff --git a/cmd/main.go b/cmd/main.go index 7ae7992..355f6e2 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -22,6 +22,9 @@ import ( var insecure bool var contentType string = "text/plain" +var processCommands bool = false +var ttd uint32 = 0 +var qos uint8 = 0 func createTlsConfig() *tls.Config { return &tls.Config{ @@ -60,16 +63,28 @@ func main() { } cmdPublish.AddCommand(cmdPublishHttp) - cmdPublish.Flags().StringVarP(&contentType, "content-type", "t", "text/plain", "content type") + + // publish flags + + cmdPublish.PersistentFlags().StringVar(&contentType, "content-type", "text/plain", "content type") + + // publish http flags + + cmdPublishHttp.Flags().Uint32VarP(&ttd, "ttd", "t", 0, "Wait for command") + cmdPublishHttp.Flags().Uint8VarP(&qos, "qos", "q", 0, "Quality of service") + + // consume flags + + cmdConsume.Flags().BoolVarP(&processCommands, "command", "c", false, "Enable commands") // root command var rootCmd = &cobra.Command{Use: "hot"} rootCmd.AddCommand(cmdConsume, cmdPublish) - rootCmd.Flags().BoolVar(&insecure, "insecure", false, "Skip TLS validation") + rootCmd.PersistentFlags().BoolVar(&insecure, "insecure", false, "Skip TLS validation") if err := rootCmd.Execute(); err != nil { - panic(err) + println(err.Error()) } } diff --git a/cmd/publish_http.go b/cmd/publish_http.go index af9cddb..4d41cd1 100644 --- a/cmd/publish_http.go +++ b/cmd/publish_http.go @@ -18,6 +18,9 @@ import ( "fmt" "net/http" neturl "net/url" + "strconv" + + "github.com/ctron/hot/pkg/utils" ) func publishHttp(messageType string, uri string, tenant string, deviceId string, authId string, password string, contentType string, payload string) error { @@ -44,6 +47,12 @@ func publishHttp(messageType string, uri string, tenant string, deviceId string, request.SetBasicAuth(authId+"@"+tenant, password) + if qos > 0 { + request.Header.Set("QoS-Level", strconv.Itoa(int(qos))) + } + if ttd > 0 { + request.Header.Set("hono-ttd", strconv.FormatUint(uint64(ttd), 10)) + } request.Header.Set("Content-Type", contentType) response, err := client.Do(request) @@ -59,7 +68,22 @@ func publishHttp(messageType string, uri string, tenant string, deviceId string, return err } - fmt.Println(body.String()) + utils.PrintStart() + + utils.PrintTitle("Headers") + for k, v := range response.Header { + if len(v) == 1 { + utils.PrintEntry(k, v[0]) + } else { + utils.PrintEntry(k, v) + } + } + + if body.Len() > 0 { + utils.PrintTitle("Payload") + fmt.Println(body.String()) + } + utils.PrintEnd() if err := response.Body.Close(); err != nil { fmt.Printf("Failed to close response: %v", err) diff --git a/pkg/utils/print.go b/pkg/utils/print.go index 88ad0df..a532ffa 100644 --- a/pkg/utils/print.go +++ b/pkg/utils/print.go @@ -26,7 +26,7 @@ func PrintTitle(title string) { } func PrintEntry(k interface{}, v interface{}) { - fmt.Printf("%s => %s", k, v) + fmt.Printf("%s => %[2]v (%[2]T)", k, v) fmt.Println() } @@ -59,19 +59,29 @@ func PrintMessageProperties(p *amqp.MessageProperties) { } +func PrintStart() { + fmt.Println(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>") +} + +func PrintEnd() { + fmt.Println("<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<") +} + func PrintMessage(msg *amqp.Message) { + PrintStart() + PrintAnnotations("Annotations", msg.Annotations) PrintAnnotations("Delivery annotations", msg.DeliveryAnnotations) PrintMessageProperties(msg.Properties) PrintProperties("Application Properties", msg.ApplicationProperties) - fmt.Println("---------------------------------------------------------") - fmt.Println("Payload") - fmt.Println("---------------------------------------------------------") + PrintTitle("Payload") + fmt.Printf("%s", msg.GetData()) fmt.Println() - fmt.Println("=========================================================") + + PrintEnd() }