Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

.../input/entityanalytics/provider/okta: Publish events progressively #42567

Merged
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -410,6 +410,7 @@ otherwise no tag is added. {issue}42208[42208] {pull}42403[42403]
- Added infinite & blanket retry options to websockets and improved logging and retry logic. {pull}42225[42225]
- Introduce ignore older and start timestamp filters for AWS S3 input. {pull}41804[41804]
- Journald input now can report its status to Elastic-Agent {issue}39791[39791] {pull}42462[42462]
- Publish events progressively in the Okta provider of the Entity Analytics input. {issue}40106[40106] {pull}42567[42567]

*Auditbeat*

Expand Down
75 changes: 39 additions & 36 deletions x-pack/filebeat/input/entityanalytics/provider/okta/okta.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,43 +315,42 @@
}
}()

ctx := ctxtool.FromCanceller(inputCtx.Cancelation)
p.logger.Debugf("Starting fetch...")
_, err = p.doFetchUsers(ctx, state, true)
if err != nil {
return err
}
_, err = p.doFetchDevices(ctx, state, true)
if err != nil {
return err
}

wantUsers := p.cfg.wantUsers()
wantDevices := p.cfg.wantDevices()
if (len(state.users) != 0 && wantUsers) || (len(state.devices) != 0 && wantDevices) {
if wantUsers || wantDevices {
ctx := ctxtool.FromCanceller(inputCtx.Cancelation)
p.logger.Debugf("Starting fetch...")

tracker := kvstore.NewTxTracker(ctx)

start := time.Now()
p.publishMarker(start, start, inputCtx.ID, true, client, tracker)

if wantUsers {
for _, u := range state.users {
_, err = p.doFetchUsers(ctx, state, true, func(u *User) {
p.publishUser(u, state, inputCtx.ID, client, tracker)
})
if err != nil {
return err
}
}
if wantDevices {
for _, d := range state.devices {
_, err = p.doFetchDevices(ctx, state, true, func(d *Device) {
p.publishDevice(d, state, inputCtx.ID, client, tracker)
})
if err != nil {
return err
}
}

end := time.Now()
p.publishMarker(end, end, inputCtx.ID, false, client, tracker)

tracker.Wait()
}

if ctx.Err() != nil {
return ctx.Err()
if ctx.Err() != nil {
return ctx.Err()
}
}

state.lastSync = time.Now()
Expand Down Expand Up @@ -381,27 +380,28 @@
}()

ctx := ctxtool.FromCanceller(inputCtx.Cancelation)
updatedUsers, err := p.doFetchUsers(ctx, state, false)
if err != nil {
return err
}
updatedDevices, err := p.doFetchDevices(ctx, state, false)
if err != nil {
return err
}
tracker := kvstore.NewTxTracker(ctx)

var tracker *kvstore.TxTracker
if len(updatedUsers) != 0 || len(updatedDevices) != 0 {
tracker = kvstore.NewTxTracker(ctx)
for _, u := range updatedUsers {
if p.cfg.wantUsers() {
p.logger.Debugf("Fetching changed users...")
_, err = p.doFetchUsers(ctx, state, false, func(u *User) {
p.publishUser(u, state, inputCtx.ID, client, tracker)
})
if err != nil {
return err
}
for _, d := range updatedDevices {
}
if p.cfg.wantDevices() {
p.logger.Debugf("Fetching changed devices...")
_, err = p.doFetchDevices(ctx, state, false, func(d *Device) {
p.publishDevice(d, state, inputCtx.ID, client, tracker)
})
if err != nil {
return err
}
tracker.Wait()
}

tracker.Wait()
if ctx.Err() != nil {
return ctx.Err()
}
Expand All @@ -417,7 +417,7 @@
// doFetchUsers handles fetching user identities from Okta. If fullSync is true, then
// any existing deltaLink will be ignored, forcing a full synchronization from Okta.
// Returns a set of modified users by ID.
func (p *oktaInput) doFetchUsers(ctx context.Context, state *stateStore, fullSync bool) ([]*User, error) {
func (p *oktaInput) doFetchUsers(ctx context.Context, state *stateStore, fullSync bool, publish func(u *User)) ([]*User, error) {
if !p.cfg.wantUsers() {
p.logger.Debugf("Skipping user collection from API: dataset=%s", p.cfg.Dataset)
return nil, nil
Expand Down Expand Up @@ -459,7 +459,7 @@

if fullSync {
for _, u := range batch {
p.addUserMetadata(ctx, u, state)
publish(p.addUserMetadata(ctx, u, state))
if u.LastUpdated.After(lastUpdated) {
lastUpdated = u.LastUpdated
}
Expand All @@ -468,6 +468,7 @@
users = grow(users, len(batch))
for _, u := range batch {
su := p.addUserMetadata(ctx, u, state)
publish(su)
users = append(users, su)
if u.LastUpdated.After(lastUpdated) {
lastUpdated = u.LastUpdated
Expand All @@ -477,7 +478,7 @@

next, err := okta.Next(h)
if err != nil {
if err == io.EOF {

Check failure on line 481 in x-pack/filebeat/input/entityanalytics/provider/okta/okta.go

View workflow job for this annotation

GitHub Actions / lint (windows)

comparing with == will fail on wrapped errors. Use errors.Is to check for a specific error (errorlint)
break
}
p.logger.Debugf("received %d users from API", len(users))
Expand Down Expand Up @@ -541,7 +542,7 @@
// If fullSync is true, then any existing deltaLink will be ignored, forcing a full
// synchronization from Okta.
// Returns a set of modified devices by ID.
func (p *oktaInput) doFetchDevices(ctx context.Context, state *stateStore, fullSync bool) ([]*Device, error) {
func (p *oktaInput) doFetchDevices(ctx context.Context, state *stateStore, fullSync bool, publish func(d *Device)) ([]*Device, error) {
if !p.cfg.wantDevices() {
p.logger.Debugf("Skipping device collection from API: dataset=%s", p.cfg.Dataset)
return nil, nil
Expand Down Expand Up @@ -614,7 +615,7 @@

next, err := okta.Next(h)
if err != nil {
if err == io.EOF {

Check failure on line 618 in x-pack/filebeat/input/entityanalytics/provider/okta/okta.go

View workflow job for this annotation

GitHub Actions / lint (windows)

comparing with == will fail on wrapped errors. Use errors.Is to check for a specific error (errorlint)
break
}
p.logger.Debugf("received %d devices from API", len(devices))
Expand All @@ -626,15 +627,17 @@

if fullSync {
for _, d := range batch {
state.storeDevice(d)
publish(state.storeDevice(d))
if d.LastUpdated.After(lastUpdated) {
lastUpdated = d.LastUpdated
}
}
} else {
devices = grow(devices, len(batch))
for _, d := range batch {
devices = append(devices, state.storeDevice(d))
sd := state.storeDevice(d)
publish(sd)
devices = append(devices, sd)
if d.LastUpdated.After(lastUpdated) {
lastUpdated = d.LastUpdated
}
Expand All @@ -643,7 +646,7 @@

next, err := okta.Next(h)
if err != nil {
if err == io.EOF {

Check failure on line 649 in x-pack/filebeat/input/entityanalytics/provider/okta/okta.go

View workflow job for this annotation

GitHub Actions / lint (windows)

comparing with == will fail on wrapped errors. Use errors.Is to check for a specific error (errorlint)
break
}
p.logger.Debugf("received %d devices from API", len(devices))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,15 +211,19 @@ func TestOktaDoFetch(t *testing.T) {

t.Run("users", func(t *testing.T) {
n = 0
published := make(map[string]struct{})

got, err := a.doFetchUsers(ctx, ss, false)
got, err := a.doFetchUsers(ctx, ss, false, func(u *User) { published[u.ID] = struct{}{} })
if err != nil {
t.Fatalf("unexpected error from doFetch: %v", err)
}

if len(got) != wantCount(repeats, test.wantUsers) {
t.Errorf("unexpected number of results: got:%d want:%d", len(got), wantCount(repeats, test.wantUsers))
}
if len(published) != len(got) {
t.Errorf("unexpected number of distinct users published: got:%d want:%d", len(published), len(got))
}
for i, g := range got {
wantID := fmt.Sprintf("userid%d", i+1)
if g.ID != wantID {
Expand All @@ -244,15 +248,19 @@ func TestOktaDoFetch(t *testing.T) {

t.Run("devices", func(t *testing.T) {
n = 0
published := make(map[string]struct{})

got, err := a.doFetchDevices(ctx, ss, false)
got, err := a.doFetchDevices(ctx, ss, false, func(d *Device) { published[d.ID] = struct{}{} })
if err != nil {
t.Fatalf("unexpected error from doFetch: %v", err)
}

if len(got) != wantCount(repeats, test.wantDevices) {
t.Errorf("unexpected number of results: got:%d want:%d", len(got), wantCount(repeats, test.wantDevices))
}
if len(published) != len(got) {
t.Errorf("unexpected number of distinct devices published: got:%d want:%d", len(published), len(got))
}
for i, g := range got {
if wantID := fmt.Sprintf("deviceid%d", i+1); g.ID != wantID {
t.Errorf("unexpected device ID for device %d: got:%s want:%s", i, g.ID, wantID)
Expand Down
Loading