Skip to content

Commit

Permalink
Upgrade NATS, adding faults array
Browse files Browse the repository at this point in the history
  • Loading branch information
samlown committed Sep 3, 2024
1 parent 5e2ee1a commit 16d5eef
Show file tree
Hide file tree
Showing 5 changed files with 197 additions and 78 deletions.
40 changes: 26 additions & 14 deletions gateway/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"github.com/gabriel-vasile/mimetype"
"github.com/invopop/configure/pkg/natsconf"
nats "github.com/nats-io/nats.go"
"github.com/nats-io/nats.go/encoders/protobuf"
"github.com/rs/zerolog/log"
"google.golang.org/protobuf/proto"
)
Expand All @@ -33,7 +32,7 @@ const (
// the gateway service, accessed via NATS.
type Client struct {
name string // service name
nc *nats.EncodedConn
nc *nats.Conn
wg sync.WaitGroup
th TaskHandler
incoming chan *nats.Msg
Expand Down Expand Up @@ -62,9 +61,9 @@ func New(conf Configuration) *Client {
return gw
}

// NATS provides the NATS Encoded Connection so that it can be used
// NATS provides the NATS Connection so that it can be used
// for other tasks if needed.
func (gw *Client) NATS() *nats.EncodedConn {
func (gw *Client) NATS() *nats.Conn {
return gw.nc
}

Expand All @@ -77,22 +76,39 @@ func (gw *Client) Subscribe(th TaskHandler) {
// Poke sends a message to the gateway indicating that we've received an
// external prompt, like a webhook, and the original task should be re-sent.
func (gw *Client) Poke(ctx context.Context, req *TaskPoke) error {
in, err := proto.Marshal(req)
if err != nil {
return err
}
out, err := gw.nc.RequestWithContext(ctx, SubjectTasksPoke, in)
if err != nil {
return err
}
res := new(TaskPokeResponse)
if err := gw.nc.RequestWithContext(ctx, SubjectTasksPoke, req, res); err != nil {
if err := proto.Unmarshal(out.Data, res); err != nil {
return err
}
if res.Err != nil {
return res.Err
}

// PokeTaskResponse is empty if successful
return nil
}

// CreateFile allows us to build a file place holder and upload the data afterwards
// by posting to the URL provided.
func (gw *Client) CreateFile(ctx context.Context, req *CreateFile) (*File, error) {
in, err := proto.Marshal(req)
if err != nil {
return nil, err
}
out, err := gw.nc.RequestWithContext(ctx, SubjectFilesCreate, in)
if err != nil {
return nil, err
}
res := new(FileResponse)
if err := gw.nc.RequestWithContext(ctx, SubjectFilesCreate, req, res); err != nil {
if err := proto.Unmarshal(out.Data, res); err != nil {
return nil, err
}
if res.Err != nil {
Expand Down Expand Up @@ -225,7 +241,7 @@ func (gw *Client) subscribeIncomingTasks() error {
subj := fmt.Sprintf(SubjectTaskFmt, gw.name)
queue := fmt.Sprintf(QueueNameTaskFmt, gw.name)
var err error
gw.sub, err = gw.nc.Conn.QueueSubscribeSyncWithChan(subj, queue, gw.incoming)
gw.sub, err = gw.nc.QueueSubscribeSyncWithChan(subj, queue, gw.incoming)
if err != nil {
return fmt.Errorf("error subscribing to queue: %w", err)
}
Expand Down Expand Up @@ -260,12 +276,12 @@ func (gw *Client) processTask(ctx context.Context, m *nats.Msg) {
if err != nil {
log.Error().Str("task_id", t.Id).Err(err).Msg("unable to marshal task response, dropping")
}
if err := gw.nc.Conn.Publish(m.Reply, data); err != nil {
if err := gw.nc.Publish(m.Reply, data); err != nil {
log.Error().Str("task_id", t.Id).Err(err).Msg("unable to publish response")
}
}

func prepareNATSClient(conf *natsconf.Config, name string) *nats.EncodedConn {
func prepareNATSClient(conf *natsconf.Config, name string) *nats.Conn {
// prepare base options
opts, err := conf.Options()
if err != nil {
Expand All @@ -287,10 +303,6 @@ func prepareNATSClient(conf *natsconf.Config, name string) *nats.EncodedConn {
if err != nil {
log.Fatal().Err(err).Str("url", conf.URL).Msg("failed to connect to nats")
}
enc, err := nats.NewEncodedConn(nc, protobuf.PROTOBUF_ENCODER)
if err != nil {
log.Fatal().Err(err).Str("url", conf.URL).Msg("failed to prepare nats encoded connection")
}

return enc
return nc
}
Loading

0 comments on commit 16d5eef

Please sign in to comment.