Skip to content

Commit

Permalink
Unified Queue: tentative DB schema, start refactoring scripts (#25215)
Browse files Browse the repository at this point in the history
  • Loading branch information
mna authored Jan 8, 2025
1 parent e956220 commit 2c3fc20
Show file tree
Hide file tree
Showing 7 changed files with 417 additions and 180 deletions.
1 change: 1 addition & 0 deletions changes/23913-upcoming-activities-handle-scripts
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
* Added script execution to the new `upcoming_activities` table.
9 changes: 6 additions & 3 deletions ee/server/service/setup_experience.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,9 +224,12 @@ func (svc *Service) SetupExperienceNextStep(ctx context.Context, hostUUID string
return false, ctxerr.Errorf(ctx, "setup experience script missing content id: %d", *script.SetupExperienceScriptID)
}
req := &fleet.HostScriptRequestPayload{
HostID: host.ID,
ScriptName: script.Name,
ScriptContentID: *script.ScriptContentID,
HostID: host.ID,
ScriptName: script.Name,
ScriptContentID: *script.ScriptContentID,
// because the script execution request is associated with setup experience,
// it will be enqueued with a higher priority and will run before other
// items in the queue.
SetupExperienceScriptID: script.SetupExperienceScriptID,
}
res, err := svc.ds.NewHostScriptExecutionRequest(ctx, req)
Expand Down
1 change: 1 addition & 0 deletions server/datastore/mysql/hosts.go
Original file line number Diff line number Diff line change
Expand Up @@ -539,6 +539,7 @@ var hostRefs = []string{
"host_activities",
"host_mdm_actions",
"host_calendar_events",
"upcoming_activities",
}

// NOTE: The following tables are explicity excluded from hostRefs list and accordingly are not
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package tables

import (
"database/sql"
)

func init() {
MigrationClient.AddMigration(Up_20250106162751, Down_20250106162751)
}

func Up_20250106162751(tx *sql.Tx) error {
_, err := tx.Exec(`
CREATE TABLE upcoming_activities (
id INT UNSIGNED NOT NULL AUTO_INCREMENT,
host_id INT UNSIGNED NOT NULL,
-- priority 0 is normal, > 0 is higher priority, < 0 is lower priority.
priority INT NOT NULL DEFAULT 0,
-- user_id is the user that triggered the activity, it may be null if the
-- activity is fleet-initiated or the user was deleted. Additional user
-- information (name, email, etc.) is stored in the JSON payload.
user_id INT UNSIGNED NULL,
-- type of activity to be executed, currently we only support those, but as
-- more activity types get added, we can enrich the ENUM with an ALTER TABLE.
activity_type ENUM('script', 'software_install', 'vpp_app_install') NOT NULL,
-- execution_id is the identifier of the activity that will be used when
-- executed - e.g. scripts and software installs have an execution_id, and
-- it is sometimes important to know it as soon as the activity is enqueued,
-- so we need to generate it immediately.
execution_id VARCHAR(255) NOT NULL,
-- those are all columns and not JSON fields because we need FKs on them to
-- do processing ON DELETE, otherwise we'd have to check for existence of
-- each one when executing the activity (we need the enqueue next activity
-- action to be efficient).
script_id INT UNSIGNED NULL,
script_content_id INT UNSIGNED NULL,
policy_id INT UNSIGNED NULL,
setup_experience_script_id INT UNSIGNED NULL,
payload JSON NOT NULL,
-- Using DATETIME instead of TIMESTAMP to prevent future Y2K38 issues
created_at DATETIME(6) NOT NULL DEFAULT NOW(6),
updated_at DATETIME(6) NOT NULL DEFAULT NOW(6) ON UPDATE NOW(6),
PRIMARY KEY (id),
UNIQUE KEY idx_upcoming_activities_execution_id (execution_id),
INDEX idx_upcoming_activities_host_id_activity_type (host_id, priority, created_at, activity_type),
CONSTRAINT fk_upcoming_activities_script_id
FOREIGN KEY (script_id) REFERENCES scripts (id) ON DELETE SET NULL,
CONSTRAINT fk_upcoming_activities_script_content_id
FOREIGN KEY (script_content_id) REFERENCES script_contents (id) ON DELETE CASCADE,
CONSTRAINT fk_upcoming_activities_policy_id
FOREIGN KEY (policy_id) REFERENCES policies (id) ON DELETE SET NULL,
CONSTRAINT fk_upcoming_activities_setup_experience_script_id
FOREIGN KEY (setup_experience_script_id) REFERENCES setup_experience_scripts (id) ON DELETE SET NULL,
CONSTRAINT fk_upcoming_activities_user_id
FOREIGN KEY (user_id) REFERENCES users (id) ON DELETE SET NULL
) ENGINE = InnoDB DEFAULT CHARSET = utf8mb4 COLLATE = utf8mb4_unicode_ci`,
)
return err
}

func Down_20250106162751(tx *sql.Tx) error {
return nil
}
35 changes: 33 additions & 2 deletions server/datastore/mysql/schema.sql

Large diffs are not rendered by default.

119 changes: 105 additions & 14 deletions server/datastore/mysql/scripts.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import (
"github.com/jmoiron/sqlx"
)

// TODO(mna): does it still make sense to have that sync/async distinction?
// A pending script is a pending script, no?
const whereFilterPendingScript = `
exit_code IS NULL
-- async requests + sync requests created within the given interval
Expand Down Expand Up @@ -60,21 +62,55 @@ func (ds *Datastore) NewInternalScriptExecutionRequest(ctx context.Context, requ

func newHostScriptExecutionRequest(ctx context.Context, tx sqlx.ExtContext, request *fleet.HostScriptRequestPayload, isInternal bool) (*fleet.HostScriptResult, error) {
const (
insStmt = `INSERT INTO host_script_results (host_id, execution_id, script_content_id, output, script_id, policy_id, user_id, sync_request, setup_experience_script_id, is_internal) VALUES (?, ?, ?, '', ?, ?, ?, ?, ?, ?)`
getStmt = `SELECT hsr.id, hsr.host_id, hsr.execution_id, hsr.created_at, hsr.script_id, hsr.policy_id, hsr.user_id, hsr.sync_request, sc.contents as script_contents, hsr.setup_experience_script_id FROM host_script_results hsr JOIN script_contents sc WHERE sc.id = hsr.script_content_id AND hsr.id = ?`
insStmt = `
INSERT INTO upcoming_activities
(
host_id, user_id, activity_type, execution_id, script_id, script_content_id,
policy_id, setup_experience_script_id, priority, payload
)
VALUES
(?, ?, 'script', ?, ?, ?, ?, ?, ?,
JSON_OBJECT(
'sync_request', ?,
'is_internal', ?,
'user', (SELECT JSON_OBJECT('name', name, 'email', email) FROM users WHERE id = ?)
)
)`

getStmt = `
SELECT
ua.id, ua.host_id, ua.execution_id, ua.created_at, ua.script_id, ua.policy_id, ua.user_id,
JSON_EXTRACT(payload, '$.sync_request') AS sync_request,
sc.contents as script_contents, ua.setup_experience_script_id
FROM
upcoming_activities ua
INNER JOIN script_contents sc
ON ua.script_content_id = sc.id
WHERE
ua.id = ?
`
)

var priority int
if request.SetupExperienceScriptID != nil {
// a bit naive/simplistic for now, but we'll support user-provided
// priorities in a future story and we can improve on how we manage those.
priority = 100
}

execID := uuid.New().String()
result, err := tx.ExecContext(ctx, insStmt,
request.HostID,
request.UserID,
execID,
request.ScriptContentID,
request.ScriptID,
request.ScriptContentID,
request.PolicyID,
request.UserID,
request.SyncRequest,
request.SetupExperienceScriptID,
priority,
request.SyncRequest,
isInternal,
request.UserID,
)
if err != nil {
return nil, ctxerr.Wrap(ctx, err, "new host script execution request")
Expand All @@ -86,7 +122,6 @@ func newHostScriptExecutionRequest(ctx context.Context, tx sqlx.ExtContext, requ
if err != nil {
return nil, ctxerr.Wrap(ctx, err, "getting the created host script result to return")
}

return &script, nil
}

Expand Down Expand Up @@ -226,12 +261,14 @@ func (ds *Datastore) SetHostScriptExecutionResult(ctx context.Context, result *f
}

func (ds *Datastore) ListPendingHostScriptExecutions(ctx context.Context, hostID uint, onlyShowInternal bool) ([]*fleet.HostScriptResult, error) {
// pending host script executions are those without results in
// host_script_results UNION those in the upcoming activities queue
internalWhere := ""
if onlyShowInternal {
internalWhere = " AND is_internal = TRUE"
}

listStmt := fmt.Sprintf(`
listHSRStmt := fmt.Sprintf(`
SELECT
id,
host_id,
Expand All @@ -246,9 +283,33 @@ func (ds *Datastore) ListPendingHostScriptExecutions(ctx context.Context, hostID
ORDER BY
created_at ASC`, whereFilterPendingScript, internalWhere)

if onlyShowInternal {
internalWhere = " AND JSON_EXTRACT(payload, '$.is_internal') = 1"
}
listUAStmt := fmt.Sprintf(`
SELECT
id,
host_id,
execution_id,
script_id
FROM
upcoming_activities
WHERE
host_id = ? AND
activity_type = 'script' AND
(
JSON_EXTRACT(payload, '$.sync_request') = 0 OR
created_at >= DATE_SUB(NOW(), INTERVAL ? SECOND)
)
%s
ORDER BY
priority DESC, created_at ASC`, internalWhere)

stmt := fmt.Sprintf(`(%s) UNION (%s)`, listHSRStmt, listUAStmt)

var results []*fleet.HostScriptResult
seconds := int(constants.MaxServerWaitTime.Seconds())
if err := sqlx.SelectContext(ctx, ds.reader(ctx), &results, listStmt, hostID, seconds); err != nil {
if err := sqlx.SelectContext(ctx, ds.reader(ctx), &results, stmt, hostID, seconds, hostID, seconds); err != nil {
return nil, ctxerr.Wrap(ctx, err, "list pending host script executions")
}
return results, nil
Expand All @@ -264,10 +325,19 @@ func (ds *Datastore) IsExecutionPendingForHost(ctx context.Context, hostID uint,
host_id = ? AND
script_id = ? AND
exit_code IS NULL
UNION
SELECT
1
FROM
upcoming_activities
WHERE
host_id = ? AND
activity_type = 'script' AND
script_id = ?
`

var results []*uint
if err := sqlx.SelectContext(ctx, ds.reader(ctx), &results, getStmt, hostID, scriptID); err != nil {
if err := sqlx.SelectContext(ctx, ds.reader(ctx), &results, getStmt, hostID, scriptID, hostID, scriptID); err != nil {
return false, ctxerr.Wrap(ctx, err, "is execution pending for host")
}
return len(results) > 0, nil
Expand Down Expand Up @@ -501,10 +571,15 @@ func (ds *Datastore) deletePendingHostScriptExecutionsForPolicy(ctx context.Cont
globalOrTeamID = *teamID
}

deleteStmt := fmt.Sprintf(`
deletePendingFunc := func(stmt string, args ...any) error {
_, err := ds.writer(ctx).ExecContext(ctx, stmt, args...)
return ctxerr.Wrap(ctx, err, "delete pending host script executions for policy")
}

deleteHSRStmt := fmt.Sprintf(`
DELETE FROM
host_script_results
WHERE
WHERE
policy_id = ? AND
script_id IN (
SELECT id FROM scripts WHERE scripts.global_or_team_id = ?
Expand All @@ -513,9 +588,22 @@ func (ds *Datastore) deletePendingHostScriptExecutionsForPolicy(ctx context.Cont
`, whereFilterPendingScript)

seconds := int(constants.MaxServerWaitTime.Seconds())
_, err := ds.writer(ctx).ExecContext(ctx, deleteStmt, policyID, globalOrTeamID, seconds)
if err != nil {
return ctxerr.Wrap(ctx, err, "delete pending host script executions for policy")
if err := deletePendingFunc(deleteHSRStmt, policyID, globalOrTeamID, seconds); err != nil {
return err
}

deleteUAStmt := `
DELETE FROM
upcoming_activities
WHERE
policy_id = ? AND
activity_type = 'script' AND
script_id IN (
SELECT id FROM scripts WHERE scripts.global_or_team_id = ?
)
`
if err := deletePendingFunc(deleteUAStmt, policyID, globalOrTeamID); err != nil {
return err
}

return nil
Expand Down Expand Up @@ -1312,6 +1400,9 @@ WHERE
AND NOT EXISTS (
SELECT 1 FROM setup_experience_scripts WHERE script_content_id = script_contents.id
)
AND NOT EXISTS (
SELECT 1 FROM upcoming_activities WHERE script_content_id = script_contents.id
)
`
_, err := ds.writer(ctx).ExecContext(ctx, deleteStmt)
if err != nil {
Expand Down
Loading

0 comments on commit 2c3fc20

Please sign in to comment.