Skip to content

Commit

Permalink
fix sync logic
Browse files Browse the repository at this point in the history
  • Loading branch information
zyxkad committed Jun 8, 2024
1 parent 57cf313 commit 69d6ab0
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 42 deletions.
2 changes: 1 addition & 1 deletion handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ func (cr *Cluster) getRecordMiddleWare() utils.MiddleWareFunc {
case <-updateTicker.C:
cr.stats.Lock()

log.Infof("Served %d requests, total responsed body = %s, total used CPU time = %.2fs",
log.Infof("Served %d requests, total responsed body = %s, total IO waiting time = %.2fs",
total, utils.BytesToUnit(totalBytes), totalUsed)
for ua, v := range uas {
if ua == "" {
Expand Down
27 changes: 14 additions & 13 deletions lang/en/us.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,19 +58,20 @@ var areaUS = map[string]string{
"error.check.open.failed": "Cannot open %q: %v",
"error.check.hash.failed": "Cannot calculate hash for %s: %v",

"info.sync.prepare": "Preparing to sync files, length of filelist is %d ...",
"hint.sync.start": "Starting sync files, count: %d, bytes: %s",
"hint.sync.done": "All files were synchronized, use time: %v, %s/s",
"error.sync.failed": "File sync failed: %v",
"info.sync.none": "All files were synchronized",
"warn.sync.interrupted": "File sync interrupted",
"info.sync.config": "Sync config: %#v",
"hint.sync.total": "Total: ",
"hint.sync.downloading": "> Downloading ",
"hint.sync.downloading.handler": "Downloading %s from handler",
"info.sync.downloaded": "Downloaded %s [%s] %.2f%%",
"error.sync.download.failed": "Download error %s:\n\t%s",
"error.sync.create.failed": "Cannot create %s/%s: %v",
"info.sync.prepare": "Preparing to sync files, length of filelist is %d ...",
"hint.sync.start": "Starting sync files, count: %d, bytes: %s",
"hint.sync.done": "All files were synchronized, use time: %v, %s/s",
"error.sync.failed": "File sync failed: %v",
"info.sync.none": "All files were synchronized",
"warn.sync.interrupted": "File sync interrupted",
"info.sync.config": "Sync config: %#v",
"hint.sync.total": "Total: ",
"hint.sync.downloading": "> Downloading ",
"hint.sync.downloading.handler": "Downloading %s from handler",
"info.sync.downloaded": "Downloaded %s [%s] %.2f%%",
"error.sync.download.failed": "Download error %s:\n\t%s",
"error.sync.download.failed.retry": "Download error %s, retry after %v:\n\t%s",
"error.sync.create.failed": "Cannot create %s/%s: %v",

"info.gc.start": "Starting garbage collector for %s",
"info.gc.done": "Garbage collect finished for %s",
Expand Down
27 changes: 14 additions & 13 deletions lang/zh/cn.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,19 +58,20 @@ var areaCN = map[string]string{
"error.check.open.failed": "无法打开 %q: %v",
"error.check.hash.failed": "无法为 %s 计算哈希值: %v",

"info.sync.prepare": "准备同步中, 文件列表长度为 %d ...",
"hint.sync.start": "开始同步, 总计: %d, 字节: %s",
"hint.sync.done": "文件同步完成, 用时: %v, %s/s",
"error.sync.failed": "文件同步失败: %v",
"info.sync.none": "所有文件已同步",
"warn.sync.interrupted": "同步已中断",
"info.sync.config": "同步配置: %#v",
"hint.sync.total": "总计: ",
"hint.sync.downloading": "> 下载中 ",
"hint.sync.downloading.handler": "Downloading %s from handler",
"info.sync.downloaded": "已下载 %s [%s] %.2f%%",
"error.sync.download.failed": "下载失败 %s:\n\t%s",
"error.sync.create.failed": "无法创建 %s/%s: %v",
"info.sync.prepare": "准备同步中, 文件列表长度为 %d ...",
"hint.sync.start": "开始同步, 总计: %d, 字节: %s",
"hint.sync.done": "文件同步完成, 用时: %v, %s/s",
"error.sync.failed": "文件同步失败: %v",
"info.sync.none": "所有文件已同步",
"warn.sync.interrupted": "同步已中断",
"info.sync.config": "同步配置: %#v",
"hint.sync.total": "总计: ",
"hint.sync.downloading": "> 下载中 ",
"hint.sync.downloading.handler": "Downloading %s from handler",
"info.sync.downloaded": "已下载 %s [%s] %.2f%%",
"error.sync.download.failed": "下载失败 %s:\n\t%s",
"error.sync.download.failed.retry": "下载失败 %s, %v 后重新尝试:\n\t%s",
"error.sync.create.failed": "无法创建 %s/%s: %v",

"info.gc.start": "正在清理 %s",
"info.gc.done": "已清理 %s",
Expand Down
37 changes: 22 additions & 15 deletions sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ func (cr *Cluster) checkFileFor(
mpb.AppendDecorators(
decor.CountersNoUnit("%d / %d", decor.WCSyncSpaceR),
decor.NewPercentage("%d", decor.WCSyncSpaceR),
decor.EwmaETA(decor.ET_STYLE_GO, 30),
decor.EwmaETA(decor.ET_STYLE_GO, 60),
),
mpb.BarExtender((mpb.BarFillerFunc)(func(w io.Writer, _ decor.Statistics) (err error) {
if checkingHashMux.TryLock() {
Expand Down Expand Up @@ -489,7 +489,7 @@ func (cr *Cluster) syncFiles(ctx context.Context, files []FileInfo, heavyCheck b
var stats syncStats
stats.pg = pg
stats.noOpen = syncCfg.Source == "center"
stats.slots = limited.NewBufSlots(syncCfg.Concurrency)
stats.slots = limited.NewBufSlots(syncCfg.Concurrency + 1)
stats.totalFiles = totalFiles
for _, f := range missing {
stats.totalSize += f.Size
Expand Down Expand Up @@ -669,15 +669,15 @@ func (cr *Cluster) fetchFile(ctx context.Context, stats *syncStats, f FileInfo)
defer close(pathRes)

var barUnit decor.SizeB1024
var trycount atomic.Int32
trycount.Store(1)
var tried atomic.Int32
tried.Store(1)
bar := stats.pg.AddBar(f.Size,
mpb.BarRemoveOnComplete(),
mpb.BarPriority(slotId),
mpb.PrependDecorators(
decor.Name(Tr("hint.sync.downloading")),
decor.Any(func(decor.Statistics) string {
tc := trycount.Load()
tc := tried.Load()
if tc <= 1 {
return ""
}
Expand All @@ -688,22 +688,23 @@ func (cr *Cluster) fetchFile(ctx context.Context, stats *syncStats, f FileInfo)
mpb.AppendDecorators(
decor.NewPercentage("%d", decor.WCSyncSpace),
decor.Counters(barUnit, "[%.1f / %.1f]", decor.WCSyncSpace),
decor.EwmaSpeed(barUnit, "%.1f", 10, decor.WCSyncSpace),
decor.EwmaSpeed(barUnit, "%.1f", 30, decor.WCSyncSpace),
decor.OnComplete(
decor.EwmaETA(decor.ET_STYLE_GO, 10, decor.WCSyncSpace), "done",
decor.EwmaETA(decor.ET_STYLE_GO, 30, decor.WCSyncSpace), "done",
),
),
)
defer bar.Abort(true)

noOpen := stats.noOpen
badOpen := false
interval := time.Second
for {
bar.SetCurrent(0)
hashMethod, err := getHashMethod(len(f.Hash))
if err == nil {
var path string
if path, err = cr.fetchFileWithBuf(ctx, f, hashMethod, buf, noOpen, func(r io.Reader) io.Reader {
if path, err = cr.fetchFileWithBuf(ctx, f, hashMethod, buf, noOpen, badOpen, func(r io.Reader) io.Reader {
return ProxyReader(r, bar, stats.totalBar, &stats.lastInc)
}); err == nil {
pathRes <- path
Expand All @@ -716,14 +717,15 @@ func (cr *Cluster) fetchFile(ctx context.Context, stats *syncStats, f FileInfo)
}
bar.SetRefill(bar.Current())

log.Errorf(Tr("error.sync.download.failed"), f.Path, err)
c := trycount.Add(1)
c := tried.Add(1)
if c > maxRetryCount {
log.Errorf(Tr("error.sync.download.failed"), f.Path, err)
break
}
if c > maxTryWithOpen {
noOpen = true
badOpen = true
}
log.Errorf(Tr("error.sync.download.failed.retry"), f.Path, interval, err)
select {
case <-time.After(interval):
interval *= 2
Expand All @@ -739,20 +741,25 @@ func (cr *Cluster) fetchFile(ctx context.Context, stats *syncStats, f FileInfo)
func (cr *Cluster) fetchFileWithBuf(
ctx context.Context, f FileInfo,
hashMethod crypto.Hash, buf []byte,
noOpen bool,
noOpen bool, badOpen bool,
wrapper func(io.Reader) io.Reader,
) (path string, err error) {
var (
reqPath = f.Path
query url.Values
req *http.Request
res *http.Response
fd *os.File
r io.Reader
)
if noOpen {
if badOpen {
reqPath = "/openbmclapi/download/" + f.Hash
} else if noOpen {
query = url.Values{
"noopen": {"1"},
}
}
if req, err = cr.makeReqWithAuth(ctx, http.MethodGet, reqPath, nil); err != nil {
if req, err = cr.makeReqWithAuth(ctx, http.MethodGet, reqPath, query); err != nil {
return
}
req.Header.Set("Accept-Encoding", "gzip, deflate")
Expand Down Expand Up @@ -904,7 +911,7 @@ func (cr *Cluster) DownloadFile(ctx context.Context, hash string) (err error) {
}
defer free()

path, err := cr.fetchFileWithBuf(ctx, f, hashMethod, buf, true, nil)
path, err := cr.fetchFileWithBuf(ctx, f, hashMethod, buf, true, true, nil)
if err != nil {
return
}
Expand Down

0 comments on commit 69d6ab0

Please sign in to comment.