-
Notifications
You must be signed in to change notification settings - Fork 255
/
Copy pathpipe.go
337 lines (295 loc) · 8.94 KB
/
pipe.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
package command
import (
"context"
"errors"
"fmt"
"mime"
"os"
"path/filepath"
"github.com/urfave/cli/v2"
errorpkg "github.com/peak/s5cmd/v2/error"
"github.com/peak/s5cmd/v2/log"
"github.com/peak/s5cmd/v2/log/stat"
"github.com/peak/s5cmd/v2/storage"
"github.com/peak/s5cmd/v2/storage/url"
)
var pipeHelpTemplate = `Name:
{{.HelpName}} - {{.Usage}}
Usage:
{{.HelpName}} [options] destination
Options:
{{range .VisibleFlags}}{{.}}
{{end}}
Examples:
01. Stream stdin to an object
> echo "content" | gzip | s5cmd {{.HelpName}} s3://bucket/prefix/object.gz
02. Pass arbitrary metadata to an object
> cat "flowers.png" | gzip | s5cmd {{.HelpName}} --metadata "imageSize=6032x4032" s3://bucket/prefix/flowers.gz
03. Download an object and stream it to a bucket
> curl https://github.com/peak/s5cmd/ | s5cmd {{.HelpName}} s3://bucket/s5cmd.html
04. Compress an object and stream it to a bucket
> gzip -c file | s5cmd {{.HelpName}} s3://bucket/file.gz
`
func NewPipeCommandFlags() []cli.Flag {
pipeFlags := []cli.Flag{
&cli.StringFlag{
Name: "storage-class",
Usage: "set storage class for target ('STANDARD','REDUCED_REDUNDANCY','GLACIER','STANDARD_IA','ONEZONE_IA','INTELLIGENT_TIERING','DEEP_ARCHIVE')",
},
&cli.IntFlag{
Name: "concurrency",
Aliases: []string{"c"},
Value: defaultCopyConcurrency,
Usage: "number of concurrent parts transferred between host and remote server",
},
&cli.IntFlag{
Name: "part-size",
Aliases: []string{"p"},
Value: defaultPartSize,
Usage: "size of each part transferred between host and remote server, in MiB",
},
&MapFlag{
Name: "metadata",
Usage: "set arbitrary metadata for the object",
},
&cli.StringFlag{
Name: "sse",
Usage: "perform server side encryption of the data at its destination, e.g. aws:kms",
},
&cli.StringFlag{
Name: "sse-kms-key-id",
Usage: "customer master key (CMK) id for SSE-KMS encryption; leave it out if server-side generated key is desired",
},
&cli.StringFlag{
Name: "acl",
Usage: "set acl for target: defines granted accesses and their types on different accounts/groups, e.g. pipe --acl 'public-read'",
},
&cli.StringFlag{
Name: "cache-control",
Usage: "set cache control for target: defines cache control header for object, e.g. pipe --cache-control 'public, max-age=345600'",
},
&cli.StringFlag{
Name: "expires",
Usage: "set expires for target (uses RFC3339 format): defines expires header for object, e.g. pipe --expires '2024-10-01T20:30:00Z'",
},
&cli.BoolFlag{
Name: "raw",
Usage: "disable the wildcard operations, useful with filenames that contains glob characters",
},
&cli.StringFlag{
Name: "content-type",
Usage: "set content type for target: defines content type header for object, e.g. --content-type text/plain",
},
&cli.StringFlag{
Name: "content-encoding",
Usage: "set content encoding for target: defines content encoding header for object, e.g. --content-encoding gzip",
},
&cli.StringFlag{
Name: "content-disposition",
Usage: "set content disposition for target: defines content disposition header for object, e.g. --content-disposition 'attachment; filename=\"filename.jpg\"'",
},
&cli.BoolFlag{
Name: "no-clobber",
Aliases: []string{"n"},
Usage: "do not overwrite destination if already exists",
},
}
return pipeFlags
}
func NewPipeCommand() *cli.Command {
cmd := &cli.Command{
Name: "pipe",
HelpName: "pipe",
Usage: "stream to remote from stdin",
Flags: NewPipeCommandFlags(),
CustomHelpTemplate: pipeHelpTemplate,
Before: func(c *cli.Context) error {
err := validatePipeCommand(c)
if err != nil {
printError(commandFromContext(c), c.Command.Name, err)
}
return err
},
Action: func(c *cli.Context) (err error) {
defer stat.Collect(c.Command.FullName(), &err)()
pipe, err := NewPipe(c, false)
if err != nil {
return err
}
return pipe.Run(c.Context)
},
}
cmd.BashComplete = getBashCompleteFn(cmd, false, false)
return cmd
}
// Pipe holds pipe operation flags and states.
type Pipe struct {
dst *url.URL
op string
fullCommand string
deleteSource bool
// flags
noClobber bool
storageClass storage.StorageClass
encryptionMethod string
encryptionKeyID string
acl string
cacheControl string
expires string
contentType string
contentEncoding string
contentDisposition string
metadata map[string]string
// s3 options
concurrency int
partSize int64
storageOpts storage.Options
}
// NewPipe creates Pipe from cli.Context.
func NewPipe(c *cli.Context, deleteSource bool) (*Pipe, error) {
fullCommand := commandFromContext(c)
dst, err := url.New(c.Args().Get(0), url.WithRaw(c.Bool("raw")))
if err != nil {
printError(fullCommand, c.Command.Name, err)
return nil, err
}
metadata, ok := c.Value("metadata").(MapValue)
if !ok {
err := errors.New("metadata flag is not a map")
printError(fullCommand, c.Command.Name, err)
return nil, err
}
return &Pipe{
dst: dst,
op: c.Command.Name,
fullCommand: fullCommand,
deleteSource: deleteSource,
// flags
noClobber: c.Bool("no-clobber"),
storageClass: storage.StorageClass(c.String("storage-class")),
concurrency: c.Int("concurrency"),
partSize: c.Int64("part-size") * megabytes,
encryptionMethod: c.String("sse"),
encryptionKeyID: c.String("sse-kms-key-id"),
acl: c.String("acl"),
cacheControl: c.String("cache-control"),
expires: c.String("expires"),
contentType: c.String("content-type"),
contentEncoding: c.String("content-encoding"),
contentDisposition: c.String("content-disposition"),
metadata: metadata,
// s3 options
storageOpts: NewStorageOpts(c),
}, nil
}
// Run starts copying stdin output to destination.
func (c Pipe) Run(ctx context.Context) error {
if c.dst.IsBucket() || c.dst.IsPrefix() {
return fmt.Errorf("target %q must be an object", c.dst)
}
err := c.shouldOverride(ctx, c.dst)
if err != nil {
if errorpkg.IsWarning(err) {
printDebug(c.op, err, nil, c.dst)
return nil
}
return err
}
client, err := storage.NewRemoteClient(ctx, c.dst, c.storageOpts)
if err != nil {
return err
}
metadata := storage.Metadata{
UserDefined: c.metadata,
ACL: c.acl,
CacheControl: c.cacheControl,
Expires: c.expires,
StorageClass: string(c.storageClass),
ContentEncoding: c.contentEncoding,
ContentDisposition: c.contentDisposition,
EncryptionMethod: c.encryptionMethod,
EncryptionKeyID: c.encryptionKeyID,
}
if c.contentType != "" {
metadata.ContentType = c.contentType
} else {
metadata.ContentType = guessContentTypeByExtension(c.dst)
}
err = client.Put(ctx, &stdin{file: os.Stdin}, c.dst, metadata, c.concurrency, c.partSize)
if err != nil {
return err
}
msg := log.InfoMessage{
Operation: c.op,
Source: nil,
Destination: c.dst,
Object: &storage.Object{
StorageClass: c.storageClass,
},
}
log.Info(msg)
return nil
}
// shouldOverride function checks if the destination should be overridden if
// the destination object and given pipe flags conform to the
// override criteria.
func (c Pipe) shouldOverride(ctx context.Context, dsturl *url.URL) error {
// if not asked to override, ignore.
if !c.noClobber {
return nil
}
client, err := storage.NewClient(ctx, dsturl, c.storageOpts)
if err != nil {
return err
}
obj, err := statObject(ctx, dsturl, client)
if err != nil {
return err
}
// if destination not exists, no conditions apply.
if obj == nil {
return nil
}
if c.noClobber {
return errorpkg.ErrObjectExists
}
return nil
}
func validatePipeCommand(c *cli.Context) error {
if c.Args().Len() != 1 {
return fmt.Errorf("expected destination argument")
}
dst := c.Args().Get(0)
dsturl, err := url.New(dst, url.WithRaw(c.Bool("raw")))
if err != nil {
return err
}
if !dsturl.IsRemote() {
return fmt.Errorf("destination must be a bucket")
}
if dsturl.IsBucket() || dsturl.IsPrefix() {
return fmt.Errorf("target %q must be an object", dsturl)
}
// wildcard destination can not be used with pipe
if dsturl.IsWildcard() {
return fmt.Errorf("target %q can not contain glob characters", dst)
}
return nil
}
func guessContentTypeByExtension(dsturl *url.URL) string {
contentType := mime.TypeByExtension(filepath.Ext(dsturl.Absolute()))
if contentType == "" {
return "application/octet-stream"
}
return contentType
}
// stdin is an io.Reader adapter for os.File, enabling it to function solely as
// an io.Reader. The AWS SDK, which accepts an io.Reader for multipart uploads,
// will attempt to use io.Seek if the reader supports it. However, os.Stdin is
// a specific type of file that can not seekable.
type stdin struct {
file *os.File
}
func (s *stdin) Read(p []byte) (n int, err error) {
return s.file.Read(p)
}