Skip to content

Commit

Permalink
Fix edge case of AppConfig changes getting lost in cached mysql. (#15352
Browse files Browse the repository at this point in the history
)
  • Loading branch information
mna authored Nov 29, 2023
1 parent 1609c0f commit 0b5eedb
Show file tree
Hide file tree
Showing 8 changed files with 178 additions and 35 deletions.
1 change: 1 addition & 0 deletions changes/issue-14714-fix-cached-appconfig-race
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
* Fixed an edge-case where the caching of data could lead to some organization settings changes being lost when running with multiple Fleet instances.
12 changes: 6 additions & 6 deletions cmd/fleetctl/apply_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -387,15 +387,15 @@ func TestApplyAppConfig(t *testing.T) {
}

defaultAgentOpts := json.RawMessage(`{"config":{"foo":"bar"}}`)
savedAppConfig := &fleet.AppConfig{
OrgInfo: fleet.OrgInfo{OrgName: "Fleet"},
ServerSettings: fleet.ServerSettings{ServerURL: "https://example.org"},
AgentOptions: &defaultAgentOpts,
}
ds.AppConfigFunc = func(ctx context.Context) (*fleet.AppConfig, error) {
return &fleet.AppConfig{
OrgInfo: fleet.OrgInfo{OrgName: "Fleet"},
ServerSettings: fleet.ServerSettings{ServerURL: "https://example.org"},
AgentOptions: &defaultAgentOpts,
}, nil
return savedAppConfig, nil
}

var savedAppConfig *fleet.AppConfig
ds.SaveAppConfigFunc = func(ctx context.Context, config *fleet.AppConfig) error {
savedAppConfig = config
return nil
Expand Down
20 changes: 19 additions & 1 deletion server/contexts/ctxdb/ctxdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@ import (

type key int

const requirePrimaryKey key = 0
const (
requirePrimaryKey key = 0
bypassCachedMysqlKey key = 1
)

// RequirePrimary returns a new context that indicates to the database layer if
// the primary instance must always be used instead of the replica, even for
Expand All @@ -21,3 +24,18 @@ func IsPrimaryRequired(ctx context.Context) bool {
v, _ := ctx.Value(requirePrimaryKey).(bool)
return v
}

// BypassCachedMysql returns a new context that indicates to the mysql cache
// layer if the cache should be bypassed. This is required when reading data
// with the intention of writing it back with changes, to avoid reading stale
// data from the cache.
func BypassCachedMysql(ctx context.Context, bypass bool) context.Context {
return context.WithValue(ctx, bypassCachedMysqlKey, bypass)
}

// IsCachedMysqlBypassed returns true if the context indicates that the mysql
// cache must be bypassed, false otherwise.
func IsCachedMysqlBypassed(ctx context.Context) bool {
v, _ := ctx.Value(bypassCachedMysqlKey).(bool)
return v
}
19 changes: 19 additions & 0 deletions server/contexts/ctxdb/ctxdb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,22 @@ func TestIsPrimaryRequired(t *testing.T) {
})
}
}

func TestIsCachedMysqlBypassed(t *testing.T) {
cases := []struct {
desc string
ctx context.Context
want bool
}{
{"not set", context.Background(), false},
{"set to true", BypassCachedMysql(context.Background(), true), true},
{"set to false", BypassCachedMysql(context.Background(), false), false},
{"set to true then false", BypassCachedMysql(BypassCachedMysql(context.Background(), true), false), false},
}
for _, c := range cases {
t.Run(c.desc, func(t *testing.T) {
got := IsCachedMysqlBypassed(c.ctx)
require.Equal(t, c.want, got)
})
}
}
6 changes: 4 additions & 2 deletions server/datastore/cached_mysql/bench_cached_mysql_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package cached_mysql

