Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unified Queue: tentative DB schema, start refactoring scripts #25215

Merged
merged 13 commits into from
Jan 8, 2025
1 change: 1 addition & 0 deletions changes/23913-upcoming-activities-handle-scripts
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
* Added script execution to the new `upcoming_activities` table.
9 changes: 6 additions & 3 deletions ee/server/service/setup_experience.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,9 +224,12 @@ func (svc *Service) SetupExperienceNextStep(ctx context.Context, hostUUID string
return false, ctxerr.Errorf(ctx, "setup experience script missing content id: %d", *script.SetupExperienceScriptID)
}
req := &fleet.HostScriptRequestPayload{
HostID: host.ID,
ScriptName: script.Name,
ScriptContentID: *script.ScriptContentID,
HostID: host.ID,
ScriptName: script.Name,
ScriptContentID: *script.ScriptContentID,
// because the script execution request is associated with setup experience,
// it will be enqueued with a higher priority and will run before other
// items in the queue.
SetupExperienceScriptID: script.SetupExperienceScriptID,
}
res, err := svc.ds.NewHostScriptExecutionRequest(ctx, req)
Expand Down
1 change: 1 addition & 0 deletions server/datastore/mysql/hosts.go
Original file line number Diff line number Diff line change
Expand Up @@ -539,6 +539,7 @@ var hostRefs = []string{
"host_activities",
"host_mdm_actions",
"host_calendar_events",
"upcoming_activities",
}

// NOTE: The following tables are explicity excluded from hostRefs list and accordingly are not
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package tables

import (
"database/sql"
)

func init() {
MigrationClient.AddMigration(Up_20250106162751, Down_20250106162751)
}

func Up_20250106162751(tx *sql.Tx) error {
_, err := tx.Exec(`
CREATE TABLE upcoming_activities (
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Keep in mind that this will still evolve - it only handles scripts for now, as I try to take small bites at that huge story. Be ready to reset your DB with future changes to this same migration if you try that out.

id INT UNSIGNED NOT NULL AUTO_INCREMENT,
getvictor marked this conversation as resolved.
Show resolved Hide resolved
host_id INT UNSIGNED NOT NULL,
-- priority 0 is normal, > 0 is higher priority, < 0 is lower priority.
priority INT NOT NULL DEFAULT 0,
-- user_id is the user that triggered the activity, it may be null if the
-- activity is fleet-initiated or the user was deleted. Additional user
-- information (name, email, etc.) is stored in the JSON payload.
user_id INT UNSIGNED NULL,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If user was deleted, the id should point to the new deleted_users table (or whatever we call it).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I'll be able to add that feature as part of this already-huge story. I'm trying to keep the scope in check as there are still tons of unknowns, storing the deleted user's info in the JSON for now.

-- type of activity to be executed, currently we only support those, but as
-- more activity types get added, we can enrich the ENUM with an ALTER TABLE.
activity_type ENUM('script', 'software_install', 'vpp_app_install') NOT NULL,
-- execution_id is the identifier of the activity that will be used when
-- executed - e.g. scripts and software installs have an execution_id, and
-- it is sometimes important to know it as soon as the activity is enqueued,
-- so we need to generate it immediately.
execution_id VARCHAR(255) NOT NULL,
-- those are all columns and not JSON fields because we need FKs on them to
-- do processing ON DELETE, otherwise we'd have to check for existence of
-- each one when executing the activity (we need the enqueue next activity
-- action to be efficient).
script_id INT UNSIGNED NULL,
script_content_id INT UNSIGNED NULL,
policy_id INT UNSIGNED NULL,
setup_experience_script_id INT UNSIGNED NULL,
payload JSON NOT NULL,
Comment on lines +39 to +44
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't feel very clean -- a bunch of columns specific to one type of activity. What if we need to add another column -- will we keep growing this table?

What do you think about normalizing and having a separate table for this script-specific data?

We can still have FKs by having the JOIN column like script_activity_id. If we have this column, then maybe we also don't need activity_type/execution_id columns. Or at least the activity_type can be autogenerated by simply checking if script_activity_id is null or not.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I think a join table makes sense for specific activity stuff that is outside of the JSON payload. I think I'll keep the activity type as a) it cleanly describes the supported types and b) we may have some activity types that don't require a join table at all.

As mentioned, I'll address those things by going forward in the next PR, please approve if there's nothing else.

-- Using DATETIME instead of TIMESTAMP to prevent future Y2K38 issues
created_at DATETIME(6) NOT NULL DEFAULT NOW(6),
updated_at DATETIME(6) NOT NULL DEFAULT NOW(6) ON UPDATE NOW(6),
PRIMARY KEY (id),
UNIQUE KEY idx_upcoming_activities_execution_id (execution_id),
INDEX idx_upcoming_activities_host_id_activity_type (host_id, priority, created_at, activity_type),
CONSTRAINT fk_upcoming_activities_script_id
FOREIGN KEY (script_id) REFERENCES scripts (id) ON DELETE SET NULL,
CONSTRAINT fk_upcoming_activities_script_content_id
FOREIGN KEY (script_content_id) REFERENCES script_contents (id) ON DELETE CASCADE,
CONSTRAINT fk_upcoming_activities_policy_id
FOREIGN KEY (policy_id) REFERENCES policies (id) ON DELETE SET NULL,
CONSTRAINT fk_upcoming_activities_setup_experience_script_id
FOREIGN KEY (setup_experience_script_id) REFERENCES setup_experience_scripts (id) ON DELETE SET NULL,
CONSTRAINT fk_upcoming_activities_user_id
FOREIGN KEY (user_id) REFERENCES users (id) ON DELETE SET NULL
) ENGINE = InnoDB DEFAULT CHARSET = utf8mb4 COLLATE = utf8mb4_unicode_ci`,
)
return err
}

