Skip to content

Commit 76a4f5a

Browse files
authored
feat(http): add IdempotencyKey in header if present (#70)
1 parent 023d615 commit 76a4f5a

File tree

4 files changed

+21
-10
lines changed

4 files changed

+21
-10
lines changed

pkg/attempt.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ type Attempt struct {
3535
NextRetryAfter time.Time `json:"nextRetryAfter,omitempty" bun:"next_retry_after,nullzero"`
3636
}
3737

38-
func MakeAttempt(ctx context.Context, httpClient *http.Client, retryPolicy BackoffPolicy, id, webhookID string, attemptNb int, cfg Config, payload []byte, isTest bool) (Attempt, error) {
38+
func MakeAttempt(ctx context.Context, httpClient *http.Client, retryPolicy BackoffPolicy, id, webhookID string, attemptNb int, cfg Config, idempotencyKey string, payload []byte, isTest bool) (Attempt, error) {
3939
req, err := http.NewRequestWithContext(ctx, http.MethodPost, cfg.Endpoint, bytes.NewBuffer(payload))
4040
if err != nil {
4141
return Attempt{}, errors.Wrap(err, "http.NewRequestWithContext")
@@ -54,6 +54,9 @@ func MakeAttempt(ctx context.Context, httpClient *http.Client, retryPolicy Backo
5454
req.Header.Set("formance-webhook-timestamp", fmt.Sprintf("%d", timestamp))
5555
req.Header.Set("formance-webhook-signature", signature)
5656
req.Header.Set("formance-webhook-test", fmt.Sprintf("%v", isTest))
57+
if idempotencyKey != "" {
58+
req.Header.Set("formance-webhook-idempotency-key", idempotencyKey)
59+
}
5760

5861
resp, err := httpClient.Do(req)
5962
if err != nil {

pkg/server/test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ func (h *serverHandler) testOneConfigHandle(w http.ResponseWriter, r *http.Reque
2727
logging.FromContext(r.Context()).Debugf("GET %s/%s%s", PathConfigs, id, PathTest)
2828
retryPolicy := backoff.NewNoRetry()
2929
attempt, err := webhooks.MakeAttempt(r.Context(), h.httpClient, retryPolicy, uuid.NewString(),
30-
uuid.NewString(), 0, cfgs[0], []byte(`{"data":"test"}`), true)
30+
uuid.NewString(), 0, cfgs[0], "ik", []byte(`{"data":"test"}`), true)
3131
if err != nil {
3232
logging.FromContext(r.Context()).Errorf("GET %s/%s%s: %s", PathConfigs, id, PathTest, err)
3333
apierrors.ResponseError(w, r, err)

pkg/worker/module.go

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,6 @@ func configureMessageRouter(r *message.Router, subscriber message.Subscriber, to
8484
func processMessages(store storage.Store, httpClient *http.Client, retryPolicy webhooks.BackoffPolicy, pool *pond.WorkerPool) func(msg *message.Message) error {
8585
return func(msg *message.Message) error {
8686
pool.Submit(func() {
87-
8887
var ev *publish.EventMessage
8988
span, ev, err := publish.UnmarshalMessage(msg)
9089
if err != nil {
@@ -131,16 +130,16 @@ func processMessages(store storage.Store, httpClient *http.Client, retryPolicy w
131130
return
132131
}
133132

133+
data, err := json.Marshal(ev)
134+
if err != nil {
135+
logging.FromContext(ctx).Error(err)
136+
return
137+
}
134138
for _, cfg := range cfgs {
135139
logging.FromContext(ctx).Debugf("found one config: %+v", cfg)
136-
data, err := json.Marshal(ev)
137-
if err != nil {
138-
logging.FromContext(ctx).Error(err)
139-
return
140-
}
141140

142141
attempt, err := webhooks.MakeAttempt(ctx, httpClient, retryPolicy, uuid.NewString(),
143-
uuid.NewString(), 0, cfg, data, false)
142+
uuid.NewString(), 0, cfg, ev.IdempotencyKey, data, false)
144143
if err != nil {
145144
logging.FromContext(ctx).Error(err)
146145
return

pkg/worker/worker.go

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,13 @@ package worker
22

33
import (
44
"context"
5+
"encoding/json"
56
"fmt"
67
"net/http"
78
"time"
89

910
"github.com/formancehq/go-libs/v2/logging"
11+
"github.com/formancehq/go-libs/v2/publish"
1012
webhooks "github.com/formancehq/webhooks/pkg"
1113
"github.com/formancehq/webhooks/pkg/storage"
1214
"github.com/google/uuid"
@@ -103,9 +105,16 @@ func (w *Retrier) attemptRetries(ctx context.Context, errChan chan error) {
103105
continue
104106
}
105107

108+
var ev publish.EventMessage
109+
err = json.Unmarshal([]byte(atts[0].Payload), &ev)
110+
if err != nil {
111+
errChan <- errors.Wrap(err, "json.Unmarshal")
112+
continue
113+
}
114+
106115
newAttemptNb := atts[0].RetryAttempt + 1
107116
attempt, err := webhooks.MakeAttempt(ctx, w.httpClient, w.retryPolicy, uuid.NewString(),
108-
webhookID, newAttemptNb, atts[0].Config, []byte(atts[0].Payload), false)
117+
webhookID, newAttemptNb, atts[0].Config, ev.IdempotencyKey, []byte(atts[0].Payload), false)
109118
if err != nil {
110119
errChan <- errors.Wrap(err, "webhooks.MakeAttempt")
111120
continue

0 commit comments

Comments
 (0)