Skip to content

Commit adabb5a

Browse files
committed
fix download consistency
Signed-off-by: Jörn Friedrich Dreyer <jfd@butonic.de>
1 parent e4191d8 commit adabb5a

File tree

4 files changed

+158
-17
lines changed

4 files changed

+158
-17
lines changed

pkg/rhttp/datatx/utils/download/download.go

Lines changed: 26 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -30,14 +30,15 @@ import (
3030
"strings"
3131

3232
provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
33+
"github.com/rs/zerolog"
34+
3335
"github.com/cs3org/reva/v2/internal/grpc/services/storageprovider"
3436
"github.com/cs3org/reva/v2/internal/http/services/owncloud/ocdav/net"
3537
"github.com/cs3org/reva/v2/pkg/appctx"
3638
"github.com/cs3org/reva/v2/pkg/errtypes"
3739
"github.com/cs3org/reva/v2/pkg/storage"
3840
"github.com/cs3org/reva/v2/pkg/storagespace"
3941
"github.com/cs3org/reva/v2/pkg/utils"
40-
"github.com/rs/zerolog"
4142
)
4243

4344
type contextKey struct{}
@@ -91,13 +92,26 @@ func GetOrHeadFile(w http.ResponseWriter, r *http.Request, fs storage.FS, spaceI
9192
// TODO check preconditions like If-Range, If-Match ...
9293

9394
var md *provider.ResourceInfo
95+
var getContent func(ctx context.Context, ref *provider.Reference) (io.ReadCloser, error)
9496
var err error
9597

96-
// do a stat to set a Content-Length header
98+
// do a stat to set Content-Length and etag headers
9799

98-
if md, err = fs.GetMD(ctx, ref, nil, []string{"size", "mimetype", "etag"}); err != nil {
99-
handleError(w, &sublog, err, "stat")
100-
return
100+
if fscd, ok := fs.(storage.ConsistentDownloader); ok {
101+
md, getContent, err = fscd.ConsistentDownload(ctx, ref)
102+
if err != nil {
103+
handleError(w, &sublog, err, "download")
104+
return
105+
}
106+
} else {
107+
// There is a race condition here: between the stat and the download the file could have been updated again
108+
// this is ok if the HeaderIfNoneMatch header is set but we should then also update the etag
109+
// maybe we can read the etag from the download response headers?
110+
if md, err = fs.GetMD(ctx, ref, nil, []string{"size", "mimetype", "etag"}); err != nil {
111+
handleError(w, &sublog, err, "stat")
112+
return
113+
}
114+
getContent = fs.Download
101115
}
102116

103117
// check etag, see https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/If-None-Match
@@ -141,8 +155,11 @@ func GetOrHeadFile(w http.ResponseWriter, r *http.Request, fs storage.FS, spaceI
141155
}
142156
}
143157

158+
// There is a race condition here: between the stat and the download the file could have been updated again
159+
// this is ok if the HeaderIfNoneMatch header is set but we should then also update the etag
160+
// maybe we can read the etag from the download response headers?
144161
ctx = ContextWithEtag(ctx, md.Etag)
145-
content, err := fs.Download(ctx, ref)
162+
content, err := getContent(ctx, ref)
146163
if err != nil {
147164
handleError(w, &sublog, err, "download")
148165
return
@@ -259,6 +276,9 @@ func handleError(w http.ResponseWriter, log *zerolog.Logger, err error, action s
259276
case errtypes.IsPermissionDenied:
260277
log.Debug().Err(err).Str("action", action).Msg("permission denied")
261278
w.WriteHeader(http.StatusForbidden)
279+
case errtypes.Aborted:
280+
log.Debug().Err(err).Str("action", action).Msg("etags do not match")
281+
w.WriteHeader(http.StatusPreconditionFailed)
262282
default:
263283
log.Error().Err(err).Str("action", action).Msg("unexpected error")
264284
w.WriteHeader(http.StatusInternalServerError)

pkg/storage/storage.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,9 @@ import (
2323
"io"
2424
"net/url"
2525

26-
tusd "github.com/tus/tusd/v2/pkg/handler"
27-
2826
provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
2927
registry "github.com/cs3org/go-cs3apis/cs3/storage/registry/v1beta1"
28+
tusd "github.com/tus/tusd/v2/pkg/handler"
3029
)
3130

3231
// FS is the interface to implement access to the storage.
@@ -169,3 +168,8 @@ type PathWrapper interface {
169168
Unwrap(ctx context.Context, rp string) (string, error)
170169
Wrap(ctx context.Context, rp string) (string, error)
171170
}
171+
172+
type ConsistentDownloader interface {
173+
// ConsistentDownload returns the metadata for a resource and a callback to get the content stream matching the etag
174+
ConsistentDownload(ctx context.Context, ref *provider.Reference) (*provider.ResourceInfo, func(ctx context.Context, ref *provider.Reference) (io.ReadCloser, error), error)
175+
}

pkg/storage/utils/decomposedfs/decomposedfs.go

Lines changed: 64 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,14 @@ import (
3333
user "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1"
3434
rpcv1beta1 "github.com/cs3org/go-cs3apis/cs3/rpc/v1beta1"
3535
provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
36+
"github.com/jellydator/ttlcache/v2"
37+
"github.com/pkg/errors"
38+
tusd "github.com/tus/tusd/v2/pkg/handler"
39+
microstore "go-micro.dev/v4/store"
40+
"go.opentelemetry.io/otel"
41+
"go.opentelemetry.io/otel/trace"
42+
"golang.org/x/sync/errgroup"
43+
3644
ctxpkg "github.com/cs3org/reva/v2/pkg/ctx"
3745
"github.com/cs3org/reva/v2/pkg/errtypes"
3846
"github.com/cs3org/reva/v2/pkg/events"
@@ -60,13 +68,6 @@ import (
6068
"github.com/cs3org/reva/v2/pkg/storagespace"
6169
"github.com/cs3org/reva/v2/pkg/store"
6270
"github.com/cs3org/reva/v2/pkg/utils"
63-
"github.com/jellydator/ttlcache/v2"
64-
"github.com/pkg/errors"
65-
tusd "github.com/tus/tusd/v2/pkg/handler"
66-
microstore "go-micro.dev/v4/store"
67-
"go.opentelemetry.io/otel"
68-
"go.opentelemetry.io/otel/trace"
69-
"golang.org/x/sync/errgroup"
7071
)
7172

7273
type CtxKey int
@@ -1053,6 +1054,62 @@ func (fs *Decomposedfs) Delete(ctx context.Context, ref *provider.Reference) (er
10531054
return fs.tp.Delete(ctx, node)
10541055
}
10551056

1057+
// ConsistentDownload returns the metadata for a resource and a callback to get the content stream matching the etag
1058+
func (fs *Decomposedfs) ConsistentDownload(ctx context.Context, ref *provider.Reference) (*provider.ResourceInfo, func(ctx context.Context, ref *provider.Reference) (io.ReadCloser, error), error) {
1059+
1060+
ctx, span := tracer.Start(ctx, "ConsistentDownload")
1061+
defer span.End()
1062+
// check if we are trying to download a revision
1063+
// TODO the CS3 api should allow initiating a revision download
1064+
if ref.ResourceId != nil && strings.Contains(ref.ResourceId.OpaqueId, node.RevisionIDDelimiter) {
1065+
return fs.DownloadRevisionConsistent(ctx, ref, ref.ResourceId.OpaqueId)
1066+
}
1067+
1068+
n, err := fs.lu.NodeFromResource(ctx, ref)
1069+
if err != nil {
1070+
return nil, nil, errors.Wrap(err, "Decomposedfs: error resolving ref")
1071+
}
1072+
1073+
if !n.Exists {
1074+
err = errtypes.NotFound(filepath.Join(n.ParentID, n.Name))
1075+
return nil, nil, err
1076+
}
1077+
1078+
rp, err := fs.p.AssemblePermissions(ctx, n)
1079+
switch {
1080+
case err != nil:
1081+
return nil, nil, err
1082+
case !rp.InitiateFileDownload:
1083+
f, _ := storagespace.FormatReference(ref)
1084+
if rp.Stat {
1085+
return nil, nil, errtypes.PermissionDenied(f)
1086+
}
1087+
return nil, nil, errtypes.NotFound(f)
1088+
}
1089+
1090+
mtime, err := n.GetMTime(ctx)
1091+
if err != nil {
1092+
return nil, nil, errors.Wrap(err, "Decomposedfs: error getting mtime for '"+n.ID+"'")
1093+
}
1094+
currentEtag, err := node.CalculateEtag(n.ID, mtime)
1095+
if err != nil {
1096+
return nil, nil, errors.Wrap(err, "Decomposedfs: error calculating etag for '"+n.ID+"'")
1097+
}
1098+
expectedEtag := download.EtagFromContext(ctx)
1099+
if currentEtag != expectedEtag {
1100+
return nil, nil, errtypes.Aborted(fmt.Sprintf("file changed from etag %s to %s", expectedEtag, currentEtag))
1101+
}
1102+
md, err := n.AsResourceInfo(ctx, rp, []string{}, []string{}, true)
1103+
1104+
return md, func(ctx context.Context, ref *provider.Reference) (io.ReadCloser, error) {
1105+
reader, err := fs.tp.ReadBlob(n)
1106+
if err != nil {
1107+
return nil, errors.Wrap(err, "Decomposedfs: error download blob '"+n.ID+"'")
1108+
}
1109+
return reader, nil
1110+
}, err
1111+
}
1112+
10561113
// Download returns a reader to the specified resource
10571114
func (fs *Decomposedfs) Download(ctx context.Context, ref *provider.Reference) (io.ReadCloser, error) {
10581115
ctx, span := tracer.Start(ctx, "Download")

pkg/storage/utils/decomposedfs/revisions.go

Lines changed: 62 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,13 +27,14 @@ import (
2727
"time"
2828

2929
provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
30+
"github.com/pkg/errors"
31+
"github.com/rogpeppe/go-internal/lockedfile"
32+
3033
"github.com/cs3org/reva/v2/pkg/appctx"
3134
"github.com/cs3org/reva/v2/pkg/errtypes"
3235
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/metadata/prefixes"
3336
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/node"
3437
"github.com/cs3org/reva/v2/pkg/storagespace"
35-
"github.com/pkg/errors"
36-
"github.com/rogpeppe/go-internal/lockedfile"
3738
)
3839

3940
// Revision entries are stored inside the node folder and start with the same uuid as the current version.
@@ -110,6 +111,65 @@ func (fs *Decomposedfs) ListRevisions(ctx context.Context, ref *provider.Referen
110111
return
111112
}
112113

114+
// DownloadRevision returns a reader for the specified revision
115+
// FIXME the CS3 api should explicitly allow initiating revision and trash download, a related issue is https://github.com/cs3org/reva/issues/1813
116+
func (fs *Decomposedfs) DownloadRevisionConsistent(ctx context.Context, ref *provider.Reference, revisionKey string) (*provider.ResourceInfo, func(ctx context.Context, ref *provider.Reference) (io.ReadCloser, error), error) {
117+
log := appctx.GetLogger(ctx)
118+
119+
// verify revision key format
120+
kp := strings.SplitN(revisionKey, node.RevisionIDDelimiter, 2)
121+
if len(kp) != 2 {
122+
log.Error().Str("revisionKey", revisionKey).Msg("malformed revisionKey")
123+
return nil, nil, errtypes.NotFound(revisionKey)
124+
}
125+
log.Debug().Str("revisionKey", revisionKey).Msg("DownloadRevision")
126+
127+
spaceID := ref.ResourceId.SpaceId
128+
// check if the node is available and has not been deleted
129+
n, err := node.ReadNode(ctx, fs.lu, spaceID, kp[0], false, nil, false)
130+
if err != nil {
131+
return nil, nil, err
132+
}
133+
if !n.Exists {
134+
err = errtypes.NotFound(filepath.Join(n.ParentID, n.Name))
135+
return nil, nil, err
136+
}
137+
138+
rp, err := fs.p.AssemblePermissions(ctx, n)
139+
switch {
140+
case err != nil:
141+
return nil, nil, err
142+
case !rp.ListFileVersions || !rp.InitiateFileDownload: // TODO add explicit permission in the CS3 api?
143+
f, _ := storagespace.FormatReference(ref)
144+
if rp.Stat {
145+
return nil, nil, errtypes.PermissionDenied(f)
146+
}
147+
return nil, nil, errtypes.NotFound(f)
148+
}
149+
150+
contentPath := fs.lu.InternalPath(spaceID, revisionKey)
151+
152+
blobid, blobsize, err := fs.lu.ReadBlobIDAndSizeAttr(ctx, contentPath, nil)
153+
if err != nil {
154+
return nil, nil, errors.Wrapf(err, "Decomposedfs: could not read blob id and size for revision '%s' of node '%s'", n.ID, revisionKey)
155+
}
156+
157+
revisionNode := node.Node{SpaceID: spaceID, BlobID: blobid, Blobsize: blobsize} // blobsize is needed for the s3ng blobstore
158+
159+
md, err := revisionNode.AsResourceInfo(ctx, rp, []string{}, []string{}, true)
160+
if err != nil {
161+
return nil, nil, err
162+
}
163+
return md, func(ctx context.Context, ref *provider.Reference) (io.ReadCloser, error) {
164+
165+
reader, err := fs.tp.ReadBlob(&revisionNode)
166+
if err != nil {
167+
return nil, errors.Wrapf(err, "Decomposedfs: could not download blob of revision '%s' for node '%s'", n.ID, revisionKey)
168+
}
169+
return reader, nil
170+
}, nil
171+
}
172+
113173
// DownloadRevision returns a reader for the specified revision
114174
// FIXME the CS3 api should explicitly allow initiating revision and trash download, a related issue is https://github.com/cs3org/reva/issues/1813
115175
func (fs *Decomposedfs) DownloadRevision(ctx context.Context, ref *provider.Reference, revisionKey string) (io.ReadCloser, error) {

0 commit comments

Comments
 (0)