From ba2a641dc448cbee75f592b11899c6b80ed278bb Mon Sep 17 00:00:00 2001 From: Dan Kortschak <90160302+efd6@users.noreply.github.com> Date: Sat, 7 Oct 2023 11:13:24 +1030 Subject: [PATCH] x-pack/filebeat/input/entityanalytics/provider/azuread: avoid work on unwanted datasets (#36753) During full sync the provider may have state from a previous dataset. So in the case that the user has changed dataset from users to devices or vice versa the provider may publish already existing state in the entity graph. This change adds conditional checks to ensure that unwanted dataset records are not published. --- CHANGELOG.next.asciidoc | 1 + .../entityanalytics/provider/azuread/azure.go | 97 +++++++++++-------- .../entityanalytics/provider/azuread/conf.go | 18 ++++ 3 files changed, 73 insertions(+), 43 deletions(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 6932839c03f2..8ebc79340811 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -234,6 +234,7 @@ is collected by it. - Re-use buffers to optimise memory allocation in fingerprint mode of filestream {pull}36736[36736] - Allow http_endpoint input to receive PUT and PATCH requests. {pull}36734[36734] - Add cache processor. {pull}36786[36786] +- Avoid unwanted publication of Azure entity records. {pull}36753[36753] *Auditbeat* diff --git a/x-pack/filebeat/input/entityanalytics/provider/azuread/azure.go b/x-pack/filebeat/input/entityanalytics/provider/azuread/azure.go index 7db004237c92..30514352ebaf 100644 --- a/x-pack/filebeat/input/entityanalytics/provider/azuread/azure.go +++ b/x-pack/filebeat/input/entityanalytics/provider/azuread/azure.go @@ -9,7 +9,6 @@ import ( "context" "errors" "fmt" - "strings" "time" "github.com/google/uuid" @@ -151,21 +150,22 @@ func (p *azure) runFullSync(inputCtx v2.Context, store *kvstore.Store, client be return err } - if len(state.users) != 0 || len(state.devices) != 0 { + wantUsers := p.conf.wantUsers() + wantDevices := p.conf.wantDevices() + if (len(state.users) != 0 && wantUsers) || (len(state.devices) != 0 && wantDevices) { tracker := kvstore.NewTxTracker(ctx) start := time.Now() p.publishMarker(start, start, inputCtx.ID, true, client, tracker) - if len(state.users) != 0 { + if len(state.users) != 0 && wantUsers { p.logger.Debugw("publishing users", "count", len(state.devices)) for _, u := range state.users { p.publishUser(u, state, inputCtx.ID, client, tracker) } - } - if len(state.devices) != 0 { + if len(state.devices) != 0 && wantDevices { p.logger.Debugw("publishing devices", "count", len(state.devices)) for _, d := range state.devices { p.publishDevice(d, state, inputCtx.ID, client, tracker) @@ -224,7 +224,6 @@ func (p *azure) runIncrementalUpdate(inputCtx v2.Context, store *kvstore.Store, } p.publishUser(u, state, inputCtx.ID, client, tracker) }) - } if updatedDevices.Len() != 0 { @@ -236,7 +235,6 @@ func (p *azure) runIncrementalUpdate(inputCtx v2.Context, store *kvstore.Store, } p.publishDevice(d, state, inputCtx.ID, client, tracker) }) - } tracker.Wait() @@ -269,32 +267,32 @@ func (p *azure) doFetch(ctx context.Context, state *stateStore, fullSync bool) ( } var ( + wantUsers = p.conf.wantUsers() changedUsers []*fetcher.User userLink string ) - switch strings.ToLower(p.conf.Dataset) { - case "", "all", "users": + if wantUsers { changedUsers, userLink, err = p.fetcher.Users(ctx, usersDeltaLink) if err != nil { return updatedUsers, updatedDevices, err } p.logger.Debugf("Received %d users from API", len(changedUsers)) - default: + } else { p.logger.Debugf("Skipping user collection from API: dataset=%s", p.conf.Dataset) } var ( + wantDevices = p.conf.wantDevices() changedDevices []*fetcher.Device deviceLink string ) - switch strings.ToLower(p.conf.Dataset) { - case "", "all", "devices": + if wantDevices { changedDevices, deviceLink, err = p.fetcher.Devices(ctx, devicesDeltaLink) if err != nil { return updatedUsers, updatedDevices, err } p.logger.Debugf("Received %d devices from API", len(changedDevices)) - default: + } else { p.logger.Debugf("Skipping device collection from API: dataset=%s", p.conf.Dataset) } @@ -337,6 +335,9 @@ func (p *azure) doFetch(ctx context.Context, state *stateStore, fullSync bool) ( for _, member := range g.Members { switch member.Type { case fetcher.MemberGroup: + if !wantUsers { + break + } for _, u := range state.users { if u.TransitiveMemberOf.Contains(member.ID) { updatedUsers.Add(u.ID) @@ -349,6 +350,9 @@ func (p *azure) doFetch(ctx context.Context, state *stateStore, fullSync bool) ( } case fetcher.MemberUser: + if !wantUsers { + break + } if u, ok := state.users[member.ID]; ok { updatedUsers.Add(u.ID) if member.Deleted { @@ -359,6 +363,9 @@ func (p *azure) doFetch(ctx context.Context, state *stateStore, fullSync bool) ( } case fetcher.MemberDevice: + if !wantDevices { + break + } if d, ok := state.devices[member.ID]; ok { updatedDevices.Add(d.ID) if member.Deleted { @@ -372,42 +379,46 @@ func (p *azure) doFetch(ctx context.Context, state *stateStore, fullSync bool) ( } // Expand user group memberships. - updatedUsers.ForEach(func(userID uuid.UUID) { - u, ok := state.users[userID] - if !ok { - p.logger.Errorf("Unable to find user %q in state", userID) - return - } - u.Modified = true - if u.Deleted { - p.logger.Debugw("not expanding membership for deleted user", "user", userID) - return - } + if wantUsers { + updatedUsers.ForEach(func(userID uuid.UUID) { + u, ok := state.users[userID] + if !ok { + p.logger.Errorf("Unable to find user %q in state", userID) + return + } + u.Modified = true + if u.Deleted { + p.logger.Debugw("not expanding membership for deleted user", "user", userID) + return + } - u.TransitiveMemberOf = u.MemberOf - state.relationships.ExpandFromSet(u.MemberOf).ForEach(func(elem uuid.UUID) { - u.TransitiveMemberOf.Add(elem) + u.TransitiveMemberOf = u.MemberOf + state.relationships.ExpandFromSet(u.MemberOf).ForEach(func(elem uuid.UUID) { + u.TransitiveMemberOf.Add(elem) + }) }) - }) + } // Expand device group memberships. - updatedDevices.ForEach(func(devID uuid.UUID) { - d, ok := state.devices[devID] - if !ok { - p.logger.Errorf("Unable to find device %q in state", devID) - return - } - d.Modified = true - if d.Deleted { - p.logger.Debugw("not expanding membership for deleted device", "device", devID) - return - } + if wantDevices { + updatedDevices.ForEach(func(devID uuid.UUID) { + d, ok := state.devices[devID] + if !ok { + p.logger.Errorf("Unable to find device %q in state", devID) + return + } + d.Modified = true + if d.Deleted { + p.logger.Debugw("not expanding membership for deleted device", "device", devID) + return + } - d.TransitiveMemberOf = d.MemberOf - state.relationships.ExpandFromSet(d.MemberOf).ForEach(func(elem uuid.UUID) { - d.TransitiveMemberOf.Add(elem) + d.TransitiveMemberOf = d.MemberOf + state.relationships.ExpandFromSet(d.MemberOf).ForEach(func(elem uuid.UUID) { + d.TransitiveMemberOf.Add(elem) + }) }) - }) + } return updatedUsers, updatedDevices, nil } diff --git a/x-pack/filebeat/input/entityanalytics/provider/azuread/conf.go b/x-pack/filebeat/input/entityanalytics/provider/azuread/conf.go index 105b05cbbbbd..137951bcc78c 100644 --- a/x-pack/filebeat/input/entityanalytics/provider/azuread/conf.go +++ b/x-pack/filebeat/input/entityanalytics/provider/azuread/conf.go @@ -52,3 +52,21 @@ func defaultConf() conf { UpdateInterval: defaultUpdateInterval, } } + +func (c *conf) wantUsers() bool { + switch strings.ToLower(c.Dataset) { + case "", "all", "users": + return true + default: + return false + } +} + +func (c *conf) wantDevices() bool { + switch strings.ToLower(c.Dataset) { + case "", "all", "devices": + return true + default: + return false + } +}