Skip to content

Commit 8ba08aa

Browse files
orimdominicsubomijirevwemekilis
authored
feat: dashboard UI framework (#2253)
* feat: copy migration files to container (#2238) * fix: put event types creation behind the license (#2239) * Add manual instrumentation (#2240) * fix: add manual instrumentation * fix: pass context to manual instrumentation * fix: add manual instrumentation for sentry tracer backend * fix: add generic function for manual instrumentation * fix: add context to db hooks; add span to http client * fix: use convoy tracer backend to build trace * fix: fixed dispatcher tests * fix: add http event trace breakdown * Re-enable Meta-Events (#2244) * fix: fix queries that weren't using indexes because of its column type * fix: re-enable meta-events; event delivery meta-events should run in the foreground. * fix: reindex index :) * Improve Tracing: Add Event ID To Workflow Stages (#2245) * improve tracing; add event id to workflow stages * add tracer mocks to tests * resolve PR comments * Update Changelog (#2247) * update changelog * Bump version to v25.2.1 * fixed rate limit not deleting for projects and endpoints (#2246) * fixed rate limit not deleting for projects and endpoints * fixed lint issues * fixed test * refactor: identifier names, prettier * feat: include necessary ui components * refactor: services * refactor: services * feat: add supporting functions * feat: set up to navigation and side bar --------- Co-authored-by: Subomi Oluwalana <subomioluwalana71@gmail.com> Co-authored-by: Raymond Tukpe <jirevwe@users.noreply.github.com> Co-authored-by: Raymond Tukpe <rtukpe@gmail.com> Co-authored-by: Smart Mekiliuwa <st.nonso@gmail.com>
1 parent 0ea02a1 commit 8ba08aa

File tree

97 files changed

+4285
-852
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

97 files changed

+4285
-852
lines changed

CHANGELOG.md

+241-202
Large diffs are not rendered by default.

VERSION

+1-1
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
v25.1.1
1+
v25.2.1

api/server_suite_test.go

+3-1
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,12 @@ package api
55

66
import (
77
"bytes"
8+
"context"
89
"encoding/json"
910
"fmt"
1011
"github.com/frain-dev/convoy/internal/pkg/fflag"
1112
"github.com/frain-dev/convoy/internal/pkg/keys"
13+
1214
"io"
1315
"math/rand"
1416
"net/http"
@@ -97,7 +99,7 @@ func getDB() database.Database {
9799
_ = os.Setenv("TZ", "") // Use UTC by default :)
98100

99101
dbHooks := hooks.Init()
100-
dbHooks.RegisterHook(datastore.EndpointCreated, func(data interface{}, changelog interface{}) {})
102+
dbHooks.RegisterHook(datastore.EndpointCreated, func(ctx context.Context, data interface{}, changelog interface{}) {})
101103

102104
pDB = db
103105
})

cmd/hooks/hooks.go

+1
Original file line numberDiff line numberDiff line change
@@ -232,6 +232,7 @@ func PreRun(app *cli.App, db *postgres.Postgres) func(cmd *cobra.Command, args [
232232
if err != nil {
233233
return err
234234
}
235+
// todo(raymond): I don't think the check below needs to exist since we perform the check in the tracer.Init() above.
235236
if cfg.Tracer.Type == config.DatadogTracerProvider && !app.Licenser.DatadogTracing() {
236237
lo.Error("your instance does not have access to datadog tracing, upgrade to access this feature")
237238
_ = app.TracerBackend.Shutdown(context.Background())

cmd/worker/worker.go

+21-8
Original file line numberDiff line numberDiff line change
@@ -276,6 +276,8 @@ func StartWorker(ctx context.Context, a *cli.App, cfg config.Configuration, inte
276276
a.Licenser,
277277
featureFlag,
278278
net.LoggerOption(lo),
279+
net.TracerOption(a.TracerBackend),
280+
net.DetailedTraceOption(true),
279281
net.ProxyOption(cfg.Server.HTTP.HttpProxy),
280282
net.AllowListOption(cfg.Dispatcher.AllowList),
281283
net.BlockListOption(cfg.Dispatcher.BlockList),
@@ -364,8 +366,8 @@ func StartWorker(ctx context.Context, a *cli.App, cfg config.Configuration, inte
364366
attemptRepo,
365367
circuitBreakerManager,
366368
featureFlag,
367-
a.TracerBackend,
368-
), newTelemetry)
369+
a.TracerBackend),
370+
newTelemetry)
369371

370372
consumer.RegisterHandlers(convoy.CreateEventProcessor, task.ProcessEventCreation(
371373
defaultCh,
@@ -375,7 +377,10 @@ func StartWorker(ctx context.Context, a *cli.App, cfg config.Configuration, inte
375377
eventDeliveryRepo,
376378
a.Queue,
377379
subRepo,
378-
deviceRepo, a.Licenser), newTelemetry)
380+
deviceRepo,
381+
a.Licenser,
382+
a.TracerBackend),
383+
newTelemetry)
379384

380385
consumer.RegisterHandlers(convoy.RetryEventProcessor, task.ProcessRetryEventDelivery(
381386
endpointRepo,
@@ -388,8 +393,8 @@ func StartWorker(ctx context.Context, a *cli.App, cfg config.Configuration, inte
388393
attemptRepo,
389394
circuitBreakerManager,
390395
featureFlag,
391-
a.TracerBackend,
392-
), newTelemetry)
396+
a.TracerBackend),
397+
newTelemetry)
393398

394399
consumer.RegisterHandlers(convoy.CreateBroadcastEventProcessor, task.ProcessBroadcastEventCreation(
395400
broadcastCh,
@@ -400,7 +405,9 @@ func StartWorker(ctx context.Context, a *cli.App, cfg config.Configuration, inte
400405
a.Queue,
401406
subRepo,
402407
deviceRepo,
403-
a.Licenser), newTelemetry)
408+
a.Licenser,
409+
a.TracerBackend),
410+
newTelemetry)
404411

