Skip to content

Commit

Permalink
refact(session): Replace state start_time/end_time with time range
Browse files Browse the repository at this point in the history
This replaces the start_time, end_time, and previous_end_time columns on
the session_state table with a time range column that records when the
state was active. This allowed for dropping several constraints on the
session_state table that were used to ensure that states for a session
did not overlap. Instead this can be ensured with a single exclusion
constraint. By reducing the number of constraints, it improves the
performance for a number of queries involved with session state. Most
notably when resolving delete cascades of resources related to sessions,
such as targets, where as part of the delete transaction large numbers
of sessions get canceled. Similarly, when sessions get deleted, and the
corresponding session_state rows are deleted, these constraints all must
be checked during the transaction.

The domain layer continues to expose StartTime and EndTime fields on
session.State. However it drops PreviousEndTime, which was never used by
anything calling the domain layer.

See:
    https://www.postgresql.org/docs/current/ddl-constraints.html#DDL-CONSTRAINTS-EXCLUSION
    https://www.postgresql.org/docs/current/rangetypes.html
  • Loading branch information
tmessi committed Sep 30, 2024
1 parent f6bdbd1 commit 142b80f
Show file tree
Hide file tree
Showing 14 changed files with 283 additions and 239 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -395,6 +395,7 @@ begin;
references session_state (session_id, end_time)
);

-- Replaced in 91/06_session_state_tstzrange.up.sql
create trigger immutable_columns before update on session_state
for each row execute procedure immutable_columns('session_id', 'state', 'start_time', 'previous_end_time');

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -397,6 +397,7 @@ begin;
end;
$$ language plpgsql;

-- Replaced in 91/06_session_state_tstzrange.up.sql
create trigger wh_insert_session_state after insert on session_state
for each row execute function wh_insert_session_state();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ begin
end; $$;

-- Replaces trigger from 0/50_session.up.sql
-- Replaced in 91/06_session_state_tstzrange.up.sql
-- Update insert session state transition trigger
drop trigger insert_session_state on session_state;
drop function insert_session_state();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
begin;

-- Replaces the view created in 69/02_session_worker_protocol.up.sql
-- Replaced in 91/06_session_state_tstzrange.up.sql
drop view session_list;
create view session_list as
select s.public_id,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ begin;
'for the user that corresponds to the provided auth_token_id.';

-- Replaces function from 60/03_wh_sessions.up.sql
-- Replaced in 91/06_session_state_tstzrange.up.sql
create function wh_insert_session() returns trigger
as $$
declare
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
-- Copyright (c) HashiCorp, Inc.
-- SPDX-License-Identifier: BUSL-1.1

begin;
-- Add new active_time_range column that will replace two start_time, end_time columns.
-- Also drop a number of constraints on the start_time, end_time columns. This will allow
-- from dropping these columns after the new column has been set with the correct data.
alter table session_state
add column active_time_range tstzrange not null default tstzrange(now(), null, '[]'),
drop constraint end_times_in_sequence,
drop constraint previous_end_time_and_start_time_in_sequence,
drop constraint start_and_end_times_in_sequence,
drop constraint session_state_session_id_previous_end_time_fkey;

-- Set the new active_time_range column for any existing rows using start_time and end_time.
update session_state
set active_time_range = tstzrange(start_time, end_time, '[)');

-- Replaces view from 72/03/session_list_perf_fix.up.sql
-- Switch view to tuse the new column. This also eliminates the previous_end_time column
-- from the view, since it also will be dropped.
drop view session_list;
create view session_list as
select s.public_id,
s.user_id,
shsh.host_id,
shsh.host_set_id,
s.target_id,
s.auth_token_id,
s.project_id,
s.certificate,
s.expiration_time,
s.termination_reason,
s.create_time,
s.update_time,
s.version,
s.endpoint,
s.connection_limit,
ss.state,
lower(ss.active_time_range) as start_time,
upper(ss.active_time_range) as end_time
from session s
join session_state ss on s.public_id = ss.session_id
left join session_host_set_host shsh on s.public_id = shsh.session_id;

-- Now we can finally drop the old columns and add a constraint on the new column
-- that ensures there are no overlaps on the active_time_range for a given session.
alter table session_state
drop column start_time,
drop column end_time,
drop column previous_end_time,
add constraint session_state_active_time_range_excl
exclude using gist (session_id with =,
active_time_range with &&),
add constraint active_time_range_not_empty
check (not isempty(active_time_range));

