Skip to content

Commit

Permalink
Speed up Client.GetDeepProcessGroups and Client.GetDeepConnections
Browse files Browse the repository at this point in the history
Use async tree walking which significantly speeds this up
  • Loading branch information
dekimsey committed Jun 28, 2022
1 parent 2d3e3e1 commit aaca7ee
Showing 1 changed file with 57 additions and 22 deletions.
79 changes: 57 additions & 22 deletions nifi/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,61 +121,96 @@ func (c *Client) GetProcessGroups(parentID string) ([]ProcessGroupEntity, error)
// GetConnections traverses the process group hierarchy returning information about
// all connections
func (c *Client) GetConnections(parentID string) ([]ConnectionEntity, error) {
var entity ConnectionsEntity
if err := c.getDeepConnections(parentID, &entity); err != nil {
return nil, err
results := make(chan ConnectionEntity, 1)
var entities []ConnectionEntity
// Results accumelator
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
for entity := range results {
entities = append(entities, entity)
}
}()

// Start the tree walk
if err := c.getDeepConnections(results, parentID); err != nil {
log.Errorf("failed to get id %s", parentID, err)
}
return entity.Connections, nil
close(results)
wg.Wait()
return entities, nil

}

func (c *Client) getDeepConnections(parentID string, connectionsEntity *ConnectionsEntity) error {
func (c *Client) getDeepConnections(results chan ConnectionEntity, parentID string) error {
var entity ConnectionsEntity

// Get the connections for the current process group
if err := c.request("/process-groups/"+parentID+"/connections", nil, &entity); err != nil {
return errors.Trace(err)
}

for _, entity := range entity.Connections {
results <- entity
}
// And the child process groups

var pgentity ProcessGroupsEntity
if err := c.request("/process-groups/"+parentID+"/process-groups", nil, &pgentity); err != nil {
return errors.Trace(err)
}

var wg sync.WaitGroup
for _, pg := range pgentity.ProcessGroups {
if err := c.getDeepConnections(pg.ID, connectionsEntity); err != nil {
return err
wg.Add(1)
go func(wg *sync.WaitGroup, id string) {
defer wg.Done()
// FIXME: We aren't collecting errors, that's kinda uncool
c.getDeepConnections(results, id)
}(&wg, pg.ID)
}
}
connectionsEntity.Connections = append(connectionsEntity.Connections, entity.Connections...)
wg.Wait()
return nil
}

// GetDeepProcessGroups traverses the process group hierarchy returning information about
// this and all child process groups
func (c *Client) GetDeepProcessGroups(parentID string) ([]ProcessGroupEntity, error) {
var entity ProcessGroupsEntity
if err := c.getDeepProcessGroups(parentID, &entity); err != nil {
return nil, err
results := make(chan ProcessGroupEntity, 1)
var entities []ProcessGroupEntity
// Results accumelator
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
for entity := range results {
entities = append(entities, entity)
}
return entity.ProcessGroups, nil

}()
// Start the tree walk
if err := c.getDeepProcessGroups(results, parentID); err != nil {
log.Errorf("failed to get id %s", parentID, err)
}
close(results)
wg.Wait()
return entities, nil
}

func (c *Client) getDeepProcessGroups(parentID string, groupsEntity *ProcessGroupsEntity) error {
func (c *Client) getDeepProcessGroups(results chan ProcessGroupEntity, parentID string) error {
var entity ProcessGroupsEntity
if err := c.request("/process-groups/"+parentID+"/process-groups", nil, &entity); err != nil {
return errors.Trace(err)
}

var wg sync.WaitGroup
for _, pg := range entity.ProcessGroups {
if err := c.getDeepProcessGroups(pg.ID, groupsEntity); err != nil {
return err
results <- pg
wg.Add(1)
go func(wg *sync.WaitGroup, id string) {
defer wg.Done()
// FIXME: We aren't collecting errors, that's kinda uncool
c.getDeepProcessGroups(results, id)
}(&wg, pg.ID)
}
}
groupsEntity.ProcessGroups = append(groupsEntity.ProcessGroups, entity.ProcessGroups...)
wg.Wait()
return nil
}

Expand Down

0 comments on commit aaca7ee

Please sign in to comment.