Skip to content

Commit

Permalink
feat(session): Delete terminated sessions in batches
Browse files Browse the repository at this point in the history
This changes the periodic job that deletes terminated sessions to delete
the those sessions in batches. Prior to this, a single SQL DELETE
command was used to delete all qualifying terminated sessions. In
systems under heavy load, the statement could timeout leading to a
situation where no qualifying terminated sessions were ever being
deleted.
  • Loading branch information
mgaffney committed Oct 25, 2024
1 parent 9b984b0 commit 9529491
Show file tree
Hide file tree
Showing 8 changed files with 373 additions and 168 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
-- Copyright (c) HashiCorp, Inc.
-- SPDX-License-Identifier: BUSL-1.1

begin;

create table session_delete_terminated_job (
batch_size int not null
constraint batch_size_must_be_greater_than_0
check(batch_size > 0),
create_time wt_timestamp,
update_time wt_timestamp
);
comment on table session_delete_terminated_job is
'session_delete_terminated_job is a single row table that contains settings for the delete terminated sessions job.';

-- this index ensures that there will only ever be one row in the
-- table. see:
-- https://www.postgresql.org/docs/current/indexes-expressional.html
create unique index session_delete_terminated_job_one_row
on session_delete_terminated_job((batch_size is not null));

create trigger immutable_columns before update on session_delete_terminated_job
for each row execute procedure immutable_columns('create_time');

create trigger default_create_time_column before insert on session_delete_terminated_job
for each row execute procedure default_create_time();

create trigger update_time_column before update on session_delete_terminated_job
for each row execute procedure update_time_column();

insert into session_delete_terminated_job(batch_size) values(5000);

commit;

47 changes: 37 additions & 10 deletions internal/session/job_delete_terminated_sessions.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@ package session

import (
"context"
"sync"
"time"

"github.com/hashicorp/boundary/internal/errors"
"github.com/hashicorp/boundary/internal/scheduler"
"github.com/hashicorp/boundary/internal/scheduler/batch"
)