-- There are still a number of functions that reference the old columns.
-- These all need to be updated to use the new column instead.

-- Replaces trigger from 0/50_session.up.sql
drop trigger immutable_columns on session_state;
create trigger immutable_columns before update on session_state
for each row execute procedure immutable_columns('session_id', 'state');

-- Replaces function from 28/02_prior_session_trigger.up.sql
drop trigger insert_session_state on session_state;
drop function insert_session_state();
create function insert_session_state() returns trigger
as $$
declare
old_col_state text;
begin
update session_state
set active_time_range = tstzrange(lower(active_time_range), now(), '[)')
where session_id = new.session_id
and upper(active_time_range) is null
returning state
into old_col_state;

if not found then
new.prior_state = 'pending';
else
new.prior_state = old_col_state;
end if;

new.active_time_range = tstzrange(now(), null, '[]');

return new;
end;
$$ language plpgsql;

create trigger insert_session_state before insert on session_state
for each row execute procedure insert_session_state();

-- Replaces function from 84/02_wh_upsert_user_refact.up.sql
drop trigger wh_insert_session on session;
drop function wh_insert_session;
create function wh_insert_session() returns trigger
as $$
declare
new_row wh_session_accumulating_fact%rowtype;
begin
with
pending_timestamp (date_dim_key, time_dim_key, ts) as (
select wh_date_key(lower(active_time_range)), wh_time_key(lower(active_time_range)), lower(active_time_range)
from session_state
where session_id = new.public_id
and state = 'pending'
)
insert into wh_session_accumulating_fact (
session_id,
auth_token_id,
host_key,
user_key,
credential_group_key,
session_pending_date_key,
session_pending_time_key,
session_pending_time
)
select new.public_id,
new.auth_token_id,
'no host source', -- will be updated by wh_upsert_host
wh_upsert_user(new.auth_token_id),
'no credentials', -- will be updated by wh_upsert_credential_group
pending_timestamp.date_dim_key,
pending_timestamp.time_dim_key,
pending_timestamp.ts
from pending_timestamp
returning * into strict new_row;
return null;
end;
$$ language plpgsql;

create trigger wh_insert_session after insert on session
for each row execute procedure wh_insert_session();

-- Replaces function from 15/01_wh_rename_key_columns.up.sql
drop trigger wh_insert_session_state on session_state;
drop function wh_insert_session_state;

create function wh_insert_session_state() returns trigger
as $$
declare
date_col text;
time_col text;
ts_col text;
q text;
session_row wh_session_accumulating_fact%rowtype;
begin
if new.state = 'pending' then
-- The pending state is the first state which is handled by the
-- wh_insert_session trigger. The update statement in this trigger will
-- fail for the pending state because the row for the session has not yet
-- been inserted into the wh_session_accumulating_fact table.
return null;
end if;

date_col = 'session_' || new.state || '_date_key';
time_col = 'session_' || new.state || '_time_key';
ts_col = 'session_' || new.state || '_time';

q = format(' update wh_session_accumulating_fact
set (%I, %I, %I) = (select wh_date_key(%L), wh_time_key(%L), %L::timestamptz)
where session_id = %L
returning *',
date_col, time_col, ts_col,
lower(new.active_time_range), lower(new.active_time_range), lower(new.active_time_range),
new.session_id);
execute q into strict session_row;

return null;
end;
$$ language plpgsql;

create trigger wh_insert_session_state after insert on session_state
for each row execute function wh_insert_session_state();
commit;
62 changes: 32 additions & 30 deletions internal/session/immutable_fields_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package session

