Skip to content

Commit

Permalink
Feature: PDU SubmitWindow with; MaxWindowSize option, ExpectedRespons…
Browse files Browse the repository at this point in the history
…e handler, ExpiredPdus handler and NoRespPdu OnClose handler (#134)

What: 
- Add a submit window via a
[concurrent-map](https://github.com/orcaman/concurrent-map) that tracks
Requests (SubmitSM, EnquireLinks, ReplaceSM, etc..)
- Add functionality to return a expected response with the original sent
PDU
- Add functionality to track PDUs with no response and a timer setting
for when they expire
- Add a max window size setting, to limit the number of outbound request
- Add function call to get current bind window size on Tx and Trx
- Add function call to get a PDU stuck in the submit store when the bind
closes.
- Add an example on how to use new settings
- Add an example on how to implemented a Custom PDU to add any fields to
be tracked

Why: As requested in #126, #105 and #73, the user sometimes needs to
track all requests sent to SMSC. Either to relate a response to a
request, or to track a request that have received no response or even to
limit the number of outgoing request without any response from the SMSC.

How: 
- The main feature, the submit window, works by using a
[concurrent-map](https://github.com/orcaman/concurrent-map) as a
key/value store. The key is the PDU sequence number and the value is a
new Request struct created for this feature. Concurrent-map is thread
safe and has all the functionality needed for this use case. The map
gets reset on every rebind and all PDUs stored in the map are can be
retruned to the user via a func call when the session is closed.
- When the user Submits a PDU, it is stored in the new Request struct
with the request is created
- When the library receives a PDU from the SMSC, it will verify if the
PDU is a response type (SubmitSMResp, ReplaceSMResp, etc) and queries
the key/value store with the sequence number. If the store contains a
PDU request, the response is returned to the user with the PDU and the
original request via OnExpectedPduResponse setting. The Request is
removed from the store after the lookup.
- Receivable has been modified to add a new loopWithVerifyExpiredPdu,
that verifies all PDU in the store and compare the time they were
stored. If the time is great than the value entered by the setting
PduExpireTimeOut, the PDU is removed from the store and return to the
user via OnExpiredPduRequest.

The submit window will only contain PDU that return true on CanResponse,
except Unbind and BindRequest:
- CancelSM
- DataSM
- DeliverSM
- EnquireLink
- QuerySM
- ReplaceSM
- SubmitMulti
- SubmitSM

This PR does not break current user experience, all old test pass and if
user does not add the new settings, all will work as it previously did.
  • Loading branch information
laduchesneau authored Jun 15, 2024
1 parent 1c3b3e5 commit 3cf20dc
Show file tree
Hide file tree
Showing 20 changed files with 1,792 additions and 65 deletions.
5 changes: 5 additions & 0 deletions connect.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ func newBindRequest(s Auth, bindingType pdu.BindingType, addressRange pdu.Addres
// Connector is connection factory interface.
type Connector interface {
Connect() (conn *Connection, err error)
GetBindType() pdu.BindingType
}

type connector struct {
Expand All @@ -56,6 +57,10 @@ type connector struct {
addressRange pdu.AddressRange
}

func (c *connector) GetBindType() pdu.BindingType {
return c.bindingType
}

func (c *connector) Connect() (conn *Connection, err error) {
conn, err = connect(c.dialer, c.auth.SMSC, newBindRequest(c.auth, c.bindingType, c.addressRange))
return
Expand Down
19 changes: 19 additions & 0 deletions connect_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,3 +81,22 @@ func TestBindingSMSC_Error(t *testing.T) {
checker(t, TRXConnector(NonTLSDialer, auth))
})
}

func TestBindingType(t *testing.T) {
auth := Auth{SMSC: smscAddr, SystemID: "invalid"}

t.Run("TX", func(t *testing.T) {
c := TXConnector(NonTLSDialer, auth)
require.Equal(t, c.GetBindType(), pdu.Transmitter)
})

t.Run("RX", func(t *testing.T) {
c := RXConnector(NonTLSDialer, auth)
require.Equal(t, c.GetBindType(), pdu.Receiver)
})

t.Run("TRX", func(t *testing.T) {
c := TRXConnector(NonTLSDialer, auth)
require.Equal(t, c.GetBindType(), pdu.Transceiver)
})
}
165 changes: 165 additions & 0 deletions example/transeiver_with_custom_store/CustomStore.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
package main

import (
"bytes"
"context"
"encoding/gob"
"errors"
"fmt"
"github.com/allegro/bigcache/v3"
"github.com/linxGnu/gosmpp/pdu"
"strconv"
"time"

"github.com/linxGnu/gosmpp"
)

// This is a just an example how to implement a custom store.
//
// Your implementation must be concurrency safe
//
// In this example we use bigcache https://github.com/allegro/bigcache
// Warning:
// - This is just an example and should be tested before using in production
// - We are serializing with gob, some field cannot be serialized for simplicity
// - We recommend you implement your own serialization/deserialization if you choose to use bigcache

type CustomStore struct {
store *bigcache.BigCache
}

func NewCustomStore() CustomStore {
cache, _ := bigcache.New(context.Background(), bigcache.DefaultConfig(30*time.Second))
return CustomStore{
store: cache,
}
}

func (s CustomStore) Set(ctx context.Context, request gosmpp.Request) error {
select {
case <-ctx.Done():
fmt.Println("Task cancelled")
return ctx.Err()
default:
b, _ := serialize(request)
err := s.store.Set(strconv.Itoa(int(request.PDU.GetSequenceNumber())), b)
if err != nil {
return err
}
return nil
}
}

func (s CustomStore) Get(ctx context.Context, sequenceNumber int32) (gosmpp.Request, bool) {
select {
case <-ctx.Done():
fmt.Println("Task cancelled")
return gosmpp.Request{}, false
default:
bRequest, err := s.store.Get(strconv.Itoa(int(sequenceNumber)))
if err != nil {
return gosmpp.Request{}, false
}
request, err := deserialize(bRequest)
if err != nil {
return gosmpp.Request{}, false
}
return request, true
}
}

func (s CustomStore) List(ctx context.Context) []gosmpp.Request {
var requests []gosmpp.Request
select {
case <-ctx.Done():
return requests
default:
it := s.store.Iterator()
for it.SetNext() {
value, err := it.Value()
if err != nil {
return requests
}
request, _ := deserialize(value.Value())
requests = append(requests, request)
}
return requests
}
}

func (s CustomStore) Delete(ctx context.Context, sequenceNumber int32) error {
select {
case <-ctx.Done():
return ctx.Err()
default:
err := s.store.Delete(strconv.Itoa(int(sequenceNumber)))
if err != nil {
return err
}
return nil
}
}

func (s CustomStore) Clear(ctx context.Context) error {
select {
case <-ctx.Done():
return ctx.Err()
default:
err := s.store.Reset()
if err != nil {
return err
}
return nil
}
}

func (s CustomStore) Length(ctx context.Context) (int, error) {
select {
case <-ctx.Done():
return 0, ctx.Err()
default:
return s.store.Len(), nil
}
}

func serialize(request gosmpp.Request) ([]byte, error) {
buf := pdu.NewBuffer(make([]byte, 0, 64))
request.PDU.Marshal(buf)
b := bytes.Buffer{}
e := gob.NewEncoder(&b)
err := e.Encode(requestGob{
Pdu: buf.Bytes(),
TimeSent: time.Time{},
})
if err != nil {
return b.Bytes()[:], errors.New("serialization failed")
}
return b.Bytes(), nil
}

func deserialize(bRequest []byte) (request gosmpp.Request, err error) {
r := requestGob{}
b := bytes.Buffer{}
_, err = b.Write(bRequest)
if err != nil {
return request, errors.New("deserialization failed")
}
d := gob.NewDecoder(&b)
err = d.Decode(&r)
if err != nil {
return request, errors.New("deserialization failed")
}
p, err := pdu.Parse(bytes.NewReader(r.Pdu))
if err != nil {
return gosmpp.Request{}, err
}
return gosmpp.Request{
PDU: p,
TimeSent: r.TimeSent,
}, nil
}

type requestGob struct {
Pdu []byte
TimeSent time.Time
}
Loading

0 comments on commit 3cf20dc

Please sign in to comment.