diff --git a/carstore/carreader.go b/carstore/carreader.go index 83ebc2216..5452c142e 100644 --- a/carstore/carreader.go +++ b/carstore/carreader.go @@ -157,16 +157,16 @@ func (cr CarReader) Read(ctx context.Context, adCid cid.Cid, skipEntries bool) ( return &adBlock, nil } -// ReadHead reads the advertisement CID from the publisher's head file. The -// head file contains the CID of the latest advertisement for an advertisement -// publisher. Returns fs.ErrNotExist if head file is not found. -func (cr CarReader) ReadHead(ctx context.Context, publisher peer.ID) (cid.Cid, error) { - err := publisher.Validate() +// ReadHead reads the advertisement CID from the provider's head file. The head +// file contains the CID of the latest advertisement for a provider. Returns +// fs.ErrNotExist if head file is not found. +func (cr CarReader) ReadHead(ctx context.Context, provider peer.ID) (cid.Cid, error) { + err := provider.Validate() if err != nil { return cid.Undef, err } - headPath := publisher.String() + HeadFileSuffix + headPath := provider.String() + HeadFileSuffix _, r, err := cr.fileStore.Get(ctx, headPath) if err != nil { return cid.Undef, err diff --git a/carstore/carwriter.go b/carstore/carwriter.go index 9314a0585..66096f0b1 100644 --- a/carstore/carwriter.go +++ b/carstore/carwriter.go @@ -276,16 +276,20 @@ func (cw *CarWriter) WriteChain(ctx context.Context, adCid cid.Cid, overWrite bo return count, nil } -func (cw *CarWriter) WriteHead(ctx context.Context, adCid cid.Cid, publisher peer.ID) (*filestore.File, error) { - err := publisher.Validate() +func (cw *CarWriter) WriteHead(ctx context.Context, adCid cid.Cid, provider peer.ID) (*filestore.File, error) { + err := provider.Validate() if err != nil { return nil, err } - - headName := publisher.String() + HeadFileSuffix + headName := provider.String() + HeadFileSuffix return cw.fileStore.Put(ctx, headName, strings.NewReader(adCid.String())) } +func (cw *CarWriter) DeleteHead(ctx context.Context, provider peer.ID) error { + headName := provider.String() + HeadFileSuffix + return cw.fileStore.Delete(ctx, headName) +} + func (cw *CarWriter) Delete(ctx context.Context, adCid cid.Cid) error { err := cw.fileStore.Delete(ctx, cw.CarPath(adCid)) if err != nil { diff --git a/e2e_test.go b/e2e_test.go index 71790c8ff..e8910f3db 100644 --- a/e2e_test.go +++ b/e2e_test.go @@ -355,12 +355,16 @@ func testEndToEndWithReferenceProvider(t *testing.T, publisherProto string) { outStatus = e.Run(indexer, "admin", "status", "--indexer", "http://localhost:3202") require.Contains(t, string(outStatus), "Frozen: true", "expected indexer to be frozen") - outgc := string(e.Run(ipnigc, "provider", "-pid", providerID, "-ll", "debug", "--commit", + logLevel := "info" + if testing.Verbose() { + logLevel = "debug" + } + outgc := string(e.Run(ipnigc, "provider", "-pid", providerID, "-ll", logLevel, "--commit", "-i", "http://localhost:3200", "-i", "http://localhost:3000", )) t.Logf("GC Results:\n%s\n", outgc) - require.Contains(t, outgc, `{"count": 1043, "total": 1043, "source": "CAR", "adsProcessed": 2}`) + require.Contains(t, outgc, `"count": 1043, "total": 1043, "source": "CAR"`) e.Stop(cmdIndexer2, time.Second) diff --git a/internal/ingest/ingest.go b/internal/ingest/ingest.go index 5df332d64..4d8a64f42 100644 --- a/internal/ingest/ingest.go +++ b/internal/ingest/ingest.go @@ -1117,7 +1117,7 @@ func (ing *Ingester) ingestWorkerLogic(ctx context.Context, provider peer.ID, as headAdCid := assignment.adInfos[0].cid if ing.mirror.canWrite() && !assignment.adInfos[0].resync { - _, err := ing.mirror.writeHead(ctx, headAdCid, assignment.publisher) + _, err := ing.mirror.writeHead(ctx, headAdCid, provider) if err != nil { log.Errorw("Cannot write publisher head", "err", err) } diff --git a/ipni-gc/reaper/reaper.go b/ipni-gc/reaper/reaper.go index 5c48838ab..b06d2b845 100644 --- a/ipni-gc/reaper/reaper.go +++ b/ipni-gc/reaper/reaper.go @@ -12,7 +12,6 @@ import ( "os" "path" "path/filepath" - "strings" "sync" "time" @@ -227,33 +226,17 @@ func (r *Reaper) Reap(ctx context.Context, providerID peer.ID) error { } if pinfo == nil { if r.delNotFound { - return r.removePublisher(ctx, providerID, providerID) + return r.removeProvider(ctx, providerID) } return ErrProviderNotFound } if pinfo.Publisher == nil { return errors.New("provider has no publisher") } - publisher := *pinfo.Publisher - // Check that parent directory for the gc datastore directory is writable. - if err = fsutil.DirWritable(r.dsDir); err != nil { - return err - } - // If the gc datastore dir does not already exist, try to get archive from - // filestore. - dstoreDir := filepath.Join(r.dsDir, dstoreDirName(publisher.ID)) - _, err = os.Stat(dstoreDir) - if errors.Is(err, fs.ErrNotExist) && r.fileStore != nil { - if err = unarchiveDatastore(ctx, publisher.ID, r.fileStore, r.dsDir); err != nil { - return fmt.Errorf("failed to retrieve datastore from archive: %w", err) - } - } - // Check that the extracted archive directory is readable, and if it does - // not exist create a new gc datastore directory. - dstore, err := createDatastore(dstoreDir) + dstore, err := r.openDatastore(ctx, providerID) if err != nil { - return fmt.Errorf("failed to create ipni-gc datastore: %w", err) + return err } defer func() { if dstore != nil { @@ -264,7 +247,7 @@ func (r *Reaper) Reap(ctx context.Context, providerID peer.ID) error { // Create datastore for temporary ad data. var tmpDir string if r.dsTmpDir == "" { - tmpDir, err = os.MkdirTemp("", "gc-tmp-"+publisher.ID.String()) + tmpDir, err = os.MkdirTemp("", "gc-tmp-"+providerID.String()) if err != nil { return fmt.Errorf("cannot create temp directory for gc: %w", err) } @@ -273,7 +256,7 @@ func (r *Reaper) Reap(ctx context.Context, providerID peer.ID) error { if err = fsutil.DirWritable(r.dsTmpDir); err != nil { return err } - tmpDir = filepath.Join(r.dsTmpDir, "gc-tmp-"+publisher.ID.String()) + tmpDir = filepath.Join(r.dsTmpDir, "gc-tmp-"+providerID.String()) } dstoreTmp, err := createDatastore(tmpDir) if err != nil { @@ -281,7 +264,7 @@ func (r *Reaper) Reap(ctx context.Context, providerID peer.ID) error { } defer dstoreTmp.Close() - // Create ipni dagsync Subscriber for this publisher. + // Create ipni dagsync Subscriber for the provider. sub, err := makeSubscriber(r.host, dstoreTmp, r.topic, r.httpTimeout) if err != nil { return fmt.Errorf("failed to start dagsync subscriber: %w", err) @@ -293,7 +276,7 @@ func (r *Reaper) Reap(ctx context.Context, providerID peer.ID) error { dstore: dstore, dstoreTmp: dstoreTmp, providerID: pinfo.AddrInfo.ID, - publisher: publisher, + publisher: *pinfo.Publisher, sub: sub, tmpDir: tmpDir, } @@ -321,75 +304,162 @@ func (r *Reaper) Reap(ctx context.Context, providerID peer.ID) error { } dstore.Close() dstore = nil - return s.archiveDatastore(ctx, dstoreDir) + return s.archiveDatastore(ctx) } -func (r *Reaper) DataArchiveName(ctx context.Context, providerID peer.ID) (string, error) { - pinfo, err := r.pcache.Get(ctx, providerID) +func (r *Reaper) datastoreExists(ctx context.Context, providerID peer.ID) (bool, error) { + dir := filepath.Join(r.dsDir, dstoreDirName(providerID)) + fi, err := os.Stat(dir) if err != nil { - return "", err + if !errors.Is(err, fs.ErrNotExist) { + return false, err + } + _, err = r.fileStore.Head(ctx, ArchiveName(providerID)) + if err != nil { + if !errors.Is(err, fs.ErrNotExist) { + return false, err + } + return false, nil + } + return true, nil } - if pinfo == nil { - return "", ErrProviderNotFound + if !fi.IsDir() { + return false, fmt.Errorf("not a directory: %s", dir) } + return true, nil +} - if pinfo.Publisher == nil { - return "", errors.New("provider has no publisher") +func (r *Reaper) openDatastore(ctx context.Context, providerID peer.ID) (datastore.Batching, error) { + // Check that parent directory for the gc datastore directory is writable. + err := fsutil.DirWritable(r.dsDir) + if err != nil { + return nil, err + } + // If the gc datastore dir does not already exist, try to get archive from + // filestore. + dstoreDir := filepath.Join(r.dsDir, dstoreDirName(providerID)) + _, err = os.Stat(dstoreDir) + if errors.Is(err, fs.ErrNotExist) && r.fileStore != nil { + if err = r.unarchiveDatastore(ctx, providerID); err != nil { + return nil, fmt.Errorf("failed to retrieve datastore from archive: %w", err) + } } - return dstoreArchiveName(pinfo.Publisher.ID), nil + // Check that the extracted archive directory is readable, and if it does + // not exist create a new gc datastore directory. + dstore, err := createDatastore(dstoreDir) + if err != nil { + return nil, fmt.Errorf("failed to create ipni-gc datastore: %w", err) + } + return dstore, nil } -func (r *Reaper) removePublisher(ctx context.Context, providerID, publisherID peer.ID) error { +func (r *Reaper) removeProvider(ctx context.Context, providerID peer.ID) error { startTime := time.Now() + exists, err := r.datastoreExists(ctx, providerID) + if err != nil { + return err + } + if exists { + dstore, err := r.openDatastore(ctx, providerID) + if err != nil { + return err + } + defer dstore.Close() + + s := &scythe{ + reaper: r, + dstore: dstore, + providerID: providerID, + } + + err = s.reapAllPrevRemaining(ctx) + if err != nil { + return fmt.Errorf("failed to remove remaining ads from previous gc: %w", err) + } + + err = s.reapRemoved(ctx) + if err != nil { + return err + } + + dstore.Close() + + // Delete gc-datastore archive. + if r.commit { + name := ArchiveName(providerID) + err = r.fileStore.Delete(ctx, name) + if err != nil && !errors.Is(err, fs.ErrNotExist) { + log.Errorw("Cannot delete datastore archive for provider", "err", err, "name", name) + } + // Delete gc-datastore. + dstoreDir := filepath.Join(r.dsDir, dstoreDirName(providerID)) + err = os.RemoveAll(dstoreDir) + if err != nil && !errors.Is(err, fs.ErrNotExist) { + log.Errorw("Cannot remove datastore directory for provider", "err", err, "dir", dstoreDir) + } + } + + r.AddStats(s.stats) + } + + // Delete temporary gc-datastore. + if r.dsTmpDir != "" && r.commit { + name := filepath.Join(r.dsTmpDir, "gc-tmp-"+providerID.String()) + err = os.RemoveAll(name) + if err != nil && !errors.Is(err, fs.ErrNotExist) { + log.Errorw("Cannot remove datastore directory for provider", "err", err, "dir", name) + } + } + if r.carReader == nil { return ErrNoCarReader } - adCid, err := r.readHeadFile(ctx, publisherID) + carWriter, err := carstore.NewWriter(nil, r.fileStore) if err != nil { - return fmt.Errorf("cannot read head advertisement: %w", err) + return err + } + + adCid, err := r.carReader.ReadHead(ctx, providerID) + if err != nil { + if errors.Is(err, fs.ErrNotExist) { + // OK if head file does not exist. No CAR files to clean up. + return nil + } + if r.commit { + if err = carWriter.DeleteHead(ctx, providerID); err != nil { + log.Errorw("Failed to delete head file for provider", "err", err, "provider", providerID) + } + } + return fmt.Errorf("cannot read head advertisement for provider %s: %w", providerID, err) } adProviderID, err := r.providerFromCar(ctx, adCid) if err != nil { return fmt.Errorf("cannot read provider from car: %w", err) } - - if providerID == "" { + // If providerID is different from the one in the advertisement, then it + // means the publisher ID was given as providerID, since that is what head + // files were previously named with. The real providerID comes from the CAR + // file, so check that the provider does not still exist. + if adProviderID != providerID { providerID = adProviderID - } else if providerID != adProviderID { - return errors.New("requested provider does not match provider in advertisement") - } - - // Delete gc-datastore archive. - name := dstoreArchiveName(publisherID) - err = r.fileStore.Delete(ctx, name) - if err != nil && !errors.Is(err, fs.ErrNotExist) { - log.Errorw("Cannot delete datastore archive for provider", "err", err, "name", name) - } - // Delete gc-datastore. - name = filepath.Join(r.dsDir, dstoreDirName(publisherID)) - err = os.RemoveAll(name) - if err != nil && !errors.Is(err, fs.ErrNotExist) { - log.Errorw("Cannot remove datastore directory for provider", "err", err, "dir", name) - } - // Delete temporary gc-datastore. - if r.dsTmpDir != "" { - name = filepath.Join(r.dsTmpDir, "gc-tmp-"+publisherID.String()) - err = os.RemoveAll(name) - if err != nil && !errors.Is(err, fs.ErrNotExist) { - log.Errorw("Cannot remove datastore directory for provider", "err", err, "dir", name) + pinfo, err := r.pcache.Get(ctx, providerID) + if err != nil { + return err + } + if pinfo != nil { + return fmt.Errorf("provider %s still exists", providerID) } } - err = nil var stats GCStats var newHead cid.Cid var cleanErr error for adCid != cid.Undef { - prevAdCid, mhCount, err := r.cleanCarIndexes(ctx, adCid, providerID) + prevAdCid, mhCount, err := r.cleanCarIndexes(ctx, adCid) stats.IndexesRemoved += mhCount if err != nil { cleanErr = fmt.Errorf("stopping gc due to error removing entries in CAR: %w", err) @@ -408,19 +478,21 @@ func (r *Reaper) removePublisher(ctx context.Context, providerID, publisherID pe adCid = prevAdCid } - headName := publisherID.String() + carstore.HeadFileSuffix if adCid == cid.Undef { - err = r.fileStore.Delete(ctx, headName) - if err != nil { - err = fmt.Errorf("failed to delete head file: %w", err) + if r.commit { + if err = carWriter.DeleteHead(ctx, providerID); err != nil { + err = fmt.Errorf("failed to delete head file: %w", err) + } } stats.TimeElapsed = time.Since(startTime) log.Infow("Finished GC for removed provider", "provider", providerID, "stats", stats.String()) } else if newHead != cid.Undef { // Did not complete. Save head where GC left off. - _, err = r.fileStore.Put(ctx, headName, strings.NewReader(newHead.String())) - if err != nil { - err = fmt.Errorf("failed to update head file: %w", err) + if r.commit { + _, err = carWriter.WriteHead(ctx, newHead, providerID) + if err != nil { + err = fmt.Errorf("failed to update head file: %w", err) + } } stats.TimeElapsed = time.Since(startTime) log.Infow("Incomplete GC for removed provider", "provider", providerID, "stats", stats.String()) @@ -428,13 +500,12 @@ func (r *Reaper) removePublisher(ctx context.Context, providerID, publisherID pe if err != nil { if cleanErr != nil { log.Error(err.Error()) - } else { - cleanErr = err + err = cleanErr } } r.AddStats(stats) - return cleanErr + return err } func (r *Reaper) providerFromCar(ctx context.Context, adCid cid.Cid) (peer.ID, error) { @@ -449,7 +520,7 @@ func (r *Reaper) providerFromCar(ctx context.Context, adCid cid.Cid) (peer.ID, e return peer.Decode(ad.Provider) } -func (r *Reaper) cleanCarIndexes(ctx context.Context, adCid cid.Cid, providerID peer.ID) (cid.Cid, int, error) { +func (r *Reaper) cleanCarIndexes(ctx context.Context, adCid cid.Cid) (cid.Cid, int, error) { // Create a context to cancel the carReader reading entries. readCtx, cancel := context.WithCancel(ctx) defer cancel() @@ -474,6 +545,11 @@ func (r *Reaper) cleanCarIndexes(ctx context.Context, adCid cid.Cid, providerID return prevAdCid, 0, nil } + providerID, err := peer.Decode(ad.Provider) + if err != nil { + return cid.Undef, 0, fmt.Errorf("cannot get provider from advertisement: %w", err) + } + value := indexer.Value{ ProviderID: providerID, ContextID: ad.ContextID, @@ -529,22 +605,6 @@ func (r *Reaper) deleteCarFile(ctx context.Context, adCid cid.Cid) (int64, error return file.Size, nil } -func (r *Reaper) readHeadFile(ctx context.Context, publisherID peer.ID) (cid.Cid, error) { - headName := publisherID.String() + carstore.HeadFileSuffix - _, rc, err := r.fileStore.Get(ctx, headName) - if err != nil { - return cid.Undef, err - } - - cidData, err := io.ReadAll(rc) - rc.Close() - if err != nil { - return cid.Undef, err - } - - return cid.Decode(string(cidData)) -} - func (s *scythe) reap(ctx context.Context, latestAdCid cid.Cid) error { log.Infow("Starting GC for provider", "latestAd", latestAdCid, "provider", s.providerID) if latestAdCid == cid.Undef { @@ -702,8 +762,18 @@ func (s *scythe) reapRemoved(ctx context.Context) error { return nil } +// reapAllPrevRemaining marks all previously remaining ads for removal. +func (s *scythe) reapAllPrevRemaining(ctx context.Context) error { + return s.reapPrefixedAds(ctx, "/ctx/") +} + +// reapPrevRemaining marks previously remaining ads for removeal that have the +// specified context ID. func (s *scythe) reapPrevRemaining(ctx context.Context, contextID string) error { - prefix := dsContextPrefix(contextID) + return s.reapPrefixedAds(ctx, dsContextPrefix(contextID)) +} + +func (s *scythe) reapPrefixedAds(ctx context.Context, prefix string) error { q := query.Query{ Prefix: prefix, KeysOnly: true, @@ -865,7 +935,7 @@ func (s *scythe) removeEntries(ctx context.Context, adCid cid.Cid) error { } else { err = ErrNoCarReader } - if err != nil && s.reaper.entsFromPub { + if err != nil && s.reaper.entsFromPub && s.sub != nil { err = s.removeEntriesFromPublisher(ctx, adCid) if err != nil { if errors.Is(err, errIndexerWrite) { @@ -879,7 +949,7 @@ func (s *scythe) removeEntries(ctx context.Context, adCid cid.Cid) error { } newRemoved := s.stats.IndexesRemoved - prevRemoved if newRemoved != 0 { - log.Infow("Removed indexes in removed ad", "adCid", adCid, "count", newRemoved, "total", s.stats.IndexesRemoved, "source", source, "adsProcessed", s.stats.AdsProcessed) + log.Infow("Removed indexes in removed ad", "adCid", adCid, "count", newRemoved, "total", s.stats.IndexesRemoved, "source", source) } return nil } @@ -954,15 +1024,14 @@ func (s *scythe) removeEntriesFromPublisher(ctx context.Context, adCid cid.Cid) return fmt.Errorf("failed to load advertisement %s: %w", adCid.String(), err) } if ad.Entries == nil || ad.Entries == schema.NoEntries { - log.Errorw("Advertisement expected to have entries, but has none", "adCid", adCid, "provider", s.providerID) + log.Errorw("Advertisement expected to have entries, but has none", "adCid", adCid, "provider", s.providerID, "publisher", s.publisher.ID) return nil } entsCid := ad.Entries.(cidlink.Link).Cid providerID, err := peer.Decode(ad.Provider) if err != nil { - log.Errorw("Cannot get provider from advertisement", "err", err) - providerID = s.providerID + return fmt.Errorf("cannot get provider from advertisement %s: %w", adCid, err) } value := indexer.Value{ @@ -1013,15 +1082,50 @@ func createDatastore(dir string) (datastore.Batching, error) { return ds, nil } -func dstoreDirName(publisherID peer.ID) string { - return fmt.Sprint("gc-data-", publisherID.String()) +func dstoreDirName(providerID peer.ID) string { + return fmt.Sprint("gc-data-", providerID.String()) } -func dstoreArchiveName(publisherID peer.ID) string { - return dstoreDirName(publisherID) + ".tar.gz" +func ArchiveName(providerID peer.ID) string { + return dstoreDirName(providerID) + ".tar.gz" } -func (s *scythe) archiveDatastore(ctx context.Context, dstoreDir string) error { +// archiveDatastore gets a gzipped tar archive of the provider gc datastore, +// from the filestore, and extracts the archive. +// +// If the dsDir is `/data/datastore-gc/` and the archive is +// gc-data-PID.tar.gz, then that archive is extracted as +// `/data/datastore-gc/gc-data-PID`. +func (r *Reaper) unarchiveDatastore(ctx context.Context, providerID peer.ID) error { + tarName := ArchiveName(providerID) + log.Debugw("Datastore directory does not exist, fetching from archive", "name", tarName) + + fileInfo, rc, err := r.fileStore.Get(ctx, tarName) + if err != nil { + if errors.Is(err, fs.ErrNotExist) { + log.Debug("Datastore archive not found, will create new datastore") + return nil + } + return fmt.Errorf("cannot retrieve datastore archive from filestore: %w", err) + } + defer rc.Close() + + err = targz.ExtractReader(rc, r.dsDir) + if err != nil { + return fmt.Errorf("failed to extract datastore archive: %w", err) + } + + log.Infow("Extracted datastore archive from filestore", "name", tarName, "size", fileInfo.Size, "targetDir", r.dsDir) + return nil +} + +// archiveDatastore creates a gzipped tar archive of the provider gc datastore +// and puts a copy of that into the filestore. +// +// If the provider gc datastore is `/data/datastore-gc/gc-data-PID` then +// create an archive of `gc-data-PId` named `gc-data-PID.tar.gz` and copy +// it to the filestore. +func (s *scythe) archiveDatastore(ctx context.Context) error { if !s.reaper.commit { return nil } @@ -1029,9 +1133,10 @@ func (s *scythe) archiveDatastore(ctx context.Context, dstoreDir string) error { log.Warn("Filestore not available to save gc datastore to") return nil } - tarName := dstoreArchiveName(s.publisher.ID) - parent := filepath.Dir(dstoreDir) - tarPath := filepath.Join(parent, tarName) + + dstoreDir := filepath.Join(s.reaper.dsDir, dstoreDirName(s.providerID)) + tarName := ArchiveName(s.providerID) + tarPath := filepath.Join(s.reaper.dsDir, tarName) err := targz.Create(dstoreDir, tarPath) if err != nil { @@ -1056,29 +1161,6 @@ func (s *scythe) archiveDatastore(ctx context.Context, dstoreDir string) error { return nil } -func unarchiveDatastore(ctx context.Context, publisherID peer.ID, fileStore filestore.Interface, parentDir string) error { - tarName := dstoreArchiveName(publisherID) - log.Debugw("Datastore directory does not exist, fetching from archive", "name", tarName) - - fileInfo, rc, err := fileStore.Get(ctx, tarName) - if err != nil { - if errors.Is(err, fs.ErrNotExist) { - log.Debug("Datastore archive not found, will create new datastore") - return nil - } - return fmt.Errorf("cannot retrieve datastore archive from filestore: %w", err) - } - defer rc.Close() - - err = targz.ExtractReader(rc, parentDir) - if err != nil { - return fmt.Errorf("failed to extract datastore archive: %w", err) - } - - log.Infow("Extracted datastore archive from filestore", "name", tarName, "size", fileInfo.Size, "targetDir", parentDir) - return nil -} - func (s *scythe) loadAd(c cid.Cid) (schema.Advertisement, error) { adn, err := s.loadNode(c, schema.AdvertisementPrototype) if err != nil { diff --git a/ipni-gc/reaper/reaper_test.go b/ipni-gc/reaper/reaper_test.go index 5b3388d33..7a54174f8 100644 --- a/ipni-gc/reaper/reaper_test.go +++ b/ipni-gc/reaper/reaper_test.go @@ -8,9 +8,11 @@ import ( "testing" "time" + "github.com/ipfs/go-cid" "github.com/ipni/go-indexer-core/store/memory" "github.com/ipni/go-libipni/find/model" "github.com/ipni/go-libipni/pcache" + "github.com/ipni/storetheindex/carstore" "github.com/ipni/storetheindex/filestore" "github.com/ipni/storetheindex/ipni-gc/reaper" "github.com/libp2p/go-libp2p/core/peer" @@ -21,6 +23,7 @@ import ( const testTopic = "/indexer/ingest/test" var pid1, pid2, pid3 peer.ID +var adCid cid.Cid func init() { var err error @@ -36,6 +39,10 @@ func init() { if err != nil { panic(err) } + adCid, err = cid.Decode("bafybeigvgzoolc3drupxhlevdp2ugqcrbcsqfmcek2zxiw5wctk3xjpjwy") + if err != nil { + panic(err) + } } type mockSource struct { @@ -52,8 +59,7 @@ func TestReaper(t *testing.T) { idxr := memory.New() - src := newMockSource(pid1) - pc, err := pcache.New(pcache.WithSource(src)) + pc, err := pcache.New(pcache.WithSource(newMockSource(pid1))) require.NoError(t, err) gc, err := reaper.New(idxr, fileStore, @@ -75,9 +81,7 @@ func TestReaper(t *testing.T) { require.NoError(t, err) // Check that archive is stored in filestore. - archiveName, err := gc.DataArchiveName(ctx, pid1) - require.NoError(t, err) - fileInfo, err := fileStore.Head(ctx, archiveName) + fileInfo, err := fileStore.Head(ctx, reaper.ArchiveName(pid1)) require.NoError(t, err) require.NotZero(t, fileInfo.Size) @@ -95,10 +99,33 @@ func TestReaper(t *testing.T) { reaper.WithTopicName(testTopic), ) require.NoError(t, err) - err = gc2.Reap(ctx, pid3) - require.Error(t, err) - require.Error(t, fs.ErrNotExist) defer gc2.Close() + err = gc2.Reap(ctx, pid3) + require.NoError(t, err) + gc2.Close() + + carWriter, err := carstore.NewWriter(nil, fileStore) + require.NoError(t, err) + _, err = carWriter.WriteHead(context.Background(), adCid, pid1) + require.NoError(t, err) + + pc, err = pcache.New(pcache.WithSource(newMockSource(pid2))) + require.NoError(t, err) + gc2, err = reaper.New(idxr, fileStore, + reaper.WithCarDelete(true), + reaper.WithCarRead(true), + reaper.WithCommit(true), + reaper.WithDatastoreDir(dsDir), + reaper.WithDatastoreTempDir(dsTmpDir), + reaper.WithDeleteNotFound(true), + reaper.WithPCache(pc), + reaper.WithTopicName(testTopic), + ) + require.NoError(t, err) + + err = gc2.Reap(ctx, pid1) + require.ErrorIs(t, err, fs.ErrNotExist) + gc2.Close() } func newMockSource(pids ...peer.ID) *mockSource {