-
Notifications
You must be signed in to change notification settings - Fork 999
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
gc: improve the performance of Juicefs gc command #5683
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -3341,36 +3341,59 @@ func (m *redisMeta) scanPendingFiles(ctx Context, scan pendingFileScan) error { | |
visited := make(map[Ino]bool) | ||
start := int64(0) | ||
const batchSize = 1000 | ||
|
||
threads := m.conf.MaxCleanups | ||
deleteFileChan := make(chan redis.Z, threads) | ||
var wg sync.WaitGroup | ||
|
||
for i := 0; i < threads; i++ { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we move this part into base.go to reduce the duplicated code? |
||
wg.Add(1) | ||
go func() { | ||
defer wg.Done() | ||
for p := range deleteFileChan { | ||
v := p.Member.(string) | ||
ps := strings.Split(v, ":") | ||
if len(ps) != 2 { // will be cleaned up as legacy | ||
continue | ||
} | ||
inode, _ := strconv.ParseUint(ps[0], 10, 64) | ||
if visited[Ino(inode)] { | ||
continue | ||
} | ||
visited[Ino(inode)] = true | ||
size, _ := strconv.ParseUint(ps[1], 10, 64) | ||
clean, err := scan(Ino(inode), size, int64(p.Score)) | ||
if err != nil { | ||
logger.Errorf("scan pending deleted files: %s", err) | ||
continue | ||
} | ||
if clean { | ||
m.doDeleteFileData_(Ino(inode), size, v) | ||
} | ||
} | ||
}() | ||
} | ||
|
||
for { | ||
pairs, err := m.rdb.ZRangeWithScores(Background(), m.delfiles(), start, start+batchSize).Result() | ||
if err != nil { | ||
close(deleteFileChan) | ||
wg.Wait() | ||
return err | ||
} | ||
start += batchSize | ||
for _, p := range pairs { | ||
v := p.Member.(string) | ||
ps := strings.Split(v, ":") | ||
if len(ps) != 2 { // will be cleaned up as legacy | ||
continue | ||
} | ||
inode, _ := strconv.ParseUint(ps[0], 10, 64) | ||
if visited[Ino(inode)] { | ||
continue | ||
} | ||
visited[Ino(inode)] = true | ||
size, _ := strconv.ParseUint(ps[1], 10, 64) | ||
clean, err := scan(Ino(inode), size, int64(p.Score)) | ||
if err != nil { | ||
return err | ||
} | ||
if clean { | ||
m.doDeleteFileData_(Ino(inode), size, v) | ||
} | ||
deleteFileChan <- p | ||
} | ||
|
||
start += batchSize | ||
|
||
if len(pairs) < batchSize { | ||
break | ||
} | ||
} | ||
|
||
close(deleteFileChan) | ||
wg.Wait() | ||
return nil | ||
} | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2574,28 +2574,65 @@ func (m *kvMeta) scanPendingFiles(ctx Context, scan pendingFileScan) error { | |
} | ||
// deleted files: Diiiiiiiissssssss | ||
klen := 1 + 8 + 8 | ||
pairs, err := m.scanValues(m.fmtKey("D"), -1, func(k, v []byte) bool { | ||
return len(k) == klen | ||
}) | ||
if err != nil { | ||
return err | ||
batchSize := 100000 | ||
|
||
threads := m.conf.MaxCleanups | ||
deleteFileChan := make(chan pair, threads) | ||
var wg sync.WaitGroup | ||
|
||
for i := 0; i < threads; i++ { | ||
wg.Add(1) | ||
go func() { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If an exception occurs in the middle, the GC command should print that, same for redis and sql implement. |
||
defer wg.Done() | ||
for pair := range deleteFileChan { | ||
key, value := pair.key, pair.value | ||
if len(key) != klen { | ||
logger.Errorf("invalid key %x", key) | ||
continue | ||
} | ||
ino := m.decodeInode([]byte(key)[1:9]) | ||
size := binary.BigEndian.Uint64([]byte(key)[9:]) | ||
ts := m.parseInt64(value) | ||
clean, err := scan(ino, size, ts) | ||
if err != nil { | ||
logger.Errorf("scan pending deleted files: %s", err) | ||
continue | ||
} | ||
if clean { | ||
m.doDeleteFileData(ino, size) | ||
} | ||
} | ||
}() | ||
} | ||
|
||
for key, value := range pairs { | ||
if len(key) != klen { | ||
return fmt.Errorf("invalid key %x", key) | ||
startKey := m.fmtKey("D") | ||
endKey := nextKey(startKey) | ||
for { | ||
keys, values, err := m.scan(startKey, endKey, batchSize, func(k, v []byte) bool { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. use client.scan directly There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The scan implementation in TiKV and FDB already handles this situation by fetching in batches, but etcd doesn’t. Let’s keep it like this then. |
||
return len(k) == klen | ||
}) | ||
if len(keys) == 0 { | ||
break | ||
} | ||
ino := m.decodeInode([]byte(key)[1:9]) | ||
size := binary.BigEndian.Uint64([]byte(key)[9:]) | ||
ts := m.parseInt64(value) | ||
clean, err := scan(ino, size, ts) | ||
if err != nil { | ||
logger.Errorf("scan pending deleted files: %s", err) | ||
close(deleteFileChan) | ||
wg.Wait() | ||
return err | ||
} | ||
if clean { | ||
m.doDeleteFileData(ino, size) | ||
startKey = nextKey(keys[len(keys)-1]) | ||
|
||
for index, key := range keys { | ||
deleteFileChan <- pair{key, values[index]} | ||
} | ||
|
||
if len(keys) < batchSize { | ||
break | ||
} | ||
} | ||
|
||
close(deleteFileChan) | ||
wg.Wait() | ||
return nil | ||
} | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can reuse the
threads
for cleanup