Skip to content

Commit

Permalink
.../input/entityanalytics/provider/okta: Publish events progressively (
Browse files Browse the repository at this point in the history
…#42567)

Instead of waiting for a full sync (which may take hours) to finish
before publishing anything, we now publish data as it is received, so
that users have immediate feedback.

Incremental updates are also published more frequently: page by page
instead of at the end of the pagination sequence.

For the full sync, the previous behavior of adding everything to the
store and publishing it at the end would theoretically de-duplicate
repeated items. However, within a pagination sequence we use opaque
cursors from the API that should avoid overlap between pages. Whether an
item modified after an earlier page in a sequence can appear again in a
later page of the same sequence depends on the API's implementation.
Incremental updates begin with a previously seen timestamp value, so
there is overlap between updates there, but that is unaffected by this
change.

For the full sync, markers for the start and end were only published if
data is retrieved, but now the markers are published regardless of how
much data is received.

Some extra checks of configuration are done to decide whether to fetch
and publish items of a given type (user or device).

---------

Co-authored-by: Dan Kortschak <dan.kortschak@elastic.co>
  • Loading branch information
chrisberkhout and efd6 authored Feb 2, 2025
1 parent 2b8d554 commit 0e76511
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 68 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -410,6 +410,7 @@ otherwise no tag is added. {issue}42208[42208] {pull}42403[42403]
- Added infinite & blanket retry options to websockets and improved logging and retry logic. {pull}42225[42225]
- Introduce ignore older and start timestamp filters for AWS S3 input. {pull}41804[41804]
- Journald input now can report its status to Elastic-Agent {issue}39791[39791] {pull}42462[42462]
- Publish events progressively in the Okta provider of the Entity Analytics input. {issue}40106[40106] {pull}42567[42567]

*Auditbeat*

Expand Down
125 changes: 59 additions & 66 deletions x-pack/filebeat/input/entityanalytics/provider/okta/okta.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,43 +315,42 @@ func (p *oktaInput) runFullSync(inputCtx v2.Context, store *kvstore.Store, clien
}
}()

ctx := ctxtool.FromCanceller(inputCtx.Cancelation)
p.logger.Debugf("Starting fetch...")
_, err = p.doFetchUsers(ctx, state, true)
if err != nil {
return err
}
_, err = p.doFetchDevices(ctx, state, true)
if err != nil {
return err
}

