Skip to content

Commit

Permalink
Merge branch '7766-backend' into 13486-save-results-to-db
Browse files Browse the repository at this point in the history
  • Loading branch information
mostlikelee committed Oct 9, 2023
2 parents 8a68954 + 5c868c9 commit 064823a
Show file tree
Hide file tree
Showing 12 changed files with 154 additions and 52 deletions.
9 changes: 4 additions & 5 deletions cmd/fleetctl/apply_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"bytes"
"context"
"crypto/sha256"
"database/sql"
"encoding/json"
"errors"
"fmt"
Expand Down Expand Up @@ -1204,9 +1203,9 @@ spec:
// Apply queries.
var appliedQueries []*fleet.Query
ds.QueryByNameFunc = func(ctx context.Context, teamID *uint, name string, opts ...fleet.OptionalArg) (*fleet.Query, error) {
return nil, sql.ErrNoRows
return nil, &notFoundError{}
}
ds.ApplyQueriesFunc = func(ctx context.Context, authorID uint, queries []*fleet.Query) error {
ds.ApplyQueriesFunc = func(ctx context.Context, authorID uint, queries []*fleet.Query, queriesToDiscardResults map[uint]bool) error {
appliedQueries = queries
return nil
}
Expand Down Expand Up @@ -1305,9 +1304,9 @@ func TestApplyQueries(t *testing.T) {

var appliedQueries []*fleet.Query
ds.QueryByNameFunc = func(ctx context.Context, teamID *uint, name string, opts ...fleet.OptionalArg) (*fleet.Query, error) {
return nil, sql.ErrNoRows
return nil, &notFoundError{}
}
ds.ApplyQueriesFunc = func(ctx context.Context, authorID uint, queries []*fleet.Query) error {
ds.ApplyQueriesFunc = func(ctx context.Context, authorID uint, queries []*fleet.Query, queriesToDiscardResults map[uint]bool) error {
appliedQueries = queries
return nil
}
Expand Down
2 changes: 1 addition & 1 deletion server/datastore/mysql/packs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ func setupPackSpecsTest(t *testing.T, ds fleet.Datastore) []*fleet.PackSpec {
{Name: "bar", Description: "do some bars", Query: "select baz from bar"},
}
// Zach creates some queries
err := ds.ApplyQueries(context.Background(), zwass.ID, queries)
err := ds.ApplyQueries(context.Background(), zwass.ID, queries, nil)
require.Nil(t, err)

labels := []*fleet.LabelSpec{
Expand Down
95 changes: 79 additions & 16 deletions server/datastore/mysql/queries.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
"github.com/jmoiron/sqlx"
)

func (ds *Datastore) ApplyQueries(ctx context.Context, authorID uint, queries []*fleet.Query) (err error) {
func (ds *Datastore) ApplyQueries(ctx context.Context, authorID uint, queries []*fleet.Query, queriesToDiscardResults map[uint]bool) (err error) {
tx, err := ds.writer(ctx).BeginTxx(ctx, nil)
if err != nil {
return ctxerr.Wrap(ctx, err, "begin ApplyQueries transaction")
Expand All @@ -29,7 +29,7 @@ func (ds *Datastore) ApplyQueries(ctx context.Context, authorID uint, queries []
}
}()

sql := `
insertSql := `
INSERT INTO queries (
name,
description,
Expand Down Expand Up @@ -62,12 +62,23 @@ func (ds *Datastore) ApplyQueries(ctx context.Context, authorID uint, queries []
logging_type = VALUES(logging_type),
discard_data = VALUES(discard_data)
`
stmt, err := tx.PrepareContext(ctx, sql)
stmt, err := tx.PrepareContext(ctx, insertSql)
if err != nil {
return ctxerr.Wrap(ctx, err, "prepare ApplyQueries insert")
}
defer stmt.Close()

var resultsStmt *sql.Stmt
if len(queriesToDiscardResults) > 0 {
resultsSql := `DELETE FROM query_results WHERE query_id = ?`
resultsStmt, err = tx.PrepareContext(ctx, resultsSql)
if err != nil {
return ctxerr.Wrap(ctx, err, "prepare ApplyQueries delete query results")
}
defer resultsStmt.Close()

}

for _, q := range queries {
if err := q.Verify(); err != nil {
return ctxerr.Wrap(ctx, err)
Expand All @@ -93,6 +104,16 @@ func (ds *Datastore) ApplyQueries(ctx context.Context, authorID uint, queries []
}
}

for id := range queriesToDiscardResults {
_, err := resultsStmt.ExecContext(
ctx,
id,
)
if err != nil {
return ctxerr.Wrap(ctx, err, "exec ApplyQueries delete query results")
}
}

err = tx.Commit()
return ctxerr.Wrap(ctx, err, "commit ApplyQueries transaction")
}
Expand Down Expand Up @@ -204,8 +225,26 @@ func (ds *Datastore) NewQuery(
}

// SaveQuery saves changes to a Query.
func (ds *Datastore) SaveQuery(ctx context.Context, q *fleet.Query) error {
sql := `
func (ds *Datastore) SaveQuery(ctx context.Context, q *fleet.Query, shouldDiscardResults bool) (err error) {
tx, err := ds.writer(ctx).BeginTxx(ctx, nil)
if err != nil {
return ctxerr.Wrap(ctx, err, "begin SaveQuery transaction")
}

defer func() {
if err != nil {
rbErr := tx.Rollback()
// It seems possible that there might be a case in
// which the error we are dealing with here was thrown
// by the call to tx.Commit(), and the docs suggest
// this call would then result in sql.ErrTxDone.
if rbErr != nil && rbErr != sql.ErrTxDone {
panic(fmt.Sprintf("got err '%s' rolling back after err '%s'", rbErr, err))
}
}
}()

updateSql := `
UPDATE queries
SET name = ?,
description = ?,
Expand All @@ -223,9 +262,26 @@ func (ds *Datastore) SaveQuery(ctx context.Context, q *fleet.Query) error {
discard_data = ?
WHERE id = ?
`
result, err := ds.writer(ctx).ExecContext(

stmt, err := tx.PrepareContext(ctx, updateSql)
if err != nil {
return ctxerr.Wrap(ctx, err, "prepare SaveQuery update")
}
defer stmt.Close()

var resultsStmt *sql.Stmt
if shouldDiscardResults {
resultsSql := `DELETE FROM query_results WHERE query_id = ?`
resultsStmt, err = tx.PrepareContext(ctx, resultsSql)
if err != nil {
return ctxerr.Wrap(ctx, err, "prepare SaveQuery delete query results")
}
defer resultsStmt.Close()

}

_, err = stmt.ExecContext(
ctx,
sql,
q.Name,
q.Description,
q.Query,
Expand All @@ -240,19 +296,24 @@ func (ds *Datastore) SaveQuery(ctx context.Context, q *fleet.Query) error {
q.AutomationsEnabled,
q.Logging,
q.DiscardData,
q.ID)
if err != nil {
return ctxerr.Wrap(ctx, err, "updating query")
}
rows, err := result.RowsAffected()
q.ID,
)
if err != nil {
return ctxerr.Wrap(ctx, err, "rows affected updating query")
return ctxerr.Wrap(ctx, err, "exec SaveQuery update")
}
if rows == 0 {
return ctxerr.Wrap(ctx, notFound("Query").WithID(q.ID))

if resultsStmt != nil {
_, err := resultsStmt.ExecContext(
ctx,
q.ID,
)
if err != nil {
return ctxerr.Wrap(ctx, err, "exec SaveQuery delete query results")
}
}

return nil
err = tx.Commit()
return ctxerr.Wrap(ctx, err, "commit SaveQuery transaction")
}

func (ds *Datastore) DeleteQuery(
Expand Down Expand Up @@ -310,6 +371,7 @@ func (ds *Datastore) Query(ctx context.Context, id uint) (*fleet.Query, error) {
q.discard_data,
q.created_at,
q.updated_at,
q.discard_data,
COALESCE(NULLIF(u.name, ''), u.email, '') AS author_name,
COALESCE(u.email, '') AS author_email,
JSON_EXTRACT(json_value, '$.user_time_p50') as user_time_p50,
Expand Down Expand Up @@ -360,6 +422,7 @@ func (ds *Datastore) ListQueries(ctx context.Context, opt fleet.ListQueryOptions
q.discard_data,
q.created_at,
q.updated_at,
q.discard_data,
COALESCE(u.name, '<deleted>') AS author_name,
COALESCE(u.email, '') AS author_email,
JSON_EXTRACT(json_value, '$.user_time_p50') as user_time_p50,
Expand Down
26 changes: 15 additions & 11 deletions server/datastore/mysql/queries_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,11 +65,12 @@ func testQueriesApply(t *testing.T, ds *Datastore) {
Name: "bar",
Description: "do some bars",
Query: "select baz from bar",
DiscardData: true,
},
}

// Zach creates some queries
err := ds.ApplyQueries(context.Background(), zwass.ID, expectedQueries)
err := ds.ApplyQueries(context.Background(), zwass.ID, expectedQueries, nil)
require.NoError(t, err)

queries, err := ds.ListQueries(context.Background(), fleet.ListQueryOptions{})
Expand All @@ -89,8 +90,7 @@ func testQueriesApply(t *testing.T, ds *Datastore) {
// Victor modifies a query (but also pushes the same version of the
// first query)
expectedQueries[1].Query = "not really a valid query ;)"
expectedQueries[1].DiscardData = true
err = ds.ApplyQueries(context.Background(), groob.ID, expectedQueries)
err = ds.ApplyQueries(context.Background(), groob.ID, expectedQueries, nil)
require.NoError(t, err)

queries, err = ds.ListQueries(context.Background(), fleet.ListQueryOptions{})
Expand All @@ -113,9 +113,10 @@ func testQueriesApply(t *testing.T, ds *Datastore) {
Name: "trouble",
Description: "Look out!",
Query: "select * from time",
DiscardData: true,
},
)
err = ds.ApplyQueries(context.Background(), zwass.ID, []*fleet.Query{expectedQueries[2]})
err = ds.ApplyQueries(context.Background(), zwass.ID, []*fleet.Query{expectedQueries[2]}, nil)
require.NoError(t, err)

queries, err = ds.ListQueries(context.Background(), fleet.ListQueryOptions{})
Expand Down Expand Up @@ -152,7 +153,7 @@ func testQueriesApply(t *testing.T, ds *Datastore) {
Logging: "differential",
},
}
err = ds.ApplyQueries(context.Background(), zwass.ID, invalidQueries)
err = ds.ApplyQueries(context.Background(), zwass.ID, invalidQueries, nil)
require.ErrorIs(t, err, fleet.ErrQueryInvalidPlatform)
}

Expand Down Expand Up @@ -278,8 +279,9 @@ func testQueriesSave(t *testing.T, ds *Datastore) {
query.MinOsqueryVersion = "5.2.1"
query.AutomationsEnabled = true
query.Logging = "differential"
query.DiscardData = true

err = ds.SaveQuery(context.Background(), query)
err = ds.SaveQuery(context.Background(), query, true)
require.NoError(t, err)

actual, err := ds.Query(context.Background(), query.ID)
Expand All @@ -298,10 +300,11 @@ func testQueriesList(t *testing.T, ds *Datastore) {

for i := 0; i < 10; i++ {
_, err := ds.NewQuery(context.Background(), &fleet.Query{
Name: fmt.Sprintf("name%02d", i),
Query: fmt.Sprintf("query%02d", i),
Saved: true,
AuthorID: &user.ID,
Name: fmt.Sprintf("name%02d", i),
Query: fmt.Sprintf("query%02d", i),
Saved: true,
AuthorID: &user.ID,
DiscardData: true,
})
require.Nil(t, err)
}
Expand All @@ -321,6 +324,7 @@ func testQueriesList(t *testing.T, ds *Datastore) {
require.Equal(t, 10, len(results))
require.Equal(t, "Zach", results[0].AuthorName)
require.Equal(t, "zwass@fleet.co", results[0].AuthorEmail)
require.True(t, results[0].DiscardData)

idWithAgg := results[0].ID

Expand Down Expand Up @@ -353,7 +357,7 @@ func testQueriesLoadPacksForQueries(t *testing.T, ds *Datastore) {
{Name: "q1", Query: "select * from time"},
{Name: "q2", Query: "select * from osquery_info"},
}
err := ds.ApplyQueries(context.Background(), zwass.ID, queries)
err := ds.ApplyQueries(context.Background(), zwass.ID, queries, nil)
require.NoError(t, err)

specs := []*fleet.PackSpec{
Expand Down
8 changes: 4 additions & 4 deletions server/datastore/mysql/scheduled_queries_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func testScheduledQueriesListInPackWithStats(t *testing.T, ds *Datastore) {
{Name: "foo", Description: "get the foos", Query: "select * from foo"},
{Name: "bar", Description: "do some bars", Query: "select baz from bar"},
}
err := ds.ApplyQueries(context.Background(), zwass.ID, queries)
err := ds.ApplyQueries(context.Background(), zwass.ID, queries, nil)
require.NoError(t, err)

specs := []*fleet.PackSpec{
Expand Down Expand Up @@ -134,7 +134,7 @@ func testScheduledQueriesListInPack(t *testing.T, ds *Datastore) {
{Name: "foo", Description: "get the foos", Query: "select * from foo"},
{Name: "bar", Description: "do some bars", Query: "select baz from bar"},
}
err := ds.ApplyQueries(context.Background(), zwass.ID, queries)
err := ds.ApplyQueries(context.Background(), zwass.ID, queries, nil)
require.NoError(t, err)

specs := []*fleet.PackSpec{
Expand Down Expand Up @@ -327,7 +327,7 @@ func testScheduledQueriesCascadingDelete(t *testing.T, ds *Datastore) {
{Name: "foo", Description: "get the foos", Query: "select * from foo"},
{Name: "bar", Description: "do some bars", Query: "select baz from bar"},
}
err := ds.ApplyQueries(context.Background(), zwass.ID, queries)
err := ds.ApplyQueries(context.Background(), zwass.ID, queries, nil)
require.Nil(t, err)

specs := []*fleet.PackSpec{
Expand Down Expand Up @@ -379,7 +379,7 @@ func testScheduledQueriesIDsByName(t *testing.T, ds *Datastore) {
{Name: "foo2", Description: "get the foos", Query: "select * from foo2"},
{Name: "bar2", Description: "do some bars", Query: "select * from bar2"},
}
err := ds.ApplyQueries(ctx, user.ID, queries)
err := ds.ApplyQueries(ctx, user.ID, queries, nil)
require.NoError(t, err)

specs := []*fleet.PackSpec{
Expand Down
4 changes: 2 additions & 2 deletions server/fleet/datastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,11 @@ type Datastore interface {

// ApplyQueries applies a list of queries (likely from a yaml file) to the datastore. Existing queries are updated,
// and new queries are created.
ApplyQueries(ctx context.Context, authorID uint, queries []*Query) error
ApplyQueries(ctx context.Context, authorID uint, queries []*Query, queriesToDiscardResults map[uint]bool) error
// NewQuery creates a new query object in thie datastore. The returned query should have the ID updated.
NewQuery(ctx context.Context, query *Query, opts ...OptionalArg) (*Query, error)
// SaveQuery saves changes to an existing query object.
SaveQuery(ctx context.Context, query *Query) error
SaveQuery(ctx context.Context, query *Query, shouldDiscardResults bool) error
// DeleteQuery deletes an existing query object on a team. If teamID is nil, then the query is
// looked up in the 'global' team.
DeleteQuery(ctx context.Context, teamID *uint, name string) error
Expand Down
12 changes: 6 additions & 6 deletions server/mock/datastore_mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,11 @@ type PendingEmailChangeFunc func(ctx context.Context, userID uint, newEmail stri

type ConfirmPendingEmailChangeFunc func(ctx context.Context, userID uint, token string) (string, error)

type ApplyQueriesFunc func(ctx context.Context, authorID uint, queries []*fleet.Query) error
type ApplyQueriesFunc func(ctx context.Context, authorID uint, queries []*fleet.Query, queriesToDiscardResults map[uint]bool) error

type NewQueryFunc func(ctx context.Context, query *fleet.Query, opts ...fleet.OptionalArg) (*fleet.Query, error)

type SaveQueryFunc func(ctx context.Context, query *fleet.Query) error
type SaveQueryFunc func(ctx context.Context, query *fleet.Query, shouldDiscardResults bool) error

type DeleteQueryFunc func(ctx context.Context, teamID *uint, name string) error

Expand Down Expand Up @@ -1835,11 +1835,11 @@ func (s *DataStore) ConfirmPendingEmailChange(ctx context.Context, userID uint,
return s.ConfirmPendingEmailChangeFunc(ctx, userID, token)
}

func (s *DataStore) ApplyQueries(ctx context.Context, authorID uint, queries []*fleet.Query) error {
func (s *DataStore) ApplyQueries(ctx context.Context, authorID uint, queries []*fleet.Query, queriesToDiscardResults map[uint]bool) error {
s.mu.Lock()
s.ApplyQueriesFuncInvoked = true
s.mu.Unlock()
return s.ApplyQueriesFunc(ctx, authorID, queries)
return s.ApplyQueriesFunc(ctx, authorID, queries, queriesToDiscardResults)
}

func (s *DataStore) NewQuery(ctx context.Context, query *fleet.Query, opts ...fleet.OptionalArg) (*fleet.Query, error) {
Expand All @@ -1849,11 +1849,11 @@ func (s *DataStore) NewQuery(ctx context.Context, query *fleet.Query, opts ...fl
return s.NewQueryFunc(ctx, query, opts...)
}

func (s *DataStore) SaveQuery(ctx context.Context, query *fleet.Query) error {
func (s *DataStore) SaveQuery(ctx context.Context, query *fleet.Query, shouldDiscardResults bool) error {
s.mu.Lock()
s.SaveQueryFuncInvoked = true
s.mu.Unlock()
return s.SaveQueryFunc(ctx, query)
return s.SaveQueryFunc(ctx, query, shouldDiscardResults)
}

func (s *DataStore) DeleteQuery(ctx context.Context, teamID *uint, name string) error {
Expand Down
2 changes: 1 addition & 1 deletion server/service/global_schedule_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func TestGlobalScheduleAuth(t *testing.T) {
Query: "SELECT 1;",
}, nil
}
ds.SaveQueryFunc = func(ctx context.Context, query *fleet.Query) error {
ds.SaveQueryFunc = func(ctx context.Context, query *fleet.Query, shouldDiscardResults bool) error {
return nil
}
ds.NewActivityFunc = func(ctx context.Context, user *fleet.User, activity fleet.ActivityDetails) error {
Expand Down
Loading

0 comments on commit 064823a

Please sign in to comment.