-
Notifications
You must be signed in to change notification settings - Fork 0
/
sum.go
351 lines (319 loc) · 8.59 KB
/
sum.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
package xsum
import (
"context"
"errors"
"fmt"
"os"
"path/filepath"
"runtime"
"sync"
"golang.org/x/sync/semaphore"
"github.com/sclevine/xsum/encoding"
)
var (
ErrDirectory = errors.New("is a directory")
ErrNoStat = errors.New("stat data unavailable")
ErrNoXattr = errors.New("xattr data unavailable")
DefaultSemaphore = semaphore.NewWeighted(int64(runtime.NumCPU()))
DefaultSum = &Sum{Semaphore: DefaultSemaphore}
)
// Sum may be used to calculate checksums of files and directories.
// Directory checksums use Merkle trees to hash their contents.
// If noDirs is true, Files that refer to directories will return ErrDirectory.
// If Semaphone is not provided, DefaultSemaphore is used.
type Sum struct {
Semaphore *semaphore.Weighted
NoDirs bool
}
// Find takes a slice of Files and returns a slice of *Nodes.
// Each *Node contains either a checksum or an error.
// Unlike Each and EachList, Find returns immediately on the first error encountered.
// Returned *Nodes are guaranteed to have Node.Err set to nil.
func (s *Sum) Find(files []File) ([]*Node, error) {
var nodes []*Node
if err := s.EachList(files, func(n *Node) error {
if n.Err != nil {
return n.Err
}
nodes = append(nodes, n)
return nil
}); err != nil {
return nil, err
}
return nodes, nil
}
// EachList takes a slice of Files and invokes f for each resulting *Node.
// Each *Node contains either a checksum or an error.
// EachList returns immediately if fn returns an error.
func (s *Sum) EachList(files []File, fn func(*Node) error) error {
ch := make(chan File)
ctx, done := context.WithCancel(context.Background())
defer done()
go func() {
for _, f := range files {
select {
case ch <- f:
case <-ctx.Done():
close(ch) // stops Each from processing after it returns, eventually unnecessary?
return
}
}
close(ch)
}()
return s.Each(ch, fn)
}
// Each takes a channel of Files and invokes f for each resulting *Node.
// Each *Node contains either a checksum or an error.
// Each returns immediately if fn returns an error.
func (s *Sum) Each(files <-chan File, fn func(*Node) error) error {
queue := newNodeQueue()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go func() {
loop:
for {
select {
case file, ok := <-files:
if !ok {
break loop
}
var wg sync.WaitGroup
if file.Path != "" {
file.Path = filepath.Clean(file.Path)
}
nodeRec := make(chan *Node, 1)
queue.enqueue(nodeRec)
wg.Add(1)
go func() {
nodeRec <- s.walkFile(file, false, wg.Done)
}()
wg.Wait()
case <-ctx.Done(): // fast exit via ctx cancel is better for CLI
break loop
}
}
queue.close()
}()
// TODO: err does not shutdown all goroutines, need to thread ctx
for node := queue.dequeue(); node != nil; node = queue.dequeue() {
if err := fn(node); err != nil {
return err
}
}
return nil
}
func (s *Sum) acquireCPU() {
sem := DefaultSemaphore
if s.Semaphore != nil {
sem = s.Semaphore
}
sem.Acquire(context.Background(), 1)
}
func (s *Sum) releaseCPU() {
sem := DefaultSemaphore
if s.Semaphore != nil {
sem = s.Semaphore
}
sem.Release(1)
}
// If passed, sched is called exactly once when all remaining work has acquired locks on the CPU
func (s *Sum) walkFile(file File, subdir bool, sched func()) *Node {
s.acquireCPU()
rOnce := newOnce()
defer rOnce.Do(s.releaseCPU)
sOnce := newOnce()
defer sOnce.Do(sched)
if err := validateMask(file.Mask); err != nil {
return newFileErrorNode("validate mask for file", file, subdir, err)
}
if file.Stdin {
file.Mask.Attr &= ^AttrX
}
inclusive := file.Mask.Attr&AttrInclusive != 0
noData := file.Mask.Attr&AttrNoData != 0 && (inclusive || subdir)
fi, err := file.stat(subdir)
if os.IsNotExist(err) {
return newFileErrorNode("", file, subdir, err)
}
if err != nil {
return newFileErrorNode("stat", file, subdir, err)
}
sys, err := getSys(fi)
if err == ErrNoStat &&
file.Mask.Attr&(AttrUID|AttrGID|AttrSpecial|AttrMtime|AttrCtime) == 0 {
// sys not needed
} else if err != nil {
return newFileErrorNode("stat", file, subdir, err)
}
var xattr *Xattr
if file.Mask.Attr&AttrX != 0 {
xattr, err = file.xattr(subdir)
if err != nil {
return newFileErrorNode("get xattr", file, subdir, err)
}
}
var sum []byte
switch {
case fi.IsDir():
if s.NoDirs {
return newFileErrorNode("", file, subdir, ErrDirectory)
}
names, err := readDirUnordered(file.Path)
if err != nil {
return newFileErrorNode("read dir", file, subdir, err)
}
rOnce.Do(s.releaseCPU)
nodes := s.walkDir(file, names)
sOnce.Do(sched)
// Locking on the following operation would prevent short, in-memory checksum operations from bypassing the NumCPU limit.
// However, it would also prevent some earlier entries from finishing before later entries and lead to excessive contention.
// Instead, we rely on preemption to schedule these operations.
hashes := make([]encoding.NamedHash, 0, len(names))
for n := range nodes {
if n.Err != nil {
if subdir { // preserve bottom-level and top-level FileError only
// error from walkFile has adequate context
return &Node{File: file, Err: n.Err}
}
return newFileErrorNode("", file, subdir, n.Err)
}
var name string
if file.Mask.Attr&AttrNoName == 0 {
// safe because subdir nodes have generated bases
name = filepath.Base(n.Path)
}
b, err := hashFileAttr(n)
if err != nil {
return newFileErrorNode("hash metadata for file", file, subdir, err)
}
hashes = append(hashes, encoding.NamedHash{
Hash: b,
Name: []byte(name),
})
}
der, err := encoding.TreeASN1DER(hashToEncoding(file.Hash.String()), hashes)
if err != nil {
return newFileErrorNode("encode", file, subdir, err)
}
sum, err = file.Hash.Metadata(der)
if err != nil {
return newFileErrorNode("hash", file, subdir, err)
}
case fi.Mode()&os.ModeSymlink != 0:
sOnce.Do(sched)
file.Mask.Attr &= ^AttrNoName // not directory
if noData {
file.Mask.Attr |= AttrNoData
} else {
link, err := os.Readlink(file.Path)
if err != nil {
return newFileErrorNode("read link", file, subdir, err)
}
file.Mask.Attr &= ^AttrNoData
sum, err = file.Hash.Metadata([]byte(link))
if err != nil {
return newFileErrorNode("hash link", file, subdir, err)
}
}
default:
sOnce.Do(sched)
file.Mask.Attr &= ^AttrNoName // not directory
if noData || (!fi.Mode().IsRegular() && (inclusive || subdir)) {
file.Mask.Attr |= AttrNoData
} else {
file.Mask.Attr &= ^AttrNoData
sum, err = file.sum()
if err != nil {
return newFileErrorNode("hash", file, subdir, err)
}
}
}
n := &Node{
File: file,
Sum: sum,
Mode: fi.Mode(),
Sys: sys,
Xattr: xattr,
}
if inclusive && !subdir {
n.Sum, err = hashFileAttr(n)
if err != nil {
return newFileErrorNode("hash metadata for file", file, subdir, err)
}
}
return n
}
func (s *Sum) walkDir(file File, names []string) <-chan *Node {
nodes := make(chan *Node, len(names))
var swg, nwg sync.WaitGroup
nwg.Add(len(names))
swg.Add(len(names))
for _, name := range names {
name := name
go func() {
defer nwg.Done()
nodes <- s.walkFile(File{
Hash: file.Hash,
Path: filepath.Join(file.Path, name),
Mask: file.Mask,
}, true, swg.Done)
}()
}
go func() {
nwg.Wait()
close(nodes)
}()
swg.Wait()
return nodes
}
func newOnce() doOnce {
return true
}
// sync.Once is concurrent, not needed here
type doOnce bool
func (rs *doOnce) Do(f func()) {
if *rs {
*rs = false
if f != nil {
f()
}
}
}
func newFileErrorNode(action string, file File, subdir bool, err error) *Node {
return &Node{File: file, Err: newFileError(action, file.Path, subdir, err)}
}
func newFileError(action, path string, subdir bool, err error) error {
return &FileError{
Action: action,
Path: path,
Subdir: subdir,
Err: err,
}
}
// FileError is similar to os.PathError, but contains extra information such as Subdir.
type FileError struct {
Action string // failed action
Path string
Subdir bool // error apply to file/dir in subdir of specified path
Err error
}
// Error message
func (e *FileError) Error() string {
err := e.Err
pErr := &os.PathError{}
fErr := &FileError{}
if errors.As(err, &pErr) && !errors.As(err, &fErr) {
err = pErr.Err // remove intermediate *PathError -- will be covered by FileError
}
if e.Action == "" {
return fmt.Sprintf("%s: %s", e.Path, err)
}
if e.Subdir {
return fmt.Sprintf("failed to %s `%s': %s", e.Action, e.Path, err)
}
return fmt.Sprintf("%[2]s: failed to %[1]s: %[3]s", e.Action, e.Path, err)
}
// Unwrap returns the underlying error
func (e *FileError) Unwrap() error {
return e.Err
}