wantUsers := p.cfg.wantUsers()
wantDevices := p.cfg.wantDevices()
if (len(state.users) != 0 && wantUsers) || (len(state.devices) != 0 && wantDevices) {
if wantUsers || wantDevices {
ctx := ctxtool.FromCanceller(inputCtx.Cancelation)
p.logger.Debugf("Starting fetch...")

tracker := kvstore.NewTxTracker(ctx)

start := time.Now()
p.publishMarker(start, start, inputCtx.ID, true, client, tracker)

if wantUsers {
for _, u := range state.users {
err = p.doFetchUsers(ctx, state, true, func(u *User) {
p.publishUser(u, state, inputCtx.ID, client, tracker)
})
if err != nil {
return err
}
}
if wantDevices {
for _, d := range state.devices {
err = p.doFetchDevices(ctx, state, true, func(d *Device) {
p.publishDevice(d, state, inputCtx.ID, client, tracker)
})
if err != nil {
return err
}
}

end := time.Now()
p.publishMarker(end, end, inputCtx.ID, false, client, tracker)

tracker.Wait()
}

if ctx.Err() != nil {
return ctx.Err()
if ctx.Err() != nil {
return ctx.Err()
}
}

state.lastSync = time.Now()
Expand Down Expand Up @@ -381,27 +380,28 @@ func (p *oktaInput) runIncrementalUpdate(inputCtx v2.Context, store *kvstore.Sto
}()

ctx := ctxtool.FromCanceller(inputCtx.Cancelation)
updatedUsers, err := p.doFetchUsers(ctx, state, false)
if err != nil {
return err
}
updatedDevices, err := p.doFetchDevices(ctx, state, false)
if err != nil {
return err
}
tracker := kvstore.NewTxTracker(ctx)

var tracker *kvstore.TxTracker
if len(updatedUsers) != 0 || len(updatedDevices) != 0 {
tracker = kvstore.NewTxTracker(ctx)
for _, u := range updatedUsers {
if p.cfg.wantUsers() {
p.logger.Debugf("Fetching changed users...")
err = p.doFetchUsers(ctx, state, false, func(u *User) {
p.publishUser(u, state, inputCtx.ID, client, tracker)
})
if err != nil {
return err
}
for _, d := range updatedDevices {
}
if p.cfg.wantDevices() {
p.logger.Debugf("Fetching changed devices...")
err = p.doFetchDevices(ctx, state, false, func(d *Device) {
p.publishDevice(d, state, inputCtx.ID, client, tracker)
})
if err != nil {
return err
}
tracker.Wait()
}

tracker.Wait()
if ctx.Err() != nil {
return ctx.Err()
}
Expand All @@ -417,10 +417,10 @@ func (p *oktaInput) runIncrementalUpdate(inputCtx v2.Context, store *kvstore.Sto
// doFetchUsers handles fetching user identities from Okta. If fullSync is true, then
// any existing deltaLink will be ignored, forcing a full synchronization from Okta.
// Returns a set of modified users by ID.
func (p *oktaInput) doFetchUsers(ctx context.Context, state *stateStore, fullSync bool) ([]*User, error) {
func (p *oktaInput) doFetchUsers(ctx context.Context, state *stateStore, fullSync bool, publish func(u *User)) error {
if !p.cfg.wantUsers() {
p.logger.Debugf("Skipping user collection from API: dataset=%s", p.cfg.Dataset)
return nil, nil
return nil
}

var (
Expand All @@ -446,29 +446,29 @@ func (p *oktaInput) doFetchUsers(ctx context.Context, state *stateStore, fullSyn
const omit = okta.OmitCredentials | okta.OmitCredentialsLinks | okta.OmitTransitioningToStatus

var (
users []*User
n int
lastUpdated time.Time
)
for {
batch, h, err := okta.GetUserDetails(ctx, p.client, p.cfg.OktaDomain, p.cfg.OktaToken, "", query, omit, p.lim, p.logger)
if err != nil {
p.logger.Debugf("received %d users from API", len(users))
return nil, err
p.logger.Debugf("received %d users from API", n)
return err
}
p.logger.Debugf("received batch of %d users from API", len(batch))

if fullSync {
for _, u := range batch {
p.addUserMetadata(ctx, u, state)
publish(p.addUserMetadata(ctx, u, state))
if u.LastUpdated.After(lastUpdated) {
lastUpdated = u.LastUpdated
}
}
} else {
users = grow(users, len(batch))
for _, u := range batch {
su := p.addUserMetadata(ctx, u, state)
users = append(users, su)
publish(su)
n++
if u.LastUpdated.After(lastUpdated) {
lastUpdated = u.LastUpdated
}
Expand All @@ -480,8 +480,8 @@ func (p *oktaInput) doFetchUsers(ctx context.Context, state *stateStore, fullSyn
if err == io.EOF {
break
}
p.logger.Debugf("received %d users from API", len(users))
return users, err
p.logger.Debugf("received %d users from API", n)
return err
}
query = next
}
Expand All @@ -495,8 +495,8 @@ func (p *oktaInput) doFetchUsers(ctx context.Context, state *stateStore, fullSyn
query.Add("search", fmt.Sprintf(`lastUpdated ge "%s" and status pr`, lastUpdated.Format(okta.ISO8601)))
state.nextUsers = query.Encode()

p.logger.Debugf("received %d users from API", len(users))
return users, nil
p.logger.Debugf("received %d users from API", n)
return nil
}

func (p *oktaInput) addUserMetadata(ctx context.Context, u okta.User, state *stateStore) *User {
Expand Down Expand Up @@ -541,10 +541,10 @@ func (p *oktaInput) addUserMetadata(ctx context.Context, u okta.User, state *sta
// If fullSync is true, then any existing deltaLink will be ignored, forcing a full
// synchronization from Okta.
// Returns a set of modified devices by ID.
func (p *oktaInput) doFetchDevices(ctx context.Context, state *stateStore, fullSync bool) ([]*Device, error) {
func (p *oktaInput) doFetchDevices(ctx context.Context, state *stateStore, fullSync bool, publish func(d *Device)) error {
if !p.cfg.wantDevices() {
p.logger.Debugf("Skipping device collection from API: dataset=%s", p.cfg.Dataset)
return nil, nil
return nil
}

var (
Expand Down Expand Up @@ -575,14 +575,14 @@ func (p *oktaInput) doFetchDevices(ctx context.Context, state *stateStore, fullS
userQueryInit = cloneURLValues(deviceQuery)

var (
devices []*Device
n int
lastUpdated time.Time
)
for {
batch, h, err := okta.GetDeviceDetails(ctx, p.client, p.cfg.OktaDomain, p.cfg.OktaToken, "", deviceQuery, p.lim, p.logger)
if err != nil {
p.logger.Debugf("received %d devices from API", len(devices))
return nil, err
p.logger.Debugf("received %d devices from API", n)
return err
}
p.logger.Debugf("received batch of %d devices from API", len(batch))

Expand All @@ -601,7 +601,7 @@ func (p *oktaInput) doFetchDevices(ctx context.Context, state *stateStore, fullS
users, h, err := okta.GetDeviceUsers(ctx, p.client, p.cfg.OktaDomain, p.cfg.OktaToken, d.ID, userQuery, omit, p.lim, p.logger)
if err != nil {
p.logger.Debugf("received %d device users from API", len(users))
return nil, err
return err
}
p.logger.Debugf("received batch of %d device users from API", len(users))

Expand All @@ -617,24 +617,25 @@ func (p *oktaInput) doFetchDevices(ctx context.Context, state *stateStore, fullS
if err == io.EOF {
break
}
p.logger.Debugf("received %d devices from API", len(devices))
return devices, err
p.logger.Debugf("received %d devices from API", n)
return err
}
userQuery = next
}
}

if fullSync {
for _, d := range batch {
state.storeDevice(d)
publish(state.storeDevice(d))
if d.LastUpdated.After(lastUpdated) {
lastUpdated = d.LastUpdated
}
}
} else {
devices = grow(devices, len(batch))
for _, d := range batch {
devices = append(devices, state.storeDevice(d))
sd := state.storeDevice(d)
publish(sd)
n++
if d.LastUpdated.After(lastUpdated) {
lastUpdated = d.LastUpdated
}
Expand All @@ -646,8 +647,8 @@ func (p *oktaInput) doFetchDevices(ctx context.Context, state *stateStore, fullS
if err == io.EOF {
break
}
p.logger.Debugf("received %d devices from API", len(devices))
return devices, err
p.logger.Debugf("received %d devices from API", n)
return err
}
deviceQuery = next
}
Expand All @@ -661,8 +662,8 @@ func (p *oktaInput) doFetchDevices(ctx context.Context, state *stateStore, fullS
deviceQuery.Add("search", fmt.Sprintf(`lastUpdated ge "%s" and status pr`, lastUpdated.Format(okta.ISO8601)))
state.nextDevices = deviceQuery.Encode()

p.logger.Debugf("received %d devices from API", len(devices))
return devices, nil
p.logger.Debugf("received %d devices from API", n)
return nil
}

func cloneURLValues(a url.Values) url.Values {
Expand All @@ -677,14 +678,6 @@ type entity interface {
*User | *Device | okta.User
}

func grow[T entity](e []T, n int) []T {
if len(e)+n <= cap(e) {
return e
}
new := append(e, make([]T, n)...)
return new[:len(e)]
}

// publishMarker will publish a write marker document using the given beat.Client.
// If start is true, then it will be a start marker, otherwise an end marker.
func (p *oktaInput) publishMarker(ts, eventTime time.Time, inputID string, start bool, client beat.Client, tracker *kvstore.TxTracker) {
Expand Down
20 changes: 18 additions & 2 deletions x-pack/filebeat/input/entityanalytics/provider/okta/okta_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,15 +211,23 @@ func TestOktaDoFetch(t *testing.T) {

t.Run("users", func(t *testing.T) {
n = 0
var got []*User
published := make(map[string]struct{})

got, err := a.doFetchUsers(ctx, ss, false)
err := a.doFetchUsers(ctx, ss, false, func(u *User) {
got = append(got, u)
published[u.ID] = struct{}{}
})
if err != nil {
t.Fatalf("unexpected error from doFetch: %v", err)
}

if len(got) != wantCount(repeats, test.wantUsers) {
t.Errorf("unexpected number of results: got:%d want:%d", len(got), wantCount(repeats, test.wantUsers))
}
if len(published) != len(got) {
t.Errorf("unexpected number of distinct users published: got:%d want:%d", len(published), len(got))
}
for i, g := range got {
wantID := fmt.Sprintf("userid%d", i+1)
if g.ID != wantID {
Expand All @@ -244,15 +252,23 @@ func TestOktaDoFetch(t *testing.T) {

t.Run("devices", func(t *testing.T) {
n = 0
var got []*Device
published := make(map[string]struct{})

got, err := a.doFetchDevices(ctx, ss, false)
err := a.doFetchDevices(ctx, ss, false, func(d *Device) {
got = append(got, d)
published[d.ID] = struct{}{}
})
if err != nil {
t.Fatalf("unexpected error from doFetch: %v", err)
}

if len(got) != wantCount(repeats, test.wantDevices) {
t.Errorf("unexpected number of results: got:%d want:%d", len(got), wantCount(repeats, test.wantDevices))
}
if len(published) != len(got) {
t.Errorf("unexpected number of distinct devices published: got:%d want:%d", len(published), len(got))
}
for i, g := range got {
if wantID := fmt.Sprintf("deviceid%d", i+1); g.ID != wantID {
t.Errorf("unexpected device ID for device %d: got:%s want:%s", i, g.ID, wantID)
Expand Down

0 comments on commit 0e76511

Please sign in to comment.