From 1e93dc5f25540a137a63138c60f7581e4b574463 Mon Sep 17 00:00:00 2001 From: Aman Maharjan Date: Thu, 10 Apr 2025 22:50:26 +0545 Subject: [PATCH 1/6] Refactor retry mechanism to actual loRa writes than serial writes --- pkg/app.go | 4 ++-- pkg/enable.go | 2 +- pkg/router.go | 2 +- pkg/write_queue.go | 38 ++++++++++++++------------------------ 4 files changed, 18 insertions(+), 28 deletions(-) diff --git a/pkg/app.go b/pkg/app.go index 030fb0d..7ea4f4a 100644 --- a/pkg/app.go +++ b/pkg/app.go @@ -123,7 +123,7 @@ func (m *Module) deletePoint(_ *model.Point) (success bool, err error) { func (m *Module) writePoint(pointUUID string, body *dto.PointWriter) (*model.Point, error) { body.IgnorePresentValueUpdate = true - body.PollState = datatype.PointStateApiUpdatePending + body.PollState = datatype.PointStateApiWritePending pwResponse, err := m.grpcMarshaller.PointWrite(pointUUID, body) if err != nil { return nil, err @@ -136,7 +136,7 @@ func (m *Module) internalPointUpdate(point *model.Point) (*model.Point, error) { OriginalValue: point.WriteValue, Message: "", Fault: false, - PollState: datatype.PointStatePollOk, + PollState: datatype.PointStateWriteOk, } pwResponse, err := m.grpcMarshaller.PointWrite(point.UUID, pointWriter) if err != nil { diff --git a/pkg/enable.go b/pkg/enable.go index 6c6242a..161194a 100644 --- a/pkg/enable.go +++ b/pkg/enable.go @@ -34,7 +34,7 @@ func (m *Module) Enable() error { } else { for _, device := range net.Devices { for _, point := range device.Points { - if point.PointState == datatype.PointStateApiUpdatePending { + if point.PointState == datatype.PointStateApiWritePending { m.pointWriteQueue.LoadWriteQueue(point) } } diff --git a/pkg/router.go b/pkg/router.go index 60421af..9d237cb 100644 --- a/pkg/router.go +++ b/pkg/router.go @@ -156,7 +156,7 @@ func PointWrite(m *nmodule.Module, r *router.Request) ([]byte, error) { return nil, err } - pendingPointWrite := &PendingPointWrite{Point: point, PointWriteStatus: PointWritePending} + pendingPointWrite := &PendingPointWrite{Point: point} (*m).(*Module).pointWriteQueue.EnqueueWriteQueue(pendingPointWrite) return json.Marshal(point) diff --git a/pkg/write_queue.go b/pkg/write_queue.go index 917a03e..1e12266 100644 --- a/pkg/write_queue.go +++ b/pkg/write_queue.go @@ -8,19 +8,11 @@ import ( "time" ) -type PointWriteState string - -const ( - PointWritePending PointWriteState = "point-write-pending" - PointWriteSuccess PointWriteState = "point-write-success" -) - type PendingPointWrite struct { - MessageId uint8 - Message []byte - Point *model.Point - RetryCount int - PointWriteStatus PointWriteState + MessageId uint8 + Message []byte + Point *model.Point + RetryCount int } type PointWriteQueue struct { @@ -42,7 +34,7 @@ func NewPointWriteQueue(maxRetry int, timeout time.Duration) *PointWriteQueue { func (pwq *PointWriteQueue) LoadWriteQueue(point *model.Point) { pwq.mutex.Lock() defer pwq.mutex.Unlock() - pendingPointWrite := &PendingPointWrite{Point: point, PointWriteStatus: PointWritePending} + pendingPointWrite := &PendingPointWrite{Point: point} pwq.writeQueue = append(pwq.writeQueue, pendingPointWrite) } @@ -143,20 +135,18 @@ func (pwq *PointWriteQueue) ProcessPointWriteQueue( } if pendingPointWrite.RetryCount < pwq.maxRetry { - if pendingPointWrite.PointWriteStatus == PointWritePending { - err := writeToLoRaRaw(pendingPointWrite.Message) - if err != nil { - log.Infof("error writing to LoRa: %v\n", err) - pendingPointWrite.RetryCount++ - continue - } - pendingPointWrite.PointWriteStatus = PointWriteSuccess + err := writeToLoRaRaw(pendingPointWrite.Message) + if err != nil { + log.Errorf("error writing to LoRa serial port: %v\n", err) + time.Sleep(time.Second * 2) + continue } + pendingPointWrite.RetryCount++ + + // Wait for the set timeout before initiating another write + time.Sleep(pwq.timeout) } else { pwq.DequeueWriteQueue() } - - // Wait for the set timeout before initiating another write - time.Sleep(pwq.timeout) } } From f84b7c18dcaf8b669e6b7efa48a240567275d101 Mon Sep 17 00:00:00 2001 From: Tan Le Date: Mon, 14 Apr 2025 20:15:58 +0700 Subject: [PATCH 2/6] Implement reply the device message Signed-off-by: Tan Le --- endec/base.go | 6 ++-- endec/devices_and_points.go | 4 +++ endec/droplet.go | 8 ++++- endec/encoder.go | 33 ++++++++--------- endec/microedge.go | 4 ++- endec/rubix.go | 15 +++++++- endec/serialmap.go | 1 + endec/ziphydrotap.go | 4 ++- pkg/app.go | 71 ++++++++++++++++++++++++++++++++++++- pkg/enable.go | 11 +++++- pkg/module.go | 6 +++- pkg/serial.go | 18 +++++----- pkg/write_queue.go | 7 ++-- 13 files changed, 153 insertions(+), 35 deletions(-) diff --git a/endec/base.go b/endec/base.go index 02faec3..d25a2bc 100644 --- a/endec/base.go +++ b/endec/base.go @@ -1,8 +1,9 @@ package endec import ( - "github.com/NubeIO/nubeio-rubix-lib-models-go/model" "strconv" + + "github.com/NubeIO/nubeio-rubix-lib-models-go/model" ) const ( @@ -32,8 +33,9 @@ func DecodePayload( mtFn UpdateDeviceMetaTagsFunc, dequeueFn DequeuePointWriteFunc, internalPointUpdateFn InternalPointUpdate, + sendAckMessageFn SendAckToDeviceFunc, ) error { - err := devDesc.Decode(data, devDesc, device, fn, mtFn, dequeueFn, internalPointUpdateFn) + err := devDesc.Decode(data, devDesc, device, fn, mtFn, dequeueFn, internalPointUpdateFn, sendAckMessageFn) return err } diff --git a/endec/devices_and_points.go b/endec/devices_and_points.go index e4be60d..66f38c9 100644 --- a/endec/devices_and_points.go +++ b/endec/devices_and_points.go @@ -2,6 +2,7 @@ package endec import ( "errors" + "github.com/NubeIO/module-core-loraraw/schema" "github.com/NubeIO/nubeio-rubix-lib-models-go/model" @@ -12,6 +13,7 @@ type UpdateDevicePointFunc func(name string, value float64, device *model.Device type UpdateDeviceMetaTagsFunc func(uuid string, metaTags []*model.DeviceMetaTag) error type DequeuePointWriteFunc func(messageId uint8) *model.Point type InternalPointUpdate func(point *model.Point) (*model.Point, error) +type SendAckToDeviceFunc func(device *model.Device, messageId uint8) error type LoRaDeviceDescription struct { DeviceName string @@ -26,6 +28,7 @@ type LoRaDeviceDescription struct { updateDevMetaTagsFnc UpdateDeviceMetaTagsFunc, dequeuePointWriteFunc DequeuePointWriteFunc, internalPointWriteFunc InternalPointUpdate, + sendAck SendAckToDeviceFunc, ) error GetPointNames func() []string IsLoRaRAW bool @@ -52,6 +55,7 @@ func NilLoRaDeviceDescriptionDecode( _ UpdateDeviceMetaTagsFunc, _ DequeuePointWriteFunc, _ InternalPointUpdate, + _ SendAckToDeviceFunc, ) error { return errors.New("nil decode function called") } diff --git a/endec/droplet.go b/endec/droplet.go index f23e214..54d7f06 100644 --- a/endec/droplet.go +++ b/endec/droplet.go @@ -1,9 +1,10 @@ package endec import ( + "strconv" + "github.com/NubeIO/module-core-loraraw/utils" "github.com/NubeIO/nubeio-rubix-lib-models-go/model" - "strconv" ) const ( @@ -49,6 +50,7 @@ func DecodeDropletTH( _ UpdateDeviceMetaTagsFunc, _ DequeuePointWriteFunc, _ InternalPointUpdate, + _ SendAckToDeviceFunc, ) error { temperature, err := dropletTemp(data) if err != nil { @@ -83,6 +85,7 @@ func DecodeDropletTHL( updateDeviceMetaTagFn UpdateDeviceMetaTagsFunc, dequeuePointWriteFunc DequeuePointWriteFunc, internalPointUpdate InternalPointUpdate, + sendAck SendAckToDeviceFunc, ) error { err := DecodeDropletTH( data, @@ -92,6 +95,7 @@ func DecodeDropletTHL( updateDeviceMetaTagFn, dequeuePointWriteFunc, internalPointUpdate, + sendAck, ) if err != nil { return err @@ -112,6 +116,7 @@ func DecodeDropletTHLM( updateDeviceMetaTagsFn UpdateDeviceMetaTagsFunc, dequeuePointWriteFunc DequeuePointWriteFunc, internalPointUpdate InternalPointUpdate, + sendAck SendAckToDeviceFunc, ) error { err := DecodeDropletTHL( data, @@ -121,6 +126,7 @@ func DecodeDropletTHLM( updateDeviceMetaTagsFn, dequeuePointWriteFunc, internalPointUpdate, + sendAck, ) if err != nil { return err diff --git a/endec/encoder.go b/endec/encoder.go index 9e2e25e..371ac20 100644 --- a/endec/encoder.go +++ b/endec/encoder.go @@ -2,13 +2,14 @@ package endec import ( "errors" + "math" + "strconv" + "unsafe" + "github.com/NubeIO/lib-utils-go/nstring" "github.com/NubeIO/module-core-loraraw/aesutils" "github.com/NubeIO/module-core-loraraw/utils" "github.com/NubeIO/nubeio-rubix-lib-models-go/model" - "math" - "strconv" - "unsafe" log "github.com/sirupsen/logrus" ) @@ -111,7 +112,7 @@ func dataTypeToBits[T any](data T, metaData *MetaData, data64 *uint64, bitCount return true } -func encodeData[T any](serialData *SerialData, data T, header MetaDataKey, position uint8) bool { +func EncodeData[T any](serialData *SerialData, data T, header MetaDataKey, position uint8) bool { metaData := getMetaData(header) headerVector := make([]byte, 0) dataVector := make([]byte, 0) @@ -130,17 +131,17 @@ func encodeData[T any](serialData *SerialData, data T, header MetaDataKey, posit switch v := any(data).(type) { case float64: if !fixedPointToBits(v, &metaData, &dataBits, &bitCount) { - log.Errorf("encodeData: fixedPointToBits failed for float64") + log.Errorf("EncodeData: fixedPointToBits failed for float64") return false } case float32: if !fixedPointToBits(v, &metaData, &dataBits, &bitCount) { - log.Errorf("encodeData: fixedPointToBits failed for float32") + log.Errorf("EncodeData: fixedPointToBits failed for float32") return false } default: log.Errorf("%v", v) - log.Errorf("encodeData: Unsupported type for FIXEDPOINT: %T", data) + log.Errorf("EncodeData: Unsupported type for FIXEDPOINT: %T", data) return false } // Add header to buffer @@ -186,23 +187,23 @@ func EncodeAndEncrypt(point *model.Point, serialData *SerialData, key []byte) ([ } if MetaDataKey(pointDataType) == MDK_UINT_8 { - encodeData(serialData, uint8(*writeValue), MetaDataKey(pointDataType), addressID) + EncodeData(serialData, uint8(*writeValue), MetaDataKey(pointDataType), addressID) } else if MetaDataKey(pointDataType) == MDK_UINT_16 { - encodeData(serialData, uint16(*writeValue), MetaDataKey(pointDataType), addressID) + EncodeData(serialData, uint16(*writeValue), MetaDataKey(pointDataType), addressID) } else if MetaDataKey(pointDataType) == MDK_UINT_32 { - encodeData(serialData, uint32(*writeValue), MetaDataKey(pointDataType), addressID) + EncodeData(serialData, uint32(*writeValue), MetaDataKey(pointDataType), addressID) } else if MetaDataKey(pointDataType) == MDK_UINT_64 { - encodeData(serialData, uint64(*writeValue), MetaDataKey(pointDataType), addressID) + EncodeData(serialData, uint64(*writeValue), MetaDataKey(pointDataType), addressID) } else if MetaDataKey(pointDataType) == MDK_INT_8 { - encodeData(serialData, int8(*writeValue), MetaDataKey(pointDataType), addressID) + EncodeData(serialData, int8(*writeValue), MetaDataKey(pointDataType), addressID) } else if MetaDataKey(pointDataType) == MDK_INT_16 { - encodeData(serialData, int16(*writeValue), MetaDataKey(pointDataType), addressID) + EncodeData(serialData, int16(*writeValue), MetaDataKey(pointDataType), addressID) } else if MetaDataKey(pointDataType) == MDK_INT_32 { - encodeData(serialData, int32(*writeValue), MetaDataKey(pointDataType), addressID) + EncodeData(serialData, int32(*writeValue), MetaDataKey(pointDataType), addressID) } else if MetaDataKey(pointDataType) == MDK_INT_64 { - encodeData(serialData, int64(*writeValue), MetaDataKey(pointDataType), addressID) + EncodeData(serialData, int64(*writeValue), MetaDataKey(pointDataType), addressID) } else { - encodeData(serialData, *writeValue, MetaDataKey(pointDataType), addressID) + EncodeData(serialData, *writeValue, MetaDataKey(pointDataType), addressID) } encryptedData, err := aesutils.Encrypt( diff --git a/endec/microedge.go b/endec/microedge.go index 5ecf90c..040059d 100644 --- a/endec/microedge.go +++ b/endec/microedge.go @@ -1,11 +1,12 @@ package endec import ( + "strconv" + "github.com/NubeIO/module-core-loraraw/schema" "github.com/NubeIO/nubeio-rubix-lib-helpers-go/pkg/nube/thermistor" "github.com/NubeIO/nubeio-rubix-lib-models-go/datatype" "github.com/NubeIO/nubeio-rubix-lib-models-go/model" - "strconv" ) const ( @@ -41,6 +42,7 @@ func DecodeME( _ UpdateDeviceMetaTagsFunc, _ DequeuePointWriteFunc, _ InternalPointUpdate, + _ SendAckToDeviceFunc, ) error { p, err := pulse(data) if err != nil { diff --git a/endec/rubix.go b/endec/rubix.go index 1f96ad7..5e14115 100644 --- a/endec/rubix.go +++ b/endec/rubix.go @@ -236,6 +236,7 @@ func DecodeRubix( _ UpdateDeviceMetaTagsFunc, dequeuePointWriteFunc DequeuePointWriteFunc, internalPointUpdate InternalPointUpdate, + sendAckMessage SendAckToDeviceFunc, ) error { var ( temperature float32 @@ -281,7 +282,9 @@ func DecodeRubix( hasPos := HasPositionalData(serialData) var position uint8 = 0 - if HasRequestData(serialData) || HasResponseData(serialData) { + if HasRequestData(serialData) { + UpdateBitPositionsForHeaderByte(serialData) + } else if HasResponseData(serialData) { messageId := GetMessageId(serialData) point := dequeuePointWriteFunc(messageId) if point != nil { @@ -393,6 +396,16 @@ func DecodeRubix( log.Errorf("Unknown header: %d", header) } } + + if HasRequestData(serialData) { + respMsgId := GetMessageId(serialData) + if sendAckMessage != nil { + err := sendAckMessage(device, respMsgId); + if err != nil { + log.Errorf("Failed to send ACK: %v", err) + } + } + } return nil } diff --git a/endec/serialmap.go b/endec/serialmap.go index 2615ca4..722822d 100644 --- a/endec/serialmap.go +++ b/endec/serialmap.go @@ -87,6 +87,7 @@ var serialMap = map[int]MetaData{ MDK_CHAR: {DATAPOINT, 0, 0, 0, 1}, MDK_FLOAT: {DATAPOINT, 0, 0, 0, 4}, MDK_DOUBLE: {DATAPOINT, 0, 0, 0, 8}, + MDK_ERROR: {DATAPOINT, 0, 0, 0, 1}, } func getMetaData(header MetaDataKey) MetaData { diff --git a/endec/ziphydrotap.go b/endec/ziphydrotap.go index cc89f25..d2815d4 100644 --- a/endec/ziphydrotap.go +++ b/endec/ziphydrotap.go @@ -4,9 +4,10 @@ import ( "encoding/binary" "encoding/hex" "fmt" - "github.com/NubeIO/lib-utils-go/nstring" "strconv" + "github.com/NubeIO/lib-utils-go/nstring" + "github.com/NubeIO/module-core-loraraw/utils" "github.com/NubeIO/nubeio-rubix-lib-models-go/model" ) @@ -114,6 +115,7 @@ func DecodeZHT( updateDeviceMetaTagsFn UpdateDeviceMetaTagsFunc, _ DequeuePointWriteFunc, _ InternalPointUpdate, + _ SendAckToDeviceFunc, ) error { bytes, err := getPayloadBytes(data) if err != nil { diff --git a/pkg/app.go b/pkg/app.go index 7ea4f4a..c151f3f 100644 --- a/pkg/app.go +++ b/pkg/app.go @@ -4,16 +4,19 @@ import ( "encoding/hex" "errors" "fmt" - "github.com/NubeIO/nubeio-rubix-lib-models-go/dto" "reflect" "strings" "sync" + "time" + + "github.com/NubeIO/nubeio-rubix-lib-models-go/dto" "github.com/NubeIO/module-core-loraraw/aesutils" "github.com/NubeIO/lib-module-go/nmodule" "github.com/NubeIO/lib-utils-go/boolean" "github.com/NubeIO/lib-utils-go/integer" + "github.com/NubeIO/lib-utils-go/nstring" "github.com/NubeIO/module-core-loraraw/endec" "github.com/NubeIO/module-core-loraraw/utils" "github.com/NubeIO/nubeio-rubix-lib-models-go/datatype" @@ -146,6 +149,28 @@ func (m *Module) internalPointUpdate(point *model.Point) (*model.Point, error) { return &pwResponse.Point, nil } +func (m *Module) sendAckToDevice(device *model.Device, messageId uint8) error { + serialData := endec.NewSerialData() + endec.SetPositionalData(serialData, true) + endec.SetResponseData(serialData, true) + endec.SetMessageId(serialData, messageId) + endec.UpdateBitPositionsForHeaderByte(serialData) + encryptionKey, err := m.getEncryptionKey(device.UUID) + if err != nil { + return fmt.Errorf("failed to get encryption key for ACK: %v", err) + } + + endec.EncodeData(serialData, uint8(1), endec.MDK_ERROR, 1) + encryptedData, err := aesutils.Encrypt( + nstring.DerefString(device.AddressUUID), + serialData.Buffer, + encryptionKey, + 0, + ) + log.Infof("Sending ACK to device %s for message ID: %d", device.UUID, messageId) + return m.WriteToLoRaRaw(encryptedData) +} + func (m *Module) handleSerialPayload(data string) { if m.networkUUID == "" { return @@ -198,6 +223,7 @@ func (m *Module) handleSerialPayload(data string) { m.updateDeviceMetaTags, m.pointWriteQueue.DequeueUsingMessageId, m.internalPointUpdate, + m.sendAckToDevice, ) if err != nil { log.Errorf("decode error: %v\r\n", err) @@ -217,6 +243,8 @@ func decodeData( updateDeviceMetaTagFn endec.UpdateDeviceMetaTagsFunc, dequeuePointWriteFn endec.DequeuePointWriteFunc, internalPointUpdateFn endec.InternalPointUpdate, + sendAckMessageFn endec.SendAckToDeviceFunc, + ) error { devDesc := endec.GetDeviceDescription(device) if devDesc == &endec.NilLoRaDeviceDescription { @@ -252,6 +280,7 @@ func decodeData( updateDeviceMetaTagFn, dequeuePointWriteFn, internalPointUpdateFn, + sendAckMessageFn, ) return err } @@ -418,3 +447,43 @@ func (m *Module) getEncryptionKey(deviceUUID string) ([]byte, error) { return key, nil } + +func (m *Module) initWriteQueue() { + m.writeQueueInit.Do(func() { + m.writeQueue = make(chan []byte, 100) + m.writeQueueDone = make(chan struct{}) + + go m.processWriteQueue() + }) +} + +func (m *Module) processWriteQueue() { + defer func() { + if r := recover(); r != nil { + log.Errorf("Recovered panic in processWriteQueue: %v", r) + // Khởi động lại goroutine + go m.processWriteQueue() + } + }() + + for { + select { + case data := <-m.writeQueue: + if Port == nil { + log.Error("Serial port not connected") + continue + } + + _, err := Port.Write(data) + if err != nil { + log.Errorf("Error writing to serial port: %v", err) + } + + // Đợi một khoảng thời gian sau khi gửi để module LoRa xử lý + time.Sleep(50 * time.Millisecond) + + case <-m.writeQueueDone: + return + } + } +} diff --git a/pkg/enable.go b/pkg/enable.go index 161194a..cfd9cdc 100644 --- a/pkg/enable.go +++ b/pkg/enable.go @@ -1,12 +1,13 @@ package pkg import ( + "time" + "github.com/NubeIO/lib-module-go/nmodule" "github.com/NubeIO/nubeio-rubix-lib-models-go/datatype" "github.com/NubeIO/nubeio-rubix-lib-models-go/dto" "github.com/NubeIO/nubeio-rubix-lib-models-go/nargs" log "github.com/sirupsen/logrus" - "time" ) func (m *Module) Enable() error { @@ -20,6 +21,7 @@ func (m *Module) Enable() error { _ = m.updatePluginMessage(dto.MessageLevel.Fail, err.Error()) } + m.initWriteQueue() m.pointWriteQueue = NewPointWriteQueue(m.config.WriteQueueMaxRetries, m.config.WriteQueueTimeout) if len(networks) == 0 { @@ -57,6 +59,13 @@ func (m *Module) Disable() error { defer m.mutex.Unlock() log.Info("plugin is disabling...") m.interruptChan <- struct{}{} + if m.writeQueue != nil { + m.writeQueueDone <- struct{}{} + close(m.writeQueueDone) + close(m.writeQueue) + m.writeQueue = nil + } + time.Sleep(m.config.ReIterationTime + 1*time.Second) // we need to do this because, before disable it could possibly be restarted log.Info("plugin is disabled") return nil diff --git a/pkg/module.go b/pkg/module.go index 17c37b5..c2902a4 100644 --- a/pkg/module.go +++ b/pkg/module.go @@ -1,8 +1,9 @@ package pkg import ( - "github.com/NubeIO/lib-module-go/nmodule" "sync" + + "github.com/NubeIO/lib-module-go/nmodule" ) type Module struct { @@ -14,6 +15,9 @@ type Module struct { interruptChan chan struct{} mutex *sync.RWMutex pointWriteQueue *PointWriteQueue + writeQueue chan []byte + writeQueueDone chan struct{} + writeQueueInit sync.Once } func (m *Module) Init(dbHelper nmodule.DBHelper, moduleName string) error { diff --git a/pkg/serial.go b/pkg/serial.go index 71ad4b0..cef619c 100644 --- a/pkg/serial.go +++ b/pkg/serial.go @@ -4,6 +4,8 @@ import ( "bufio" "errors" "fmt" + "time" + "github.com/NubeIO/nubeio-rubix-lib-models-go/model" log "github.com/sirupsen/logrus" "go.bug.st/serial" @@ -64,14 +66,14 @@ func (m *Module) SerialClose() error { } func (m *Module) WriteToLoRaRaw(data []byte) error { - if Port == nil { - return errors.New("serial connection error: port not set") - } - _, err := Port.Write(data) - if err != nil { - return err - } - return nil + m.initWriteQueue() // Đảm bảo hàng đợi được khởi tạo + + select { + case m.writeQueue <- data: + return nil + case <-time.After(1 * time.Second): + return errors.New("write queue full, timeout after 1 second") + } } func (s *SerialSetting) Loop(plChan chan<- string, errChan chan<- error) { diff --git a/pkg/write_queue.go b/pkg/write_queue.go index 1e12266..cffcde4 100644 --- a/pkg/write_queue.go +++ b/pkg/write_queue.go @@ -1,16 +1,18 @@ package pkg import ( + "sync" + "time" + "github.com/NubeIO/module-core-loraraw/endec" "github.com/NubeIO/nubeio-rubix-lib-models-go/model" log "github.com/sirupsen/logrus" - "sync" - "time" ) type PendingPointWrite struct { MessageId uint8 Message []byte + MessageType bool Point *model.Point RetryCount int } @@ -135,6 +137,7 @@ func (pwq *PointWriteQueue) ProcessPointWriteQueue( } if pendingPointWrite.RetryCount < pwq.maxRetry { + log.Errorf("HAHAHAHAHAHAHAHAHAHAHAHAHA \n") err := writeToLoRaRaw(pendingPointWrite.Message) if err != nil { log.Errorf("error writing to LoRa serial port: %v\n", err) From 3ddf5a0b3b56a77ca416f088ce460e635d6ceb16 Mon Sep 17 00:00:00 2001 From: Aman Maharjan Date: Tue, 15 Apr 2025 23:15:43 +0545 Subject: [PATCH 3/6] Apply formatting for consistency --- endec/base.go | 2 +- endec/rubix.go | 2 +- endec/ziphydrotap.go | 1 - pkg/app.go | 103 +++++++++++++++++++++---------------------- pkg/enable.go | 12 ++--- pkg/module.go | 6 +-- pkg/serial.go | 16 +++---- 7 files changed, 69 insertions(+), 73 deletions(-) diff --git a/endec/base.go b/endec/base.go index d25a2bc..25e1c7a 100644 --- a/endec/base.go +++ b/endec/base.go @@ -33,7 +33,7 @@ func DecodePayload( mtFn UpdateDeviceMetaTagsFunc, dequeueFn DequeuePointWriteFunc, internalPointUpdateFn InternalPointUpdate, - sendAckMessageFn SendAckToDeviceFunc, + sendAckMessageFn SendAckToDeviceFunc, ) error { err := devDesc.Decode(data, devDesc, device, fn, mtFn, dequeueFn, internalPointUpdateFn, sendAckMessageFn) return err diff --git a/endec/rubix.go b/endec/rubix.go index 5e14115..dad86ba 100644 --- a/endec/rubix.go +++ b/endec/rubix.go @@ -400,7 +400,7 @@ func DecodeRubix( if HasRequestData(serialData) { respMsgId := GetMessageId(serialData) if sendAckMessage != nil { - err := sendAckMessage(device, respMsgId); + err := sendAckMessage(device, respMsgId) if err != nil { log.Errorf("Failed to send ACK: %v", err) } diff --git a/endec/ziphydrotap.go b/endec/ziphydrotap.go index d2815d4..17e41cc 100644 --- a/endec/ziphydrotap.go +++ b/endec/ziphydrotap.go @@ -7,7 +7,6 @@ import ( "strconv" "github.com/NubeIO/lib-utils-go/nstring" - "github.com/NubeIO/module-core-loraraw/utils" "github.com/NubeIO/nubeio-rubix-lib-models-go/model" ) diff --git a/pkg/app.go b/pkg/app.go index c151f3f..80599ac 100644 --- a/pkg/app.go +++ b/pkg/app.go @@ -9,17 +9,15 @@ import ( "sync" "time" - "github.com/NubeIO/nubeio-rubix-lib-models-go/dto" - - "github.com/NubeIO/module-core-loraraw/aesutils" - "github.com/NubeIO/lib-module-go/nmodule" "github.com/NubeIO/lib-utils-go/boolean" "github.com/NubeIO/lib-utils-go/integer" "github.com/NubeIO/lib-utils-go/nstring" + "github.com/NubeIO/module-core-loraraw/aesutils" "github.com/NubeIO/module-core-loraraw/endec" "github.com/NubeIO/module-core-loraraw/utils" "github.com/NubeIO/nubeio-rubix-lib-models-go/datatype" + "github.com/NubeIO/nubeio-rubix-lib-models-go/dto" "github.com/NubeIO/nubeio-rubix-lib-models-go/model" "github.com/NubeIO/nubeio-rubix-lib-models-go/nargs" log "github.com/sirupsen/logrus" @@ -150,25 +148,25 @@ func (m *Module) internalPointUpdate(point *model.Point) (*model.Point, error) { } func (m *Module) sendAckToDevice(device *model.Device, messageId uint8) error { - serialData := endec.NewSerialData() - endec.SetPositionalData(serialData, true) - endec.SetResponseData(serialData, true) - endec.SetMessageId(serialData, messageId) - endec.UpdateBitPositionsForHeaderByte(serialData) - encryptionKey, err := m.getEncryptionKey(device.UUID) - if err != nil { - return fmt.Errorf("failed to get encryption key for ACK: %v", err) - } - + serialData := endec.NewSerialData() + endec.SetPositionalData(serialData, true) + endec.SetResponseData(serialData, true) + endec.SetMessageId(serialData, messageId) + endec.UpdateBitPositionsForHeaderByte(serialData) + encryptionKey, err := m.getEncryptionKey(device.UUID) + if err != nil { + return fmt.Errorf("failed to get encryption key for ACK: %v", err) + } + endec.EncodeData(serialData, uint8(1), endec.MDK_ERROR, 1) - encryptedData, err := aesutils.Encrypt( + encryptedData, err := aesutils.Encrypt( nstring.DerefString(device.AddressUUID), serialData.Buffer, encryptionKey, 0, ) - log.Infof("Sending ACK to device %s for message ID: %d", device.UUID, messageId) - return m.WriteToLoRaRaw(encryptedData) + log.Infof("Sending ACK to device %s for message ID: %d", device.UUID, messageId) + return m.WriteToLoRaRaw(encryptedData) } func (m *Module) handleSerialPayload(data string) { @@ -243,8 +241,7 @@ func decodeData( updateDeviceMetaTagFn endec.UpdateDeviceMetaTagsFunc, dequeuePointWriteFn endec.DequeuePointWriteFunc, internalPointUpdateFn endec.InternalPointUpdate, - sendAckMessageFn endec.SendAckToDeviceFunc, - + sendAckMessageFn endec.SendAckToDeviceFunc, ) error { devDesc := endec.GetDeviceDescription(device) if devDesc == &endec.NilLoRaDeviceDescription { @@ -449,41 +446,41 @@ func (m *Module) getEncryptionKey(deviceUUID string) ([]byte, error) { } func (m *Module) initWriteQueue() { - m.writeQueueInit.Do(func() { - m.writeQueue = make(chan []byte, 100) - m.writeQueueDone = make(chan struct{}) - - go m.processWriteQueue() - }) + m.writeQueueInit.Do(func() { + m.writeQueue = make(chan []byte, 100) + m.writeQueueDone = make(chan struct{}) + + go m.processWriteQueue() + }) } func (m *Module) processWriteQueue() { - defer func() { - if r := recover(); r != nil { - log.Errorf("Recovered panic in processWriteQueue: %v", r) - // Khởi động lại goroutine - go m.processWriteQueue() - } - }() - - for { - select { - case data := <-m.writeQueue: - if Port == nil { - log.Error("Serial port not connected") - continue - } - - _, err := Port.Write(data) - if err != nil { - log.Errorf("Error writing to serial port: %v", err) - } - - // Đợi một khoảng thời gian sau khi gửi để module LoRa xử lý - time.Sleep(50 * time.Millisecond) - - case <-m.writeQueueDone: - return - } - } + defer func() { + if r := recover(); r != nil { + log.Errorf("Recovered panic in processWriteQueue: %v", r) + // Khởi động lại goroutine + go m.processWriteQueue() + } + }() + + for { + select { + case data := <-m.writeQueue: + if Port == nil { + log.Error("Serial port not connected") + continue + } + + _, err := Port.Write(data) + if err != nil { + log.Errorf("Error writing to serial port: %v", err) + } + + // Đợi một khoảng thời gian sau khi gửi để module LoRa xử lý + time.Sleep(50 * time.Millisecond) + + case <-m.writeQueueDone: + return + } + } } diff --git a/pkg/enable.go b/pkg/enable.go index cfd9cdc..841a284 100644 --- a/pkg/enable.go +++ b/pkg/enable.go @@ -60,12 +60,12 @@ func (m *Module) Disable() error { log.Info("plugin is disabling...") m.interruptChan <- struct{}{} if m.writeQueue != nil { - m.writeQueueDone <- struct{}{} - close(m.writeQueueDone) - close(m.writeQueue) - m.writeQueue = nil - } - + m.writeQueueDone <- struct{}{} + close(m.writeQueueDone) + close(m.writeQueue) + m.writeQueue = nil + } + time.Sleep(m.config.ReIterationTime + 1*time.Second) // we need to do this because, before disable it could possibly be restarted log.Info("plugin is disabled") return nil diff --git a/pkg/module.go b/pkg/module.go index c2902a4..6873974 100644 --- a/pkg/module.go +++ b/pkg/module.go @@ -15,9 +15,9 @@ type Module struct { interruptChan chan struct{} mutex *sync.RWMutex pointWriteQueue *PointWriteQueue - writeQueue chan []byte - writeQueueDone chan struct{} - writeQueueInit sync.Once + writeQueue chan []byte + writeQueueDone chan struct{} + writeQueueInit sync.Once } func (m *Module) Init(dbHelper nmodule.DBHelper, moduleName string) error { diff --git a/pkg/serial.go b/pkg/serial.go index cef619c..88b02a6 100644 --- a/pkg/serial.go +++ b/pkg/serial.go @@ -66,14 +66,14 @@ func (m *Module) SerialClose() error { } func (m *Module) WriteToLoRaRaw(data []byte) error { - m.initWriteQueue() // Đảm bảo hàng đợi được khởi tạo - - select { - case m.writeQueue <- data: - return nil - case <-time.After(1 * time.Second): - return errors.New("write queue full, timeout after 1 second") - } + m.initWriteQueue() // Đảm bảo hàng đợi được khởi tạo + + select { + case m.writeQueue <- data: + return nil + case <-time.After(1 * time.Second): + return errors.New("write queue full, timeout after 1 second") + } } func (s *SerialSetting) Loop(plChan chan<- string, errChan chan<- error) { From 6201f9ed45233dd3060a7d9d3d430f0e3d4b9fae Mon Sep 17 00:00:00 2001 From: Aman Maharjan Date: Mon, 5 May 2025 20:21:43 +0545 Subject: [PATCH 4/6] Send ack to loRa serial port which later is sent to the device --- aesutils/aesutils.go | 23 ++++++++++++- pkg/app.go | 79 ++++++++++++++++++++++++++++++++++++++++++-- pkg/serial.go | 2 +- pkg/write_queue.go | 9 +++-- 4 files changed, 103 insertions(+), 10 deletions(-) diff --git a/aesutils/aesutils.go b/aesutils/aesutils.go index f365103..146b8e2 100644 --- a/aesutils/aesutils.go +++ b/aesutils/aesutils.go @@ -110,7 +110,7 @@ func prepareCMAC(data, key []byte) ([]byte, error) { return nil, err } - // Create CMAC object + // Create a CMAC object cmacObj, err := cmac.New(block) if err != nil { return nil, err @@ -124,3 +124,24 @@ func prepareCMAC(data, key []byte) ([]byte, error) { mac := cmacObj.Sum(nil) return mac[:4], nil } + +func CmacUnencrypted(data []byte, key []byte) ([]byte, error) { + // Create AES cipher + block, err := aes.NewCipher(key) + if err != nil { + return nil, err + } + + // Create a CMAC object + mac, err := cmac.New(block) + if err != nil { + return nil, err + } + + // Update the CMAC object with the data + mac.Write(data) + + // Compute the MAC and return the first 4 bytes + fullMAC := mac.Sum(nil) + return fullMAC[:4], nil +} diff --git a/pkg/app.go b/pkg/app.go index 80599ac..bc9afcb 100644 --- a/pkg/app.go +++ b/pkg/app.go @@ -1,6 +1,7 @@ package pkg import ( + "bytes" "encoding/hex" "errors" "fmt" @@ -23,6 +24,16 @@ import ( log "github.com/sirupsen/logrus" ) +const ( + LORA_RAW_VERSION = 1 + LORA_RAW_OPTS_CONF = 2 + LORA_RAW_OPTS_ACK = 3 + LORA_RAW_VERSION_POSITION = 0 + LORA_RAW_OPTS_POSITION = 1 + LORA_RAW_NONCE_POSITION = 2 + LORA_RAW_HEADER_LEN = 3 +) + func (m *Module) addNetwork(body *model.Network) (network *model.Network, err error) { nets, err := m.grpcMarshaller.GetNetworksByPluginName(body.PluginName) if err != nil { @@ -198,6 +209,7 @@ func (m *Module) handleSerialPayload(data string) { log.Infof("message from non-added sensor. ID: %s, RSSI: %d", id, rssi) return } + // Decode RSSI and SNR before decryption; otherwise, they will be lost. rssi := endec.DecodeRSSI(data) snr := endec.DecodeSNR(data) @@ -205,13 +217,18 @@ func (m *Module) handleSerialPayload(data string) { if !legacyDevice && !m.config.DecryptionDisabled { hexKey := m.config.DefaultKey if device.Manufacture != "" { - hexKey = device.Manufacture // Manufacture property from device model holds hex key + hexKey = device.Manufacture // Manufacture property from the device model holds hex key } data, err = decryptNormal(data, hexKey) if err != nil { log.Errorf("error decrypting data: %s", err) return } + + err := m.sendAcknowledgement(data, hexKey) + if err != nil { + log.Errorf("error sending acknowledgement: %s", err) + } } err = decodeData( @@ -234,6 +251,62 @@ func (m *Module) handleSerialPayload(data string) { m.updateDeviceFault(device.Model, device.UUID) } +func (m *Module) sendAcknowledgement(data, hexKey string) error { + byteKey, err := hex.DecodeString(hexKey) + if err != nil { + return err + } + byteData, err := hex.DecodeString(data) + if err != nil { + return err + } + ack, err := prepareAcknowledgement(byteData, byteKey) + if err != nil { + return err + } + return m.WriteToLoRaRaw(ack) +} + +func prepareAcknowledgement(data, key []byte) ([]byte, error) { + if len(data) < LORA_RAW_HEADER_LEN { + return nil, errors.New("invalid data length") + } + if data[LORA_RAW_VERSION_POSITION] != LORA_RAW_VERSION { + return nil, errors.New("invalid version") + } + opts := getOpts(data) + if opts == LORA_RAW_OPTS_CONF { + ack := createAck(data[:LORA_RAW_HEADER_LEN], key, getNonce(data)) + return ack, nil + } + return nil, errors.New("invalid opts") +} + +func getOpts(data []byte) int { + return int(data[LORA_RAW_OPTS_POSITION]) +} + +func getNonce(data []byte) int { + return int(data[LORA_RAW_NONCE_POSITION]) +} + +func createAck(address, key []byte, nonce int) []byte { + optB := []byte{byte(LORA_RAW_OPTS_ACK)} + nonceB := []byte{byte(nonce)} + var buffer bytes.Buffer + buffer.Write(address) + buffer.Write(optB) + buffer.Write(nonceB) + fullData := buffer.Bytes() + unCmac, err := aesutils.CmacUnencrypted(fullData, key) + if err != nil { + log.Errorf("error creating ack: %s", err) + return nil + } + fullData = append(fullData, unCmac...) + return fullData +} + func decodeData( data string, device *model.Device, @@ -458,7 +531,7 @@ func (m *Module) processWriteQueue() { defer func() { if r := recover(); r != nil { log.Errorf("Recovered panic in processWriteQueue: %v", r) - // Khởi động lại goroutine + // Restart goroutine go m.processWriteQueue() } }() @@ -476,7 +549,7 @@ func (m *Module) processWriteQueue() { log.Errorf("Error writing to serial port: %v", err) } - // Đợi một khoảng thời gian sau khi gửi để module LoRa xử lý + // Wait a while after sending for the LoRa module to process time.Sleep(50 * time.Millisecond) case <-m.writeQueueDone: diff --git a/pkg/serial.go b/pkg/serial.go index 88b02a6..5385ced 100644 --- a/pkg/serial.go +++ b/pkg/serial.go @@ -66,7 +66,7 @@ func (m *Module) SerialClose() error { } func (m *Module) WriteToLoRaRaw(data []byte) error { - m.initWriteQueue() // Đảm bảo hàng đợi được khởi tạo + m.initWriteQueue() // Make sure the queue is initialized select { case m.writeQueue <- data: diff --git a/pkg/write_queue.go b/pkg/write_queue.go index cffcde4..82367a9 100644 --- a/pkg/write_queue.go +++ b/pkg/write_queue.go @@ -10,11 +10,11 @@ import ( ) type PendingPointWrite struct { - MessageId uint8 - Message []byte + MessageId uint8 + Message []byte MessageType bool - Point *model.Point - RetryCount int + Point *model.Point + RetryCount int } type PointWriteQueue struct { @@ -137,7 +137,6 @@ func (pwq *PointWriteQueue) ProcessPointWriteQueue( } if pendingPointWrite.RetryCount < pwq.maxRetry { - log.Errorf("HAHAHAHAHAHAHAHAHAHAHAHAHA \n") err := writeToLoRaRaw(pendingPointWrite.Message) if err != nil { log.Errorf("error writing to LoRa serial port: %v\n", err) From 4dc4e5ce09798dec9d20341a2b5e092a82eb1939 Mon Sep 17 00:00:00 2001 From: Aman Maharjan Date: Tue, 6 May 2025 20:59:19 +0545 Subject: [PATCH 5/6] Build binary file inside docker --- .github/workflows/build-release.yml | 41 +++++++++-------------------- Dockerfile | 41 +++++++++++++++++++++++++++++ 2 files changed, 54 insertions(+), 28 deletions(-) create mode 100644 Dockerfile diff --git a/.github/workflows/build-release.yml b/.github/workflows/build-release.yml index 2c9a1cc..e42534a 100644 --- a/.github/workflows/build-release.yml +++ b/.github/workflows/build-release.yml @@ -18,12 +18,12 @@ on: env: APP_NAME: module-core-loraraw GHCR_IMAGE: ghcr.io/nubeio/module-core-loraraw - PLATFORMS: linux/amd64,linux/arm/v7 + PLATFORMS: linux/amd64 jobs: context: - runs-on: ubuntu-20.04 + runs-on: ubuntu-22.04 outputs: shouldBuild: ${{ steps.context.outputs.decision_build }} @@ -36,7 +36,7 @@ jobs: fqn: ${{ env.APP_NAME }}-${{ steps.context.outputs.version }}-${{ steps.context.outputs.shortCommitId }} steps: - - uses: actions/checkout@v2 + - uses: actions/checkout@v4 with: token: ${{ secrets.NUBEIO_CI_GITHUBPROJECT_TOKEN }} @@ -58,7 +58,7 @@ jobs: defaultBranch: master build: - runs-on: ubuntu-20.04 + runs-on: ubuntu-22.04 needs: context if: needs.context.outputs.shouldBuild == 'true' env: @@ -72,37 +72,22 @@ jobs: -v /home/runner:/var/lib/registry --name registry steps: - - uses: actions/checkout@v2 - - uses: actions/setup-go@v2 - with: - go-version: '^1.16.6' + - uses: actions/checkout@v4 - name: Set current date as env variable id: date run: echo "::set-output name=date::$(date +'%Y-%m-%dT%H:%M:%S')" - - name: Edit main.go for environment - env: - VERSION: ${{ needs.context.outputs.version }} - COMMIT: ${{ needs.context.outputs.shortCommitId }} - BUILD_DATE: ${{ steps.date.outputs.date }} - run: | - sed -i -e 's,,'"${VERSION}"',' main.go - sed -i -e 's//'"${COMMIT}"'/' main.go - sed -i -e 's//'"${BUILD_DATE}"'/' main.go - - - name: Build amd64 + - name: Build inside Docker run: | - git config --global url."https://$GITHUB_TOKEN:x-oauth-basic@github.com/NubeIO".insteadOf "https://github.com/NubeIO" - go mod tidy - go build -o module-core-loraraw-amd64 + docker build \ + -f Dockerfile \ + -t build-image . - - name: Build armv7 - if: ${{ needs.context.outputs.isRelease == 'true' }} - run: | - sudo apt-get update -y - sudo apt-get install -y gcc-arm-linux-gnueabihf g++-arm-linux-gnueabihf - env GOOS=linux GOARCH=arm GOARM=7 CGO_ENABLED=1 CC=arm-linux-gnueabihf-gcc CXX=arm-linux-gnueabihf-g++ go build -o module-core-loraraw-armv7 + container_id=$(docker create build-image) + docker cp $container_id:/app/module-core-loraraw-amd64 ./module-core-loraraw-amd64 + docker cp $container_id:/app/module-core-loraraw-armv7 ./module-core-loraraw-armv7 + docker rm $container_id - name: Zip artifacts if: ${{ needs.context.outputs.isRelease == 'true' }} diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..25af3b7 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,41 @@ +# Use Ubuntu 20.04 as the base image +FROM ubuntu:20.04 AS builder + +# Install Go and necessary dependencies for cross-compilation +RUN apt-get update +RUN apt-get install -y \ + gcc \ + gcc-x86-64-linux-gnu \ + gcc-arm-linux-gnueabihf \ + g++-arm-linux-gnueabihf \ + wget + +# Set the working directory +WORKDIR /app + +# Copy the Go project files +COPY . . + +# Define build arguments +ARG VERSION +ARG COMMIT +ARG BUILD_DATE + +RUN wget https://go.dev/dl/go1.21.13.linux-amd64.tar.gz +RUN tar -C /usr/local -xzf go1.21.13.linux-amd64.tar.gz && \ + rm go1.21.13.linux-amd64.tar.gz + +# Set Go environment variables +ENV PATH=$PATH:/usr/local/go/bin +ENV GOPATH=/go + +# Run Go mod tidy and build for the default platform (amd64) +RUN go mod tidy +RUN env GOOS=linux GOARCH=amd64 CGO_ENABLED=1 \ + CC=x86_64-linux-gnu-gcc CXX=x86_64-linux-gnu-g++ \ + go build -o module-core-loraraw-amd64 + +# Build for ARMv7 architecture +RUN env GOOS=linux GOARCH=arm GOARM=7 CGO_ENABLED=1 \ + CC=arm-linux-gnueabihf-gcc CXX=arm-linux-gnueabihf-g++ \ + go build -o module-core-loraraw-armv7 \ No newline at end of file From 03eceb67b473927300196bac0b3c8defc194bf59 Mon Sep 17 00:00:00 2001 From: Aman Maharjan Date: Wed, 7 May 2025 11:05:31 +0545 Subject: [PATCH 6/6] Fix constant value --- pkg/app.go | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/pkg/app.go b/pkg/app.go index bc9afcb..c1c83c4 100644 --- a/pkg/app.go +++ b/pkg/app.go @@ -25,13 +25,13 @@ import ( ) const ( - LORA_RAW_VERSION = 1 - LORA_RAW_OPTS_CONF = 2 - LORA_RAW_OPTS_ACK = 3 - LORA_RAW_VERSION_POSITION = 0 - LORA_RAW_OPTS_POSITION = 1 - LORA_RAW_NONCE_POSITION = 2 - LORA_RAW_HEADER_LEN = 3 + LORA_RAW_VERSION = 0xC0 + LORA_RAW_OPTS_CONF = 1 + LORA_RAW_OPTS_ACK = 2 + LORA_RAW_VERSION_POSITION = 1 + LORA_RAW_OPTS_POSITION = 4 + LORA_RAW_NONCE_POSITION = 5 + LORA_RAW_HEADER_LEN = 4 ) func (m *Module) addNetwork(body *model.Network) (network *model.Network, err error) {