Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

.../input/entityanalytics/provider/okta: Publish events progressively #42567

Merged

Conversation

chrisberkhout
Copy link
Contributor

@chrisberkhout chrisberkhout commented Feb 1, 2025

Proposed commit message

.../input/entityanalytics/provider/okta: Publish events progressively

Instead of waiting for a full sync (which may take hours) to finish
before publishing anything, we now publish data as it is received, so
that users have immediate feedback.

Incremental updates are also published more frequently: page by page
instead of at the end of the pagination sequence.

For the full sync, the previous behavior of adding everything to the
store and publishing it at the end would theoretically de-duplicate
repeated items. However, within a pagination sequence we use opaque
cursors from the API that should avoid overlap between pages. Whether an
item modified after an earlier page in a sequence can appear again in a
later page of the same sequence depends on the API's implementation.
Incremental updates begin with a previously seen timestamp value, so
there is overlap between updates there, but that is unaffected by this
change.

For the full sync, markers for the start and end were only published if
data is retrieved, but now the markers are published regardless of how
much data is received.

Some extra checks of configuration are done to decide whether to fetch
and publish items of a given type (user or device).

---------

Co-authored-by: Dan Kortschak <dan.kortschak@elastic.co>

Checklist

  • My code follows the style guidelines of this project
  • I have commented my code, particularly in hard-to-understand areas
  • I have made corresponding changes to the documentation
  • I have made corresponding change to the default configuration files
  • I have added tests that prove my fix is effective or that my feature works
  • I have added an entry in CHANGELOG.next.asciidoc or CHANGELOG-developer.next.asciidoc.

Related issues

@chrisberkhout chrisberkhout added enhancement Team:Security-Service Integrations Security Service Integrations Team labels Feb 1, 2025
@chrisberkhout chrisberkhout self-assigned this Feb 1, 2025
@chrisberkhout chrisberkhout requested a review from a team as a code owner February 1, 2025 00:34
@elasticmachine
Copy link
Collaborator

Pinging @elastic/security-service-integrations (Team:Security-Service Integrations)

@botelastic botelastic bot added needs_team Indicates that the issue/PR needs a Team:* label and removed needs_team Indicates that the issue/PR needs a Team:* label labels Feb 1, 2025
Copy link
Contributor

mergify bot commented Feb 1, 2025