import (
"context"
"fmt"
"testing"

"github.com/hashicorp/boundary/internal/db"
Expand Down Expand Up @@ -97,15 +98,35 @@ func TestState_ImmutableFields(t *testing.T) {
wrapper := db.TestWrapper(t)
iamRepo := iam.TestRepo(t, conn, wrapper)

ts := timestamp.Timestamp{Timestamp: &timestamppb.Timestamp{Seconds: 0, Nanos: 0}}

_, _ = iam.TestScopes(t, iam.TestRepo(t, conn, wrapper))
session := TestDefaultSession(t, conn, wrapper, iamRepo)
state := TestState(t, conn, session.PublicId, StatusActive)

var new State
err := rw.LookupWhere(context.Background(), &new, "session_id = ? and state = ?", []any{state.SessionId, state.Status})
require.NoError(t, err)
fetchSession := func(ctx context.Context, rw *db.Db, sessionId string, startTime *timestamp.Timestamp) (*State, error) {
const selectQuery = `
select session_id,
state,
lower(active_time_range) as start_time,
upper(active_time_range) as end_time
from session_state
where session_id = ?
and lower(active_time_range) = ?;`
var states []*State
rows, err := rw.Query(ctx, selectQuery, []any{sessionId, startTime})
if err != nil {
return nil, err
}
defer rows.Close()
for rows.Next() {
if err := rw.ScanRows(ctx, rows, &states); err != nil {
return nil, err
}
}
if len(states) != 1 {
return nil, fmt.Errorf("found %d states, expected 1", len(states))
}
return states[0], nil
}

tests := []struct {
name string
Expand All @@ -115,7 +136,7 @@ func TestState_ImmutableFields(t *testing.T) {
{
name: "session_id",
update: func() *State {
s := new.Clone().(*State)
s := state.Clone().(*State)
s.SessionId = "s_thisIsNotAValidId"
return s
}(),
Expand All @@ -124,47 +145,28 @@ func TestState_ImmutableFields(t *testing.T) {
{
name: "status",
update: func() *State {
s := new.Clone().(*State)
s := state.Clone().(*State)
s.Status = "canceling"
return s
}(),
fieldMask: []string{"Status"},
},
{
name: "start time",
update: func() *State {
s := new.Clone().(*State)
s.StartTime = &ts
return s
}(),
fieldMask: []string{"StartTime"},
},
{
name: "previous_end_time",
update: func() *State {
s := new.Clone().(*State)
s.PreviousEndTime = &ts
return s
}(),
fieldMask: []string{"PreviousEndTime"},
},
}
for _, tt := range tests {
tt := tt
t.Run(tt.name, func(t *testing.T) {
ctx := context.Background()
assert, require := assert.New(t), require.New(t)
orig := new.Clone()
err := rw.LookupWhere(context.Background(), orig, "session_id = ? and start_time = ?", []any{new.SessionId, new.StartTime})
orig, err := fetchSession(ctx, rw, state.SessionId, state.StartTime)
require.NoError(err)

rowsUpdated, err := rw.Update(context.Background(), tt.update, tt.fieldMask, nil, db.WithSkipVetForWrite(true))
require.Error(err)
assert.Equal(0, rowsUpdated)

after := new.Clone()
err = rw.LookupWhere(context.Background(), after, "session_id = ? and start_time = ?", []any{new.SessionId, new.StartTime})
after, err := fetchSession(ctx, rw, state.SessionId, state.StartTime)
require.NoError(err)
assert.Equal(orig.(*State), after)
assert.Equal(orig, after)
})
}
}
Expand Down
18 changes: 14 additions & 4 deletions internal/session/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ active_session as (
where
ss.session_id in (select * from unexpired_session) and
ss.state = 'active' and
ss.end_time is null
upper(ss.active_time_range) is null
)
insert into session_connection (
session_id,
Expand Down Expand Up @@ -150,7 +150,7 @@ from
where
ss.session_id = @public_id and
ss.state = 'canceling' and
ss.end_time is null
upper(ss.active_time_range) is null
)
update session us
set version = version +1,
Expand Down Expand Up @@ -226,7 +226,7 @@ with canceling_session(session_id) as
session_state ss
where
ss.state = 'canceling' and
ss.end_time is null
upper(ss.active_time_range) is null
)
update session us
set termination_reason =
Expand Down Expand Up @@ -371,7 +371,7 @@ where
and
session_state.state = 'terminated'
and
session_state.start_time < wt_sub_seconds_from_now(@threshold_seconds)
lower(session_state.active_time_range) < wt_sub_seconds_from_now(@threshold_seconds)
;
`
sessionCredentialRewrapQuery = `
Expand Down Expand Up @@ -451,6 +451,16 @@ order by update_time desc, public_id desc;
`
estimateCountSessions = `
select reltuples::bigint as estimate from pg_class where oid in ('session'::regclass)
`

selectStates = `
select session_id,
state,
lower(active_time_range) as start_time,
upper(active_time_range) as end_time
from session_state
where session_id = ?
order by active_time_range desc;
`
)

Expand Down
Loading

0 comments on commit 142b80f

Please sign in to comment.