405412
consumer.RegisterHandlers(convoy.CreateDynamicEventProcessor, task.ProcessDynamicEventCreation(
406413
dynamicCh,
@@ -410,7 +417,10 @@ func StartWorker(ctx context.Context, a *cli.App, cfg config.Configuration, inte
410417
eventDeliveryRepo,
411418
a.Queue,
412419
subRepo,
413-
deviceRepo, a.Licenser), newTelemetry)
420+
deviceRepo,
421+
a.Licenser,
422+
a.TracerBackend),
423+
newTelemetry)
414424

415425
if a.Licenser.RetentionPolicy() {
416426
consumer.RegisterHandlers(convoy.RetentionPolicies, task.RetentionPolicies(rd, ret), nil)
@@ -425,7 +435,10 @@ func StartWorker(ctx context.Context, a *cli.App, cfg config.Configuration, inte
425435
eventDeliveryRepo,
426436
a.Queue,
427437
subRepo,
428-
deviceRepo, a.Licenser), newTelemetry)
438+
deviceRepo,
439+
a.Licenser,
440+
a.TracerBackend),
441+
newTelemetry)
429442

430443
consumer.RegisterHandlers(convoy.MonitorTwitterSources, task.MonitorTwitterSources(a.DB, a.Queue, rd), nil)
431444

database/hooks/hooks.go

+5-4
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,14 @@
11
package hooks
22

33
import (
4+
"context"
45
"errors"
56
"sync/atomic"
67

78
"github.com/frain-dev/convoy/datastore"
89
)
910

10-
type hookMap map[datastore.HookEventType]func(data interface{}, changelog interface{})
11+
type hookMap map[datastore.HookEventType]func(ctx context.Context, data interface{}, changelog interface{})
1112