func Down_20250106162751(tx *sql.Tx) error {
return nil
}
35 changes: 33 additions & 2 deletions server/datastore/mysql/schema.sql

Large diffs are not rendered by default.

119 changes: 105 additions & 14 deletions server/datastore/mysql/scripts.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import (
"github.com/jmoiron/sqlx"
)

// TODO(mna): does it still make sense to have that sync/async distinction?
// A pending script is a pending script, no?
const whereFilterPendingScript = `
exit_code IS NULL
-- async requests + sync requests created within the given interval
Expand Down Expand Up @@ -60,21 +62,55 @@ func (ds *Datastore) NewInternalScriptExecutionRequest(ctx context.Context, requ

func newHostScriptExecutionRequest(ctx context.Context, tx sqlx.ExtContext, request *fleet.HostScriptRequestPayload, isInternal bool) (*fleet.HostScriptResult, error) {
const (
insStmt = `INSERT INTO host_script_results (host_id, execution_id, script_content_id, output, script_id, policy_id, user_id, sync_request, setup_experience_script_id, is_internal) VALUES (?, ?, ?, '', ?, ?, ?, ?, ?, ?)`
getStmt = `SELECT hsr.id, hsr.host_id, hsr.execution_id, hsr.created_at, hsr.script_id, hsr.policy_id, hsr.user_id, hsr.sync_request, sc.contents as script_contents, hsr.setup_experience_script_id FROM host_script_results hsr JOIN script_contents sc WHERE sc.id = hsr.script_content_id AND hsr.id = ?`
insStmt = `
INSERT INTO upcoming_activities
(
host_id, user_id, activity_type, execution_id, script_id, script_content_id,
policy_id, setup_experience_script_id, priority, payload
)
VALUES
(?, ?, 'script', ?, ?, ?, ?, ?, ?,
JSON_OBJECT(
'sync_request', ?,
'is_internal', ?,
'user', (SELECT JSON_OBJECT('name', name, 'email', email) FROM users WHERE id = ?)
)
)`

getStmt = `
SELECT
ua.id, ua.host_id, ua.execution_id, ua.created_at, ua.script_id, ua.policy_id, ua.user_id,
JSON_EXTRACT(payload, '$.sync_request') AS sync_request,
sc.contents as script_contents, ua.setup_experience_script_id
FROM
upcoming_activities ua
INNER JOIN script_contents sc
ON ua.script_content_id = sc.id
WHERE
ua.id = ?
`
)

var priority int
if request.SetupExperienceScriptID != nil {
// a bit naive/simplistic for now, but we'll support user-provided
// priorities in a future story and we can improve on how we manage those.
priority = 100
}

execID := uuid.New().String()
result, err := tx.ExecContext(ctx, insStmt,
request.HostID,
request.UserID,
execID,
request.ScriptContentID,
request.ScriptID,
request.ScriptContentID,
request.PolicyID,
request.UserID,
request.SyncRequest,
request.SetupExperienceScriptID,
priority,
request.SyncRequest,
isInternal,
request.UserID,
)
if err != nil {
return nil, ctxerr.Wrap(ctx, err, "new host script execution request")
Expand All @@ -86,7 +122,6 @@ func newHostScriptExecutionRequest(ctx context.Context, tx sqlx.ExtContext, requ
if err != nil {
return nil, ctxerr.Wrap(ctx, err, "getting the created host script result to return")
}

return &script, nil
}

Expand Down Expand Up @@ -226,12 +261,14 @@ func (ds *Datastore) SetHostScriptExecutionResult(ctx context.Context, result *f
}

func (ds *Datastore) ListPendingHostScriptExecutions(ctx context.Context, hostID uint, onlyShowInternal bool) ([]*fleet.HostScriptResult, error) {
// pending host script executions are those without results in
// host_script_results UNION those in the upcoming activities queue
internalWhere := ""
if onlyShowInternal {
internalWhere = " AND is_internal = TRUE"
}

listStmt := fmt.Sprintf(`
listHSRStmt := fmt.Sprintf(`
SELECT
id,
host_id,
Expand All @@ -246,9 +283,33 @@ func (ds *Datastore) ListPendingHostScriptExecutions(ctx context.Context, hostID
ORDER BY
created_at ASC`, whereFilterPendingScript, internalWhere)