import (
"context"
"testing"
"time"

Expand Down Expand Up @@ -39,14 +40,15 @@ func BenchmarkCacheGetCustomClone(b *testing.B) {
}

func benchmarkCacheGet(b *testing.B, v any) {
ctx := context.Background()
c := &cloneCache{cache.New(time.Minute, time.Minute)}
c.Set("k", v, cache.DefaultExpiration)
c.Set(ctx, "k", v, cache.DefaultExpiration)

b.ResetTimer()

var ok bool
for i := 0; i < b.N; i++ {
Result, ok = c.Get("k")
Result, ok = c.Get(ctx, "k")
if !ok {
b.Fatal("expected ok")
}
Expand Down
65 changes: 41 additions & 24 deletions server/datastore/cached_mysql/cached_mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"reflect"
"time"

"github.com/fleetdm/fleet/v4/server/contexts/ctxdb"
"github.com/fleetdm/fleet/v4/server/fleet"
"github.com/jinzhu/copier"
"github.com/patrickmn/go-cache"
Expand Down Expand Up @@ -84,7 +85,12 @@ type cloneCache struct {
*cache.Cache
}

func (c *cloneCache) Get(k string) (interface{}, bool) {
func (c *cloneCache) Get(ctx context.Context, k string) (interface{}, bool) {
if ctxdb.IsCachedMysqlBypassed(ctx) {
// cache miss if the caller explicitly asked to bypass the cache
return nil, false
}

x, found := c.Cache.Get(k)
if !found {
return nil, false
Expand All @@ -98,10 +104,13 @@ func (c *cloneCache) Get(k string) (interface{}, bool) {
return clone, true
}

func (c *cloneCache) Set(k string, x interface{}, d time.Duration) {
func (c *cloneCache) Set(ctx context.Context, k string, x interface{}, d time.Duration) {
clone, err := clone(x)
if err != nil {
// Unfortunately, we can't return an error here. Skip caching it if clone fails.
// Unfortunately, we can't return an error here. Skip caching it if clone
// fails, but ensure that we clear any existing cached item for this key,
// as the call to Set indicates the cache is now stale.
c.Cache.Delete(k)
return
}

Expand All @@ -113,6 +122,7 @@ type cachedMysql struct {

c *cloneCache

appConfigExp time.Duration
packsExp time.Duration
scheduledQueriesExp time.Duration
teamAgentOptionsExp time.Duration
Expand All @@ -124,6 +134,12 @@ type cachedMysql struct {

type Option func(*cachedMysql)

func WithAppConfigExpiration(d time.Duration) Option {
return func(o *cachedMysql) {
o.appConfigExp = d
}
}

func WithPacksExpiration(d time.Duration) Option {
return func(o *cachedMysql) {
o.packsExp = d
Expand Down Expand Up @@ -158,6 +174,7 @@ func New(ds fleet.Datastore, opts ...Option) fleet.Datastore {
c := &cachedMysql{
Datastore: ds,
c: &cloneCache{cache.New(5*time.Minute, 10*time.Minute)},
appConfigExp: defaultAppConfigExpiration,
packsExp: defaultPacksExpiration,
scheduledQueriesExp: defaultScheduledQueriesExpiration,
teamAgentOptionsExp: defaultTeamAgentOptionsExpiration,
Expand All @@ -178,13 +195,13 @@ func (ds *cachedMysql) NewAppConfig(ctx context.Context, info *fleet.AppConfig)
return nil, err
}

ds.c.Set(appConfigKey, ac, defaultAppConfigExpiration)
ds.c.Set(ctx, appConfigKey, ac, ds.appConfigExp)

return ac, nil
}

func (ds *cachedMysql) AppConfig(ctx context.Context) (*fleet.AppConfig, error) {
if x, found := ds.c.Get(appConfigKey); found {
if x, found := ds.c.Get(ctx, appConfigKey); found {
ac, ok := x.(*fleet.AppConfig)
if ok {
return ac, nil
Expand All @@ -196,7 +213,7 @@ func (ds *cachedMysql) AppConfig(ctx context.Context) (*fleet.AppConfig, error)
return nil, err
}

ds.c.Set(appConfigKey, ac, defaultAppConfigExpiration)
ds.c.Set(ctx, appConfigKey, ac, ds.appConfigExp)

return ac, nil
}
Expand All @@ -207,14 +224,14 @@ func (ds *cachedMysql) SaveAppConfig(ctx context.Context, info *fleet.AppConfig)
return err
}

ds.c.Set(appConfigKey, info, defaultAppConfigExpiration)
ds.c.Set(ctx, appConfigKey, info, ds.appConfigExp)

return nil
}

func (ds *cachedMysql) ListPacksForHost(ctx context.Context, hid uint) ([]*fleet.Pack, error) {
key := fmt.Sprintf(packsHostKey, hid)
if x, found := ds.c.Get(key); found {
if x, found := ds.c.Get(ctx, key); found {
cachedPacks, ok := x.([]*fleet.Pack)
if ok {
return cachedPacks, nil
Expand All @@ -226,14 +243,14 @@ func (ds *cachedMysql) ListPacksForHost(ctx context.Context, hid uint) ([]*fleet
return nil, err
}

ds.c.Set(key, packs, ds.packsExp)
ds.c.Set(ctx, key, packs, ds.packsExp)

return packs, nil
}

func (ds *cachedMysql) ListScheduledQueriesInPack(ctx context.Context, packID uint) (fleet.ScheduledQueryList, error) {
key := fmt.Sprintf(scheduledQueriesKey, packID)
if x, found := ds.c.Get(key); found {
if x, found := ds.c.Get(ctx, key); found {
scheduledQueries, ok := x.(fleet.ScheduledQueryList)
if ok {
return scheduledQueries, nil
Expand All @@ -245,14 +262,14 @@ func (ds *cachedMysql) ListScheduledQueriesInPack(ctx context.Context, packID ui
return nil, err
}

ds.c.Set(key, scheduledQueries, ds.scheduledQueriesExp)
ds.c.Set(ctx, key, scheduledQueries, ds.scheduledQueriesExp)

return scheduledQueries, nil
}

func (ds *cachedMysql) TeamAgentOptions(ctx context.Context, teamID uint) (*json.RawMessage, error) {
key := fmt.Sprintf(teamAgentOptionsKey, teamID)
if x, found := ds.c.Get(key); found {
if x, found := ds.c.Get(ctx, key); found {
if agentOptions, ok := x.(*json.RawMessage); ok {
return agentOptions, nil
}
Expand All @@ -263,14 +280,14 @@ func (ds *cachedMysql) TeamAgentOptions(ctx context.Context, teamID uint) (*json
return nil, err
}

ds.c.Set(key, agentOptions, ds.teamAgentOptionsExp)
ds.c.Set(ctx, key, agentOptions, ds.teamAgentOptionsExp)

return agentOptions, nil
}

func (ds *cachedMysql) TeamFeatures(ctx context.Context, teamID uint) (*fleet.Features, error) {
key := fmt.Sprintf(teamFeaturesKey, teamID)
if x, found := ds.c.Get(key); found {
if x, found := ds.c.Get(ctx, key); found {
if features, ok := x.(*fleet.Features); ok {
return features, nil
}
Expand All @@ -281,14 +298,14 @@ func (ds *cachedMysql) TeamFeatures(ctx context.Context, teamID uint) (*fleet.Fe
return nil, err
}

ds.c.Set(key, features, ds.teamFeaturesExp)
ds.c.Set(ctx, key, features, ds.teamFeaturesExp)

return features, nil
}

func (ds *cachedMysql) TeamMDMConfig(ctx context.Context, teamID uint) (*fleet.TeamMDM, error) {
key := fmt.Sprintf(teamMDMConfigKey, teamID)
if x, found := ds.c.Get(key); found {
if x, found := ds.c.Get(ctx, key); found {
if cfg, ok := x.(*fleet.TeamMDM); ok {
return cfg, nil
}
Expand All @@ -299,7 +316,7 @@ func (ds *cachedMysql) TeamMDMConfig(ctx context.Context, teamID uint) (*fleet.T
return nil, err
}

ds.c.Set(key, cfg, ds.teamMDMConfigExp)
ds.c.Set(ctx, key, cfg, ds.teamMDMConfigExp)

return cfg, nil
}
Expand All @@ -314,9 +331,9 @@ func (ds *cachedMysql) SaveTeam(ctx context.Context, team *fleet.Team) (*fleet.T
featuresKey := fmt.Sprintf(teamFeaturesKey, team.ID)
mdmConfigKey := fmt.Sprintf(teamMDMConfigKey, team.ID)

ds.c.Set(agentOptionsKey, team.Config.AgentOptions, ds.teamAgentOptionsExp)
ds.c.Set(featuresKey, &team.Config.Features, ds.teamFeaturesExp)
ds.c.Set(mdmConfigKey, &team.Config.MDM, ds.teamMDMConfigExp)
ds.c.Set(ctx, agentOptionsKey, team.Config.AgentOptions, ds.teamAgentOptionsExp)
ds.c.Set(ctx, featuresKey, &team.Config.Features, ds.teamFeaturesExp)
ds.c.Set(ctx, mdmConfigKey, &team.Config.MDM, ds.teamMDMConfigExp)

return team, nil
}
Expand Down Expand Up @@ -345,7 +362,7 @@ func (ds *cachedMysql) QueryByName(ctx context.Context, teamID *uint, name strin
}
key := fmt.Sprintf(queryByNameKey, teamID_, name)

if x, found := ds.c.Get(key); found {
if x, found := ds.c.Get(ctx, key); found {
if query, ok := x.(*fleet.Query); ok {
return query, nil
}
Expand All @@ -356,15 +373,15 @@ func (ds *cachedMysql) QueryByName(ctx context.Context, teamID *uint, name strin
return nil, err
}

ds.c.Set(key, query, ds.queryByNameExp)
ds.c.Set(ctx, key, query, ds.queryByNameExp)

return query, nil
}

func (ds *cachedMysql) ResultCountForQuery(ctx context.Context, queryID uint) (int, error) {
key := fmt.Sprintf(queryResultsCountKey, queryID)

if x, found := ds.c.Get(key); found {
if x, found := ds.c.Get(ctx, key); found {
if count, ok := x.(int); ok {
return count, nil
}
Expand All @@ -375,7 +392,7 @@ func (ds *cachedMysql) ResultCountForQuery(ctx context.Context, queryID uint) (i
return 0, err
}

ds.c.Set(key, count, ds.queryResultsCountExp)
ds.c.Set(ctx, key, count, ds.queryResultsCountExp)

return count, nil
}
Loading

0 comments on commit 0b5eedb

Please sign in to comment.