Skip to content

Commit

Permalink
feat: add readers sdk
Browse files Browse the repository at this point in the history
Signed-off-by: Felix Gateru <felix.gateru@gmail.com>
  • Loading branch information
felixgateru committed Jan 10, 2025
1 parent 8df4803 commit 3c24d99
Show file tree
Hide file tree
Showing 35 changed files with 4,518 additions and 247 deletions.
6 changes: 3 additions & 3 deletions apidocs/openapi/notifiers.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ info:
license:
name: Apache 2.0
url: https://github.com/absmach/magistrala/blob/main/LICENSE
version: 0.14.0
version: 0.15.1

servers:
- url: http://localhost:9014
Expand Down Expand Up @@ -44,7 +44,7 @@ paths:
"400":
description: Failed due to malformed JSON.
"401":
description: Missing or invalid access token provided.
description: Missing or invalid access token provided.
"403":
description: Failed to perform authorization over the entity.
"409":
Expand Down Expand Up @@ -278,7 +278,7 @@ components:
content:
application/health+json:
schema:
$ref: "./schemas/HealthInfo.yml"
$ref: "./schemas/health_info.yml"

securitySchemes:
bearerAuth:
Expand Down
4 changes: 2 additions & 2 deletions apidocs/openapi/readers.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ info:
license:
name: Apache 2.0
url: https://github.com/absmach/magistrala/blob/main/LICENSE
version: 0.14.0
version: 0.15.1

servers:
- url: http://localhost:9003
Expand Down Expand Up @@ -292,7 +292,7 @@ components:
content:
application/health+json:
schema:
$ref: "./schemas/HealthInfo.yml"
$ref: "./schemas/health_info.yml"

securitySchemes:
bearerAuth:
Expand Down
2 changes: 1 addition & 1 deletion cmd/postgres-reader/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"os"

