Skip to content

Commit

Permalink
Improve deviceshifu mqtt and socket customized deviceshifu (#938)
Browse files Browse the repository at this point in the history
* update customized deviceshifu

* remove debug log and update return message when wrong method

* update deviceshifu socket customized deviceshifu

* fix set content-type header

* remove test etc file
  • Loading branch information
rhoninl authored Jul 31, 2024
1 parent b2de162 commit 3471a86
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 27 deletions.
41 changes: 25 additions & 16 deletions pkg/deviceshifu/deviceshifumqtt/deviceshifumqtt.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,9 @@ import (
"strings"
"time"

"github.com/edgenesis/shifu/pkg/deviceshifu/utils"

mqtt "github.com/eclipse/paho.mqtt.golang"
"github.com/edgenesis/shifu/pkg/deviceshifu/deviceshifubase"
"github.com/edgenesis/shifu/pkg/deviceshifu/utils"
"github.com/edgenesis/shifu/pkg/k8s/api/v1alpha1"
"github.com/edgenesis/shifu/pkg/logger"
)
Expand Down Expand Up @@ -106,14 +105,7 @@ func New(deviceShifuMetadata *deviceshifubase.DeviceShifuMetaData) (*DeviceShifu
var messagePubHandler mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
logger.Infof("Received message: %v from topic: %v", msg.Payload(), msg.Topic())
rawMqttMessageStr := string(msg.Payload())
instructionFuncName, shouldUsePythonCustomProcessing := deviceshifubase.CustomInstructionsPython[msg.Topic()]
logger.Infof("Topic %v is custom: %v", msg.Topic(), shouldUsePythonCustomProcessing)
if shouldUsePythonCustomProcessing {
logger.Infof("Topic %v has a python customized handler configured.\n", msg.Topic())
mqttMessageInstructionMap[msg.Topic()] = utils.ProcessInstruction(deviceshifubase.PythonHandlersModuleName, instructionFuncName, rawMqttMessageStr, deviceshifubase.PythonScriptDir)
} else {
mqttMessageInstructionMap[msg.Topic()] = rawMqttMessageStr
}
mqttMessageInstructionMap[msg.Topic()] = rawMqttMessageStr
mqttMessageReceiveTimestampMap[msg.Topic()] = time.Now()
logger.Infof("MESSAGE_STR updated")
}
Expand Down Expand Up @@ -163,16 +155,33 @@ func (handler DeviceCommandHandlerMQTT) commandHandleFunc() http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
// handlerEdgeDeviceSpec := handler.HandlerMetaData.edgeDeviceSpec
reqType := r.Method

topic := handler.HandlerMetaData.properties.MQTTTopic
if reqType == http.MethodGet {
returnMessage := ReturnBody{
MQTTMessage: mqttMessageInstructionMap[handler.HandlerMetaData.properties.MQTTTopic],
MQTTTimestamp: mqttMessageReceiveTimestampMap[handler.HandlerMetaData.properties.MQTTTopic].String(),
MQTTMessage: mqttMessageInstructionMap[topic],
MQTTTimestamp: mqttMessageReceiveTimestampMap[topic].String(),
}

responseMessage, err := json.Marshal(returnMessage)
if err != nil {
http.Error(w, "Cannot Encode message to json", http.StatusInternalServerError)
logger.Errorf("Cannot Encode message to json")
return
}

w.WriteHeader(http.StatusOK)
w.Header().Set("Content-Type", "application/json")
err := json.NewEncoder(w).Encode(returnMessage)
w.WriteHeader(http.StatusOK)

instructionFuncName, shouldUsePythonCustomProcessing := deviceshifubase.CustomInstructionsPython[handler.HandlerMetaData.instruction]
if shouldUsePythonCustomProcessing {
logger.Infof("Topic %v has a python customized handler configured.", topic)
responseMessage = []byte(utils.ProcessInstruction(deviceshifubase.PythonHandlersModuleName, instructionFuncName, string(responseMessage), deviceshifubase.PythonScriptDir))
if !json.Valid(responseMessage) {
w.Header().Set("Content-Type", "text/plain")
}
}

_, err = w.Write(responseMessage)
if err != nil {
http.Error(w, "Cannot Encode message to json", http.StatusInternalServerError)
logger.Errorf("Cannot Encode message to json")
Expand Down Expand Up @@ -212,7 +221,7 @@ func (handler DeviceCommandHandlerMQTT) commandHandleFunc() http.HandlerFunc {
logger.Infof("Info: Success To publish a message %v to MQTTServer!", requestBody)
return
} else {
http.Error(w, "must be GET or POST method", http.StatusBadRequest)
http.Error(w, "must be GET or PUT method", http.StatusBadRequest)
logger.Errorf("Request type %v is not supported yet!", reqType)
return
}
Expand Down
28 changes: 19 additions & 9 deletions pkg/deviceshifu/deviceshifusocket/deviceshifusocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,21 +133,31 @@ func deviceCommandHandlerSocket(HandlerMetaData *HandlerMetaData) http.HandlerFu
return
}
handlerInstruction := HandlerMetaData.instruction
instructionFuncName, shouldUsePythonCustomProcessing := deviceshifubase.CustomInstructionsPython[handlerInstruction]
logger.Infof("Instruction %v is custom: %v", handlerInstruction, shouldUsePythonCustomProcessing)
if shouldUsePythonCustomProcessing {
logger.Infof("Instruction %v has a python customized handler configured.\n", handlerInstruction)
outputMessage = utils.ProcessInstruction(deviceshifubase.PythonHandlersModuleName, instructionFuncName, outputMessage, deviceshifubase.PythonScriptDir)
}

returnMessage := ReturnBody{
returnBody := ReturnBody{
Message: outputMessage,
Status: http.StatusOK,
}
returnMessage, err := json.Marshal(returnBody)
if err != nil {
logger.Errorf("Failed marshal return message, error: %v", err)
http.Error(w, "Failed marshal return message, error: "+err.Error(), http.StatusBadRequest)
return
}

w.WriteHeader(http.StatusOK)
w.Header().Set("Content-Type", "application/json")
err = json.NewEncoder(w).Encode(returnMessage)
w.WriteHeader(http.StatusOK)

instructionFuncName, shouldUsePythonCustomProcessing := deviceshifubase.CustomInstructionsPython[handlerInstruction]
if shouldUsePythonCustomProcessing {
logger.Infof("Instruction %v has a python customized handler configured.\n", handlerInstruction)
returnMessage = []byte(utils.ProcessInstruction(deviceshifubase.PythonHandlersModuleName, instructionFuncName, outputMessage, deviceshifubase.PythonScriptDir))
if !json.Valid(returnMessage) {
w.Header().Set("Content-Type", "text/plain")
}
}

_, err = w.Write(returnMessage)
if err != nil {
logger.Errorf("Failed encode message to json, error: %v" + err.Error())
http.Error(w, "Failed encode message to json, error: "+err.Error(), http.StatusBadRequest)
Expand Down
2 changes: 1 addition & 1 deletion pkg/deviceshifu/utils/run_python.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ const (
)

func ProcessInstruction(moduleName string, funcName string, rawData string, scriptDir string) string {
cmdString := fmt.Sprintf("import %s; print(%s.%s(%s))", moduleName, moduleName, funcName, rawData)
cmdString := fmt.Sprintf("import %s; print(%s.%s(%s), end='')", moduleName, moduleName, funcName, rawData)
cmd := exec.Command(PYTHON, CMDARG, cmdString)
cmd.Dir = scriptDir
processed, err := cmd.CombinedOutput()
Expand Down
2 changes: 1 addition & 1 deletion tools/release/gpt/gpt.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func toPointer[T any](v T) *T {
}

func removeChar(s string) string {
// remove all "```"
// remove all "```"
str := "```"
return strings.ReplaceAll(s, str, "")
}

0 comments on commit 3471a86

Please sign in to comment.