Skip to content

Commit

Permalink
[eclipse-kanto#3] Add shadow state handler. Send null values for miss…
Browse files Browse the repository at this point in the history
…ing objects w… (eclipse-kanto#6)

[eclipse-kanto#3] Json document too large

Add shadow state handler. Send null values for missing objects when modifying.

Signed-off-by: Ivan Marinov ivan.marinov@bosch.io
  • Loading branch information
IvanBoychevMarinov authored Nov 24, 2023
1 parent 11aded6 commit 175a638
Show file tree
Hide file tree
Showing 6 changed files with 760 additions and 27 deletions.
9 changes: 7 additions & 2 deletions cmd/aws-connector/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/eclipse-kanto/aws-connector/flags"
"github.com/eclipse-kanto/aws-connector/routing/message/handlers"
"github.com/eclipse-kanto/aws-connector/routing/message/handlers/passthrough"
"github.com/eclipse-kanto/aws-connector/routing/message/handlers/state"

"github.com/eclipse-kanto/suite-connector/config"
suiteFlags "github.com/eclipse-kanto/suite-connector/flags"
Expand Down Expand Up @@ -80,10 +81,14 @@ func main() {
logger.Infof("Starting aws connector %s", version)
suiteFlags.ConfigCheck(logger, *fConfigFile)

shadowStateHandler := state.CreateDefaultShadowStateHandler()
cloudHandlers := []handlers.MessageHandler{
shadowStateHandler,
}

deviceHandlers := []handlers.MessageHandler{
passthrough.CreateDefaultDeviceHandler(),
passthrough.CreateDefaultDeviceHandler(shadowStateHandler.(passthrough.ShadowStateHolder)),
}
cloudHandlers := []handlers.MessageHandler{}

if err := app.MainLoop(settings, logger, deviceHandlers, cloudHandlers); err != nil {
logger.Error("Init failure", err, nil)
Expand Down
130 changes: 114 additions & 16 deletions routing/message/handlers/passthrough/device_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,25 +43,24 @@ const (
topicRootShadow = "$aws/things/%s/shadow/%s"
// Used to update feature of root thing or attributes of child thing.
topicNamedShadow = "$aws/things/%s/shadow/name/%s/%s"
// Used to update feature of child thing.
topicComplexNamedShadow = "$aws/things/%s/shadow/name/%s:%s/%s"

topicUpdate = "update"
topicDelete = "delete"
)

type deviceHandler struct {
tenantID string
deviceID string
payloadFilters []*regexp.Regexp
topicFilter *regexp.Regexp
logger watermill.LoggerAdapter
defaultHandler message.HandlerFunc
tenantID string
deviceID string
payloadFilters []*regexp.Regexp
topicFilter *regexp.Regexp
logger watermill.LoggerAdapter
defaultHandler message.HandlerFunc
shadowStateHolder ShadowStateHolder
}

// CreateDefaultDeviceHandler instantiates a new passthrough handler that forwards messages received from local message broker on event and telemetry topics as device-to-cloud messages.
func CreateDefaultDeviceHandler() handlers.MessageHandler {
return &deviceHandler{}
func CreateDefaultDeviceHandler(shadowStateHolder ShadowStateHolder) handlers.MessageHandler {
return &deviceHandler{shadowStateHolder: shadowStateHolder}
}

// Init gets the device ID that is needed for the message forwarding towards AWS IoT Hub.
Expand Down Expand Up @@ -109,7 +108,7 @@ func (h *deviceHandler) HandleMessage(msg *message.Message) ([]*message.Message,
}

// toShadowTopic convert Ditto topic to its corresponding device shadow topic and if its an update message.
func (h *deviceHandler) toShadowTopic(topic *protocol.Topic, featureName string, value interface{}) (res string, update bool) {
func (h *deviceHandler) toShadowTopic(topic *protocol.Topic, featureName string, value interface{}) (res string, update bool, shadowID string) {
target := topicUpdate
if topic.Action == protocol.ActionDelete && h.isEntireShadow(value) {
target = topicDelete
Expand All @@ -122,17 +121,19 @@ func (h *deviceHandler) toShadowTopic(topic *protocol.Topic, featureName string,
if len(h.deviceID) == len(topicID) {
if featureName == "" {
// Update root thing attributes.
return fmt.Sprintf(topicRootShadow, h.deviceID, target), update
return fmt.Sprintf(topicRootShadow, h.deviceID, target), update, h.deviceID
}
// Update root thing feature.
return fmt.Sprintf(topicNamedShadow, h.deviceID, featureName, target), update
return fmt.Sprintf(topicNamedShadow, h.deviceID, featureName, target), update, featureName
}
if featureName == "" {
// Update child thing attributes.
return fmt.Sprintf(topicNamedShadow, h.deviceID, topicID[len(h.deviceID)+1:], target), update
shadowID = topicID[len(h.deviceID)+1:]
return fmt.Sprintf(topicNamedShadow, h.deviceID, shadowID, target), update, shadowID
}
// Update child thing feature.
return fmt.Sprintf(topicComplexNamedShadow, h.deviceID, topicID[len(h.deviceID)+1:], featureName, target), update
shadowID = fmt.Sprintf("%s:%s", topicID[len(h.deviceID)+1:], featureName)
return fmt.Sprintf(topicNamedShadow, h.deviceID, shadowID, target), update, shadowID
}

// isDittoRequest returns true if provided message is Ditto request to the connected device.
Expand Down Expand Up @@ -187,10 +188,11 @@ func integrate(path []string, value interface{}) interface{} {

// toShadowMessage convert Ditto data to device shadow message.
func (h *deviceHandler) toShadowMessage(env *protocol.Envelope, featureName string, value interface{}) *message.Message {
topic, update := h.toShadowTopic(env.Topic, featureName, value)
topic, update, shadowID := h.toShadowTopic(env.Topic, featureName, value)

var payload message.Payload
if update {
value = h.mergeWithCurrentShadowState(shadowID, value, env)
// Adds shadow prefix infront: ["state", "reported"]
value = integrate([]string{valueStateTag, valueReportedTag}, value)
payload, _ = json.Marshal(value)
Expand All @@ -203,6 +205,102 @@ func (h *deviceHandler) toShadowMessage(env *protocol.Envelope, featureName stri
return message
}

func (h deviceHandler) mergeWithCurrentShadowState(featureName string, newState interface{}, envelope *protocol.Envelope) interface{} {
if envelope.Topic.Action != protocol.ActionModify {
return newState
}

if h.shadowStateHolder == nil {
return newState
}

currentState := h.shadowStateHolder.GetCurrentShadowState(featureName)
if currentState == nil {
return newState
}

if subpath, ok := isSinglePropertyOrAttributeUpdate(envelope.Path); ok {
return mergeSubpaths(currentState, newState, strings.Split(subpath, "/"))
}

return merge(currentState, newState)
}

func isSinglePropertyOrAttributeUpdate(path string) (string, bool) {
if propertiesIndex := strings.Index(path, valuePropertiesTag); propertiesIndex >= 0 {
return path[propertiesIndex+len(valuePropertiesTag):], true
}

if attributesIndex := strings.Index(path, valueAttributesTag); attributesIndex >= 0 {
return path[attributesIndex+len(valueAttributesTag):], true
}

return "", false
}

func mergeSubpaths(currentState interface{}, newState interface{}, propertyPath []string) interface{} {
currentStateMap, isCurrentStateMap := currentState.(map[string]interface{})
newStateMap, isNewStateMap := newState.(map[string]interface{})

if !isCurrentStateMap || !isNewStateMap {
return newState
}

if len(propertyPath) == 0 {
return merge(currentState, newState)
}

propertyName := propertyPath[0]

if len(propertyName) == 0 {
return mergeSubpaths(currentState, newState, propertyPath[1:])
}

newStateMap[propertyName] = mergeSubpaths(currentStateMap[propertyName], newStateMap[propertyName], propertyPath[1:])

return newStateMap

}

func merge(currentState interface{}, newState interface{}) interface{} {
newState = mergeAsMaps(currentState, newState)
newState = mergeAsArrays(currentState, newState)

return newState
}

func mergeAsMaps(currentState interface{}, newState interface{}) interface{} {
currentStateMap, isCurrentStateMap := currentState.(map[string]interface{})
newStateMap, isNewStateMap := newState.(map[string]interface{})

if !isCurrentStateMap || !isNewStateMap {
return newState
}
for key, currentValue := range currentStateMap {
if newValue, exists := newStateMap[key]; !exists {
newStateMap[key] = nil
} else {
newStateMap[key] = merge(currentValue, newValue)
}
}

return newStateMap
}

func mergeAsArrays(currentState interface{}, newState interface{}) interface{} {
currentStateArray, isCurrentStateArray := currentState.([]interface{})
newStateArray, isNewStateArray := newState.([]interface{})

if !isCurrentStateArray || !isNewStateArray {
return newState
}

for i := 0; i < len(newStateArray) && i < len(currentStateArray); i++ {
newStateArray[i] = merge(currentStateArray[i], newStateArray[i])
}
return newStateArray
}

// getFeatureProperties return all feature properties, including the definition.
func (h *deviceHandler) getFeatureProperties(obj interface{}) (interface{}, bool) {
res := map[string]interface{}{}
Expand Down
Loading

0 comments on commit 175a638

Please sign in to comment.