diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index ef0904c..16505f2 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -68,6 +68,18 @@ jobs: tenant-id: ${{ secrets.AZURE_TENANT_ID }} subscription-id: ${{ secrets.AZURE_SUBSCRIPTION_ID }} + # This is a temporary workaround. See: https://github.com/Azure/azure-cli/issues/28708#issuecomment-2049014471 + - name: Fetch OID token every 4 mins + run: | + while true; do + token_request=$ACTIONS_ID_TOKEN_REQUEST_TOKEN + token_uri=$ACTIONS_ID_TOKEN_REQUEST_URL + token=$(curl -H "Authorization: bearer $token_request" "${token_uri}&audience=api://AzureADTokenExchange" | jq .value -r) + az login --service-principal -u ${{ secrets.AZURE_CLIENT_ID }} -t ${{ secrets.AZURE_TENANT_ID }} --federated-token $token --output none + # Sleep for 4 minutes + sleep 240 + done & + - name: Check Out Source Code if: ${{ success() }} uses: actions/checkout@v2 @@ -96,12 +108,24 @@ jobs: tenant-id: ${{ secrets.AZURE_TENANT_ID }} subscription-id: ${{ secrets.AZURE_SUBSCRIPTION_ID }} + # This is a temporary workaround. See: https://github.com/Azure/azure-cli/issues/28708#issuecomment-2049014471 + - name: Fetch OID token every 4 mins + run: | + while true; do + token_request=$ACTIONS_ID_TOKEN_REQUEST_TOKEN + token_uri=$ACTIONS_ID_TOKEN_REQUEST_URL + token=$(curl -H "Authorization: bearer $token_request" "${token_uri}&audience=api://AzureADTokenExchange" | jq .value -r) + az login --service-principal -u ${{ secrets.AZURE_CLIENT_ID }} -t ${{ secrets.AZURE_TENANT_ID }} --federated-token $token --output none + # Sleep for 4 minutes + sleep 240 + done & + - name: Check Out Source Code if: ${{ success() }} uses: actions/checkout@v2 with: ref: ${{ env.TAG }} - + - name: 'Make' if: ${{ success() }} run: | diff --git a/cmd/proxy/main.go b/cmd/proxy/main.go index 1c48fca..5bed629 100644 --- a/cmd/proxy/main.go +++ b/cmd/proxy/main.go @@ -98,7 +98,7 @@ func serverCommand(ctx context.Context, args *ServerCmd) (err error) { eventsRecorder.Initializing() - r, err := routing.NewRouter(ctx, clientset, args.RouterAddr, httpsPort, clientset.Namespace) + r, err := routing.NewRouter(ctx, clientset, args.RouterAddr, httpsPort) if err != nil { return err } diff --git a/internal/context/context.go b/internal/context/context.go index bcbae63..16232c1 100644 --- a/internal/context/context.go +++ b/internal/context/context.go @@ -10,24 +10,12 @@ import ( "strconv" "strings" "sync" - "time" "github.com/gin-gonic/gin" "github.com/google/uuid" "github.com/rs/zerolog" ) -// P2P network. -const ( - KeyTTL = 30 * time.Minute -) - -// Cache constants. -const ( - P2pLookupCacheTtl = 500 * time.Millisecond - P2pLookupNotFoundValue = "PEER_NOT_FOUND" -) - // Context keys. const ( CorrelationIdCtxKey = "correlation_id" diff --git a/internal/oci/mirror.go b/internal/oci/mirror.go index 813b072..28c55f9 100644 --- a/internal/oci/mirror.go +++ b/internal/oci/mirror.go @@ -83,7 +83,7 @@ func (m *Mirror) Handle(c *gin.Context) { } succeeded := false - u, err := url.Parse(peer.Addr) + u, err := url.Parse(peer.HttpHost) if err != nil { //nolint c.AbortWithError(http.StatusInternalServerError, err) diff --git a/internal/remote/reader.go b/internal/remote/reader.go index e7c3247..e8b1990 100644 --- a/internal/remote/reader.go +++ b/internal/remote/reader.go @@ -111,7 +111,7 @@ func (r *reader) doP2p(log zerolog.Logger, fileChunkKey string, start, end int64 startTime := time.Now() peerCount := 0 - peersCh, negCacheCallback, err := r.router.ResolveWithCache(resolveCtx, fileChunkKey, false, r.resolveRetries) + peersCh, negCacheCallback, err := r.router.ResolveWithNegativeCacheCallback(resolveCtx, fileChunkKey, false, r.resolveRetries) if err != nil { //nolint:errcheck // ignore log.Error().Err(err).Msg(p2pcontext.PeerRequestErrorLog) @@ -139,11 +139,11 @@ peerLoop: if peerCount == 0 { // Only report the time it took to discover the first peer. - metrics.Global.RecordPeerDiscovery(peer.Addr, time.Since(startTime).Seconds()) + metrics.Global.RecordPeerDiscovery(peer.HttpHost, time.Since(startTime).Seconds()) peerCount++ } - peerReq, err := r.peerRequest(peer.Addr, start, end) + peerReq, err := r.peerRequest(peer.HttpHost, start, end) if err != nil { log.Error().Err(err).Msg(p2pcontext.PeerRequestErrorLog) // try next peer @@ -172,7 +172,7 @@ peerLoop: if o == operationPreadRemote { op = "pread" } - metrics.Global.RecordPeerResponse(peer.Addr, fileChunkKey, op, time.Since(startTime).Seconds(), count) + metrics.Global.RecordPeerResponse(peer.HttpHost, fileChunkKey, op, time.Since(startTime).Seconds(), count) return count, nil } } diff --git a/internal/routing/interface.go b/internal/routing/interface.go index 1f4e21b..9b347bb 100644 --- a/internal/routing/interface.go +++ b/internal/routing/interface.go @@ -9,7 +9,7 @@ import ( "github.com/libp2p/go-libp2p/core/peer" ) -// Router provides an interface to a peered network. +// Router provides a content routing interface to the network. type Router interface { // Net returns the network interface. Net() peernet.Network @@ -17,11 +17,12 @@ type Router interface { // Resolve resolves the given key to a peer address. Resolve(ctx context.Context, key string, allowSelf bool, count int) (<-chan PeerInfo, error) - // ResolveWithCache is like Resolve but it also returns a function callback that can be used to cache that a key could not be resolved. - ResolveWithCache(ctx context.Context, key string, allowSelf bool, count int) (<-chan PeerInfo, func(), error) + // ResolveWithNegativeCacheCallback is like Resolve but it also returns a function callback that can be used to cache that a key could not be resolved. + ResolveWithNegativeCacheCallback(ctx context.Context, key string, allowSelf bool, count int) (<-chan PeerInfo, func(), error) - // Advertise advertises the given keys to the network. - Advertise(ctx context.Context, keys []string) error + // Provide provides the given keys to the network. + // This lets the k-closest peers to the key know that we are providing it. + Provide(ctx context.Context, keys []string) error // Close closes the router. Close() error @@ -30,5 +31,7 @@ type Router interface { // PeerInfo describes a peer. type PeerInfo struct { peer.ID - Addr string + + // HttpHost is the HTTP host of the peer. + HttpHost string } diff --git a/internal/routing/router.go b/internal/routing/router.go index 0325e85..54f0faa 100644 --- a/internal/routing/router.go +++ b/internal/routing/router.go @@ -7,6 +7,7 @@ import ( "fmt" "net" "sync/atomic" + "time" p2pcontext "github.com/azure/peerd/internal/context" "github.com/azure/peerd/internal/k8s/events" @@ -26,64 +27,62 @@ import ( "github.com/rs/zerolog" ) +const ( + MaxRecordAge = 30 * time.Minute + + negCacheTtl = 500 * time.Millisecond + strPeerNotFound = "PEER_NOT_FOUND" +) + type router struct { - clientset *k8s.ClientSet - p2pnet peernet.Network - host host.Host - rd *routing.RoutingDiscovery - port string - lookupCache *ristretto.Cache + // host is this libp2p host. + host host.Host + + // p2pnet provides clients for downloading content from peers. + p2pnet peernet.Network + + // content is the content discovery service. + content *routing.RoutingDiscovery + + // peerRegistryPort is the port used for the peer registry. + peerRegistryPort string + + // lookupCache is a cache for storing the results of lookups, usually used to store negative results. + lookupCache *ristretto.Cache + + // k8sClient is the k8s client. + k8sClient *k8s.ClientSet + + // k8sNamespace is the k8s namespace used for leader election and event recording. k8sNamespace string + // active is a flag that indicates if this host is actively discovering content on the network. active atomic.Bool } -// PeerNotFoundError indicates that no peer could be found for the given key. -type PeerNotFoundError struct { +var _ Router = &router{} + +// ContentNotFoundError indicates that the content for the given key was not found in the network. +type ContentNotFoundError struct { error + + // key is the key that could not be resolved. key string } -// NewRouter creates a new router. -func NewRouter(ctx context.Context, clientset *k8s.ClientSet, routerAddr, serverPort, k8sNamespace string) (Router, error) { +// NewRouter creates a new Router. +func NewRouter(ctx context.Context, clientset *k8s.ClientSet, hostAddr, peerRegistryPort string) (Router, error) { log := zerolog.Ctx(ctx).With().Str("component", "router").Logger() - h, p, err := net.SplitHostPort(routerAddr) - if err != nil { - return nil, err - } - - multiAddr, err := multiaddr.NewMultiaddr(fmt.Sprintf("/ip4/%s/tcp/%s", h, p)) - if err != nil { - return nil, fmt.Errorf("could not create host multi address: %w", err) - } - - factory := libp2p.AddrsFactory(func(addrs []multiaddr.Multiaddr) []multiaddr.Multiaddr { - for _, addr := range addrs { - v, err := addr.ValueForProtocol(multiaddr.P_IP4) - if err != nil { - continue - } - if v == "" { - continue - } - if v == "127.0.0.1" { - continue - } - return []multiaddr.Multiaddr{addr} - } - return nil - }) - - host, err := libp2p.New(libp2p.ListenAddrs(multiAddr), factory) + host, err := newHost(hostAddr) if err != nil { return nil, fmt.Errorf("could not create host: %w", err) } self := fmt.Sprintf("%s/p2p/%s", host.Addrs()[0].String(), host.ID().String()) - log.Info().Str("id", self).Msg("starting p2p router") + log.Debug().Str("id", self).Msg("starting p2p router") - leaderElection := election.New(k8sNamespace, "peerd-leader-election", p2pcontext.KubeConfigPath) + leaderElection := election.New(clientset.Namespace, "peerd-leader-election", p2pcontext.KubeConfigPath) err = leaderElection.RunOrDie(ctx, self) if err != nil { @@ -91,7 +90,7 @@ func NewRouter(ctx context.Context, clientset *k8s.ClientSet, routerAddr, server } // TODO avtakkar: reconsider the max record age for cached files. Or, ensure that the cached list is periodically advertised. - dhtOpts := []dht.Option{dht.Mode(dht.ModeServer), dht.ProtocolPrefix("/microsoft"), dht.DisableValues(), dht.MaxRecordAge(p2pcontext.KeyTTL)} + dhtOpts := []dht.Option{dht.Mode(dht.ModeServer), dht.ProtocolPrefix("/peerd"), dht.DisableValues(), dht.MaxRecordAge(MaxRecordAge)} bootstrapPeerOpt := dht.BootstrapPeersFunc(func() []peer.AddrInfo { addr, err := leaderElection.Leader() if err != nil { @@ -102,7 +101,7 @@ func NewRouter(ctx context.Context, clientset *k8s.ClientSet, routerAddr, server addrInfo, err := peer.AddrInfoFromP2pAddr(addr) if err != nil { - log.Error().Err(err).Msg("could not get leader") + log.Error().Err(err).Msg("could not get leader addr info") return nil } @@ -111,11 +110,11 @@ func NewRouter(ctx context.Context, clientset *k8s.ClientSet, routerAddr, server }() if addrInfo.ID == host.ID() { - log.Info().Msg("leader is self, skipping connection to bootstrap node") + log.Debug().Msg("bootstrapped as leader") return nil } - log.Info().Str("node", addrInfo.ID.String()).Msg("bootstrap node found") + log.Debug().Str("leader", addrInfo.ID.String()).Msg("leader found") return []peer.AddrInfo{*addrInfo} }) @@ -140,13 +139,13 @@ func NewRouter(ctx context.Context, clientset *k8s.ClientSet, routerAddr, server } return &router{ - clientset: clientset, - p2pnet: n, - host: host, - rd: rd, - port: serverPort, - lookupCache: c, - k8sNamespace: k8sNamespace, + k8sClient: clientset, + p2pnet: n, + host: host, + content: rd, + peerRegistryPort: peerRegistryPort, + lookupCache: c, + k8sNamespace: clientset.Namespace, }, nil } @@ -160,76 +159,85 @@ func (r *router) Close() error { return r.host.Close() } -// ResolveWithCache is like Resolve but it also returns a function callback that can be used to cache that a key could not be resolved. -func (r *router) ResolveWithCache(ctx context.Context, key string, allowSelf bool, count int) (<-chan PeerInfo, func(), error) { - if val, ok := r.lookupCache.Get(key); ok && val.(string) == p2pcontext.P2pLookupNotFoundValue { +// ResolveWithNegativeCacheCallback is like Resolve but it also returns a function callback that can be used to cache that a key could not be resolved. +func (r *router) ResolveWithNegativeCacheCallback(ctx context.Context, key string, allowSelf bool, count int) (<-chan PeerInfo, func(), error) { + if val, ok := r.lookupCache.Get(key); ok && val.(string) == strPeerNotFound { // TODO avtakkar: currently only doing a negative cache, this could maybe become a positive cache as well. - return nil, nil, PeerNotFoundError{key: key, error: fmt.Errorf("(cached) peer not found for key")} + return nil, nil, ContentNotFoundError{key: key, error: fmt.Errorf("(cached) peer not found for key")} } + peerCh, err := r.Resolve(ctx, key, allowSelf, count) return peerCh, func() { - r.lookupCache.SetWithTTL(key, p2pcontext.P2pLookupNotFoundValue, 1, p2pcontext.P2pLookupCacheTtl) + r.lookupCache.SetWithTTL(key, strPeerNotFound, 1, negCacheTtl) }, err } // Resolve resolves the given key to a peer address. func (r *router) Resolve(ctx context.Context, key string, allowSelf bool, count int) (<-chan PeerInfo, error) { - log := zerolog.Ctx(ctx).With().Str("host", r.host.ID().String()).Str("key", key).Logger() - c, err := createCid(key) + log := zerolog.Ctx(ctx).With().Str("selfId", r.host.ID().String()).Str("key", key).Logger() + contentId, err := createContentId(key) if err != nil { return nil, err } - addrCh := r.rd.FindProvidersAsync(ctx, c, count) - peerCh := make(chan PeerInfo, count) + + providersCh := r.content.FindProvidersAsync(ctx, contentId, count) + peersCh := make(chan PeerInfo, count) + go func() { - for info := range addrCh { + for info := range providersCh { if !allowSelf && info.ID == r.host.ID() { continue } + if len(info.Addrs) != 1 { - log.Info().Msg("expected address list to only contain a single item") + log.Debug().Msg("expected address list to only contain a single item") continue } v, err := info.Addrs[0].ValueForProtocol(multiaddr.P_IP4) if err != nil { - log.Error().Err(err).Msg("could not get IPV4 address") + log.Error().Err(err).Str("peer", info.Addrs[0].String()).Msg("could not get IPV4 address") continue } // Combine peer with registry port to create mirror endpoint. - peerCh <- PeerInfo{info.ID, fmt.Sprintf("https://%s:%s", v, r.port)} + peersCh <- PeerInfo{info.ID, fmt.Sprintf("https://%s:%s", v, r.peerRegistryPort)} if r.active.CompareAndSwap(false, true) { - er, err := events.NewRecorder(ctx, r.clientset, r.k8sNamespace) + er, err := events.NewRecorder(ctx, r.k8sClient, r.k8sNamespace) if err != nil { - log.Error().Err(err).Msg("could not create event recorder") + log.Error().Err(err).Msg("failed to create event recorder") } else { er.Active() // Report that p2p is active. } } } }() - return peerCh, nil + + return peersCh, nil } -// Advertise advertises the given keys to the network. -func (r *router) Advertise(ctx context.Context, keys []string) error { - zerolog.Ctx(ctx).Trace().Str("host", r.host.ID().String()).Strs("keys", keys).Msg("advertising keys") +// Provide advertises the given keys to the network. +func (r *router) Provide(ctx context.Context, keys []string) error { + zerolog.Ctx(ctx).Trace().Str("host", r.host.ID().String()).Strs("keys", keys).Msg("providing keys") for _, key := range keys { - c, err := createCid(key) + + contentId, err := createContentId(key) if err != nil { return err } - err = r.rd.Provide(ctx, c, true) + + err = r.content.Provide(ctx, contentId, true) if err != nil { return err } } + return nil } -func createCid(key string) (cid.Cid, error) { +// createContentId creates a deterministic content id from the given key. +func createContentId(key string) (cid.Cid, error) { pref := cid.Prefix{ Version: 1, Codec: uint64(mc.Raw), @@ -242,3 +250,35 @@ func createCid(key string) (cid.Cid, error) { } return c, nil } + +// newHost creates a new Host from the given address. +func newHost(addr string) (host.Host, error) { + h, p, err := net.SplitHostPort(addr) + if err != nil { + return nil, err + } + + hostAddr, err := multiaddr.NewMultiaddr(fmt.Sprintf("/ip4/%s/tcp/%s", h, p)) + if err != nil { + return nil, fmt.Errorf("could not create host multi address: %w", err) + } + + factory := libp2p.AddrsFactory(func(addrs []multiaddr.Multiaddr) []multiaddr.Multiaddr { + for _, addr := range addrs { + v, err := addr.ValueForProtocol(multiaddr.P_IP4) + if err != nil { + continue + } + if v == "" { + continue + } + if v == "127.0.0.1" { + continue + } + return []multiaddr.Multiaddr{addr} + } + return nil + }) + + return libp2p.New(libp2p.ListenAddrs(hostAddr), factory) +} diff --git a/internal/routing/router_test.go b/internal/routing/router_test.go index bc88f1c..2d7b3dc 100644 --- a/internal/routing/router_test.go +++ b/internal/routing/router_test.go @@ -5,10 +5,10 @@ package routing import ( "context" "errors" + "strings" "testing" "time" - p2pcontext "github.com/azure/peerd/internal/context" "github.com/azure/peerd/pkg/k8s" "github.com/dgraph-io/ristretto" cid "github.com/ipfs/go-cid" @@ -45,15 +45,15 @@ func TestResolveWithCache(t *testing.T) { } r := &router{ - clientset: &fakeClientset, - host: h, - port: "5000", - lookupCache: c, - rd: routing.NewRoutingDiscovery(tcr), + k8sClient: &fakeClientset, + host: h, + peerRegistryPort: "5000", + lookupCache: c, + content: routing.NewRoutingDiscovery(tcr), } ctx := context.Background() - _, negCacheCallback, err := r.ResolveWithCache(ctx, key, false, 2) + _, negCacheCallback, err := r.ResolveWithNegativeCacheCallback(ctx, key, false, 2) if err != nil { t.Fatal(err) } @@ -61,8 +61,8 @@ func TestResolveWithCache(t *testing.T) { negCacheCallback() time.Sleep(250 * time.Millisecond) // allow cache to flush - if val, ok := r.lookupCache.Get(key); !ok || val != p2pcontext.P2pLookupNotFoundValue { - t.Errorf("expected key to be %s, got %s", p2pcontext.P2pLookupNotFoundValue, val) + if val, ok := r.lookupCache.Get(key); !ok || val != strPeerNotFound { + t.Errorf("expected key to be %s, got %s", strPeerNotFound, val) } } @@ -78,17 +78,17 @@ func TestResolve(t *testing.T) { h := &testHost{"host-id"} key := "some-key" - contentId, err := createCid(key) + contentId, err := createContentId(key) if err != nil { t.Fatal(err) } r := &router{ - clientset: &fakeClientset, - host: h, - port: "5000", - lookupCache: c, - rd: routing.NewRoutingDiscovery(&testCr{ + k8sClient: &fakeClientset, + host: h, + peerRegistryPort: "5000", + lookupCache: c, + content: routing.NewRoutingDiscovery(&testCr{ m: map[string][]string{ contentId.String(): {"10.0.0.1", "10.0.0.2"}, }, @@ -103,7 +103,7 @@ func TestResolve(t *testing.T) { count := 0 for info := range got { - if info.Addr == "https://10.0.0.1:5000" || info.Addr == "https://10.0.0.2:5000" { + if info.HttpHost == "https://10.0.0.1:5000" || info.HttpHost == "https://10.0.0.2:5000" { count++ } else { t.Errorf("expected peer1 or peer2, got %s", info) @@ -131,7 +131,7 @@ func TestProvide(t *testing.T) { h := &testHost{"host-id"} key := "some-key" - contentId, err := createCid(key) + contentId, err := createContentId(key) if err != nil { t.Fatal(err) } @@ -140,15 +140,15 @@ func TestProvide(t *testing.T) { } r := &router{ - clientset: &fakeClientset, - host: h, - port: "5000", - lookupCache: c, - rd: routing.NewRoutingDiscovery(tcr), + k8sClient: &fakeClientset, + host: h, + peerRegistryPort: "5000", + lookupCache: c, + content: routing.NewRoutingDiscovery(tcr), } ctx := context.Background() - err = r.Advertise(ctx, []string{key}) + err = r.Provide(ctx, []string{key}) if err != nil { t.Fatal(err) } @@ -160,6 +160,56 @@ func TestProvide(t *testing.T) { } } +func TestNewHost(t *testing.T) { + for _, tc := range []struct { + name string + addr string + expectedIp string + expectedPort string + expectedErr bool + }{ + { + name: "valid address", + addr: "0.0.0.0:5000", + expectedPort: "5000", + expectedErr: false, + }, + { + name: "invalid address", + addr: "invalidaddress", + expectedPort: "", + expectedErr: true, + }, + } { + t.Run(tc.name, func(t *testing.T) { + h, err := newHost(tc.addr) + if tc.expectedErr && err == nil { + t.Fatalf("expected error, got nil") + } + + if !tc.expectedErr && err != nil { + t.Fatalf("expected no error, got %v", err) + } + + if tc.expectedErr && err != nil { + return + } + + if h == nil { + t.Fatal("expected host to be non-nil") + } + + if len(h.Addrs()) != 1 { + t.Fatalf("expected 1 address, got %d", len(h.Addrs())) + } + + if !strings.HasSuffix(h.Addrs()[0].String(), "/tcp/"+tc.expectedPort) { + t.Fatalf("expected address to end with /tcp/%s, got %s", tc.expectedPort, h.Addrs()[0].String()) + } + }) + } +} + type testCr struct { m map[string][]string provided []cid.Cid diff --git a/internal/routing/tests/mock.go b/internal/routing/tests/mock.go index 589a583..169f7c2 100644 --- a/internal/routing/tests/mock.go +++ b/internal/routing/tests/mock.go @@ -13,7 +13,7 @@ import ( ) type MockRouter struct { - net peernet.Network + p2pNet peernet.Network mx sync.RWMutex resolver map[string][]string @@ -22,11 +22,11 @@ type MockRouter struct { // Net implements routing.Router. func (m *MockRouter) Net() peernet.Network { - return m.net + return m.p2pNet } -// ResolveWithCache implements Router. -func (m *MockRouter) ResolveWithCache(ctx context.Context, key string, allowSelf bool, count int) (<-chan routing.PeerInfo, func(), error) { +// ResolveWithNegativeCacheCallback implements Router. +func (m *MockRouter) ResolveWithNegativeCacheCallback(ctx context.Context, key string, allowSelf bool, count int) (<-chan routing.PeerInfo, func(), error) { c, e := m.Resolve(ctx, key, allowSelf, count) return c, func() { m.mx.Lock() @@ -44,7 +44,7 @@ func NewMockRouter(resolver map[string][]string) *MockRouter { } return &MockRouter{ - net: n, + p2pNet: n, resolver: resolver, negCache: map[string]struct{}{}, } @@ -66,7 +66,7 @@ func (m *MockRouter) Resolve(ctx context.Context, key string, allowSelf bool, co m.mx.RLock() defer m.mx.RUnlock() for _, p := range peers { - peerCh <- routing.PeerInfo{ID: peer.ID(p), Addr: p} + peerCh <- routing.PeerInfo{ID: peer.ID(p), HttpHost: p} } close(peerCh) }() @@ -74,7 +74,7 @@ func (m *MockRouter) Resolve(ctx context.Context, key string, allowSelf bool, co return peerCh, nil } -func (m *MockRouter) Advertise(ctx context.Context, keys []string) error { +func (m *MockRouter) Provide(ctx context.Context, keys []string) error { m.mx.Lock() defer m.mx.Unlock() for _, key := range keys { diff --git a/internal/state/state.go b/internal/state/state.go index 1fb3b32..83a79d3 100644 --- a/internal/state/state.go +++ b/internal/state/state.go @@ -38,7 +38,7 @@ func Advertise(ctx context.Context, r routing.Router, containerdStore containerd immediate := make(chan time.Time, 1) immediate <- time.Now() - expirationTicker := time.NewTicker(p2pcontext.KeyTTL - time.Minute) + expirationTicker := time.NewTicker(routing.MaxRecordAge - time.Minute) defer expirationTicker.Stop() ticker := p2pcontext.Merge(immediate, expirationTicker.C) @@ -67,7 +67,7 @@ func Advertise(ctx context.Context, r routing.Router, containerdStore containerd case blob := <-filesChan: l.Debug().Str("blob", blob).Msg("advertising file") - err := r.Advertise(ctx, []string{blob}) + err := r.Provide(ctx, []string{blob}) if err != nil { l.Error().Err(err).Str("blob", blob).Msg("file: advertising error") continue @@ -117,7 +117,7 @@ func advertiseRef(ctx context.Context, l zerolog.Logger, containerdStore contain keys = append(keys, dgsts...) } - err = router.Advertise(ctx, keys) + err = router.Provide(ctx, keys) if err != nil { return 0, fmt.Errorf("could not advertise image %v: %w", ref, err) }