1213
type Hook struct {
1314
fns hookMap
@@ -30,13 +31,13 @@ func Init() *Hook {
3031
return &Hook{fns: hookMap{}}
3132
}
3233

33-
func (h *Hook) Fire(eventType datastore.HookEventType, data interface{}, changelog interface{}) {
34+
func (h *Hook) Fire(ctx context.Context, eventType datastore.HookEventType, data interface{}, changelog interface{}) {
3435
if fn, ok := h.fns[eventType]; ok {
35-
fn(data, changelog)
36+
fn(ctx, data, changelog)
3637
}
3738
}
3839

39-
func (h *Hook) RegisterHook(eventType datastore.HookEventType, fn func(data interface{}, changelog interface{})) {
40+
func (h *Hook) RegisterHook(eventType datastore.HookEventType, fn func(ctx context.Context, data interface{}, changelog interface{})) {
4041
h.fns[eventType] = fn
4142
hookSingleton.Store(h)
4243
}
+9-8
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package listener
22

33
import (
4+
"context"
45
"github.com/frain-dev/convoy/datastore"
56
"github.com/frain-dev/convoy/pkg/log"
67
"github.com/frain-dev/convoy/queue"
@@ -16,26 +17,26 @@ func NewEndpointListener(queue queue.Queuer, projectRepo datastore.ProjectReposi
1617
return &EndpointListener{mEvent: mEvent}
1718
}
1819

19-
func (e *EndpointListener) AfterCreate(data interface{}, _ interface{}) {
20-
e.metaEvent(string(datastore.EndpointCreated), data)
20+
func (e *EndpointListener) AfterCreate(ctx context.Context, data interface{}, _ interface{}) {
21+
e.metaEvent(ctx, datastore.EndpointCreated, data)
2122
}
2223

23-
func (e *EndpointListener) AfterUpdate(data interface{}, _ interface{}) {
24-
e.metaEvent(string(datastore.EndpointUpdated), data)
24+
func (e *EndpointListener) AfterUpdate(ctx context.Context, data interface{}, _ interface{}) {
25+
e.metaEvent(ctx, datastore.EndpointUpdated, data)
2526
}
2627

27-
func (e *EndpointListener) AfterDelete(data interface{}, _ interface{}) {
28-
e.metaEvent(string(datastore.EndpointDeleted), data)
28+
func (e *EndpointListener) AfterDelete(ctx context.Context, data interface{}, _ interface{}) {
29+
e.metaEvent(ctx, datastore.EndpointDeleted, data)
2930
}
3031

31-
func (e *EndpointListener) metaEvent(eventType string, data interface{}) {
32+
func (e *EndpointListener) metaEvent(ctx context.Context, eventType datastore.HookEventType, data interface{}) {
3233
endpoint, ok := data.(*datastore.Endpoint)
3334
if !ok {
3435
log.Errorf("invalid type for event - %s", eventType)
3536
return
3637
}
3738

38-
if err := e.mEvent.Run(eventType, endpoint.ProjectID, endpoint); err != nil {
39+
if err := e.mEvent.Run(ctx, string(eventType), endpoint.ProjectID, endpoint); err != nil {
3940
log.WithError(err).Error("endpoint meta event failed")
4041
}
4142
}

database/listener/event_delivery_listener.go

+4-4
Original file line numberDiff line numberDiff line change
@@ -47,15 +47,15 @@ func NewEventDeliveryListener(queue queue.Queuer, projectRepo datastore.ProjectR
4747
return &EventDeliveryListener{mEvent: mEvent, attemptsRepo: attemptsRepo}
4848
}
4949

50-
func (e *EventDeliveryListener) AfterUpdate(data interface{}, _ interface{}) {
50+
func (e *EventDeliveryListener) AfterUpdate(ctx context.Context, data interface{}, _ interface{}) {
5151
eventDelivery, ok := data.(*datastore.EventDelivery)
5252
if !ok {
5353
log.Error("invalid type for event - eventdelivery.updated")
5454
return
5555
}
5656

5757
mEventDelivery := getMetaEventDelivery(eventDelivery)
58-
attempts, err := e.attemptsRepo.FindDeliveryAttempts(context.Background(), mEventDelivery.UID)
58+
attempts, err := e.attemptsRepo.FindDeliveryAttempts(ctx, mEventDelivery.UID)
5959
if err != nil {
6060
log.WithError(err).Error("event delivery meta event failed")
6161
}
@@ -65,14 +65,14 @@ func (e *EventDeliveryListener) AfterUpdate(data interface{}, _ interface{}) {
6565
}
6666

6767
if eventDelivery.Status == datastore.SuccessEventStatus {
68-
err := e.mEvent.Run(string(datastore.EventDeliverySuccess), eventDelivery.ProjectID, mEventDelivery)
68+
err = e.mEvent.Run(ctx, string(datastore.EventDeliverySuccess), eventDelivery.ProjectID, mEventDelivery)
6969
if err != nil {
7070
log.WithError(err).Error("event delivery meta event failed")
7171
}
7272
}
7373

7474
if eventDelivery.Status == datastore.FailureEventStatus {
75-
err := e.mEvent.Run(string(datastore.EventDeliveryFailed), eventDelivery.ProjectID, mEventDelivery)
75+
err = e.mEvent.Run(ctx, string(datastore.EventDeliveryFailed), eventDelivery.ProjectID, mEventDelivery)
7676
if err != nil {
7777
log.WithError(err).Error("event delivery meta event failed")
7878
}

database/listener/project_listener.go

+4-3
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package listener
22

33
import (
4+
"context"
45
"encoding/json"
56
"time"
67

@@ -19,11 +20,11 @@ func NewProjectListener(queue queue.Queuer) *ProjectListener {
1920
return &ProjectListener{queue: queue}
2021
}
2122

22-
func (e *ProjectListener) AfterUpdate(data interface{}, changelog interface{}) {
23-
e.run(string(datastore.ProjectUpdated), data, changelog)
23+
func (e *ProjectListener) AfterUpdate(ctx context.Context, data interface{}, changelog interface{}) {
24+
e.run(ctx, datastore.ProjectUpdated, data, changelog)
2425
}
2526

26-
func (e *ProjectListener) run(eventType string, data interface{}, changelog interface{}) {
27+
func (e *ProjectListener) run(_ context.Context, eventType datastore.HookEventType, data interface{}, changelog interface{}) {
2728
project, ok := data.(*datastore.Project)
2829
if !ok {
2930
log.Errorf("invalid type for project - %s", eventType)

database/postgres/endpoint.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -305,7 +305,7 @@ func (e *endpointRepo) CreateEndpoint(ctx context.Context, endpoint *datastore.E
305305
return ErrEndpointNotCreated
306306
}
307307

308-
go e.hook.Fire(datastore.EndpointCreated, endpoint, nil)
308+
go e.hook.Fire(context.Background(), datastore.EndpointCreated, endpoint, nil)
309309

310310
return nil
311311
}
@@ -418,7 +418,7 @@ func (e *endpointRepo) UpdateEndpoint(ctx context.Context, endpoint *datastore.E
418418
return ErrEndpointNotUpdated
419419
}
420420

421-
go e.hook.Fire(datastore.EndpointUpdated, endpoint, nil)
421+
go e.hook.Fire(context.Background(), datastore.EndpointUpdated, endpoint, nil)
422422
return nil
423423
}
424424

@@ -467,7 +467,7 @@ func (e *endpointRepo) DeleteEndpoint(ctx context.Context, endpoint *datastore.E
467467
return err
468468
}
469469

470-
go e.hook.Fire(datastore.EndpointDeleted, endpoint, nil)
470+
go e.hook.Fire(context.Background(), datastore.EndpointDeleted, endpoint, nil)
471471
return nil
472472
}
473473

database/postgres/event_delivery.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -555,7 +555,8 @@ func (e *eventDeliveryRepo) UpdateEventDeliveryMetadata(ctx context.Context, pro
555555
return ErrEventDeliveryAttemptsNotUpdated
556556
}
557557

558-
go e.hook.Fire(datastore.EventDeliveryUpdated, delivery, nil)
558+
e.hook.Fire(ctx, datastore.EventDeliveryUpdated, delivery, nil)
559+
559560
return nil
560561
}
561562

database/postgres/postgres_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ func getDB(t *testing.T) (database.Database, func()) {
7171
var err error
7272

7373
dbHooks := hooks.Init()
74-
dbHooks.RegisterHook(datastore.EndpointCreated, func(data interface{}, changelog interface{}) {})
74+
dbHooks.RegisterHook(datastore.EndpointCreated, func(ctx context.Context, data interface{}, changelog interface{}) {})
7575

7676
_db, err = NewDB(getConfig())
7777
require.NoError(t, err)

database/postgres/project.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -423,7 +423,8 @@ func (p *projectRepo) UpdateProject(ctx context.Context, project *datastore.Proj
423423
return err
424424
}
425425

426-
go p.hook.Fire(datastore.ProjectUpdated, project, changelog)
426+
go p.hook.Fire(context.Background(), datastore.ProjectUpdated, project, changelog)
427+
427428
return nil
428429
}
429430

datastore/models.go

+3-1
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,13 @@ import (
66
"encoding/json"
77
"errors"
88
"fmt"
9-
cb "github.com/frain-dev/convoy/pkg/circuit_breaker"
109
"math"
1110
"net/http"
1211
"strings"
1312
"time"
1413

14+
cb "github.com/frain-dev/convoy/pkg/circuit_breaker"
15+
1516
"github.com/frain-dev/convoy/pkg/flatten"
1617

1718
"github.com/oklog/ulid/v2"
@@ -963,6 +964,7 @@ type EventDelivery struct {
963964
Headers httpheader.HTTPHeader `json:"headers" db:"headers"`
964965
URLQueryParams string `json:"url_query_params" db:"url_query_params"`
965966
IdempotencyKey string `json:"idempotency_key" db:"idempotency_key"`
967+
966968
// Deprecated: Latency is deprecated.
967969
Latency string `json:"latency" db:"latency"`
968970
LatencySeconds float64 `json:"latency_seconds" db:"latency_seconds"`

ee/VERSION

+1-1
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
v25.1.1 Enterprise Edition
1+
v25.2.1 Enterprise Edition

generate.go

+1
Original file line numberDiff line numberDiff line change
@@ -11,3 +11,4 @@ package convoy
1111
//go:generate mockgen --source internal/pkg/dedup/dedup.go --destination mocks/dedup.go -package mocks
1212
//go:generate mockgen --source internal/pkg/memorystore/table.go --destination mocks/table.go -package mocks
1313
//go:generate mockgen --source internal/pkg/license/license.go --destination mocks/license.go -package mocks
14+
//go:generate mockgen --source internal/pkg/tracer/tracer.go --destination mocks/tracer.go -package mocks

internal/pkg/keys/hcpvault_test.go

-1
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,6 @@ func TestNewHCPVaultKeyManagerEnv(t *testing.T) {
3737
key, err := h.GetCurrentKeyFromCache()
3838
assert.Nil(t, err)
3939
assert.NotEmpty(t, key)
40-
t.Logf("Retrieved key: %s", key)
4140

4241
// Happy path for setting a key
4342
newKey := "from-test-" + time.Now().String()

internal/pkg/socket/client_test.go

+3-5
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,9 @@ import (
1616

1717
func provideClient(r *Repo, c WebSocketConnection) *Client {
1818
return &Client{
19-
conn: c,
20-
deviceID: "1234",
21-
deviceRepo: r.DeviceRepo,
22-
// EventTypes: []string{"*"},
19+
conn: c,
20+
deviceID: "1234",
21+
deviceRepo: r.DeviceRepo,
2322
eventDeliveryRepo: r.EventDeliveryRepo,
2423
Device: &datastore.Device{
2524
UID: "1234",
@@ -349,7 +348,6 @@ func TestParseTime(t *testing.T) {
349348
client := provideClient(r, c)
350349

351350
since, err := client.parseTime(tt.args.message)
352-
t.Log(since)
353351

354352
if tt.args.wantErr {
355353
require.Error(t, tt.args.err, err)

0 commit comments

Comments
 (0)