This pull request does not have a backport label.
If this is a bug or security fix, could you label this PR @chrisberkhout? 🙏.
For such, you'll need to label your PR with:

  • The upcoming major version of the Elastic Stack
  • The upcoming minor version of the Elastic Stack (if you're not pushing a breaking change)

To fixup this pull request, you need to add the backport labels for the needed
branches, such as:

  • backport-8./d is the label to automatically backport to the 8./d branch. /d is the digit

… we got anything.

The fetching was redundant if !wantUsers and !wantDevices.
Always publishing the markers is better when publishing progressively,
and arguably it's better in general.
Also, now the configuration for each type (user or device) is the
condition for both fetching and publishing, in both full and incremental
sync.

Previously, publishing of a given type during an full sync was
conditional on being configured to do so, but the fetching would be done
regardless. In the incremental sync, the configuration was not consulted
at all.
@chrisberkhout chrisberkhout force-pushed the ea-okta-publish-progressively branch from 1467968 to 0cf8e9f Compare February 1, 2025 00:40
@elastic elastic deleted a comment from mergify bot Feb 1, 2025
Copy link
Contributor

@efd6 efd6 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We're not using the returned slices anymore, so we can unwrite some code and reduce allocations with

diff --git a/x-pack/filebeat/input/entityanalytics/provider/okta/okta.go b/x-pack/filebeat/input/entityanalytics/provider/okta/okta.go
index aec511a1d5..1ed7b4c6ee 100644
--- a/x-pack/filebeat/input/entityanalytics/provider/okta/okta.go
+++ b/x-pack/filebeat/input/entityanalytics/provider/okta/okta.go
@@ -327,7 +327,7 @@ func (p *oktaInput) runFullSync(inputCtx v2.Context, store *kvstore.Store, clien
                p.publishMarker(start, start, inputCtx.ID, true, client, tracker)
 
                if wantUsers {
-                       _, err = p.doFetchUsers(ctx, state, true, func(u *User) {
+                       err = p.doFetchUsers(ctx, state, true, func(u *User) {
                                p.publishUser(u, state, inputCtx.ID, client, tracker)
                        })
                        if err != nil {
@@ -335,7 +335,7 @@ func (p *oktaInput) runFullSync(inputCtx v2.Context, store *kvstore.Store, clien
                        }
                }
                if wantDevices {
-                       _, err = p.doFetchDevices(ctx, state, true, func(d *Device) {
+                       err = p.doFetchDevices(ctx, state, true, func(d *Device) {
                                p.publishDevice(d, state, inputCtx.ID, client, tracker)
                        })
                        if err != nil {
@@ -384,7 +384,7 @@ func (p *oktaInput) runIncrementalUpdate(inputCtx v2.Context, store *kvstore.Sto
 
        if p.cfg.wantUsers() {
                p.logger.Debugf("Fetching changed users...")
-               _, err = p.doFetchUsers(ctx, state, false, func(u *User) {
+               err = p.doFetchUsers(ctx, state, false, func(u *User) {
                        p.publishUser(u, state, inputCtx.ID, client, tracker)
                })
                if err != nil {
@@ -393,7 +393,7 @@ func (p *oktaInput) runIncrementalUpdate(inputCtx v2.Context, store *kvstore.Sto
        }
        if p.cfg.wantDevices() {
                p.logger.Debugf("Fetching changed devices...")
-               _, err = p.doFetchDevices(ctx, state, false, func(d *Device) {
+               err = p.doFetchDevices(ctx, state, false, func(d *Device) {
                        p.publishDevice(d, state, inputCtx.ID, client, tracker)
                })
                if err != nil {
@@ -417,10 +417,10 @@ func (p *oktaInput) runIncrementalUpdate(inputCtx v2.Context, store *kvstore.Sto
 // doFetchUsers handles fetching user identities from Okta. If fullSync is true, then
 // any existing deltaLink will be ignored, forcing a full synchronization from Okta.
 // Returns a set of modified users by ID.
-func (p *oktaInput) doFetchUsers(ctx context.Context, state *stateStore, fullSync bool, publish func(u *User)) ([]*User, error) {
+func (p *oktaInput) doFetchUsers(ctx context.Context, state *stateStore, fullSync bool, publish func(u *User)) error {
        if !p.cfg.wantUsers() {
                p.logger.Debugf("Skipping user collection from API: dataset=%s", p.cfg.Dataset)
-               return nil, nil
+               return nil
        }
 
        var (
@@ -446,14 +446,14 @@ func (p *oktaInput) doFetchUsers(ctx context.Context, state *stateStore, fullSyn
        const omit = okta.OmitCredentials | okta.OmitCredentialsLinks | okta.OmitTransitioningToStatus
 
        var (
-               users       []*User
+               n           int
                lastUpdated time.Time
        )
        for {
                batch, h, err := okta.GetUserDetails(ctx, p.client, p.cfg.OktaDomain, p.cfg.OktaToken, "", query, omit, p.lim, p.logger)
                if err != nil {
-                       p.logger.Debugf("received %d users from API", len(users))
-                       return nil, err
+                       p.logger.Debugf("received %d users from API", n)
+                       return err
                }
                p.logger.Debugf("received batch of %d users from API", len(batch))
 
@@ -465,11 +465,10 @@ func (p *oktaInput) doFetchUsers(ctx context.Context, state *stateStore, fullSyn
                                }
                        }
                } else {
-                       users = grow(users, len(batch))
                        for _, u := range batch {
                                su := p.addUserMetadata(ctx, u, state)
                                publish(su)
-                               users = append(users, su)
+                               n++
                                if u.LastUpdated.After(lastUpdated) {
                                        lastUpdated = u.LastUpdated
                                }
@@ -481,8 +480,8 @@ func (p *oktaInput) doFetchUsers(ctx context.Context, state *stateStore, fullSyn
                        if err == io.EOF {
                                break
                        }
-                       p.logger.Debugf("received %d users from API", len(users))
-                       return users, err
+                       p.logger.Debugf("received %d users from API", n)
+                       return err
                }
                query = next
        }
@@ -496,8 +495,8 @@ func (p *oktaInput) doFetchUsers(ctx context.Context, state *stateStore, fullSyn
        query.Add("search", fmt.Sprintf(`lastUpdated ge "%s" and status pr`, lastUpdated.Format(okta.ISO8601)))
        state.nextUsers = query.Encode()
 
-       p.logger.Debugf("received %d users from API", len(users))
-       return users, nil
+       p.logger.Debugf("received %d users from API", n)
+       return nil
 }
 
 func (p *oktaInput) addUserMetadata(ctx context.Context, u okta.User, state *stateStore) *User {
@@ -542,10 +541,10 @@ func (p *oktaInput) addUserMetadata(ctx context.Context, u okta.User, state *sta
 // If fullSync is true, then any existing deltaLink will be ignored, forcing a full
 // synchronization from Okta.
 // Returns a set of modified devices by ID.
-func (p *oktaInput) doFetchDevices(ctx context.Context, state *stateStore, fullSync bool, publish func(d *Device)) ([]*Device, error) {
+func (p *oktaInput) doFetchDevices(ctx context.Context, state *stateStore, fullSync bool, publish func(d *Device)) error {
        if !p.cfg.wantDevices() {
                p.logger.Debugf("Skipping device collection from API: dataset=%s", p.cfg.Dataset)
-               return nil, nil
+               return nil
        }
 
        var (
@@ -576,14 +575,14 @@ func (p *oktaInput) doFetchDevices(ctx context.Context, state *stateStore, fullS
        userQueryInit = cloneURLValues(deviceQuery)
 
        var (
-               devices     []*Device
+               n           int
                lastUpdated time.Time
        )
        for {
                batch, h, err := okta.GetDeviceDetails(ctx, p.client, p.cfg.OktaDomain, p.cfg.OktaToken, "", deviceQuery, p.lim, p.logger)
                if err != nil {
-                       p.logger.Debugf("received %d devices from API", len(devices))
-                       return nil, err
+                       p.logger.Debugf("received %d devices from API", n)
+                       return err
                }
                p.logger.Debugf("received batch of %d devices from API", len(batch))
 
@@ -602,7 +601,7 @@ func (p *oktaInput) doFetchDevices(ctx context.Context, state *stateStore, fullS
                                users, h, err := okta.GetDeviceUsers(ctx, p.client, p.cfg.OktaDomain, p.cfg.OktaToken, d.ID, userQuery, omit, p.lim, p.logger)
                                if err != nil {
                                        p.logger.Debugf("received %d device users from API", len(users))
-                                       return nil, err
+                                       return err
                                }
                                p.logger.Debugf("received batch of %d device users from API", len(users))
 
@@ -618,8 +617,8 @@ func (p *oktaInput) doFetchDevices(ctx context.Context, state *stateStore, fullS
                                        if err == io.EOF {
                                                break
                                        }
-                                       p.logger.Debugf("received %d devices from API", len(devices))
-                                       return devices, err
+                                       p.logger.Debugf("received %d devices from API", n)
+                                       return err
                                }
                                userQuery = next
                        }
@@ -633,11 +632,10 @@ func (p *oktaInput) doFetchDevices(ctx context.Context, state *stateStore, fullS
                                }
                        }
                } else {
-                       devices = grow(devices, len(batch))
                        for _, d := range batch {
                                sd := state.storeDevice(d)
                                publish(sd)
-                               devices = append(devices, sd)
+                               n++
                                if d.LastUpdated.After(lastUpdated) {
                                        lastUpdated = d.LastUpdated
                                }
@@ -649,8 +647,8 @@ func (p *oktaInput) doFetchDevices(ctx context.Context, state *stateStore, fullS
                        if err == io.EOF {
                                break
                        }
-                       p.logger.Debugf("received %d devices from API", len(devices))
-                       return devices, err
+                       p.logger.Debugf("received %d devices from API", n)
+                       return err
                }
                deviceQuery = next
        }
@@ -664,8 +662,8 @@ func (p *oktaInput) doFetchDevices(ctx context.Context, state *stateStore, fullS
        deviceQuery.Add("search", fmt.Sprintf(`lastUpdated ge "%s" and status pr`, lastUpdated.Format(okta.ISO8601)))
        state.nextDevices = deviceQuery.Encode()
 
-       p.logger.Debugf("received %d devices from API", len(devices))
-       return devices, nil
+       p.logger.Debugf("received %d devices from API", n)
+       return nil
 }
 
 func cloneURLValues(a url.Values) url.Values {
@@ -680,14 +678,6 @@ type entity interface {
        *User | *Device | okta.User
 }
 
-func grow[T entity](e []T, n int) []T {
-       if len(e)+n <= cap(e) {
-               return e
-       }
-       new := append(e, make([]T, n)...)
-       return new[:len(e)]
-}
-
 // publishMarker will publish a write marker document using the given beat.Client.
 // If start is true, then it will be a start marker, otherwise an end marker.
 func (p *oktaInput) publishMarker(ts, eventTime time.Time, inputID string, start bool, client beat.Client, tracker *kvstore.TxTracker) {
diff --git a/x-pack/filebeat/input/entityanalytics/provider/okta/okta_test.go b/x-pack/filebeat/input/entityanalytics/provider/okta/okta_test.go
index 9eeaf54412..f7bc7c45d7 100644
--- a/x-pack/filebeat/input/entityanalytics/provider/okta/okta_test.go
+++ b/x-pack/filebeat/input/entityanalytics/provider/okta/okta_test.go
@@ -211,9 +211,13 @@ func TestOktaDoFetch(t *testing.T) {
 
                        t.Run("users", func(t *testing.T) {
                                n = 0
+                               var got []*User
                                published := make(map[string]struct{})
 
-                               got, err := a.doFetchUsers(ctx, ss, false, func(u *User) { published[u.ID] = struct{}{} })
+                               err := a.doFetchUsers(ctx, ss, false, func(u *User) {
+                                       got = append(got, u)
+                                       published[u.ID] = struct{}{}
+                               })
                                if err != nil {
                                        t.Fatalf("unexpected error from doFetch: %v", err)
                                }
@@ -248,9 +252,13 @@ func TestOktaDoFetch(t *testing.T) {
 
                        t.Run("devices", func(t *testing.T) {
                                n = 0
+                               var got []*Device
                                published := make(map[string]struct{})
 
-                               got, err := a.doFetchDevices(ctx, ss, false, func(d *Device) { published[d.ID] = struct{}{} })
+                               err := a.doFetchDevices(ctx, ss, false, func(d *Device) {
+                                       got = append(got, d)
+                                       published[d.ID] = struct{}{}
+                               })
                                if err != nil {
                                        t.Fatalf("unexpected error from doFetch: %v", err)
                                }

Copy link
Contributor

@efd6 efd6 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks

@chrisberkhout chrisberkhout merged commit 0e76511 into elastic:main Feb 2, 2025
20 of 22 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement Team:Security-Service Integrations Security Service Integrations Team
Projects
None yet
Development

Successfully merging this pull request may close these issues.

x-pack/filebeat/input/entityanalytics/provider/okta: Rate limiting hangs
3 participants