Skip to content

Commit

Permalink
Merge pull request #17 from invopop/workflows-errors
Browse files Browse the repository at this point in the history
Support filtering workflows by schema and advanced error parsing
  • Loading branch information
samlown authored Sep 4, 2024
2 parents 2569c48 + 16d5eef commit a9454f6
Show file tree
Hide file tree
Showing 13 changed files with 593 additions and 160 deletions.
40 changes: 26 additions & 14 deletions gateway/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"github.com/gabriel-vasile/mimetype"
"github.com/invopop/configure/pkg/natsconf"
nats "github.com/nats-io/nats.go"
"github.com/nats-io/nats.go/encoders/protobuf"
"github.com/rs/zerolog/log"
"google.golang.org/protobuf/proto"
)
Expand All @@ -33,7 +32,7 @@ const (
// the gateway service, accessed via NATS.
type Client struct {
name string // service name
nc *nats.EncodedConn
nc *nats.Conn
wg sync.WaitGroup
th TaskHandler
incoming chan *nats.Msg
Expand Down Expand Up @@ -62,9 +61,9 @@ func New(conf Configuration) *Client {
return gw
}

// NATS provides the NATS Encoded Connection so that it can be used
// NATS provides the NATS Connection so that it can be used
// for other tasks if needed.
func (gw *Client) NATS() *nats.EncodedConn {
func (gw *Client) NATS() *nats.Conn {
return gw.nc
}

Expand All @@ -77,22 +76,39 @@ func (gw *Client) Subscribe(th TaskHandler) {
// Poke sends a message to the gateway indicating that we've received an
// external prompt, like a webhook, and the original task should be re-sent.
func (gw *Client) Poke(ctx context.Context, req *TaskPoke) error {
in, err := proto.Marshal(req)
if err != nil {
return err
}
out, err := gw.nc.RequestWithContext(ctx, SubjectTasksPoke, in)
if err != nil {
return err
}
res := new(TaskPokeResponse)
if err := gw.nc.RequestWithContext(ctx, SubjectTasksPoke, req, res); err != nil {
if err := proto.Unmarshal(out.Data, res); err != nil {
return err
}
if res.Err != nil {
return res.Err
}

// PokeTaskResponse is empty if successful
return nil
}

// CreateFile allows us to build a file place holder and upload the data afterwards
// by posting to the URL provided.
func (gw *Client) CreateFile(ctx context.Context, req *CreateFile) (*File, error) {
in, err := proto.Marshal(req)
if err != nil {
return nil, err
}
out, err := gw.nc.RequestWithContext(ctx, SubjectFilesCreate, in)
if err != nil {
return nil, err
}
res := new(FileResponse)
if err := gw.nc.RequestWithContext(ctx, SubjectFilesCreate, req, res); err != nil {
if err := proto.Unmarshal(out.Data, res); err != nil {
return nil, err
}
if res.Err != nil {
Expand Down Expand Up @@ -225,7 +241,7 @@ func (gw *Client) subscribeIncomingTasks() error {
subj := fmt.Sprintf(SubjectTaskFmt, gw.name)
queue := fmt.Sprintf(QueueNameTaskFmt, gw.name)
var err error
gw.sub, err = gw.nc.Conn.QueueSubscribeSyncWithChan(subj, queue, gw.incoming)
gw.sub, err = gw.nc.QueueSubscribeSyncWithChan(subj, queue, gw.incoming)
if err != nil {
return fmt.Errorf("error subscribing to queue: %w", err)
}
Expand Down Expand Up @@ -260,12 +276,12 @@ func (gw *Client) processTask(ctx context.Context, m *nats.Msg) {
if err != nil {
log.Error().Str("task_id", t.Id).Err(err).Msg("unable to marshal task response, dropping")
}
if err := gw.nc.Conn.Publish(m.Reply, data); err != nil {
if err := gw.nc.Publish(m.Reply, data); err != nil {
log.Error().Str("task_id", t.Id).Err(err).Msg("unable to publish response")
}
}

func prepareNATSClient(conf *natsconf.Config, name string) *nats.EncodedConn {
func prepareNATSClient(conf *natsconf.Config, name string) *nats.Conn {
// prepare base options
opts, err := conf.Options()
if err != nil {
Expand All @@ -287,10 +303,6 @@ func prepareNATSClient(conf *natsconf.Config, name string) *nats.EncodedConn {
if err != nil {
log.Fatal().Err(err).Str("url", conf.URL).Msg("failed to connect to nats")
}
enc, err := nats.NewEncodedConn(nc, protobuf.PROTOBUF_ENCODER)
if err != nil {
log.Fatal().Err(err).Str("url", conf.URL).Msg("failed to prepare nats encoded connection")
}

return enc
return nc
}
221 changes: 158 additions & 63 deletions gateway/tasks.pb.go

Large diffs are not rendered by default.

10 changes: 10 additions & 0 deletions gateway/tasks.proto
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,23 @@ message Task {
bytes config = 5;
string envelope_public_url = 7;

// Array of faults copied from the job which may be useful for reporting.
repeated Fault faults = 14;

// Rerefence to any existing attachments
repeated File files = 6;

// Tracking timestamp, issued by the gateway service. Includes nano seconds.
double ts = 8;
}

// A Fault provides basic information about a previous step that KO'd.
message Fault {
string provider = 1;
string code = 2;
string message = 3;
}

// TaskResult says what we expect from a provider after attempting to complete
// a task.
message TaskResult {
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ require (
github.com/invopop/gobl v0.113.0
github.com/labstack/echo/v4 v4.12.0
github.com/magefile/mage v1.15.0
github.com/nats-io/nats.go v1.36.0
github.com/nats-io/nats.go v1.37.0
github.com/rs/zerolog v1.33.0
github.com/stretchr/testify v1.8.4
github.com/ziflex/lecho/v3 v3.5.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJ
github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
github.com/nats-io/nats.go v1.36.0 h1:suEUPuWzTSse/XhESwqLxXGuj8vGRuPRoG7MoRN/qyU=
github.com/nats-io/nats.go v1.36.0/go.mod h1:Ubdu4Nh9exXdSz0RVWRFBbRfrbSxOYd26oF0wkWclB8=
github.com/nats-io/nats.go v1.37.0 h1:07rauXbVnnJvv1gfIyghFEo6lUcYRY0WXc3x7x0vUxE=
github.com/nats-io/nats.go v1.37.0/go.mod h1:Ubdu4Nh9exXdSz0RVWRFBbRfrbSxOYd26oF0wkWclB8=
github.com/nats-io/nkeys v0.4.7 h1:RwNJbbIdYCoClSDNY7QVKZlyb/wfT6ugvFCiKy6vDvI=
github.com/nats-io/nkeys v0.4.7/go.mod h1:kqXRgRDPlGy7nGaEDMuYzmiJCIAAWDK0IMBtDmGD0nc=
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
Expand Down
123 changes: 123 additions & 0 deletions invopop/dict.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
package invopop

import (
"encoding/json"
"strings"
)

// Dict helps manage a nested map of strings to either messages or
// or other dictionaries. This is useful for accessing error messages
// provided by endpoints that include a "fields" property.
//
// This is based on the Dict model included by the
// [ctxi18n](https://github.com/invopop/ctxi18n) project.
type Dict struct {
msg string
entries map[string]*Dict
}

// NewDict instantiates a new dict object.
func NewDict() *Dict {
return &Dict{
entries: make(map[string]*Dict),
}
}

// Add adds a new key value pair to the dictionary.
func (d *Dict) Add(key string, value any) {
switch v := value.(type) {
case string:
d.entries[key] = &Dict{msg: v}
case map[string]any:
nd := NewDict()
for k, row := range v {
nd.Add(k, row)
}
d.entries[key] = nd
case *Dict:
d.entries[key] = v
default:
// ignore
}
}

// Message returns the dictionary message or an empty string
// if the dictionary is nil.
func (d *Dict) Message() string {
if d == nil {
return ""
}
return d.msg
}

// Get recursively retrieves the dictionary at the provided key location.
func (d *Dict) Get(key string) *Dict {
if d == nil {
return nil
}
if key == "" {
return nil
}
n := strings.SplitN(key, ".", 2)
entry, ok := d.entries[n[0]]
if !ok {
return nil
}
if len(n) == 1 {
return entry
}
return entry.Get(n[1])
}

// Merge combines the entries of the second dictionary into this one. If a
// key is duplicated in the second diction, the original value takes priority.
func (d *Dict) Merge(d2 *Dict) {
if d2 == nil {
return
}
if d.entries == nil {
d.entries = make(map[string]*Dict)
}
for k, v := range d2.entries {
if d.entries[k] == nil {
d.entries[k] = v
continue
}
d.entries[k].Merge(v)
}
}

// Flatten returns a simple flat map of the dictionary entries. This might make
// it easier to list out all the error messages for user interfaces.
func (d *Dict) Flatten() map[string]string {
if d == nil {
return nil
}
if d.msg != "" {
return map[string]string{"": d.msg}
}
m := make(map[string]string)
for k, v := range d.entries {
for kk, vv := range v.Flatten() {
x := k
if kk != "" {
x += "." + kk
}
m[x] = vv
}
}
return m
}

// UnmarshalJSON attempts to load the dictionary data from a JSON byte slice.
func (d *Dict) UnmarshalJSON(data []byte) error {
if len(data) == 0 {
return nil
}
if data[0] == '"' {
d.msg = string(data[1 : len(data)-1])
return nil
}
d.entries = make(map[string]*Dict)
return json.Unmarshal(data, &d.entries)
}
106 changes: 106 additions & 0 deletions invopop/dict_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
package invopop

import (
"encoding/json"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestDictUnmarshalJSON(t *testing.T) {
ex := `{
"foo": "bar",
"data": {
"supplier": {
"emails": {
"0": {
"addr": "must be a valid email address"
}
}
}
}
}`
dict := new(Dict)
err := json.Unmarshal([]byte(ex), dict)
require.NoError(t, err)
assert.Equal(t, "bar", dict.Get("foo").Message())
assert.Equal(t, "must be a valid email address", dict.Get("data.supplier.emails.0.addr").Message())
assert.Empty(t, dict.Get("data.missing").Message())
assert.Empty(t, dict.Get("random").Message())
}

func TestDictFlatten(t *testing.T) {
ex := `{
"foo": "bar",
"data": {
"supplier": {
"emails": {
"0": {
"addr": "must be a valid email address"
}
}
}
}
}`
dict := new(Dict)
err := json.Unmarshal([]byte(ex), dict)
require.NoError(t, err)
out := dict.Flatten()
assert.Equal(t, "bar", out["foo"])
assert.Equal(t, "must be a valid email address", out["data.supplier.emails.0.addr"])
}

func TestDictAdd(t *testing.T) {
d := NewDict()
assert.Nil(t, d.Get(""))
d.Add("foo", "bar")
assert.Equal(t, "bar", d.Get("foo").Message())

d.Add("plural", map[string]any{
"zero": "no mice",
"one": "%s mouse",
"other": "%s mice",
})
assert.Equal(t, "no mice", d.Get("plural.zero").Message())
assert.Equal(t, "%s mice", d.Get("plural.other").Message())

d.Add("bad", 10) // ignore
assert.Nil(t, d.Get("bad"))

d.Add("self", d)
assert.Equal(t, "bar", d.Get("self.foo").Message())
}

func TestDictMerge(t *testing.T) {
ex := `{
"foo": "bar",
"baz": {
"qux": "quux",
"plural": {
"zero": "no mice",
"one": "%s mouse",
"other": "%s mice"
}
}
}`
d1 := new(Dict)
require.NoError(t, json.Unmarshal([]byte(ex), d1))

ex2 := `{
"foo": "baz",
"extra": "value"
}`
d2 := new(Dict)
require.NoError(t, json.Unmarshal([]byte(ex2), d2))

d1.Merge(nil) // does nothing

d3 := new(Dict)
d3.Merge(d2)
assert.Equal(t, "value", d3.Get("extra").Message())

d1.Merge(d2)
assert.Equal(t, "bar", d1.Get("foo").Message(), "should not overwrite")
assert.Equal(t, "value", d1.Get("extra").Message())
}
Loading

0 comments on commit a9454f6

Please sign in to comment.