Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Backport of fix(session): Fix query for orphaned connections into release/0.17.x #5144

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 10 additions & 31 deletions internal/session/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -329,38 +329,17 @@ where
and closed_reason is null
returning public_id;
`
orphanedConnectionsCte = `
-- Find connections that are not closed so we can reference those IDs
with
unclosed_connections as (
select public_id
from session_connection
where
-- It's not closed
upper(connected_time_range) > now() or
connected_time_range is null
-- It's not in limbo between when it moved into this state and when
-- it started being reported by the worker, which is roughly every
-- 2-3 seconds
and update_time < wt_sub_seconds_from_now(@worker_state_delay_seconds)
),
connections_to_close as (
select public_id
from session_connection
where
-- Related to the worker that just reported to us
worker_id = @worker_id
-- Only unclosed ones
and public_id in (select public_id from unclosed_connections)
-- These are connection IDs that just got reported to us by the given
-- worker, so they should not be considered closed.
%s
)
closeOrphanedConnections = `
update session_connection
set
closed_reason = 'system error'
where
public_id in (select public_id from connections_to_close)
set closed_reason = 'system error'
where worker_id = @worker_id
and update_time < wt_sub_seconds_from_now(@worker_state_delay_seconds)
and (
connected_time_range is null
or
upper(connected_time_range) > now()
)
%s
returning public_id;
`
deleteTerminated = `
Expand Down
3 changes: 2 additions & 1 deletion internal/session/repository_connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -399,12 +399,13 @@ func (r *ConnectionRepository) closeOrphanedConnections(ctx context.Context, wor
notInClause = fmt.Sprintf(notInClause, strings.Join(params, ","))
}

query := fmt.Sprintf(closeOrphanedConnections, notInClause)
_, err := r.writer.DoTx(
ctx,
db.StdRetryCnt,
db.ExpBackoff{},
func(_ db.Reader, w db.Writer) error {
rows, err := w.Query(ctx, fmt.Sprintf(orphanedConnectionsCte, notInClause), args)
rows, err := w.Query(ctx, query, args)
if err != nil {
return errors.Wrap(ctx, err, op)
}
Expand Down
Loading