Skip to content

Commit

Permalink
downloader: added downloaded torrents notifier (#11850)
Browse files Browse the repository at this point in the history
Added notifier which notify that torrent downloading completed.

---------

Co-authored-by: Mark Holt <mark@distributed.vision>
  • Loading branch information
dvovk and mh0lt authored Sep 5, 2024
1 parent 950200a commit 7a7fbec
Show file tree
Hide file tree
Showing 7 changed files with 439 additions and 63 deletions.
63 changes: 63 additions & 0 deletions erigon-lib/direct/downloader_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package direct

import (
"context"
"io"

proto_downloader "github.com/erigontech/erigon-lib/gointerfaces/downloaderproto"
"google.golang.org/grpc"
Expand Down Expand Up @@ -51,3 +52,65 @@ func (c *DownloaderClient) SetLogPrefix(ctx context.Context, in *proto_downloade
func (c *DownloaderClient) Completed(ctx context.Context, in *proto_downloader.CompletedRequest, opts ...grpc.CallOption) (*proto_downloader.CompletedReply, error) {
return c.server.Completed(ctx, in)
}

func (c *DownloaderClient) TorrentCompleted(ctx context.Context, in *proto_downloader.TorrentCompletedRequest, opts ...grpc.CallOption) (proto_downloader.Downloader_TorrentCompletedClient, error) {
ch := make(chan *downloadedReply, 1<<16)
streamServer := &DownloadeSubscribeS{ch: ch, ctx: ctx}

go func() {
streamServer.Err(c.server.TorrentCompleted(in, streamServer))
}()

return &DownloadeSubscribeC{ch: ch, ctx: ctx}, nil
}

type DownloadeSubscribeC struct {
ch chan *downloadedReply
ctx context.Context
grpc.ClientStream
}

func (c *DownloadeSubscribeC) Recv() (*proto_downloader.TorrentCompletedReply, error) {
if c.ctx.Err() != nil {
return nil, io.EOF
}

m, ok := <-c.ch
if !ok || m == nil {
return nil, io.EOF
}
return m.r, m.err
}
func (c *DownloadeSubscribeC) Context() context.Context { return c.ctx }

type DownloadeSubscribeS struct {
ch chan *downloadedReply
ctx context.Context
grpc.ServerStream
}

type downloadedReply struct {
r *proto_downloader.TorrentCompletedReply
err error
}

func (s *DownloadeSubscribeS) Send(m *proto_downloader.TorrentCompletedReply) error {
if s.ctx.Err() != nil {
if s.ch != nil {
ch := s.ch
s.ch = nil
close(ch)
}
return s.ctx.Err()
}

s.ch <- &downloadedReply{r: m}
return nil
}
func (s *DownloadeSubscribeS) Context() context.Context { return s.ctx }
func (s *DownloadeSubscribeS) Err(err error) {
if err == nil {
return
}
s.ch <- &downloadedReply{err: err}
}
57 changes: 49 additions & 8 deletions erigon-lib/downloader/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ import (
"github.com/erigontech/erigon-lib/diagnostics"
"github.com/erigontech/erigon-lib/downloader/downloadercfg"
"github.com/erigontech/erigon-lib/downloader/snaptype"
prototypes "github.com/erigontech/erigon-lib/gointerfaces/typesproto"
"github.com/erigontech/erigon-lib/kv"
"github.com/erigontech/erigon-lib/kv/mdbx"
"github.com/erigontech/erigon-lib/log/v3"
Expand Down Expand Up @@ -97,8 +98,15 @@ type Downloader struct {

stuckFileDetailedLogs bool

logPrefix string
startTime time.Time
logPrefix string
startTime time.Time
onTorrentComplete func(name string, hash *prototypes.H160)
completedTorrents map[string]completedTorrentInfo
}

type completedTorrentInfo struct {
path string
hash *prototypes.H160
}

type downloadInfo struct {
Expand Down Expand Up @@ -345,6 +353,7 @@ func New(ctx context.Context, cfg *downloadercfg.Cfg, logger log.Logger, verbosi
downloading: map[string]*downloadInfo{},
webseedsDiscover: discover,
logPrefix: "",
completedTorrents: make(map[string]completedTorrentInfo),
}
d.webseeds.SetTorrent(d.torrentFS, snapLock.Downloads, cfg.DownloadTorrentFilesFromWebseed)

Expand Down Expand Up @@ -877,7 +886,6 @@ func (d *Downloader) mainLoop(silent bool) error {
go func() {
defer d.wg.Done()

complete := map[string]struct{}{}
checking := map[string]struct{}{}
failed := map[string]struct{}{}
waiting := map[string]struct{}{}
Expand All @@ -904,7 +912,7 @@ func (d *Downloader) mainLoop(silent bool) error {
var dlist []string

for _, t := range torrents {
if _, ok := complete[t.Name()]; ok {
if _, ok := d.completedTorrents[t.Name()]; ok {
clist = append(clist, t.Name())
continue
}
Expand Down Expand Up @@ -958,7 +966,7 @@ func (d *Downloader) mainLoop(silent bool) error {

} else {
clist = append(clist, t.Name())
complete[t.Name()] = struct{}{}
d.torrentCompleted(t.Name(), t.InfoHash())
continue
}
}
Expand Down Expand Up @@ -1023,8 +1031,9 @@ func (d *Downloader) mainLoop(silent bool) error {
d.lock.Lock()
delete(d.downloading, t.Name())
d.lock.Unlock()
complete[t.Name()] = struct{}{}
clist = append(clist, t.Name())
d.torrentCompleted(t.Name(), t.InfoHash())

continue
}

Expand Down Expand Up @@ -1091,10 +1100,12 @@ func (d *Downloader) mainLoop(silent bool) error {
d.logger.Warn("[snapshots] Failed to update file info", "file", status.name, "err", err)
}

complete[status.name] = struct{}{}
d.torrentCompleted(status.name, status.infoHash)
continue
} else {
delete(complete, status.name)
d.lock.Lock()
delete(d.completedTorrents, status.name)
d.lock.Unlock()
}

default:
Expand Down Expand Up @@ -2762,6 +2773,7 @@ func (d *Downloader) BuildTorrentFilesIfNeed(ctx context.Context, chain string,
_, err := BuildTorrentFilesIfNeed(ctx, d.cfg.Dirs, d.torrentFS, chain, ignore, false)
return err
}

func (d *Downloader) Stats() AggStats {
d.lock.RLock()
defer d.lock.RUnlock()
Expand Down Expand Up @@ -2946,3 +2958,32 @@ func calculateTime(amountLeft, rate uint64) string {
func (d *Downloader) Completed() bool {
return d.stats.Completed
}

// Store completed torrents in order to notify GrpcServer subscribers when they subscribe and there is already downloaded files
func (d *Downloader) torrentCompleted(tName string, tHash metainfo.Hash) {
d.lock.Lock()
defer d.lock.Unlock()
hash := InfoHashes2Proto(tHash)

//check is torrent already completed cause some funcs may call this method multiple times
if _, ok := d.completedTorrents[tName]; !ok {
d.notifyCompleted(tName, hash)
}

d.completedTorrents[tName] = completedTorrentInfo{
path: tName,
hash: hash,
}
}

// Notify GrpcServer subscribers about completed torrent
func (d *Downloader) notifyCompleted(tName string, tHash *prototypes.H160) {
d.onTorrentComplete(tName, tHash)
}

func (d *Downloader) getCompletedTorrents() map[string]completedTorrentInfo {
d.lock.RLock()
defer d.lock.RUnlock()

return d.completedTorrents
}
56 changes: 54 additions & 2 deletions erigon-lib/downloader/downloader_grpc_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (
"fmt"
"os"
"path/filepath"
"slices"
"sync"
"time"

"github.com/anacrolix/torrent/metainfo"
Expand All @@ -38,12 +40,20 @@ var (
)

func NewGrpcServer(d *Downloader) (*GrpcServer, error) {
return &GrpcServer{d: d}, nil
svr := &GrpcServer{
d: d,
}

d.onTorrentComplete = svr.onTorrentComplete

return svr, nil
}

type GrpcServer struct {
proto_downloader.UnimplementedDownloaderServer
d *Downloader
d *Downloader
mu sync.RWMutex
subscribers []proto_downloader.Downloader_TorrentCompletedServer
}

func (s *GrpcServer) ProhibitNewDownloads(ctx context.Context, req *proto_downloader.ProhibitNewDownloadsRequest) (*emptypb.Empty, error) {
Expand Down Expand Up @@ -129,6 +139,10 @@ func Proto2InfoHash(in *prototypes.H160) metainfo.Hash {
return gointerfaces.ConvertH160toAddress(in)
}

func InfoHashes2Proto(in metainfo.Hash) *prototypes.H160 {
return gointerfaces.ConvertAddressToH160(in)
}

func (s *GrpcServer) SetLogPrefix(ctx context.Context, request *proto_downloader.SetLogPrefixRequest) (*emptypb.Empty, error) {
s.d.SetLogPrefix(request.Prefix)

Expand All @@ -138,3 +152,41 @@ func (s *GrpcServer) SetLogPrefix(ctx context.Context, request *proto_downloader
func (s *GrpcServer) Completed(ctx context.Context, request *proto_downloader.CompletedRequest) (*proto_downloader.CompletedReply, error) {
return &proto_downloader.CompletedReply{Completed: s.d.Completed()}, nil
}

func (s *GrpcServer) TorrentCompleted(req *proto_downloader.TorrentCompletedRequest, stream proto_downloader.Downloader_TorrentCompletedServer) error {
// Register the new subscriber
s.mu.Lock()
s.subscribers = append(s.subscribers, stream)
s.mu.Unlock()

//Notifying about all completed torrents to the new subscriber
cmp := s.d.getCompletedTorrents()
for _, cmpInfo := range cmp {
s.onTorrentComplete(cmpInfo.path, cmpInfo.hash)
}

return nil
}

func (s *GrpcServer) onTorrentComplete(name string, hash *prototypes.H160) {
s.mu.RLock()
defer s.mu.RUnlock()

var unsub []int

for i, s := range s.subscribers {
if s.Context().Err() != nil {
unsub = append(unsub, i)
continue
}

s.Send(&proto_downloader.TorrentCompletedReply{
Name: name,
Hash: hash,
})
}

for i := len(unsub) - 1; i >= 0; i-- {
s.subscribers = slices.Delete(s.subscribers, unsub[i], unsub[i])
}
}
5 changes: 5 additions & 0 deletions erigon-lib/downloader/downloadergrpc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,3 +80,8 @@ func String2Proto(in string) *prototypes.H160 {
copy(infoHash[:], inHex)
return gointerfaces.ConvertAddressToH160(infoHash)
}

func Proto2String(in *prototypes.H160) string {
addr := gointerfaces.ConvertH160toAddress(in)
return hex.EncodeToString(addr[:])
}
Loading

0 comments on commit 7a7fbec

Please sign in to comment.