Skip to content

Commit

Permalink
Events: ongoing clean up of eventing (#1448)
Browse files Browse the repository at this point in the history
  • Loading branch information
jimlambrt authored Aug 5, 2021
1 parent 00cb18e commit 477b541
Show file tree
Hide file tree
Showing 9 changed files with 65 additions and 113 deletions.
96 changes: 48 additions & 48 deletions internal/db/read_writer.go

Large diffs are not rendered by default.

17 changes: 9 additions & 8 deletions internal/errors/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func E(ctx context.Context, opt ...Option) error {
eventErr.Op = "unknown operation"
}
}
event.WriteError(ctx, event.Op(err.Op), eventErr)
event.WriteError(ctx, event.Op(eventErr.Op), eventErr)
}

return err
Expand Down Expand Up @@ -182,6 +182,7 @@ func WrapDeprecated(e error, op Op, opt ...Option) error {
// and attempt to add a helpful error msg as well. If that's not possible, it
// will return nil
func Convert(e error) *Err {
ctx := context.TODO()
if e == nil {
return nil
}
Expand All @@ -196,24 +197,24 @@ func Convert(e error) *Err {
if pqError.Code.Class() == "23" { // class of integrity constraint violations
switch pqError.Code {
case "23505": // unique_violation
return E(context.TODO(), WithoutEvent(), WithMsg(pqError.Message), WithWrap(EDeprecated(WithCode(NotUnique), WithMsg("unique constraint violation")))).(*Err)
return E(ctx, WithoutEvent(), WithMsg(pqError.Message), WithWrap(E(ctx, WithoutEvent(), WithCode(NotUnique), WithMsg("unique constraint violation")))).(*Err)
case "23502": // not_null_violation
msg := fmt.Sprintf("%s must not be empty", pqError.Column)
return E(context.TODO(), WithoutEvent(), WithMsg(msg), WithWrap(EDeprecated(WithCode(NotNull), WithMsg("not null constraint violated")))).(*Err)
return E(ctx, WithoutEvent(), WithMsg(msg), WithWrap(E(ctx, WithoutEvent(), WithCode(NotNull), WithMsg("not null constraint violated")))).(*Err)
case "23514": // check_violation
msg := fmt.Sprintf("%s constraint failed", pqError.Constraint)
return E(context.TODO(), WithoutEvent(), WithMsg(msg), WithWrap(EDeprecated(WithCode(CheckConstraint), WithMsg("check constraint violated")))).(*Err)
return E(ctx, WithoutEvent(), WithMsg(msg), WithWrap(E(ctx, WithoutEvent(), WithCode(CheckConstraint), WithMsg("check constraint violated")))).(*Err)
default:
return E(context.TODO(), WithoutEvent(), WithCode(NotSpecificIntegrity), WithMsg(pqError.Message)).(*Err)
return E(ctx, WithoutEvent(), WithCode(NotSpecificIntegrity), WithMsg(pqError.Message)).(*Err)
}
}
switch pqError.Code {
case "42P01":
return E(context.TODO(), WithoutEvent(), WithCode(MissingTable), WithMsg(pqError.Message)).(*Err)
return E(ctx, WithoutEvent(), WithCode(MissingTable), WithMsg(pqError.Message)).(*Err)
case "42703":
return E(context.TODO(), WithoutEvent(), WithCode(ColumnNotFound), WithMsg(pqError.Message)).(*Err)
return E(ctx, WithoutEvent(), WithCode(ColumnNotFound), WithMsg(pqError.Message)).(*Err)
case "P0001":
return E(context.TODO(), WithoutEvent(), WithCode(Exception), WithMsg(pqError.Message)).(*Err)
return E(ctx, WithoutEvent(), WithCode(Exception), WithMsg(pqError.Message)).(*Err)
}
}
// unfortunately, we can't help.
Expand Down
10 changes: 5 additions & 5 deletions internal/scheduler/job/repository_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,32 +38,32 @@ func (r *Repository) CreateJob(ctx context.Context, name, description string, op
int(opts.withNextRunIn.Round(time.Second).Seconds()),
})
if err != nil {
return errors.Wrap(ctx, err, op)
return errors.Wrap(ctx, err, op, errors.WithoutEvent())
}
defer rows.Close()

var rowCnt int
for rows.Next() {
if rowCnt > 0 {
return errors.New(ctx, errors.MultipleRecords, op, "more than 1 job would have been created")
return errors.New(ctx, errors.MultipleRecords, op, "more than 1 job would have been created", errors.WithoutEvent())
}
rowCnt++
err = r.ScanRows(rows, j)
if err != nil {
_ = rows.Close()
return errors.Wrap(ctx, err, op, errors.WithMsg("unable to scan rows for job"))
return errors.Wrap(ctx, err, op, errors.WithMsg("unable to scan rows for job"), errors.WithoutEvent())
}
}
if rowCnt == 0 {
return errors.New(ctx, errors.NotSpecificIntegrity, op, "failed to create new job")
return errors.New(ctx, errors.NotSpecificIntegrity, op, "failed to create new job", errors.WithoutEvent())
}

return nil
},
)
if err != nil {
if errors.IsUniqueError(err) {
return nil, errors.Wrap(ctx, err, op, errors.WithMsg(fmt.Sprintf("name %s already exists", name)))
return nil, errors.Wrap(ctx, err, op, errors.WithMsg(fmt.Sprintf("name %s already exists", name)), errors.WithoutEvent())
}
return nil, errors.Wrap(ctx, err, op)
}
Expand Down
27 changes: 0 additions & 27 deletions internal/servers/controller/handlers/workers/worker_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,6 @@ func (ws *workerServiceServer) Status(ctx context.Context, req *pbs.StatusReques

func (ws *workerServiceServer) LookupSession(ctx context.Context, req *pbs.LookupSessionRequest) (*pbs.LookupSessionResponse, error) {
const op = "workers.(workerServiceServer).LookupSession"
event.WriteSysEvent(ctx, op, "got validate session request from worker", "session_id", req.GetSessionId())

sessRepo, err := ws.sessionRepoFn()
if err != nil {
Expand Down Expand Up @@ -316,7 +315,6 @@ func (ws *workerServiceServer) LookupSession(ctx context.Context, req *pbs.Looku

func (ws *workerServiceServer) CancelSession(ctx context.Context, req *pbs.CancelSessionRequest) (*pbs.CancelSessionResponse, error) {
const op = "workers.(workerServiceServer).CancelSession"
event.WriteSysEvent(ctx, op, "got cancel session request from worker", "session_id", req.GetSessionId())

sessRepo, err := ws.sessionRepoFn()
if err != nil {
Expand All @@ -340,7 +338,6 @@ func (ws *workerServiceServer) CancelSession(ctx context.Context, req *pbs.Cance

func (ws *workerServiceServer) ActivateSession(ctx context.Context, req *pbs.ActivateSessionRequest) (*pbs.ActivateSessionResponse, error) {
const op = "workers.(workerServiceServer).ActivateSession"
event.WriteSysEvent(ctx, op, "got activate session request from worker", "session_id", req.GetSessionId())

sessRepo, err := ws.sessionRepoFn()
if err != nil {
Expand All @@ -364,22 +361,13 @@ func (ws *workerServiceServer) ActivateSession(ctx context.Context, req *pbs.Act
return nil, status.Error(codes.Internal, "Invalid session state in activate response.")
}

event.WriteSysEvent(ctx, op, "session activated",
"session_id", sessionInfo.PublicId,
"target_id", sessionInfo.TargetId,
"user_id", sessionInfo.UserId,
"host_set_id", sessionInfo.HostSetId,
"host_id", sessionInfo.HostId,
)

return &pbs.ActivateSessionResponse{
Status: sessionStates[0].Status.ProtoVal(),
}, nil
}

func (ws *workerServiceServer) AuthorizeConnection(ctx context.Context, req *pbs.AuthorizeConnectionRequest) (*pbs.AuthorizeConnectionResponse, error) {
const op = "workers.(workerServiceServer"
event.WriteSysEvent(ctx, op, "got authorize connection request from worker", "session_id", req.GetSessionId())

sessRepo, err := ws.sessionRepoFn()
if err != nil {
Expand All @@ -406,18 +394,11 @@ func (ws *workerServiceServer) AuthorizeConnection(ctx context.Context, req *pbs
ret.ConnectionsLeft -= int32(authzSummary.CurrentConnectionCount)
}

event.WriteSysEvent(ctx, op, "authorized connection",
"session_id", req.GetSessionId(),
"connection_id", ret.ConnectionId,
"connections_left", ret.ConnectionsLeft,
)

return ret, nil
}

func (ws *workerServiceServer) ConnectConnection(ctx context.Context, req *pbs.ConnectConnectionRequest) (*pbs.ConnectConnectionResponse, error) {
const op = "workers.(workerServiceServer).ConnectConnection"
event.WriteSysEvent(ctx, op, "got connection established information from worker", "connection_id", req.GetConnectionId())

sessRepo, err := ws.sessionRepoFn()
if err != nil {
Expand Down Expand Up @@ -456,8 +437,6 @@ func (ws *workerServiceServer) ConnectConnection(ctx context.Context, req *pbs.C
)
}

event.WriteSysEvent(ctx, op, "connection established", "logger_pairs", loggerPairs)

return ret, nil
}

Expand All @@ -480,8 +459,6 @@ func (ws *workerServiceServer) CloseConnection(ctx context.Context, req *pbs.Clo
ClosedReason: session.ClosedReason(v.GetReason()),
})
}
event.WriteSysEvent(ctx, op, "got connection close information from worker", "connection_ids", closeIds)

sessRepo, err := ws.sessionRepoFn()
if err != nil {
return nil, status.Errorf(codes.Internal, "error getting session repo: %v", err)
Expand Down Expand Up @@ -509,10 +486,6 @@ func (ws *workerServiceServer) CloseConnection(ctx context.Context, req *pbs.Clo
})
}

for _, v := range req.GetCloseRequestData() {
event.WriteSysEvent(ctx, op, "connection closed", "connection_id", v.ConnectionId)
}

ret := &pbs.CloseConnectionResponse{
CloseResponseData: closeData,
}
Expand Down
1 change: 0 additions & 1 deletion internal/servers/controller/tickers.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,6 @@ func (c *Controller) startTerminateCompletedSessionsTicking(cancelCtx context.Co
if err != nil {
event.WriteError(cancelCtx, op, err, event.WithInfoMsg("error performing termination of completed sessions"))
} else if terminationCount > 0 {
event.WriteSysEvent(cancelCtx, op, "terminating completed sessions successful", "sessions_terminated", terminationCount)
}
}
timer.Reset(getRandomInterval())
Expand Down
12 changes: 0 additions & 12 deletions internal/servers/worker/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,6 @@ func (w *Worker) handleProxy() http.HandlerFunc {
Port: numPort,
}

event.WriteSysEvent(ctx, op, "received TLS connection")

siRaw, valid := w.sessionInfoMap.Load(sessionId)
if !valid {
event.WriteError(ctx, op, errors.New("session not found in info map"), event.WithInfo("session_id", sessionId))
Expand All @@ -79,8 +77,6 @@ func (w *Worker) handleProxy() http.HandlerFunc {
sessStatus := si.status
si.RUnlock()

event.WriteSysEvent(ctx, op, "found session in session info map")

opts := &websocket.AcceptOptions{
Subprotocols: []string{globals.TcpProxyV1},
}
Expand All @@ -93,8 +89,6 @@ func (w *Worker) handleProxy() http.HandlerFunc {
// Later calls will cause this to noop if they return a different status
defer conn.Close(websocket.StatusNormalClosure, "done")

event.WriteSysEvent(ctx, op, "websocket upgrade done")

connCtx, connCancel := context.WithDeadline(r.Context(), expiration.AsTime())
defer connCancel()

Expand All @@ -110,8 +104,6 @@ func (w *Worker) handleProxy() http.HandlerFunc {
return
}

event.WriteSysEvent(ctx, op, "proxy handshake finished")

if tofuToken != "" {
if tofuToken != handshake.GetTofuToken() {
event.WriteError(ctx, op, errors.New("WARNING: mismatched tofu token"), event.WithInfo("session_id", sessionId))
Expand All @@ -125,7 +117,6 @@ func (w *Worker) handleProxy() http.HandlerFunc {
return
}
if handshake.Command == proxy.HANDSHAKECOMMAND_HANDSHAKECOMMAND_UNSPECIFIED {
event.WriteSysEvent(ctx, op, "activating session")
sessStatus, err = w.activateSession(r.Context(), sessionId, handshake.GetTofuToken(), version)
if err != nil {
event.WriteError(ctx, op, err, event.WithInfoMsg("unable to validate session"))
Expand All @@ -136,7 +127,6 @@ func (w *Worker) handleProxy() http.HandlerFunc {
}

if handshake.Command == proxy.HANDSHAKECOMMAND_HANDSHAKECOMMAND_SESSION_CANCEL {
event.WriteSysEvent(ctx, op, "canceling session at client request")
_, err := w.cancelSession(r.Context(), sessionId)
if err != nil {
event.WriteError(ctx, op, err, event.WithInfoMsg("unable to cancel session"))
Expand Down Expand Up @@ -168,8 +158,6 @@ func (w *Worker) handleProxy() http.HandlerFunc {
connectionLimit := si.lookupSessionResponse.GetConnectionLimit()
si.Unlock()

event.WriteSysEvent(ctx, op, "authorized connection", "connection_id", ci.id)

handshakeResult := &proxy.HandshakeResult{
Expiration: expiration,
ConnectionLimit: connectionLimit,
Expand Down
8 changes: 1 addition & 7 deletions internal/servers/worker/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ func (w *Worker) getSessionTls(hello *tls.ClientHelloInfo) (*tls.Config, error)
var sessionId string
switch {
case strings.HasPrefix(hello.ServerName, "s_"):
event.WriteSysEvent(ctx, op, "got valid session in SNI", "session_id", hello.ServerName)
sessionId = hello.ServerName
default:
event.WriteSysEvent(ctx, op, "invalid session in SNI", "session_id", hello.ServerName)
Expand All @@ -68,7 +67,6 @@ func (w *Worker) getSessionTls(hello *tls.ClientHelloInfo) (*tls.Config, error)
timeoutContext, cancel := context.WithTimeout(w.baseContext, validateSessionTimeout)
defer cancel()

event.WriteSysEvent(ctx, op, "looking up session", "session_id", sessionId)
resp, err := conn.LookupSession(timeoutContext, &pbs.LookupSessionRequest{
ServerId: w.conf.RawConfig.Worker.Name,
SessionId: sessionId,
Expand Down Expand Up @@ -127,7 +125,6 @@ func (w *Worker) getSessionTls(hello *tls.ClientHelloInfo) (*tls.Config, error)
actualSi.Unlock()
}

event.WriteSysEvent(ctx, op, "returning TLS configuration", "session_id", sessionId)
return tlsConf, nil
}

Expand Down Expand Up @@ -270,8 +267,6 @@ func (w *Worker) closeConnections(ctx context.Context, closeInfo map[string]stri
return
}

event.WriteSysEvent(ctx, op, "marking connections as closed", "session_and_connection_ids", fmt.Sprintf("%#v", closeInfo))

// How we handle close info depends on whether or not we succeeded with
// marking them closed on the controller.
var sessionCloseInfo map[string][]*pbs.CloseConnectionResponseData
Expand Down Expand Up @@ -304,14 +299,13 @@ func (w *Worker) closeConnections(ctx context.Context, closeInfo map[string]stri
}

// Mark connections as closed
closedIds, errs := w.setCloseTimeForResponse(sessionCloseInfo)
_, errs := w.setCloseTimeForResponse(sessionCloseInfo)
if len(errs) > 0 {
for _, err := range errs {
event.WriteError(ctx, op, err, event.WithInfoMsg("error marking connection closed in state"))
}
}

event.WriteSysEvent(ctx, op, "connections successfully marked closed", "connection_ids", closedIds)
}

// makeCloseConnectionRequest creates a CloseConnectionRequest for
Expand Down
1 change: 0 additions & 1 deletion internal/servers/worker/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,6 @@ func (w *Worker) sendWorkerStatus(cancelCtx context.Context) {
w.lastStatusSuccess.Store(&LastStatusInformation{StatusResponse: result, StatusTime: time.Now()})

for _, request := range result.GetJobsRequests() {
event.WriteSysEvent(statusCtx, op, "got job request from controller", "request", request)
switch request.GetRequestType() {
case pbs.CHANGETYPE_CHANGETYPE_UPDATE_STATE:
switch request.GetJob().GetType() {
Expand Down
6 changes: 2 additions & 4 deletions internal/servers/worker/tcp_proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,17 +67,15 @@ func (w *Worker) handleTcpProxyV1(connCtx context.Context, clientAddr *net.TCPAd
connWg.Add(2)
go func() {
defer connWg.Done()
_, err := io.Copy(netConn, tcpRemoteConn)
_, _ = io.Copy(netConn, tcpRemoteConn)
netConn.Close()
tcpRemoteConn.Close()
event.WriteSysEvent(ctx, op, "copy from client to endpoint done", "error", err)
}()
go func() {
defer connWg.Done()
_, err := io.Copy(tcpRemoteConn, netConn)
_, _ = io.Copy(tcpRemoteConn, netConn)
tcpRemoteConn.Close()
netConn.Close()
event.WriteSysEvent(ctx, op, "copy from endpoint to client done", "error", err)
}()
connWg.Wait()
}

0 comments on commit 477b541

Please sign in to comment.