Skip to content

Commit

Permalink
Fix versions pruning for deleted files
Browse files Browse the repository at this point in the history
  • Loading branch information
cdujeu committed Apr 6, 2021
1 parent 690d008 commit 0dcfba5
Showing 1 changed file with 24 additions and 25 deletions.
49 changes: 24 additions & 25 deletions data/versions/grpc/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,46 +193,45 @@ func (h *Handler) PruneVersions(ctx context.Context, request *tree.PruneVersions
}
}

// Load datasource tree in memory
existingIds := make(map[string]struct{})
for _, dsRoot := range versionedDsRoots {
log.TasksLogger(ctx).Info("Listing datasource " + dsRoot.GetPath() + " for pruning")
// Load datasource tree in memory
existingIds := make(map[string]struct{})
streamer, sErr := cl.ListNodes(ctx, &tree.ListNodesRequest{Node: dsRoot, Recursive: true})
if sErr != nil {
return sErr
}
defer streamer.Close()
for {
r, e := streamer.Recv()
if e != nil {
break
}
existingIds[r.Node.GetUuid()] = struct{}{}
}

wg := &sync.WaitGroup{}
wg.Add(1)
runner := func() error {
defer wg.Done()
uuids, done, errs := h.db.ListAllVersionedNodesUuids()
for {
select {
case id := <-uuids:
if _, o := existingIds[id]; !o {
idsToDelete = append(idsToDelete, id)
}
case e := <-errs:
return e
case <-done:
return nil
streamer.Close()
}
wg := &sync.WaitGroup{}
wg.Add(1)
runner := func() error {
defer wg.Done()
uuids, done, errs := h.db.ListAllVersionedNodesUuids()
for {
select {
case id := <-uuids:
if _, o := existingIds[id]; !o {
idsToDelete = append(idsToDelete, id)
}
case e := <-errs:
return e
case <-done:
return nil
}
}
err := runner()
wg.Wait()
if err != nil {
return err
}
}
err := runner()
wg.Wait()
if err != nil {
return err
}

} else if request.UniqueNode != nil {
Expand Down Expand Up @@ -268,7 +267,7 @@ func (h *Handler) PruneVersions(ctx context.Context, request *tree.PruneVersions
return e
}

log.Logger(ctx).Debug("Responding to Prune with these versions", zap.Any("versions", resp.DeletedVersions))
log.Logger(ctx).Debug("Responding to Prune with versions", zap.Int("versions", len(resp.DeletedVersions)))

return nil
}
Expand Down

0 comments on commit 0dcfba5

Please sign in to comment.