From bbd9f10a84402c2088340470e12cc72ea5d83a05 Mon Sep 17 00:00:00 2001 From: Andrew Gillis Date: Fri, 15 Dec 2023 00:34:55 -0800 Subject: [PATCH] Identify ipni-gc resources by provider ID (#2449) * Identify ipni-gc resources by provider ID Previously the publisher ID was used as this was more immediately associated with an advertisement chain, and because head files in the CAR mirror were identified by publisher ID. It is more correct to change these things to be identified by provider ID, since the same provider's chain can be served by different publishers. This PR also allows non-existent provider resources to be cleaned up even if GC has been previously done. --- carstore/carreader.go | 12 +- carstore/carwriter.go | 12 +- e2e_test.go | 8 +- internal/ingest/ingest.go | 2 +- ipni-gc/reaper/reaper.go | 334 +++++++++++++++++++++------------- ipni-gc/reaper/reaper_test.go | 43 ++++- 6 files changed, 264 insertions(+), 147 deletions(-) diff --git a/carstore/carreader.go b/carstore/carreader.go index 83ebc2216..5452c142e 100644 --- a/carstore/carreader.go +++ b/carstore/carreader.go @@ -157,16 +157,16 @@ func (cr CarReader) Read(ctx context.Context, adCid cid.Cid, skipEntries bool) ( return &adBlock, nil } -// ReadHead reads the advertisement CID from the publisher's head file. The -// head file contains the CID of the latest advertisement for an advertisement -// publisher. Returns fs.ErrNotExist if head file is not found. -func (cr CarReader) ReadHead(ctx context.Context, publisher peer.ID) (cid.Cid, error) { - err := publisher.Validate() +// ReadHead reads the advertisement CID from the provider's head file. The head +// file contains the CID of the latest advertisement for a provider. Returns +// fs.ErrNotExist if head file is not found. +func (cr CarReader) ReadHead(ctx context.Context, provider peer.ID) (cid.Cid, error) { + err := provider.Validate() if err != nil { return cid.Undef, err } - headPath := publisher.String() + HeadFileSuffix + headPath := provider.String() + HeadFileSuffix _, r, err := cr.fileStore.Get(ctx, headPath) if err != nil { return cid.Undef, err diff --git a/carstore/carwriter.go b/carstore/carwriter.go index 9314a0585..66096f0b1 100644 --- a/carstore/carwriter.go +++ b/carstore/carwriter.go @@ -276,16 +276,20 @@ func (cw *CarWriter) WriteChain(ctx context.Context, adCid cid.Cid, overWrite bo return count, nil } -func (cw *CarWriter) WriteHead(ctx context.Context, adCid cid.Cid, publisher peer.ID) (*filestore.File, error) { - err := publisher.Validate() +func (cw *CarWriter) WriteHead(ctx context.Context, adCid cid.Cid, provider peer.ID) (*filestore.File, error) { + err := provider.Validate() if err != nil { return nil, err } - - headName := publisher.String() + HeadFileSuffix + headName := provider.String() + HeadFileSuffix return cw.fileStore.Put(ctx, headName, strings.NewReader(adCid.String())) } +func (cw *CarWriter) DeleteHead(ctx context.Context, provider peer.ID) error { + headName := provider.String() + HeadFileSuffix + return cw.fileStore.Delete(ctx, headName) +} + func (cw *CarWriter) Delete(ctx context.Context, adCid cid.Cid) error { err := cw.fileStore.Delete(ctx, cw.CarPath(adCid)) if err != nil { diff --git a/e2e_test.go b/e2e_test.go index 71790c8ff..e8910f3db 100644 --- a/e2e_test.go +++ b/e2e_test.go @@ -355,12 +355,16 @@ func testEndToEndWithReferenceProvider(t *testing.T, publisherProto string) { outStatus = e.Run(indexer, "admin", "status", "--indexer", "http://localhost:3202") require.Contains(t, string(outStatus), "Frozen: true", "expected indexer to be frozen") - outgc := string(e.Run(ipnigc, "provider", "-pid", providerID, "-ll", "debug", "--commit", + logLevel := "info" + if testing.Verbose() { + logLevel = "debug" + } + outgc := string(e.Run(ipnigc, "provider", "-pid", providerID, "-ll", logLevel, "--commit", "-i", "http://localhost:3200", "-i", "http://localhost:3000", )) t.Logf("GC Results:\n%s\n", outgc) - require.Contains(t, outgc, `{"count": 1043, "total": 1043, "source": "CAR", "adsProcessed": 2}`) + require.Contains(t, outgc, `"count": 1043, "total": 1043, "source": "CAR"`) e.Stop(cmdIndexer2, time.Second) diff --git a/internal/ingest/ingest.go b/internal/ingest/ingest.go index 5df332d64..4d8a64f42 100644 --- a/internal/ingest/ingest.go +++ b/internal/ingest/ingest.go @@ -1117,7 +1117,7 @@ func (ing *Ingester) ingestWorkerLogic(ctx context.Context, provider peer.ID, as headAdCid := assignment.adInfos[0].cid if ing.mirror.canWrite() && !assignment.adInfos[0].resync { - _, err := ing.mirror.writeHead(ctx, headAdCid, assignment.publisher) + _, err := ing.mirror.writeHead(ctx, headAdCid, provider) if err != nil { log.Errorw("Cannot write publisher head", "err", err) } diff --git a/ipni-gc/reaper/reaper.go b/ipni-gc/reaper/reaper.go index 5c48838ab..b06d2b845 100644 --- a/ipni-gc/reaper/reaper.go +++ b/ipni-gc/reaper/reaper.go @@ -12,7 +12,6 @@ import ( "os" "path" "path/filepath" - "strings" "sync" "time" @@ -227,33 +226,17 @@ func (r *Reaper) Reap(ctx context.Context, providerID peer.ID) error { } if pinfo == nil { if r.delNotFound { - return r.removePublisher(ctx, providerID, providerID) + return r.removeProvider(ctx, providerID) } return ErrProviderNotFound } if pinfo.Publisher == nil { return errors.New("provider has no publisher") } - publisher := *pinfo.Publisher - // Check that parent directory for the gc datastore directory is writable. - if err = fsutil.DirWritable(r.dsDir); err != nil { - return err - } - // If the gc datastore dir does not already exist, try to get archive from - // filestore. - dstoreDir := filepath.Join(r.dsDir, dstoreDirName(publisher.ID)) - _, err = os.Stat(dstoreDir) - if errors.Is(err, fs.ErrNotExist) && r.fileStore != nil { - if err = unarchiveDatastore(ctx, publisher.ID, r.fileStore, r.dsDir); err != nil { - return fmt.Errorf("failed to retrieve datastore from archive: %w", err) - } - } - // Check that the extracted archive directory is readable, and if it does - // not exist create a new gc datastore directory. - dstore, err := createDatastore(dstoreDir) + dstore, err := r.openDatastore(ctx, providerID) if err != nil { - return fmt.Errorf("failed to create ipni-gc datastore: %w", err) + return err } defer func() { if dstore != nil { @@ -264,7 +247,7 @@ func (r *Reaper) Reap(ctx context.Context, providerID peer.ID) error { // Create datastore for temporary ad data. var tmpDir string if r.dsTmpDir == "" { - tmpDir, err = os.MkdirTemp("", "gc-tmp-"+publisher.ID.String()) + tmpDir, err = os.MkdirTemp("", "gc-tmp-"+providerID.String()) if err != nil { return fmt.Errorf("cannot create temp directory for gc: %w", err) } @@ -273,7 +256,7 @@ func (r *Reaper) Reap(ctx context.Context, providerID peer.ID) error { if err = fsutil.DirWritable(r.dsTmpDir); err != nil { return err } - tmpDir = filepath.Join(r.dsTmpDir, "gc-tmp-"+publisher.ID.String()) + tmpDir = filepath.Join(r.dsTmpDir, "gc-tmp-"+providerID.String()) } dstoreTmp, err := createDatastore(tmpDir) if err != nil { @@ -281,7 +264,7 @@ func (r *Reaper) Reap(ctx context.Context, providerID peer.ID) error { } defer dstoreTmp.Close() - // Create ipni dagsync Subscriber for this publisher. + // Create ipni dagsync Subscriber for the provider. sub, err := makeSubscriber(r.host, dstoreTmp, r.topic, r.httpTimeout) if err != nil { return fmt.Errorf("failed to start dagsync subscriber: %w", err) @@ -293,7 +276,7 @@ func (r *Reaper) Reap(ctx context.Context, providerID peer.ID) error { dstore: dstore, dstoreTmp: dstoreTmp, providerID: pinfo.AddrInfo.ID, - publisher: publisher, + publisher: *pinfo.Publisher, sub: sub, tmpDir: tmpDir, } @@ -321,75 +304,162 @@ func (r *Reaper) Reap(ctx context.Context, providerID peer.ID) error { } dstore.Close() dstore = nil - return s.archiveDatastore(ctx, dstoreDir) + return s.archiveDatastore(ctx) } -func (r *Reaper) DataArchiveName(ctx context.Context, providerID peer.ID) (string, error) { - pinfo, err := r.pcache.Get(ctx, providerID) +func (r *Reaper) datastoreExists(ctx context.Context, providerID peer.ID) (bool, error) { + dir := filepath.Join(r.dsDir, dstoreDirName(providerID)) + fi, err := os.Stat(dir) if err != nil { - return "", err + if !errors.Is(err, fs.ErrNotExist) { + return false, err + } + _, err = r.fileStore.Head(ctx, ArchiveName(providerID)) + if err != nil { + if !errors.Is(err, fs.ErrNotExist) { + return false, err + } + return false, nil + } + return true, nil } - if pinfo == nil { - return "", ErrProviderNotFound + if !fi.IsDir() { + return false, fmt.Errorf("not a directory: %s", dir) } + return true, nil +} - if pinfo.Publisher == nil { - return "", errors.New("provider has no publisher") +func (r *Reaper) openDatastore(ctx context.Context, providerID peer.ID) (datastore.Batching, error) { + // Check that parent directory for the gc datastore directory is writable. + err := fsutil.DirWritable(r.dsDir) + if err != nil { + return nil, err + } + // If the gc datastore dir does not already exist, try to get archive from + // filestore. + dstoreDir := filepath.Join(r.dsDir, dstoreDirName(providerID)) + _, err = os.Stat(dstoreDir) + if errors.Is(err, fs.ErrNotExist) && r.fileStore != nil { + if err = r.unarchiveDatastore(ctx, providerID); err != nil { + return nil, fmt.Errorf("failed to retrieve datastore from archive: %w", err) + } } - return dstoreArchiveName(pinfo.Publisher.ID), nil + // Check that the extracted archive directory is readable, and if it does + // not exist create a new gc datastore directory. + dstore, err := createDatastore(dstoreDir) + if err != nil { + return nil, fmt.Errorf("failed to create ipni-gc datastore: %w", err) + } + return dstore, nil } -func (r *Reaper) removePublisher(ctx context.Context, providerID, publisherID peer.ID) error { +func (r *Reaper) removeProvider(ctx context.Context, providerID peer.ID) error { startTime := time.Now() + exists, err := r.datastoreExists(ctx, providerID) + if err != nil { + return err + } + if exists { + dstore, err := r.openDatastore(ctx, providerID) + if err != nil { + return err + } + defer dstore.Close() + + s := &scythe{ + reaper: r, + dstore: dstore, + providerID: providerID, + } + + err = s.reapAllPrevRemaining(ctx) + if err != nil { + return fmt.Errorf("failed to remove remaining ads from previous gc: %w", err) + } + + err = s.reapRemoved(ctx) + if err != nil { + return err + } + + dstore.Close() + + // Delete gc-datastore archive. + if r.commit { + name := ArchiveName(providerID) + err = r.fileStore.Delete(ctx, name) + if err != nil && !errors.Is(err, fs.ErrNotExist) { + log.Errorw("Cannot delete datastore archive for provider", "err", err, "name", name) + } + // Delete gc-datastore. + dstoreDir := filepath.Join(r.dsDir, dstoreDirName(providerID)) + err = os.RemoveAll(dstoreDir) + if err != nil && !errors.Is(err, fs.ErrNotExist) { + log.Errorw("Cannot remove datastore directory for provider", "err", err, "dir", dstoreDir) + } + } + + r.AddStats(s.stats) + } + + // Delete temporary gc-datastore. + if r.dsTmpDir != "" && r.commit { + name := filepath.Join(r.dsTmpDir, "gc-tmp-"+providerID.String()) + err = os.RemoveAll(name) + if err != nil && !errors.Is(err, fs.ErrNotExist) { + log.Errorw("Cannot remove datastore directory for provider", "err", err, "dir", name) + } + } + if r.carReader == nil { return ErrNoCarReader } - adCid, err := r.readHeadFile(ctx, publisherID) + carWriter, err := carstore.NewWriter(nil, r.fileStore) if err != nil { - return fmt.Errorf("cannot read head advertisement: %w", err) + return err + } + + adCid, err := r.carReader.ReadHead(ctx, providerID) + if err != nil { + if errors.Is(err, fs.ErrNotExist) { + // OK if head file does not exist. No CAR files to clean up. + return nil + } + if r.commit { + if err = carWriter.DeleteHead(ctx, providerID); err != nil { + log.Errorw("Failed to delete head file for provider", "err", err, "provider", providerID) + } + } + return fmt.Errorf("cannot read head advertisement for provider %s: %w", providerID, err) } adProviderID, err := r.providerFromCar(ctx, adCid) if err != nil { return fmt.Errorf("cannot read provider from car: %w", err) } - - if providerID == "" { + // If providerID is different from the one in the advertisement, then it + // means the publisher ID was given as providerID, since that is what head + // files were previously named with. The real providerID comes from the CAR + // file, so check that the provider does not still exist. + if adProviderID != providerID { providerID = adProviderID - } else if providerID != adProviderID { - return errors.New("requested provider does not match provider in advertisement") - } - - // Delete gc-datastore archive. - name := dstoreArchiveName(publisherID) - err = r.fileStore.Delete(ctx, name) - if err != nil && !errors.Is(err, fs.ErrNotExist) { - log.Errorw("Cannot delete datastore archive for provider", "err", err, "name", name) - } - // Delete gc-datastore. - name = filepath.Join(r.dsDir, dstoreDirName(publisherID)) - err = os.RemoveAll(name) - if err != nil && !errors.Is(err, fs.ErrNotExist) { - log.Errorw("Cannot remove datastore directory for provider", "err", err, "dir", name) - } - // Delete temporary gc-datastore. - if r.dsTmpDir != "" { - name = filepath.Join(r.dsTmpDir, "gc-tmp-"+publisherID.String()) - err = os.RemoveAll(name) - if err != nil && !errors.Is(err, fs.ErrNotExist) { - log.Errorw("Cannot remove datastore directory for provider", "err", err, "dir", name) + pinfo, err := r.pcache.Get(ctx, providerID) + if err != nil { + return err + } + if pinfo != nil { + return fmt.Errorf("provider %s still exists", providerID) } } - err = nil var stats GCStats var newHead cid.Cid var cleanErr error for adCid != cid.Undef { - prevAdCid, mhCount, err := r.cleanCarIndexes(ctx, adCid, providerID) + prevAdCid, mhCount, err := r.cleanCarIndexes(ctx, adCid) stats.IndexesRemoved += mhCount if err != nil { cleanErr = fmt.Errorf("stopping gc due to error removing entries in CAR: %w", err) @@ -408,19 +478,21 @@ func (r *Reaper) removePublisher(ctx context.Context, providerID, publisherID pe adCid = prevAdCid } - headName := publisherID.String() + carstore.HeadFileSuffix if adCid == cid.Undef { - err = r.fileStore.Delete(ctx, headName) - if err != nil { - err = fmt.Errorf("failed to delete head file: %w", err) + if r.commit { + if err = carWriter.DeleteHead(ctx, providerID); err != nil { + err = fmt.Errorf("failed to delete head file: %w", err) + } } stats.TimeElapsed = time.Since(startTime) log.Infow("Finished GC for removed provider", "provider", providerID, "stats", stats.String()) } else if newHead != cid.Undef { // Did not complete. Save head where GC left off. - _, err = r.fileStore.Put(ctx, headName, strings.NewReader(newHead.String())) - if err != nil { - err = fmt.Errorf("failed to update head file: %w", err) + if r.commit { + _, err = carWriter.WriteHead(ctx, newHead, providerID) + if err != nil { + err = fmt.Errorf("failed to update head file: %w", err) + } } stats.TimeElapsed = time.Since(startTime) log.Infow("Incomplete GC for removed provider", "provider", providerID, "stats", stats.String()) @@ -428,13 +500,12 @@ func (r *Reaper) removePublisher(ctx context.Context, providerID, publisherID pe if err != nil { if cleanErr != nil { log.Error(err.Error()) - } else { - cleanErr = err + err = cleanErr } } r.AddStats(stats) - return cleanErr + return err } func (r *Reaper) providerFromCar(ctx context.Context, adCid cid.Cid) (peer.ID, error) { @@ -449,7 +520,7 @@ func (r *Reaper) providerFromCar(ctx context.Context, adCid cid.Cid) (peer.ID, e return peer.Decode(ad.Provider) } -func (r *Reaper) cleanCarIndexes(ctx context.Context, adCid cid.Cid, providerID peer.ID) (cid.Cid, int, error) { +func (r *Reaper) cleanCarIndexes(ctx context.Context, adCid cid.Cid) (cid.Cid, int, error) { // Create a context to cancel the carReader reading entries. readCtx, cancel := context.WithCancel(ctx) defer cancel() @@ -474,6 +545,11 @@ func (r *Reaper) cleanCarIndexes(ctx context.Context, adCid cid.Cid, providerID return prevAdCid, 0, nil } + providerID, err := peer.Decode(ad.Provider) + if err != nil { + return cid.Undef, 0, fmt.Errorf("cannot get provider from advertisement: %w", err) + } + value := indexer.Value{ ProviderID: providerID, ContextID: ad.ContextID, @@ -529,22 +605,6 @@ func (r *Reaper) deleteCarFile(ctx context.Context, adCid cid.Cid) (int64, error return file.Size, nil } -func (r *Reaper) readHeadFile(ctx context.Context, publisherID peer.ID) (cid.Cid, error) { - headName := publisherID.String() + carstore.HeadFileSuffix - _, rc, err := r.fileStore.Get(ctx, headName) - if err != nil { - return cid.Undef, err - } - - cidData, err := io.ReadAll(rc) - rc.Close() - if err != nil { - return cid.Undef, err - } - - return cid.Decode(string(cidData)) -} - func (s *scythe) reap(ctx context.Context, latestAdCid cid.Cid) error { log.Infow("Starting GC for provider", "latestAd", latestAdCid, "provider", s.providerID) if latestAdCid == cid.Undef { @@ -702,8 +762,18 @@ func (s *scythe) reapRemoved(ctx context.Context) error { return nil } +// reapAllPrevRemaining marks all previously remaining ads for removal. +func (s *scythe) reapAllPrevRemaining(ctx context.Context) error { + return s.reapPrefixedAds(ctx, "/ctx/") +} + +// reapPrevRemaining marks previously remaining ads for removeal that have the +// specified context ID. func (s *scythe) reapPrevRemaining(ctx context.Context, contextID string) error { - prefix := dsContextPrefix(contextID) + return s.reapPrefixedAds(ctx, dsContextPrefix(contextID)) +} + +func (s *scythe) reapPrefixedAds(ctx context.Context, prefix string) error { q := query.Query{ Prefix: prefix, KeysOnly: true, @@ -865,7 +935,7 @@ func (s *scythe) removeEntries(ctx context.Context, adCid cid.Cid) error { } else { err = ErrNoCarReader } - if err != nil && s.reaper.entsFromPub { + if err != nil && s.reaper.entsFromPub && s.sub != nil { err = s.removeEntriesFromPublisher(ctx, adCid) if err != nil { if errors.Is(err, errIndexerWrite) { @@ -879,7 +949,7 @@ func (s *scythe) removeEntries(ctx context.Context, adCid cid.Cid) error { } newRemoved := s.stats.IndexesRemoved - prevRemoved if newRemoved != 0 { - log.Infow("Removed indexes in removed ad", "adCid", adCid, "count", newRemoved, "total", s.stats.IndexesRemoved, "source", source, "adsProcessed", s.stats.AdsProcessed) + log.Infow("Removed indexes in removed ad", "adCid", adCid, "count", newRemoved, "total", s.stats.IndexesRemoved, "source", source) } return nil } @@ -954,15 +1024,14 @@ func (s *scythe) removeEntriesFromPublisher(ctx context.Context, adCid cid.Cid) return fmt.Errorf("failed to load advertisement %s: %w", adCid.String(), err) } if ad.Entries == nil || ad.Entries == schema.NoEntries { - log.Errorw("Advertisement expected to have entries, but has none", "adCid", adCid, "provider", s.providerID) + log.Errorw("Advertisement expected to have entries, but has none", "adCid", adCid, "provider", s.providerID, "publisher", s.publisher.ID) return nil } entsCid := ad.Entries.(cidlink.Link).Cid providerID, err := peer.Decode(ad.Provider) if err != nil { - log.Errorw("Cannot get provider from advertisement", "err", err) - providerID = s.providerID + return fmt.Errorf("cannot get provider from advertisement %s: %w", adCid, err) } value := indexer.Value{ @@ -1013,15 +1082,50 @@ func createDatastore(dir string) (datastore.Batching, error) { return ds, nil } -func dstoreDirName(publisherID peer.ID) string { - return fmt.Sprint("gc-data-", publisherID.String()) +func dstoreDirName(providerID peer.ID) string { + return fmt.Sprint("gc-data-", providerID.String()) } -func dstoreArchiveName(publisherID peer.ID) string { - return dstoreDirName(publisherID) + ".tar.gz" +func ArchiveName(providerID peer.ID) string { + return dstoreDirName(providerID) + ".tar.gz" } -func (s *scythe) archiveDatastore(ctx context.Context, dstoreDir string) error { +// archiveDatastore gets a gzipped tar archive of the provider gc datastore, +// from the filestore, and extracts the archive. +// +// If the dsDir is `/data/datastore-gc/` and the archive is +// gc-data-PID.tar.gz, then that archive is extracted as +// `/data/datastore-gc/gc-data-PID`. +func (r *Reaper) unarchiveDatastore(ctx context.Context, providerID peer.ID) error { + tarName := ArchiveName(providerID) + log.Debugw("Datastore directory does not exist, fetching from archive", "name", tarName) + + fileInfo, rc, err := r.fileStore.Get(ctx, tarName) + if err != nil { + if errors.Is(err, fs.ErrNotExist) { + log.Debug("Datastore archive not found, will create new datastore") + return nil + } + return fmt.Errorf("cannot retrieve datastore archive from filestore: %w", err) + } + defer rc.Close() + + err = targz.ExtractReader(rc, r.dsDir) + if err != nil { + return fmt.Errorf("failed to extract datastore archive: %w", err) + } + + log.Infow("Extracted datastore archive from filestore", "name", tarName, "size", fileInfo.Size, "targetDir", r.dsDir) + return nil +} + +// archiveDatastore creates a gzipped tar archive of the provider gc datastore +// and puts a copy of that into the filestore. +// +// If the provider gc datastore is `/data/datastore-gc/gc-data-PID` then +// create an archive of `gc-data-PId` named `gc-data-PID.tar.gz` and copy +// it to the filestore. +func (s *scythe) archiveDatastore(ctx context.Context) error { if !s.reaper.commit { return nil } @@ -1029,9 +1133,10 @@ func (s *scythe) archiveDatastore(ctx context.Context, dstoreDir string) error { log.Warn("Filestore not available to save gc datastore to") return nil } - tarName := dstoreArchiveName(s.publisher.ID) - parent := filepath.Dir(dstoreDir) - tarPath := filepath.Join(parent, tarName) + + dstoreDir := filepath.Join(s.reaper.dsDir, dstoreDirName(s.providerID)) + tarName := ArchiveName(s.providerID) + tarPath := filepath.Join(s.reaper.dsDir, tarName) err := targz.Create(dstoreDir, tarPath) if err != nil { @@ -1056,29 +1161,6 @@ func (s *scythe) archiveDatastore(ctx context.Context, dstoreDir string) error { return nil } -func unarchiveDatastore(ctx context.Context, publisherID peer.ID, fileStore filestore.Interface, parentDir string) error { - tarName := dstoreArchiveName(publisherID) - log.Debugw("Datastore directory does not exist, fetching from archive", "name", tarName) - - fileInfo, rc, err := fileStore.Get(ctx, tarName) - if err != nil { - if errors.Is(err, fs.ErrNotExist) { - log.Debug("Datastore archive not found, will create new datastore") - return nil - } - return fmt.Errorf("cannot retrieve datastore archive from filestore: %w", err) - } - defer rc.Close() - - err = targz.ExtractReader(rc, parentDir) - if err != nil { - return fmt.Errorf("failed to extract datastore archive: %w", err) - } - - log.Infow("Extracted datastore archive from filestore", "name", tarName, "size", fileInfo.Size, "targetDir", parentDir) - return nil -} - func (s *scythe) loadAd(c cid.Cid) (schema.Advertisement, error) { adn, err := s.loadNode(c, schema.AdvertisementPrototype) if err != nil { diff --git a/ipni-gc/reaper/reaper_test.go b/ipni-gc/reaper/reaper_test.go index 5b3388d33..7a54174f8 100644 --- a/ipni-gc/reaper/reaper_test.go +++ b/ipni-gc/reaper/reaper_test.go @@ -8,9 +8,11 @@ import ( "testing" "time" + "github.com/ipfs/go-cid" "github.com/ipni/go-indexer-core/store/memory" "github.com/ipni/go-libipni/find/model" "github.com/ipni/go-libipni/pcache" + "github.com/ipni/storetheindex/carstore" "github.com/ipni/storetheindex/filestore" "github.com/ipni/storetheindex/ipni-gc/reaper" "github.com/libp2p/go-libp2p/core/peer" @@ -21,6 +23,7 @@ import ( const testTopic = "/indexer/ingest/test" var pid1, pid2, pid3 peer.ID +var adCid cid.Cid func init() { var err error @@ -36,6 +39,10 @@ func init() { if err != nil { panic(err) } + adCid, err = cid.Decode("bafybeigvgzoolc3drupxhlevdp2ugqcrbcsqfmcek2zxiw5wctk3xjpjwy") + if err != nil { + panic(err) + } } type mockSource struct { @@ -52,8 +59,7 @@ func TestReaper(t *testing.T) { idxr := memory.New() - src := newMockSource(pid1) - pc, err := pcache.New(pcache.WithSource(src)) + pc, err := pcache.New(pcache.WithSource(newMockSource(pid1))) require.NoError(t, err) gc, err := reaper.New(idxr, fileStore, @@ -75,9 +81,7 @@ func TestReaper(t *testing.T) { require.NoError(t, err) // Check that archive is stored in filestore. - archiveName, err := gc.DataArchiveName(ctx, pid1) - require.NoError(t, err) - fileInfo, err := fileStore.Head(ctx, archiveName) + fileInfo, err := fileStore.Head(ctx, reaper.ArchiveName(pid1)) require.NoError(t, err) require.NotZero(t, fileInfo.Size) @@ -95,10 +99,33 @@ func TestReaper(t *testing.T) { reaper.WithTopicName(testTopic), ) require.NoError(t, err) - err = gc2.Reap(ctx, pid3) - require.Error(t, err) - require.Error(t, fs.ErrNotExist) defer gc2.Close() + err = gc2.Reap(ctx, pid3) + require.NoError(t, err) + gc2.Close() + + carWriter, err := carstore.NewWriter(nil, fileStore) + require.NoError(t, err) + _, err = carWriter.WriteHead(context.Background(), adCid, pid1) + require.NoError(t, err) + + pc, err = pcache.New(pcache.WithSource(newMockSource(pid2))) + require.NoError(t, err) + gc2, err = reaper.New(idxr, fileStore, + reaper.WithCarDelete(true), + reaper.WithCarRead(true), + reaper.WithCommit(true), + reaper.WithDatastoreDir(dsDir), + reaper.WithDatastoreTempDir(dsTmpDir), + reaper.WithDeleteNotFound(true), + reaper.WithPCache(pc), + reaper.WithTopicName(testTopic), + ) + require.NoError(t, err) + + err = gc2.Reap(ctx, pid1) + require.ErrorIs(t, err, fs.ErrNotExist) + gc2.Close() } func newMockSource(pids ...peer.ID) *mockSource {