From 40235cdafda64e86cd9370365e1a05b6c2a95918 Mon Sep 17 00:00:00 2001 From: cdujeu Date: Tue, 6 Apr 2021 15:04:50 +0200 Subject: [PATCH] Revert pruning versions for deleted files optimization - Add a check for versinoning datasources instead. --- data/versions/action-prune.go | 23 ++++++++++++++++++++ data/versions/grpc/handler.go | 41 +++++++++-------------------------- 2 files changed, 33 insertions(+), 31 deletions(-) diff --git a/data/versions/action-prune.go b/data/versions/action-prune.go index 56cc1a0b22..ef7b56962e 100644 --- a/data/versions/action-prune.go +++ b/data/versions/action-prune.go @@ -23,6 +23,9 @@ package versions import ( "context" + "github.com/pydio/cells/common/config" + "github.com/pydio/cells/common/proto/object" + "github.com/micro/go-micro/client" "go.uber.org/zap" @@ -79,6 +82,26 @@ func (c *PruneVersionsAction) Init(job *jobs.Job, cl client.Client, action *jobs // Run processes the actual action code. func (c *PruneVersionsAction) Run(ctx context.Context, channels *actions.RunnableChannels, input jobs.ActionMessage) (jobs.ActionMessage, error) { + // First check if versioning is enabled on any datasource + sources := config.SourceNamesForDataServices(common.ServiceDataIndex) + var versioningFound bool + for _, src := range sources { + var ds *object.DataSource + if err := config.Get("services", common.ServiceGrpcNamespace_+common.ServiceDataSync_+src).Scan(&ds); err == nil { + if ds.VersioningPolicyName != "" { + versioningFound = true + break + } + } + } + + if !versioningFound { + log.TasksLogger(ctx).Info("Ignoring action: no datasources found with versioning enabled.") + return input.WithIgnore(), nil + } else { + log.TasksLogger(ctx).Info("Starting action: one or more datasources found with versioning enabled.") + } + source, e := c.Pool.GetDataSourceInfo(common.PydioVersionsNamespace) if e != nil { return input.WithError(e), e diff --git a/data/versions/grpc/handler.go b/data/versions/grpc/handler.go index 08de431695..259d9b270b 100644 --- a/data/versions/grpc/handler.go +++ b/data/versions/grpc/handler.go @@ -23,10 +23,11 @@ package grpc import ( "context" "net/url" - "path" "sync" "time" + json "github.com/pydio/cells/x/jsonx" + "github.com/micro/go-micro/errors" "github.com/patrickmn/go-cache" "github.com/pborman/uuid" @@ -43,9 +44,7 @@ import ( "github.com/pydio/cells/common/proto/tree" "github.com/pydio/cells/common/utils/i18n" "github.com/pydio/cells/common/utils/permissions" - "github.com/pydio/cells/common/views" "github.com/pydio/cells/data/versions" - json "github.com/pydio/cells/x/jsonx" ) var policiesCache *cache.Cache @@ -172,44 +171,24 @@ func (h *Handler) PruneVersions(ctx context.Context, request *tree.PruneVersions cl := tree.NewNodeProviderClient(common.ServiceGrpcNamespace_+common.ServiceTree, defaults.NewClient()) var idsToDelete []string - router := views.NewStandardRouter(views.RouterOptions{AdminView: true}) if request.AllDeletedNodes { - var versionedDsRoots []*tree.Node - st, e := cl.ListNodes(ctx, &tree.ListNodesRequest{Node: &tree.Node{Path: "/"}, Recursive: false}) - if e != nil { - return e + // Load whole tree in memory + existingIds := make(map[string]struct{}) + streamer, sErr := cl.ListNodes(ctx, &tree.ListNodesRequest{Node: &tree.Node{Path: "/"}, Recursive: true}) + if sErr != nil { + return sErr } - defer st.Close() + defer streamer.Close() for { - r, e := st.Recv() + r, e := streamer.Recv() if e != nil { break } - dsName := path.Base(r.GetNode().GetPath()) - if loaded, e := router.GetClientsPool().GetDataSourceInfo(dsName); e == nil && loaded.VersioningPolicyName != "" { - versionedDsRoots = append(versionedDsRoots, r.GetNode()) - } + existingIds[r.Node.GetUuid()] = struct{}{} } - // Load datasource tree in memory - existingIds := make(map[string]struct{}) - for _, dsRoot := range versionedDsRoots { - log.TasksLogger(ctx).Info("Listing datasource " + dsRoot.GetPath() + " for pruning") - streamer, sErr := cl.ListNodes(ctx, &tree.ListNodesRequest{Node: dsRoot, Recursive: true}) - if sErr != nil { - return sErr - } - for { - r, e := streamer.Recv() - if e != nil { - break - } - existingIds[r.Node.GetUuid()] = struct{}{} - } - streamer.Close() - } wg := &sync.WaitGroup{} wg.Add(1) runner := func() error {