type deleteTerminatedJob struct {
Expand All @@ -18,8 +20,8 @@ type deleteTerminatedJob struct {
// state for it to be deleted.
threshold time.Duration

// the number of sessions deleted in the most recent run
deletedInRun int
mu sync.Mutex
batch *batch.Batch
}

func newDeleteTerminatedJob(ctx context.Context, repo *Repository, threshold time.Duration) (*deleteTerminatedJob, error) {
Expand All @@ -38,24 +40,49 @@ func newDeleteTerminatedJob(ctx context.Context, repo *Repository, threshold tim
// Status reports the job’s current status. The status is periodically persisted by
// the scheduler when a job is running, and will be used to verify a job is making progress.
func (d *deleteTerminatedJob) Status() scheduler.JobStatus {
return scheduler.JobStatus{
Completed: d.deletedInRun,
Total: d.deletedInRun,
d.mu.Lock()
defer d.mu.Unlock()
if d.batch != nil {
return d.batch.Status()
}
return scheduler.JobStatus{}
}

// Run performs the required work depending on the implementation.
// The context is used to notify the job that it should exit early.
func (d *deleteTerminatedJob) Run(ctx context.Context, _ time.Duration) error {
func (d *deleteTerminatedJob) Run(ctx context.Context, statusThreshold time.Duration) error {
const op = "session.(deleteTerminatedJob).Run"
d.deletedInRun = 0
var err error

d.deletedInRun, err = d.repo.deleteSessionsTerminatedBefore(ctx, d.threshold)
params, err := d.repo.getDeleteJobParams(ctx, d.threshold)
switch {
case err != nil:
return errors.Wrap(ctx, err, op)
case params.TotalToDelete == 0:
return nil
}

exec := func() batch.Exec {
return func(ctx context.Context, batchSize int) (int, error) {
return d.repo.deleteTerminatedSessionsBatch(ctx, params.WindowStartTime, batchSize)
}
}

config := &batch.Config{
Size: params.BatchSize,
TotalToComplete: params.TotalToDelete,
StatusThreshold: statusThreshold,
Exec: exec(),
Store: d.repo.setDeleteJobBatchSize,
}

batch, err := batch.New(ctx, config)
if err != nil {
return errors.Wrap(ctx, err, op)
}
return nil
d.mu.Lock()
d.batch = batch
d.mu.Unlock()
return batch.Run(ctx)
}

// NextRunIn returns the duration until the next job run should be scheduled. This
Expand Down
53 changes: 9 additions & 44 deletions internal/session/job_delete_terminated_sessions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,48 +34,13 @@ func TestDeleteTermiantedSessionsJob(t *testing.T) {
threshold time.Duration
expected int
}{
{
0,
0,
time.Nanosecond,
0,
},
{
1,
1,
time.Nanosecond,
1,
},
{
1,
1,
time.Hour,
0,
},
{
10,
10,
time.Nanosecond,
10,
},
{
10,
4,
time.Nanosecond,
4,
},
{
10,
0,
time.Nanosecond,
0,
},
{
10,
10,
time.Hour,
0,
},
{0, 0, time.Nanosecond, 0},
{1, 1, time.Nanosecond, 1},
{1, 1, time.Hour, 0},
{10, 10, time.Nanosecond, 10},
{10, 4, time.Nanosecond, 4},
{10, 0, time.Nanosecond, 0},
{10, 10, time.Hour, 0},
}

for _, tc := range cases {
Expand All @@ -101,9 +66,9 @@ func TestDeleteTermiantedSessionsJob(t *testing.T) {

job, err := newDeleteTerminatedJob(ctx, repo, tc.threshold)
require.NoError(t, err)
err = job.Run(ctx, 0)
err = job.Run(ctx, 1*time.Second)
require.NoError(t, err)
assert.Equal(t, tc.expected, job.deletedInRun)
assert.Equal(t, tc.expected, job.Status().Completed)
})
}
}
47 changes: 36 additions & 11 deletions internal/session/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -341,17 +341,6 @@ update session_connection
)
%s
returning public_id;
`
deleteTerminated = `
delete from session
using session_state
where
session.public_id = session_state.session_id
and
session_state.state = 'terminated'
and
lower(session_state.active_time_range) < wt_sub_seconds_from_now(@threshold_seconds)
;
`
sessionCredentialRewrapQuery = `
select distinct
Expand Down Expand Up @@ -457,6 +446,42 @@ values
`
)

// queries for the delete terminated sessions job
const (
getDeleteJobParams = `
with total (to_delete) as (
select count(session_id)
from session_state
where session_state.state = 'terminated'
and lower(session_state.active_time_range) < wt_sub_seconds_from_now(@threshold_seconds)
),
params (batch_size) as (
select batch_size
from session_delete_terminated_job
)
select total.to_delete as total_to_delete,
params.batch_size as batch_size,
wt_sub_seconds_from_now(@threshold_seconds) as window_start_time
from total, params;
`
setDeleteJobBatchSize = `
update session_delete_terminated_job
set batch_size = @batch_size;
`
deleteTerminatedInBatch = `
with batch (session_id) as (
select session_id
from session_state
where state = 'terminated'
and lower(session_state.active_time_range) < @terminated_before
limit @batch_size
)
delete
from session
where public_id in (select session_id from batch);
`
)

func batchInsertSessionCredentialDynamic(creds []*DynamicCredential) (string, []any, error) {
if len(creds) <= 0 {
return "", nil, fmt.Errorf("empty slice of DynamicCredential, cannot build query")
Expand Down
72 changes: 72 additions & 0 deletions internal/session/repository_jobs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: BUSL-1.1

package session

import (
"context"
"database/sql"
"time"

"github.com/hashicorp/boundary/internal/db/timestamp"
"github.com/hashicorp/boundary/internal/errors"
)

type deleteJobParams struct {
TotalToDelete int
BatchSize int
WindowStartTime *timestamp.Timestamp
}

func (r *Repository) getDeleteJobParams(ctx context.Context, threshold time.Duration) (deleteJobParams, error) {
const op = "session.(Repository).getDeleteJobParams"

args := []any{
sql.Named("threshold_seconds", threshold.Seconds()),
}
rows, err := r.reader.Query(ctx, getDeleteJobParams, args)
if err != nil {
return deleteJobParams{}, errors.Wrap(ctx, err, op, errors.WithMsg("error getting parameters for delete terminated sessions job"))
}
defer rows.Close()

var jobParams deleteJobParams
for rows.Next() {
if err := r.reader.ScanRows(ctx, rows, &jobParams); err != nil {
return deleteJobParams{}, errors.Wrap(ctx, err, op, errors.WithMsg("scan row failed"))
}
}
if err := rows.Err(); err != nil {
return deleteJobParams{}, errors.Wrap(ctx, err, op, errors.WithMsg("next row failed"))
}
return jobParams, nil
}

func (r *Repository) setDeleteJobBatchSize(ctx context.Context, batchSize int) error {
const op = "session.(Repository).setDeleteJobBatchSize"

args := []any{
sql.Named("batch_size", batchSize),
}

_, err := r.writer.Exec(ctx, setDeleteJobBatchSize, args)
if err != nil {
return errors.Wrap(ctx, err, op, errors.WithMsg("error setting delete job batch size"))
}
return nil
}

func (r *Repository) deleteTerminatedSessionsBatch(ctx context.Context, terminatedBefore *timestamp.Timestamp, batchSize int) (int, error) {
const op = "session.(Repository).deleteTerminatedSessionsBatch"

args := []any{
sql.Named("terminated_before", terminatedBefore),
sql.Named("batch_size", batchSize),
}

c, err := r.writer.Exec(ctx, deleteTerminatedInBatch, args)
if err != nil {
return 0, errors.Wrap(ctx, err, op, errors.WithMsg("error deleting terminated sessions"))
}
return c, nil
}
Loading

0 comments on commit 9529491

Please sign in to comment.