Skip to content

Commit e8147ba

Browse files
committed
fix: fix a bug where some gzipped bytes couldn't be inserted into the db column.
1 parent 419952f commit e8147ba

7 files changed

+47
-38
lines changed

database/postgres/delivery_attempts.go

+2
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,8 @@ func (d *deliveryAttemptRepo) FindDeliveryAttempts(ctx context.Context, eventDel
9797
return nil, err
9898
}
9999

100+
(&attempt).ResponseDataString = string(attempt.ResponseData)
101+
100102
attempts = append(attempts, attempt)
101103
}
102104

database/postgres/delivery_attempts_test.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ func TestCreateDeliveryAttempt(t *testing.T) {
4343
RequestHeader: map[string]string{"Content-Type": "application/json"},
4444
ResponseHeader: map[string]string{"Content-Type": "application/json"},
4545
HttpResponseCode: "200",
46-
ResponseData: "{\"status\":\"ok\"}",
46+
ResponseData: []byte("{\"status\":\"ok\"}"),
4747
Status: true,
4848
}
4949

@@ -87,7 +87,7 @@ func TestFindDeliveryAttempts(t *testing.T) {
8787
RequestHeader: map[string]string{"Content-Type": "application/json"},
8888
ResponseHeader: map[string]string{"Content-Type": "application/json"},
8989
HttpResponseCode: "200",
90-
ResponseData: "{\"status\":\"ok\"}",
90+
ResponseData: []byte("{\"status\":\"ok\"}"),
9191
Status: true,
9292
},
9393
{
@@ -102,7 +102,7 @@ func TestFindDeliveryAttempts(t *testing.T) {
102102
RequestHeader: map[string]string{"Content-Type": "application/json"},
103103
ResponseHeader: map[string]string{"Content-Type": "application/json"},
104104
HttpResponseCode: "400",
105-
ResponseData: "{\"status\":\"Not Found\"}",
105+
ResponseData: []byte("{\"status\":\"Not Found\"}"),
106106
Error: "",
107107
Status: false,
108108
},

datastore/models.go

+9-7
Original file line numberDiff line numberDiff line change
@@ -900,13 +900,15 @@ type DeliveryAttempt struct {
900900
ProjectId string `json:"project_id" db:"project_id"`
901901
EventDeliveryId string `json:"msg_id" db:"event_delivery_id"`
902902

903-
IPAddress string `json:"ip_address,omitempty" db:"ip_address"`
904-
RequestHeader HttpHeader `json:"request_http_header,omitempty" db:"request_http_header"`
905-
ResponseHeader HttpHeader `json:"response_http_header,omitempty" db:"response_http_header"`
906-
HttpResponseCode string `json:"http_status,omitempty" db:"http_status"`
907-
ResponseData string `json:"response_data,omitempty" db:"response_data"`
908-
Error string `json:"error,omitempty" db:"error"`
909-
Status bool `json:"status,omitempty" db:"status"`
903+
IPAddress string `json:"ip_address,omitempty" db:"ip_address"`
904+
RequestHeader HttpHeader `json:"request_http_header,omitempty" db:"request_http_header"`
905+
ResponseHeader HttpHeader `json:"response_http_header,omitempty" db:"response_http_header"`
906+
HttpResponseCode string `json:"http_status,omitempty" db:"http_status"`
907+
ResponseData []byte `json:"-,omitempty" db:"response_data"`
908+
ResponseDataString string `json:"response_data,omitempty" db:"-"`
909+
910+
Error string `json:"error,omitempty" db:"error"`
911+
Status bool `json:"status,omitempty" db:"status"`
910912

911913
CreatedAt time.Time `json:"created_at,omitempty" db:"created_at" swaggertype:"string"`
912914
UpdatedAt time.Time `json:"updated_at,omitempty" db:"updated_at" swaggertype:"string"`

worker/task/process_event_delivery.go

+14-12
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ func ProcessEventDelivery(endpointRepo datastore.EndpointRepository, eventDelive
9797
eventDelivery.Description = datastore.ErrEndpointNotFound.Error()
9898
err = eventDeliveryRepo.UpdateStatusOfEventDelivery(ctx, project.UID, *eventDelivery, datastore.DiscardedEventStatus)
9999
if err != nil {
100-
log.WithError(err).Error("failed to update event delivery status to discarded")
100+
log.FromContext(ctx).WithError(err).Error("failed to update event delivery status to discarded")
101101
}
102102

103103
return nil
@@ -136,7 +136,7 @@ func ProcessEventDelivery(endpointRepo datastore.EndpointRepository, eventDelive
136136
done := true
137137

138138
if eventDelivery.Status == datastore.SuccessEventStatus {
139-
log.Debugf("endpoint %s already merged with message %s\n", endpoint.Url, eventDelivery.UID)
139+
log.FromContext(ctx).Debugf("endpoint %s already merged with message %s\n", endpoint.Url, eventDelivery.UID)
140140
return nil
141141
}
142142

@@ -146,7 +146,7 @@ func ProcessEventDelivery(endpointRepo datastore.EndpointRepository, eventDelive
146146
return &DeliveryError{Err: err}
147147
}
148148

149-
log.Debugf("endpoint %s is inactive, failing to send.", endpoint.Url)
149+
log.FromContext(ctx).Debugf("endpoint %s is inactive, failing to send.", endpoint.Url)
150150
return nil
151151
}
152152

@@ -160,7 +160,7 @@ func ProcessEventDelivery(endpointRepo datastore.EndpointRepository, eventDelive
160160
if !util.IsStringEmpty(eventDelivery.URLQueryParams) {
161161
targetURL, err = url.ConcatQueryParams(endpoint.Url, eventDelivery.URLQueryParams)
162162
if err != nil {
163-
log.WithError(err).Error("failed to concat url query params")
163+
log.FromContext(ctx).WithError(err).Error("failed to concat url query params")
164164
return &DeliveryError{Err: err}
165165
}
166166
}
@@ -230,14 +230,14 @@ func ProcessEventDelivery(endpointRepo datastore.EndpointRepository, eventDelive
230230

231231
// Request failed but statusCode is 200 <= x <= 299
232232
if err != nil {
233-
log.Errorf("%s failed. Reason: %s", eventDelivery.UID, err)
233+
log.FromContext(ctx).Errorf("%s failed. Reason: %s", eventDelivery.UID, err)
234234
}
235235

236236
if done && endpoint.Status == datastore.PendingEndpointStatus && project.Config.DisableEndpoint && !licenser.CircuitBreaking() {
237237
endpointStatus := datastore.ActiveEndpointStatus
238238
err := endpointRepo.UpdateEndpointStatus(ctx, project.UID, endpoint.UID, endpointStatus)
239239
if err != nil {
240-
log.WithError(err).Error("Failed to reactivate endpoint after successful retry")
240+
log.FromContext(ctx).WithError(err).Error("Failed to reactivate endpoint after successful retry")
241241
}
242242

243243
if licenser.AdvancedEndpointMgmt() {
@@ -264,11 +264,11 @@ func ProcessEventDelivery(endpointRepo datastore.EndpointRepository, eventDelive
264264
if eventDelivery.Metadata.NumTrials >= eventDelivery.Metadata.RetryLimit {
265265
if done {
266266
if eventDelivery.Status != datastore.SuccessEventStatus {
267-
log.Errorln("an anomaly has occurred. retry limit exceeded, fan out is done but event status is not successful")
267+
log.FromContext(ctx).Error("an anomaly has occurred. retry limit exceeded, fan out is done but event status is not successful")
268268
eventDelivery.Status = datastore.FailureEventStatus
269269
}
270270
} else {
271-
log.Errorf("%s retry limit exceeded ", eventDelivery.UID)
271+
log.FromContext(ctx).Errorf("%s retry limit exceeded ", eventDelivery.UID)
272272
eventDelivery.Description = "Retry limit exceeded"
273273
eventDelivery.Status = datastore.FailureEventStatus
274274
}
@@ -278,28 +278,30 @@ func ProcessEventDelivery(endpointRepo datastore.EndpointRepository, eventDelive
278278

279279
err := endpointRepo.UpdateEndpointStatus(ctx, project.UID, endpoint.UID, endpointStatus)
280280
if err != nil {
281-
log.WithError(err).Error("failed to deactivate endpoint after failed retry")
281+
log.FromContext(ctx).WithError(err).Error("failed to deactivate endpoint after failed retry")
282282
}
283283

284284
if licenser.AdvancedEndpointMgmt() {
285285
// send endpoint deactivation notification
286286
err = notifications.SendEndpointNotification(ctx, endpoint, project, endpointStatus, q, true, resp.Error, string(resp.Body), resp.StatusCode)
287287
if err != nil {
288-
log.WithError(err).Error("failed to send notification")
288+
log.FromContext(ctx).WithError(err).Error("failed to send notification")
289289
}
290290
}
291291
}
292292
}
293293

294294
err = attemptsRepo.CreateDeliveryAttempt(ctx, &attempt)
295295
if err != nil {
296-
log.WithError(err).Errorf("failed to create delivery attempt for event delivery with id: %s", eventDelivery.UID)
296+
log.FromContext(ctx).
297+
WithError(err).
298+
Errorf("failed to create delivery attempt for event delivery with id: %s and delivery attempt: %s", eventDelivery.UID, attempt.ResponseData)
297299
return &DeliveryError{Err: fmt.Errorf("%s, err: %s", ErrDeliveryAttemptFailed, err.Error())}
298300
}
299301

300302
err = eventDeliveryRepo.UpdateEventDeliveryMetadata(ctx, project.UID, eventDelivery)
301303
if err != nil {
302-
log.WithError(err).Error("failed to update message ", eventDelivery.UID)
304+
log.FromContext(ctx).WithError(err).Error("failed to update message ", eventDelivery.UID)
303305
return &DeliveryError{Err: fmt.Errorf("%s, err: %s", ErrDeliveryAttemptFailed, err.Error())}
304306
}
305307

worker/task/process_retry_event_delivery.go

+15-13
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ func ProcessRetryEventDelivery(endpointRepo datastore.EndpointRepository, eventD
7676
eventDelivery.Description = datastore.ErrEndpointNotFound.Error()
7777
innerErr := eventDeliveryRepo.UpdateStatusOfEventDelivery(ctx, project.UID, *eventDelivery, datastore.DiscardedEventStatus)
7878
if innerErr != nil {
79-
log.WithError(innerErr).Error("failed to update event delivery status to discarded")
79+
log.FromContext(ctx).WithError(innerErr).Error("failed to update event delivery status to discarded")
8080
}
8181

8282
return nil
@@ -118,7 +118,7 @@ func ProcessRetryEventDelivery(endpointRepo datastore.EndpointRepository, eventD
118118

119119
breakerErr = endpointRepo.UpdateEndpointStatus(ctx, project.UID, endpoint.UID, endpointStatus)
120120
if breakerErr != nil {
121-
log.WithError(breakerErr).Error("failed to deactivate endpoint after failed retry")
121+
log.FromContext(ctx).WithError(breakerErr).Error("failed to deactivate endpoint after failed retry")
122122
}
123123
}
124124
}
@@ -133,7 +133,7 @@ func ProcessRetryEventDelivery(endpointRepo datastore.EndpointRepository, eventD
133133
done := true
134134

135135
if eventDelivery.Status == datastore.SuccessEventStatus {
136-
log.Debugf("endpoint %s already merged with message %s\n", endpoint.Url, eventDelivery.UID)
136+
log.FromContext(ctx).Debugf("endpoint %s already merged with message %s\n", endpoint.Url, eventDelivery.UID)
137137
return nil
138138
}
139139

@@ -143,7 +143,7 @@ func ProcessRetryEventDelivery(endpointRepo datastore.EndpointRepository, eventD
143143
return &EndpointError{Err: err, delay: defaultEventDelay}
144144
}
145145

146-
log.Debugf("endpoint %s is inactive, failing to send.", endpoint.Url)
146+
log.FromContext(ctx).Debugf("endpoint %s is inactive, failing to send.", endpoint.Url)
147147
return nil
148148
}
149149

@@ -157,7 +157,7 @@ func ProcessRetryEventDelivery(endpointRepo datastore.EndpointRepository, eventD
157157
if !util.IsStringEmpty(eventDelivery.URLQueryParams) {
158158
targetURL, err = url.ConcatQueryParams(endpoint.Url, eventDelivery.URLQueryParams)
159159
if err != nil {
160-
log.WithError(err).Error("failed to concat url query params")
160+
log.FromContext(ctx).WithError(err).Error("failed to concat url query params")
161161
return &EndpointError{Err: err, delay: defaultEventDelay}
162162
}
163163
}
@@ -222,7 +222,7 @@ func ProcessRetryEventDelivery(endpointRepo datastore.EndpointRepository, eventD
222222

223223
// Request failed but statusCode is 200 <= x <= 299
224224
if err != nil {
225-
log.Errorf("%s failed. Reason: %s", eventDelivery.UID, err)
225+
log.FromContext(ctx).Errorf("%s failed. Reason: %s", eventDelivery.UID, err)
226226
}
227227

228228
if done && endpoint.Status == datastore.PendingEndpointStatus && project.Config.DisableEndpoint && !licenser.CircuitBreaking() {
@@ -256,11 +256,11 @@ func ProcessRetryEventDelivery(endpointRepo datastore.EndpointRepository, eventD
256256
if eventDelivery.Metadata.NumTrials >= eventDelivery.Metadata.RetryLimit {
257257
if done {
258258
if eventDelivery.Status != datastore.SuccessEventStatus {
259-
log.Errorln("an anomaly has occurred. retry limit exceeded, fan out is done but event status is not successful")
259+
log.FromContext(ctx).Error("an anomaly has occurred. retry limit exceeded, fan out is done but event status is not successful")
260260
eventDelivery.Status = datastore.FailureEventStatus
261261
}
262262
} else {
263-
log.Errorf("%s retry limit exceeded ", eventDelivery.UID)
263+
log.FromContext(ctx).Errorf("%s retry limit exceeded ", eventDelivery.UID)
264264
eventDelivery.Description = "Retry limit exceeded"
265265
eventDelivery.Status = datastore.FailureEventStatus
266266
}
@@ -270,28 +270,30 @@ func ProcessRetryEventDelivery(endpointRepo datastore.EndpointRepository, eventD
270270

271271
err := endpointRepo.UpdateEndpointStatus(ctx, project.UID, endpoint.UID, endpointStatus)
272272
if err != nil {
273-
log.WithError(err).Error("failed to deactivate endpoint after failed retry")
273+
log.FromContext(ctx).WithError(err).Error("failed to deactivate endpoint after failed retry")
274274
}
275275

276276
if licenser.AdvancedEndpointMgmt() {
277277
// send endpoint deactivation notification
278278
err = notifications.SendEndpointNotification(ctx, endpoint, project, endpointStatus, q, true, resp.Error, string(resp.Body), resp.StatusCode)
279279
if err != nil {
280-
log.WithError(err).Error("failed to send notification")
280+
log.FromContext(ctx).WithError(err).Error("failed to send notification")
281281
}
282282
}
283283
}
284284
}
285285

286286
err = attemptsRepo.CreateDeliveryAttempt(ctx, &attempt)
287287
if err != nil {
288-
log.WithError(err).Errorf("failed to create delivery attempt for event delivery with id: %s", eventDelivery.UID)
288+
log.FromContext(ctx).
289+
WithError(err).
290+
Errorf("failed to create delivery attempt for event delivery with id: %s and delivery attempt: %s", eventDelivery.UID, attempt.ResponseData)
289291
return &DeliveryError{Err: fmt.Errorf("%s, err: %s", ErrDeliveryAttemptFailed, err.Error())}
290292
}
291293

292294
err = eventDeliveryRepo.UpdateEventDeliveryMetadata(ctx, project.UID, eventDelivery)
293295
if err != nil {
294-
log.WithError(err).Error("failed to update message ", eventDelivery.UID)
296+
log.FromContext(ctx).WithError(err).Error("failed to update message ", eventDelivery.UID)
295297
return &EndpointError{Err: fmt.Errorf("%s, err: %s", ErrDeliveryAttemptFailed, err.Error()), delay: defaultEventDelay}
296298
}
297299

@@ -345,7 +347,7 @@ func parseAttemptFromResponse(m *datastore.EventDelivery, e *datastore.Endpoint,
345347
ResponseHeader: *responseHeader,
346348
RequestHeader: *requestHeader,
347349
HttpResponseCode: resp.Status,
348-
ResponseData: string(resp.Body),
350+
ResponseData: resp.Body,
349351
Error: resp.Error,
350352
Status: attemptStatus,
351353

worker/task/retention_policies_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -414,7 +414,7 @@ func seedDeliveryAttempt(db database.Database, delivery *datastore.EventDelivery
414414
RequestHeader: map[string]string{"Content-Type": "application/json"},
415415
ResponseHeader: map[string]string{"Content-Type": "application/json"},
416416
HttpResponseCode: "200",
417-
ResponseData: "200 OK",
417+
ResponseData: []byte("200 OK"),
418418
Status: true,
419419
CreatedAt: filter.CreatedAt,
420420
UpdatedAt: filter.CreatedAt,

worker/task/retry_event_deliveries.go

+3-2
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ package task
22

33
import (
44
"context"
5-
"encoding/json"
5+
"github.com/frain-dev/convoy/pkg/msgpack"
66
"sync"
77
"time"
88

@@ -137,7 +137,8 @@ func processEventDeliveryBatch(ctx context.Context, status datastore.EventDelive
137137
EventDeliveryID: delivery.UID,
138138
ProjectID: delivery.ProjectID,
139139
}
140-
data, err := json.Marshal(payload)
140+
141+
data, err := msgpack.EncodeMsgPack(payload)
141142
if err != nil {
142143
log.WithError(err).Error("failed to marshal process event delivery payload")
143144
}

0 commit comments

Comments
 (0)