chclient "github.com/absmach/callhome/pkg/client"
httpapi "github.com/absmach/magistrala/readers/api"
"github.com/absmach/magistrala/readers/postgres"
"github.com/absmach/supermq"
smqlog "github.com/absmach/supermq/logger"
Expand All @@ -23,7 +24,6 @@ import (
httpserver "github.com/absmach/supermq/pkg/server/http"
"github.com/absmach/supermq/pkg/uuid"
"github.com/absmach/supermq/readers"
httpapi "github.com/absmach/supermq/readers/api"
"github.com/caarlos0/env/v11"
"github.com/jmoiron/sqlx"
"golang.org/x/sync/errgroup"
Expand Down
2 changes: 1 addition & 1 deletion cmd/timescale-reader/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"os"

chclient "github.com/absmach/callhome/pkg/client"
httpapi "github.com/absmach/magistrala/readers/api"
"github.com/absmach/magistrala/readers/timescale"
"github.com/absmach/supermq"
smqlog "github.com/absmach/supermq/logger"
Expand All @@ -23,7 +24,6 @@ import (
httpserver "github.com/absmach/supermq/pkg/server/http"
"github.com/absmach/supermq/pkg/uuid"
"github.com/absmach/supermq/readers"
httpapi "github.com/absmach/supermq/readers/api"
"github.com/caarlos0/env/v11"
"github.com/jmoiron/sqlx"
"golang.org/x/sync/errgroup"
Expand Down
2 changes: 1 addition & 1 deletion consumers/notifiers/api/endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ package api
import (
"context"

notifiers "github.com/absmach/magistrala/consumers/notifiers"
apiutil "github.com/absmach/supermq/api/http/util"
notifiers "github.com/absmach/supermq/consumers/notifiers"
"github.com/absmach/supermq/pkg/errors"
"github.com/go-kit/kit/endpoint"
)
Expand Down
2 changes: 1 addition & 1 deletion consumers/notifiers/api/endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,11 @@ import (
"strings"
"testing"

"github.com/absmach/magistrala/consumers/notifiers"
"github.com/absmach/magistrala/consumers/notifiers/api"
"github.com/absmach/magistrala/consumers/notifiers/mocks"
"github.com/absmach/magistrala/internal/testsutil"
apiutil "github.com/absmach/supermq/api/http/util"
"github.com/absmach/supermq/consumers/notifiers"
smqlog "github.com/absmach/supermq/logger"
svcerr "github.com/absmach/supermq/pkg/errors/service"
"github.com/absmach/supermq/pkg/uuid"
Expand Down
2 changes: 1 addition & 1 deletion consumers/notifiers/api/logging.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
"log/slog"
"time"

"github.com/absmach/supermq/consumers/notifiers"
"github.com/absmach/magistrala/consumers/notifiers"
)

var _ notifiers.Service = (*loggingMiddleware)(nil)
Expand Down
2 changes: 1 addition & 1 deletion consumers/notifiers/api/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
"context"
"time"

"github.com/absmach/supermq/consumers/notifiers"
"github.com/absmach/magistrala/consumers/notifiers"
"github.com/go-kit/kit/metrics"
)

Expand Down
2 changes: 1 addition & 1 deletion consumers/notifiers/api/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@ import (
"net/http"
"strings"

"github.com/absmach/magistrala/consumers/notifiers"
"github.com/absmach/supermq"
api "github.com/absmach/supermq/api/http"
apiutil "github.com/absmach/supermq/api/http/util"
"github.com/absmach/supermq/consumers/notifiers"
"github.com/absmach/supermq/pkg/errors"
"github.com/go-chi/chi/v5"
kithttp "github.com/go-kit/kit/transport/http"
Expand Down
2 changes: 1 addition & 1 deletion consumers/notifiers/mocks/repository.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion consumers/notifiers/mocks/service.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

22 changes: 22 additions & 0 deletions consumers/notifiers/notifier.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
// Copyright (c) Abstract Machines
// SPDX-License-Identifier: Apache-2.0

package notifiers

import (
"errors"

"github.com/absmach/supermq/pkg/messaging"
)

// ErrNotify wraps sending notification errors.
var ErrNotify = errors.New("error sending notification")

// Notifier represents an API for sending notification.
//
//go:generate mockery --name Notifier --output=./mocks --filename notifier.go --quiet --note "Copyright (c) Abstract Machines"
type Notifier interface {
// Notify method is used to send notification for the
// received message to the provided list of receivers.
Notify(from string, to []string, msg *messaging.Message) error
}
2 changes: 1 addition & 1 deletion consumers/notifiers/postgres/subscriptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
"fmt"
"strings"

"github.com/absmach/supermq/consumers/notifiers"
"github.com/absmach/magistrala/consumers/notifiers"
"github.com/absmach/supermq/pkg/errors"
repoerr "github.com/absmach/supermq/pkg/errors/repository"
"github.com/jackc/pgerrcode"
Expand Down
2 changes: 1 addition & 1 deletion consumers/notifiers/postgres/subscriptions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ import (
"fmt"
"testing"

"github.com/absmach/magistrala/consumers/notifiers"
"github.com/absmach/magistrala/consumers/notifiers/postgres"
"github.com/absmach/supermq/consumers/notifiers"
"github.com/absmach/supermq/pkg/errors"
repoerr "github.com/absmach/supermq/pkg/errors/repository"
"github.com/stretchr/testify/assert"
Expand Down
9 changes: 4 additions & 5 deletions consumers/notifiers/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (

"github.com/absmach/supermq"
"github.com/absmach/supermq/consumers"
notif "github.com/absmach/supermq/consumers/notifiers"
smqauthn "github.com/absmach/supermq/pkg/authn"
"github.com/absmach/supermq/pkg/errors"
svcerr "github.com/absmach/supermq/pkg/errors/service"
Expand Down Expand Up @@ -47,13 +46,13 @@ type notifierService struct {
authn smqauthn.Authentication
subs SubscriptionsRepository
idp supermq.IDProvider
notifier notif.Notifier
notifier Notifier
errCh chan error
from string
}

// New instantiates the subscriptions service implementation.
func New(authn smqauthn.Authentication, subs SubscriptionsRepository, idp supermq.IDProvider, notifier notif.Notifier, from string) Service {
func New(authn smqauthn.Authentication, subs SubscriptionsRepository, idp supermq.IDProvider, notifier Notifier, from string) Service {
return &notifierService{
authn: authn,
subs: subs,
Expand Down Expand Up @@ -132,7 +131,7 @@ func (ns *notifierService) ConsumeBlocking(ctx context.Context, message interfac
if len(to) > 0 {
err := ns.notifier.Notify(ns.from, to, msg)
if err != nil {
return errors.Wrap(notif.ErrNotify, err)
return errors.Wrap(ErrNotify, err)

Check warning on line 134 in consumers/notifiers/service.go

View check run for this annotation

Codecov / codecov/patch

consumers/notifiers/service.go#L134

Added line #L134 was not covered by tests
}
}

Expand Down Expand Up @@ -166,7 +165,7 @@ func (ns *notifierService) ConsumeAsync(ctx context.Context, message interface{}
}
if len(to) > 0 {
if err := ns.notifier.Notify(ns.from, to, msg); err != nil {
ns.errCh <- errors.Wrap(notif.ErrNotify, err)
ns.errCh <- errors.Wrap(ErrNotify, err)

Check warning on line 168 in consumers/notifiers/service.go

View check run for this annotation

Codecov / codecov/patch

consumers/notifiers/service.go#L168

Added line #L168 was not covered by tests
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion consumers/notifiers/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@ import (
"fmt"
"testing"

"github.com/absmach/magistrala/consumers/notifiers"
"github.com/absmach/magistrala/consumers/notifiers/mocks"
"github.com/absmach/magistrala/internal/testsutil"
"github.com/absmach/supermq/consumers/notifiers"
smqauthn "github.com/absmach/supermq/pkg/authn"
authnmocks "github.com/absmach/supermq/pkg/authn/mocks"
"github.com/absmach/supermq/pkg/errors"
Expand Down
2 changes: 1 addition & 1 deletion consumers/notifiers/smpp/notifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ package smpp
import (
"time"

"github.com/absmach/supermq/consumers/notifiers"
"github.com/absmach/magistrala/consumers/notifiers"
"github.com/absmach/supermq/pkg/messaging"
"github.com/absmach/supermq/pkg/transformers"
"github.com/absmach/supermq/pkg/transformers/json"
Expand Down
2 changes: 1 addition & 1 deletion consumers/notifiers/smtp/notifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ package smtp
import (
"fmt"

"github.com/absmach/magistrala/consumers/notifiers"
"github.com/absmach/magistrala/internal/email"
"github.com/absmach/supermq/consumers/notifiers"
"github.com/absmach/supermq/pkg/messaging"
)

Expand Down
2 changes: 1 addition & 1 deletion consumers/notifiers/tracing/subscriptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ package tracing
import (
"context"

"github.com/absmach/supermq/consumers/notifiers"
"github.com/absmach/magistrala/consumers/notifiers"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
)
Expand Down
3 changes: 1 addition & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,12 @@ require (
github.com/0x6flab/namegenerator v1.4.0
github.com/absmach/callhome v0.14.0
github.com/absmach/certs v0.0.0-20241209153600-91270de67b5a
github.com/absmach/supermq v0.16.1-0.20250110085603-df5d752c4b50
github.com/absmach/supermq v0.16.1-0.20250110102639-a9169276e54c
github.com/authzed/authzed-go v1.2.1
github.com/authzed/grpcutil v0.0.0-20240123194739-2ea1e3d2d98b
github.com/caarlos0/env/v11 v11.3.1
github.com/eclipse/paho.mqtt.golang v1.5.0
github.com/fiorix/go-smpp v0.0.0-20210403173735-2894b96e70ba
github.com/go-chi/chi v4.1.2+incompatible
github.com/go-chi/chi/v5 v5.2.0
github.com/go-kit/kit v0.13.0
github.com/gofrs/uuid/v5 v5.3.0
Expand Down
6 changes: 2 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ github.com/absmach/mgate v0.4.5 h1:l6RmrEsR9jxkdb9WHUSecmT0HA41TkZZQVffFfUAIfI=
github.com/absmach/mgate v0.4.5/go.mod h1:IvRIHZexZPEIAPmmaJF0L5DY2ERjj+GxRGitOW4s6qo=
github.com/absmach/senml v1.0.6 h1:WPeIl6vQ00k7ghWSZYT/QP0KUxq2+4zQoaC7240pLFk=
github.com/absmach/senml v1.0.6/go.mod h1:QnJNPy1DJPy0+qUW21PTcH/xoh0LgfYZxTfwriMIvmQ=
github.com/absmach/supermq v0.16.1-0.20250110085603-df5d752c4b50 h1:ndn1Z9wxUIH5chingm2hy3ZhIMt0+lDjD/CFaBEULbY=
github.com/absmach/supermq v0.16.1-0.20250110085603-df5d752c4b50/go.mod h1:VihyvWijocoz2yhXGAL+qHtid24O+qL/N2lxP2vRf/c=
github.com/absmach/supermq v0.16.1-0.20250110102639-a9169276e54c h1:s2OxO+rV1PMm/H2jqWVG8IF+HCVfawt8nGN/gY+SIa0=
github.com/absmach/supermq v0.16.1-0.20250110102639-a9169276e54c/go.mod h1:As0UgktURYeC5/SvC269WfdG9satLst8CQcxc2dC02E=
github.com/authzed/authzed-go v1.2.1 h1:o54aIs0ocDfVJl/rfIt/75vrb6z+tgPuXjMlSsSEwH0=
github.com/authzed/authzed-go v1.2.1/go.mod h1:/+NblSrzA6Lm6vUO3fqZyLh8MDCLUQq2AyJMlHb32DE=
github.com/authzed/grpcutil v0.0.0-20240123194739-2ea1e3d2d98b h1:wbh8IK+aMLTCey9sZasO7b6BWLAJnHHvb79fvWCXwxw=
Expand Down Expand Up @@ -99,8 +99,6 @@ github.com/fsnotify/fsnotify v1.8.0 h1:dAwr6QBTBZIkG8roQaJjGof0pp0EeF+tNV7YBP3F/
github.com/fsnotify/fsnotify v1.8.0/go.mod h1:8jBTzvmWwFyi3Pb8djgCCO5IBqzKJ/Jwo8TRcHyHii0=
github.com/fxamacker/cbor/v2 v2.7.0 h1:iM5WgngdRBanHcxugY4JySA0nk1wZorNOpTgCMedv5E=
github.com/fxamacker/cbor/v2 v2.7.0/go.mod h1:pxXPTn3joSm21Gbwsv0w9OSA2y1HFR9qXEeXQVeNoDQ=
github.com/go-chi/chi v4.1.2+incompatible h1:fGFk2Gmi/YKXk0OmGfBh0WgmN3XB8lVnEyNz34tQRec=
github.com/go-chi/chi v4.1.2+incompatible/go.mod h1:eB3wogJHnLi3x/kFX2A+IbTBlXxmMeXJVKy9tTv1XzQ=
github.com/go-chi/chi/v5 v5.2.0 h1:Aj1EtB0qR2Rdo2dG4O94RIU35w2lvQSj6BRA4+qwFL0=
github.com/go-chi/chi/v5 v5.2.0/go.mod h1:DslCQbL2OYiznFReuXYUmQ2hGd1aDpCnlMNITLSKoi8=
github.com/go-gorp/gorp/v3 v3.1.0 h1:ItKF/Vbuj31dmV4jxA1qblpSwkl9g1typ24xoe70IGs=
Expand Down
4 changes: 1 addition & 3 deletions pkg/sdk/consumers.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,7 @@ import (
"github.com/absmach/supermq/pkg/errors"
)

const (
subscriptionEndpoint = "subscriptions"
)
const subscriptionEndpoint = "subscriptions"

type Subscription struct {
ID string `json:"id,omitempty"`
Expand Down
8 changes: 3 additions & 5 deletions pkg/sdk/consumers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,9 @@ var (
OwnerID: ownerID,
ID: subID,
}
validToken = "validToken"
invalidToken = "invalidToken"
wrongID = "wrongID"
instanceID = "instanceID"
contentType = supermqSDK.CTJSON
wrongID = "wrongID"
instanceID = "instanceID"
contentType = supermqSDK.CTJSON
)

func setupSubscriptions() (*httptest.Server, *notmocks.Service) {
Expand Down
13 changes: 6 additions & 7 deletions pkg/sdk/messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,35 +12,34 @@ import (
"strings"

"github.com/absmach/supermq/pkg/errors"
supermqSDK "github.com/absmach/supermq/pkg/sdk"
)

const channelParts = 2

func (sdk mgSDK) ReadMessages(pm MessagePageMetadata, chanName, domainID, token string) (supermqSDK.MessagesPage, errors.SDKError) {
func (sdk mgSDK) ReadMessages(pm MessagePageMetadata, chanName, domainID, token string) (MessagesPage, errors.SDKError) {
chanNameParts := strings.SplitN(chanName, ".", channelParts)
chanID := chanNameParts[0]
subtopicPart := ""
if len(chanNameParts) == channelParts {
subtopicPart = fmt.Sprintf("?subtopic=%s", chanNameParts[1])
}

msgURL, err := sdk.withMessageQueryParams(sdk.readerURL, fmt.Sprintf("channels/%s/messages%s", chanID, subtopicPart), pm)
msgURL, err := sdk.withMessageQueryParams(sdk.readersURL, fmt.Sprintf("channels/%s/messages%s", chanID, subtopicPart), pm)
if err != nil {
return supermqSDK.MessagesPage{}, errors.NewSDKError(err)
return MessagesPage{}, errors.NewSDKError(err)
}

header := make(map[string]string)
header["Content-Type"] = string(sdk.msgContentType)

_, body, sdkerr := sdk.processRequest(http.MethodGet, msgURL, token, nil, header, http.StatusOK)
if sdkerr != nil {
return supermqSDK.MessagesPage{}, sdkerr
return MessagesPage{}, sdkerr
}

var mp supermqSDK.MessagesPage
var mp MessagesPage
if err := json.Unmarshal(body, &mp); err != nil {
return supermqSDK.MessagesPage{}, errors.NewSDKError(err)
return MessagesPage{}, errors.NewSDKError(err)
}

return mp, nil
Expand Down
Loading

0 comments on commit 3c24d99

Please sign in to comment.