Skip to content

Commit

Permalink
Identify ipni-gc resources by provider ID (#2449)
Browse files Browse the repository at this point in the history
* Identify ipni-gc resources by provider ID

Previously the publisher ID was used as this was more immediately associated with an advertisement chain, and because head files in the CAR mirror were identified by publisher ID. It is more correct to change these things to be identified by provider ID, since the same provider's chain can be served by different publishers.

This PR also allows non-existent provider resources to be cleaned up even if GC has been previously done.
  • Loading branch information
gammazero authored Dec 15, 2023
1 parent e37030e commit bbd9f10
Show file tree
Hide file tree
Showing 6 changed files with 264 additions and 147 deletions.
12 changes: 6 additions & 6 deletions carstore/carreader.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,16 +157,16 @@ func (cr CarReader) Read(ctx context.Context, adCid cid.Cid, skipEntries bool) (
return &adBlock, nil
}

// ReadHead reads the advertisement CID from the publisher's head file. The
// head file contains the CID of the latest advertisement for an advertisement
// publisher. Returns fs.ErrNotExist if head file is not found.
func (cr CarReader) ReadHead(ctx context.Context, publisher peer.ID) (cid.Cid, error) {
err := publisher.Validate()
// ReadHead reads the advertisement CID from the provider's head file. The head
// file contains the CID of the latest advertisement for a provider. Returns
// fs.ErrNotExist if head file is not found.
func (cr CarReader) ReadHead(ctx context.Context, provider peer.ID) (cid.Cid, error) {
err := provider.Validate()
if err != nil {
return cid.Undef, err
}

headPath := publisher.String() + HeadFileSuffix
headPath := provider.String() + HeadFileSuffix
_, r, err := cr.fileStore.Get(ctx, headPath)
if err != nil {
return cid.Undef, err
Expand Down
12 changes: 8 additions & 4 deletions carstore/carwriter.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,16 +276,20 @@ func (cw *CarWriter) WriteChain(ctx context.Context, adCid cid.Cid, overWrite bo
return count, nil
}

func (cw *CarWriter) WriteHead(ctx context.Context, adCid cid.Cid, publisher peer.ID) (*filestore.File, error) {
err := publisher.Validate()
func (cw *CarWriter) WriteHead(ctx context.Context, adCid cid.Cid, provider peer.ID) (*filestore.File, error) {
err := provider.Validate()
if err != nil {
return nil, err
}

headName := publisher.String() + HeadFileSuffix
headName := provider.String() + HeadFileSuffix
return cw.fileStore.Put(ctx, headName, strings.NewReader(adCid.String()))
}

func (cw *CarWriter) DeleteHead(ctx context.Context, provider peer.ID) error {
headName := provider.String() + HeadFileSuffix
return cw.fileStore.Delete(ctx, headName)
}

func (cw *CarWriter) Delete(ctx context.Context, adCid cid.Cid) error {
err := cw.fileStore.Delete(ctx, cw.CarPath(adCid))
if err != nil {
Expand Down
8 changes: 6 additions & 2 deletions e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -355,12 +355,16 @@ func testEndToEndWithReferenceProvider(t *testing.T, publisherProto string) {
outStatus = e.Run(indexer, "admin", "status", "--indexer", "http://localhost:3202")
require.Contains(t, string(outStatus), "Frozen: true", "expected indexer to be frozen")

outgc := string(e.Run(ipnigc, "provider", "-pid", providerID, "-ll", "debug", "--commit",
logLevel := "info"
if testing.Verbose() {
logLevel = "debug"
}
outgc := string(e.Run(ipnigc, "provider", "-pid", providerID, "-ll", logLevel, "--commit",
"-i", "http://localhost:3200",
"-i", "http://localhost:3000",
))
t.Logf("GC Results:\n%s\n", outgc)
require.Contains(t, outgc, `{"count": 1043, "total": 1043, "source": "CAR", "adsProcessed": 2}`)
require.Contains(t, outgc, `"count": 1043, "total": 1043, "source": "CAR"`)

e.Stop(cmdIndexer2, time.Second)

Expand Down
2 changes: 1 addition & 1 deletion internal/ingest/ingest.go
Original file line number Diff line number Diff line change
Expand Up @@ -1117,7 +1117,7 @@ func (ing *Ingester) ingestWorkerLogic(ctx context.Context, provider peer.ID, as
headAdCid := assignment.adInfos[0].cid

if ing.mirror.canWrite() && !assignment.adInfos[0].resync {
_, err := ing.mirror.writeHead(ctx, headAdCid, assignment.publisher)
_, err := ing.mirror.writeHead(ctx, headAdCid, provider)
if err != nil {
log.Errorw("Cannot write publisher head", "err", err)
}
Expand Down
Loading

0 comments on commit bbd9f10

Please sign in to comment.