Skip to content

Commit

Permalink
Merge branch 'main' into dependabot/go_modules/tools/github.com/openc…
Browse files Browse the repository at this point in the history
…ontainers/runc-1.2.0-rc.3
  • Loading branch information
katallaxie authored Nov 8, 2024
2 parents 528c997 + 92401fd commit 1f92e4b
Show file tree
Hide file tree
Showing 23 changed files with 93 additions and 93 deletions.
2 changes: 1 addition & 1 deletion pkg/adapter/fs/filewatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func (cw *fileWatcher) Start(ctx context.Context) {
// watcher error channel finished
return
}
cw.logger.Errorw("Error watching files", zap.Error(err))
cw.logger.Error("Error watching files", zap.Error(err))

case <-ctx.Done():
cw.logger.Debug("Exiting file watcher process")
Expand Down
4 changes: 2 additions & 2 deletions pkg/adapter/fs/filewatcher_cached.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func (ccw *cachedFileWatcher) callback(path string) WatchCallback {
ccw.m.Lock()
defer ccw.m.Unlock()
if err := ccw.updateContentFromFile(path); err != nil {
ccw.logger.Errorw("Could not read watched file", zap.Error(err))
ccw.logger.Error("Could not read watched file", zap.Error(err))
}
}
}
Expand All @@ -82,7 +82,7 @@ func (ccw *cachedFileWatcher) Add(path string) error {
defer ccw.m.Unlock()
if _, ok := ccw.watchedFiles[path]; !ok {
if err := ccw.updateContentFromFile(path); err != nil {
ccw.logger.Errorw("Could not get content from file", zap.Error(err))
ccw.logger.Error("Could not get content from file", zap.Error(err))
// initialize to be able to distinguish paths not being watched
// and those being watched but not available.
ccw.watchedFiles[path] = nil
Expand Down
10 changes: 5 additions & 5 deletions pkg/flow/adapter/synchronizer/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,18 +110,18 @@ func (a *adapter) serveRequest(ctx context.Context, correlationID string, event

select {
case err := <-sendErr:
a.logger.Errorw("Unable to forward the request", zap.Error(err))
a.logger.Error("Unable to forward the request", zap.Error(err))
return nil, cloudevents.NewHTTPResult(http.StatusBadRequest, "unable to forward the request: %v", err)
case result := <-respChan:
if result == nil {
a.logger.Errorw("No response", zap.Error(fmt.Errorf("response channel with ID %q is closed", correlationID)))
a.logger.Error("No response", zap.Error(fmt.Errorf("response channel with ID %q is closed", correlationID)))
return nil, cloudevents.NewHTTPResult(http.StatusInternalServerError, "failed to communicate the response")
}
a.logger.Debugf("Received response for %q", correlationID)
res := a.withBridgeIdentifier(result)
return &res, cloudevents.ResultACK
case <-time.After(a.responseTimeout):
a.logger.Errorw("Request time out", zap.Error(fmt.Errorf("request %q did not receive backend response in time", correlationID)))
a.logger.Error("Request time out", zap.Error(fmt.Errorf("request %q did not receive backend response in time", correlationID)))
return nil, cloudevents.NewHTTPResult(http.StatusGatewayTimeout, "backend did not respond in time")
}
}
Expand All @@ -132,7 +132,7 @@ func (a *adapter) serveResponse(_ context.Context, correlationID string, event c

responseChan, exists := a.sessions.get(correlationID)
if !exists {
a.logger.Errorw("Session not found", zap.Error(fmt.Errorf("client session with ID %q does not exist", correlationID)))
a.logger.Error("Session not found", zap.Error(fmt.Errorf("client session with ID %q does not exist", correlationID)))
return nil, cloudevents.NewHTTPResult(http.StatusBadGateway, "client session does not exist")
}

Expand All @@ -142,7 +142,7 @@ func (a *adapter) serveResponse(_ context.Context, correlationID string, event c
a.logger.Debugf("Response %q completed", correlationID)
return nil, cloudevents.ResultACK
default:
a.logger.Errorw("Unable to forward the response", zap.Error(fmt.Errorf("client connection with ID %q is closed", correlationID)))
a.logger.Error("Unable to forward the response", zap.Error(fmt.Errorf("client connection with ID %q is closed", correlationID)))
return nil, cloudevents.NewHTTPResult(http.StatusBadGateway, "client connection is closed")
}
}
Expand Down
12 changes: 6 additions & 6 deletions pkg/flow/adapter/transformation/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ func (t *adapter) applyTransformations(event cloudevents.Event) (*cloudevents.Ev
// so we must use "contains" instead of strict equality
if !strings.Contains(event.DataContentType(), cloudevents.ApplicationJSON) {
err := fmt.Errorf("CE Content-Type %q is not supported", event.DataContentType())
t.logger.Errorw("Bad Content-Type", zap.Error(err))
t.logger.Error("Bad Content-Type", zap.Error(err))
return nil, err
}

Expand All @@ -185,7 +185,7 @@ func (t *adapter) applyTransformations(event cloudevents.Event) (*cloudevents.Ev

localContextBytes, err := json.Marshal(localContext)
if err != nil {
t.logger.Errorw("Cannot encode CE context", zap.Error(err))
t.logger.Error("Cannot encode CE context", zap.Error(err))
return nil, fmt.Errorf("cannot encode CE context: %w", err)
}

Expand Down Expand Up @@ -216,13 +216,13 @@ func (t *adapter) applyTransformations(event cloudevents.Event) (*cloudevents.Ev

newContext := ceContext{}
if err := json.Unmarshal(eventContext, &newContext); err != nil {
t.logger.Errorw("Cannot decode CE new context", zap.Error(err))
t.logger.Error("Cannot decode CE new context", zap.Error(err))
return nil, fmt.Errorf("cannot decode CE new context: %w", err)
}
event.Context = newContext
for k, v := range newContext.Extensions {
if err := event.Context.SetExtension(k, v); err != nil {
t.logger.Errorw("Cannot set CE extension", zap.Error(err))
t.logger.Error("Cannot set CE extension", zap.Error(err))
return nil, fmt.Errorf("cannot set CE extension: %w", err)
}
}
Expand All @@ -232,13 +232,13 @@ func (t *adapter) applyTransformations(event cloudevents.Event) (*cloudevents.Ev
errs = append(errs, err)
}
if err = event.SetData(cloudevents.ApplicationJSON, eventPayload); err != nil {
t.logger.Errorw("Cannot set CE data", zap.Error(err))
t.logger.Error("Cannot set CE data", zap.Error(err))
return nil, fmt.Errorf("cannot set CE data: %w", err)
}
// Failed transformation operations should not stop event flow
// therefore, just log the errors
if len(errs) != 0 {
t.logger.Errorw("Event transformation errors", zap.Errors("errors", errs))
t.logger.Error("Event transformation errors", zap.Errors("errors", errs))
}

return &event, nil
Expand Down
8 changes: 4 additions & 4 deletions pkg/routing/adapter/filter/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ func (h *Handler) ServeHTTP(writer http.ResponseWriter, request *http.Request) {

event, err := binding.ToEvent(ctx, message)
if err != nil {
h.logger.Errorw("Failed to extract event from request", zap.Error(err))
h.logger.Error("Failed to extract event from request", zap.Error(err))
writer.WriteHeader(http.StatusBadRequest)
return
}
Expand All @@ -111,7 +111,7 @@ func (h *Handler) ServeHTTP(writer http.ResponseWriter, request *http.Request) {

f, err := h.filterLister.Get(filter)
if err != nil {
h.logger.Errorw("Unable to get the Filter", zap.Error(err))
h.logger.Error("Unable to get the Filter", zap.Error(err))
writer.WriteHeader(http.StatusBadRequest)
return
}
Expand Down Expand Up @@ -148,7 +148,7 @@ func (h *Handler) send(ctx context.Context, writer http.ResponseWriter, headers
// send the event to trigger's subscriber
response, err := h.sendEvent(ctx, headers, target, event)
if err != nil {
h.logger.Errorw("Failed to send event", zap.Error(err))
h.logger.Error("Failed to send event", zap.Error(err))
writer.WriteHeader(http.StatusInternalServerError)
return
}
Expand All @@ -158,7 +158,7 @@ func (h *Handler) send(ctx context.Context, writer http.ResponseWriter, headers
// If there is an event in the response write it to the response
_, err = h.writeResponse(ctx, writer, response, target)
if err != nil {
h.logger.Errorw("Failed to write response", zap.Error(err))
h.logger.Error("Failed to write response", zap.Error(err))
}
}

Expand Down
10 changes: 5 additions & 5 deletions pkg/routing/adapter/splitter/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,16 +95,16 @@ func (h *Handler) ServeHTTP(writer http.ResponseWriter, request *http.Request) {

event, err := binding.ToEvent(ctx, message)
if err != nil {
h.logger.Errorw("Failed to extract event from request", zap.Error(err))
h.logger.Error("Failed to extract event from request", zap.Error(err))
writer.WriteHeader(http.StatusBadRequest)
return
}

h.logger.Debugw("Received message", zap.Any("splitter", splitter))
h.logger.Debug("Received message", zap.Any("splitter", splitter))

s, err := h.splitterLister.Get(splitter)
if err != nil {
h.logger.Errorw("Unable to get the Splitter", zap.Error(err))
h.logger.Error("Unable to get the Splitter", zap.Error(err))
writer.WriteHeader(http.StatusBadRequest)
return
}
Expand All @@ -119,7 +119,7 @@ func (h *Handler) ServeHTTP(writer http.ResponseWriter, request *http.Request) {
// we may want to keep responses and send them back to the source
res, err := h.sendEvent(ctx, request.Header, s.Status.SinkURI.String(), e)
if err != nil {
h.logger.Errorw("Failed to send the event", zap.Error(err))
h.logger.Error("Failed to send the event", zap.Error(err))
}
defer res.Body.Close()
}
Expand All @@ -138,7 +138,7 @@ func (h *Handler) split(path string, e *event.Event) []*event.Event {
for _, v := range val.Array() {
newCE := cloudevents.NewEvent()
if err := newCE.SetData(cloudevents.ApplicationJSON, []byte(v.Raw)); err != nil {
h.logger.Errorw("Failed to set event data", zap.Error(err))
h.logger.Error("Failed to set event data", zap.Error(err))
continue
}
newCE.DataBase64 = false
Expand Down
8 changes: 4 additions & 4 deletions pkg/sources/adapter/cloudeventssource/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,12 @@ func NewAdapter(ctx context.Context, envAcc pkgadapter.EnvConfigAccessor, ceClie

cfw, err := fs.NewCachedFileWatcher(logger)
if err != nil {
logger.Panicw("Could not create a file watcher", zap.Error(err))
logger.Panic("Could not create a file watcher", zap.Error(err))
}

for _, as := range env.BasicAuths {
if err := cfw.Add(as.MountedValueFile); err != nil {
logger.Panicw(
logger.Panic(
fmt.Sprintf("Authentication secret at %q could not be watched", as.MountedValueFile),
zap.Error(err))
}
Expand Down Expand Up @@ -64,14 +64,14 @@ func NewAdapter(ctx context.Context, envAcc pkgadapter.EnvConfigAccessor, ceClie
if env.RequestsPerSecond != 0 {
rl, err := ratelimiter.New(env.RequestsPerSecond)
if err != nil {
logger.Panicw("Could not create rate limiter", zap.Error(err))
logger.Panic("Could not create rate limiter", zap.Error(err))
}
options = append(options, cehttp.WithRateLimiter(rl))
}

ceServer, err := cloudevents.NewClientHTTP(options...)
if err != nil {
logger.Panicw("Error creating CloudEvents client", zap.Error(err))
logger.Panic("Error creating CloudEvents client", zap.Error(err))
}

ceh.ceServer = ceServer
Expand Down
6 changes: 3 additions & 3 deletions pkg/sources/adapter/cloudeventssource/cloudevents.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,13 @@ func (h *cloudEventsHandler) Start(ctx context.Context) error {
func (h *cloudEventsHandler) handle(ctx context.Context, e event.Event) protocol.Result {
err := e.Validate()
if err != nil {
h.logger.Errorw("Incoming CloudEvent is not valid", zap.Error(err))
h.logger.Error("Incoming CloudEvent is not valid", zap.Error(err))
return protocol.ResultNACK
}

result := h.ceClient.Send(ctx, e)
if !cloudevents.IsACK(result) {
h.logger.Errorw("Could not send CloudEvent", zap.Error(result))
h.logger.Error("Could not send CloudEvent", zap.Error(result))
}

return result
Expand All @@ -63,7 +63,7 @@ func (h *cloudEventsHandler) handleAuthentication(next http.Handler) http.Handle
for _, kv := range h.basicAuths {
p, err := h.cfw.GetContent(kv.MountedValueFile)
if err != nil {
h.logger.Errorw(
h.logger.Error(
fmt.Sprintf("Could not retrieve password for user %q", kv.Key),
zap.Error(err))
continue
Expand Down
4 changes: 2 additions & 2 deletions pkg/sources/adapter/common/health/health.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func Start(ctx context.Context) {

handleServerError := func(err error) {
if errors.Is(err, http.ErrServerClosed) {
logging.FromContext(ctx).Errorw("Error during runtime of health server", zap.Error(err))
logging.FromContext(ctx).Error("Error during runtime of health server", zap.Error(err))
}
}

Expand All @@ -80,7 +80,7 @@ func Start(ctx context.Context) {

// nolint:contextcheck
if err := server.Shutdown(ctx); err != nil {
logging.FromContext(ctx).Errorw("Error during shutdown of health server", zap.Error(err))
logging.FromContext(ctx).Error("Error during shutdown of health server", zap.Error(err))
}

handleServerError(<-errCh)
Expand Down
2 changes: 1 addition & 1 deletion pkg/sources/adapter/httppollersource/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func NewAdapter(ctx context.Context, envAcc pkgadapter.EnvConfigAccessor, ceClie

httpRequest, err := http.NewRequestWithContext(ctx, env.Method, env.Endpoint, nil)
if err != nil {
logger.Panicw("Cannot build request", zap.Error(err))
logger.Panic("Cannot build request", zap.Error(err))
}

for k, v := range env.Headers {
Expand Down
10 changes: 5 additions & 5 deletions pkg/sources/adapter/httppollersource/httppoller.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,19 +61,19 @@ func (h *httpPoller) dispatch(ctx context.Context) {

res, err := h.httpClient.Do(h.httpRequest)
if err != nil {
h.logger.Errorw("Failed sending request", zap.Error(err))
h.logger.Error("Failed sending request", zap.Error(err))
return
}

defer res.Body.Close()
resb, err := io.ReadAll(res.Body)
if err != nil {
h.logger.Errorw("Failed reading response body", zap.Error(err))
h.logger.Error("Failed reading response body", zap.Error(err))
return
}

if res.StatusCode >= 300 {
h.logger.Errorw("Received non supported HTTP code from remote endpoint",
h.logger.Error("Received non supported HTTP code from remote endpoint",
zap.Int("code", res.StatusCode),
zap.String("response", string(resb)),
)
Expand All @@ -85,11 +85,11 @@ func (h *httpPoller) dispatch(ctx context.Context) {
event.SetSource(h.eventSource)

if err := event.SetData(cloudevents.ApplicationJSON, resb); err != nil {
h.logger.Errorw("Failed to set event data", zap.Error(err))
h.logger.Error("Failed to set event data", zap.Error(err))
return
}

if result := h.ceClient.Send(ctx, event); !cloudevents.IsACK(result) {
h.logger.Errorw("Could not send Cloud Event", zap.Error(result))
h.logger.Error("Could not send Cloud Event", zap.Error(result))
}
}
6 changes: 3 additions & 3 deletions pkg/sources/adapter/kafkasource/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func NewAdapter(ctx context.Context, envAcc pkgadapter.EnvConfigAccessor, ceClie

if env.ClientCert != "" || env.ClientKey != "" {
if err := addTLSCerts(tlsCfg, env.ClientCert, env.ClientKey); err != nil {
logger.Panicw("Could not parse the TLS Certificates", zap.Error(err))
logger.Panic("Could not parse the TLS Certificates", zap.Error(err))
}
}

Expand Down Expand Up @@ -120,14 +120,14 @@ func NewAdapter(ctx context.Context, envAcc pkgadapter.EnvConfigAccessor, ceClie

err = config.Validate()
if err != nil {
logger.Panicw("Config not valid", zap.Error(err))
logger.Panic("Config not valid", zap.Error(err))
}

kc, err := sarama.NewConsumerGroup(
env.BootstrapServers,
env.GroupID, config)
if err != nil {
logger.Panicw("Error creating Kafka Consumer Group", zap.Error(err))
logger.Panic("Error creating Kafka Consumer Group", zap.Error(err))
}

return &kafkasourceAdapter{
Expand Down
2 changes: 1 addition & 1 deletion pkg/sources/adapter/kafkasource/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func (c consumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSession,
return nil
}
if err := c.adapter.emitEvent(session.Context(), *msg); err != nil {
c.adapter.logger.Errorw("Failed to emit event: %v", zap.Error(err))
c.adapter.logger.Error("Failed to emit event: %v", zap.Error(err))
// do not mark message
continue
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/sources/adapter/salesforcesource/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,18 +121,18 @@ func (e *eventDispatcher) DispatchEvent(ctx context.Context, msg *sfclient.Conne
event.SetID(uuid.New().String())
event.SetSubject(subjectNameFromConnectResponse(msg))
if err := event.SetData(cloudevents.ApplicationJSON, msg.Data); err != nil {
e.logger.Errorw("Failed to set event data", zap.Error(err))
e.logger.Error("Failed to set event data", zap.Error(err))
return
}

if result := e.ceClient.Send(ctx, event); !cloudevents.IsACK(result) {
e.logger.Errorw("Could not send CloudEvent", zap.Error(result))
e.logger.Error("Could not send CloudEvent", zap.Error(result))
return
}
}

func (e *eventDispatcher) DispatchError(err error) {
e.logger.Errorw("Error receiving events", zap.Error(err))
e.logger.Error("Error receiving events", zap.Error(err))
}

func subjectNameFromConnectResponse(msg *sfclient.ConnectResponse) string {
Expand Down
2 changes: 1 addition & 1 deletion pkg/sources/adapter/webhooksource/webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ func (h *webhookHandler) handleAll(ctx context.Context) http.HandlerFunc {
}

func (h *webhookHandler) handleError(err error, code int, w http.ResponseWriter) {
h.logger.Errorw("An error occurred", zap.Error(err))
h.logger.Error("An error occurred", zap.Error(err))
http.Error(w, err.Error(), code)
}

Expand Down
Loading

0 comments on commit 1f92e4b

Please sign in to comment.