if onlyShowInternal {
internalWhere = " AND JSON_EXTRACT(payload, '$.is_internal') = 1"
}
listUAStmt := fmt.Sprintf(`
SELECT
id,
host_id,
execution_id,
script_id
FROM
upcoming_activities
WHERE
host_id = ? AND
activity_type = 'script' AND
(
JSON_EXTRACT(payload, '$.sync_request') = 0 OR
created_at >= DATE_SUB(NOW(), INTERVAL ? SECOND)
)
%s
ORDER BY
priority DESC, created_at ASC`, internalWhere)

stmt := fmt.Sprintf(`(%s) UNION (%s)`, listHSRStmt, listUAStmt)

var results []*fleet.HostScriptResult
seconds := int(constants.MaxServerWaitTime.Seconds())
if err := sqlx.SelectContext(ctx, ds.reader(ctx), &results, listStmt, hostID, seconds); err != nil {
if err := sqlx.SelectContext(ctx, ds.reader(ctx), &results, stmt, hostID, seconds, hostID, seconds); err != nil {
return nil, ctxerr.Wrap(ctx, err, "list pending host script executions")
}
return results, nil
Expand All @@ -264,10 +325,19 @@ func (ds *Datastore) IsExecutionPendingForHost(ctx context.Context, hostID uint,
host_id = ? AND
script_id = ? AND
exit_code IS NULL
UNION
SELECT
1
FROM
upcoming_activities
WHERE
host_id = ? AND
activity_type = 'script' AND
script_id = ?
`

var results []*uint
if err := sqlx.SelectContext(ctx, ds.reader(ctx), &results, getStmt, hostID, scriptID); err != nil {
if err := sqlx.SelectContext(ctx, ds.reader(ctx), &results, getStmt, hostID, scriptID, hostID, scriptID); err != nil {
return false, ctxerr.Wrap(ctx, err, "is execution pending for host")
}
return len(results) > 0, nil
Expand Down Expand Up @@ -501,10 +571,15 @@ func (ds *Datastore) deletePendingHostScriptExecutionsForPolicy(ctx context.Cont
globalOrTeamID = *teamID
}

deleteStmt := fmt.Sprintf(`
deletePendingFunc := func(stmt string, args ...any) error {
_, err := ds.writer(ctx).ExecContext(ctx, stmt, args...)
return ctxerr.Wrap(ctx, err, "delete pending host script executions for policy")
}

deleteHSRStmt := fmt.Sprintf(`
DELETE FROM
host_script_results
WHERE
WHERE
policy_id = ? AND
script_id IN (
SELECT id FROM scripts WHERE scripts.global_or_team_id = ?
Expand All @@ -513,9 +588,22 @@ func (ds *Datastore) deletePendingHostScriptExecutionsForPolicy(ctx context.Cont
`, whereFilterPendingScript)

seconds := int(constants.MaxServerWaitTime.Seconds())
_, err := ds.writer(ctx).ExecContext(ctx, deleteStmt, policyID, globalOrTeamID, seconds)
if err != nil {
return ctxerr.Wrap(ctx, err, "delete pending host script executions for policy")
if err := deletePendingFunc(deleteHSRStmt, policyID, globalOrTeamID, seconds); err != nil {
return err
}

deleteUAStmt := `
DELETE FROM
upcoming_activities
WHERE
policy_id = ? AND
activity_type = 'script' AND
script_id IN (
SELECT id FROM scripts WHERE scripts.global_or_team_id = ?
)
`
if err := deletePendingFunc(deleteUAStmt, policyID, globalOrTeamID); err != nil {
return err
}

return nil
Expand Down Expand Up @@ -1312,6 +1400,9 @@ WHERE
AND NOT EXISTS (
SELECT 1 FROM setup_experience_scripts WHERE script_content_id = script_contents.id
)
AND NOT EXISTS (
SELECT 1 FROM upcoming_activities WHERE script_content_id = script_contents.id
)
`
_, err := ds.writer(ctx).ExecContext(ctx, deleteStmt)
if err != nil {
Expand Down
Loading
Loading