Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: refactor router and add tests, use periodic ID token refresh in release CI #38

Merged
merged 2 commits into from
Apr 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 25 additions & 1 deletion .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,18 @@ jobs:
tenant-id: ${{ secrets.AZURE_TENANT_ID }}
subscription-id: ${{ secrets.AZURE_SUBSCRIPTION_ID }}

# This is a temporary workaround. See: https://github.com/Azure/azure-cli/issues/28708#issuecomment-2049014471
- name: Fetch OID token every 4 mins
run: |
while true; do
token_request=$ACTIONS_ID_TOKEN_REQUEST_TOKEN
token_uri=$ACTIONS_ID_TOKEN_REQUEST_URL
token=$(curl -H "Authorization: bearer $token_request" "${token_uri}&audience=api://AzureADTokenExchange" | jq .value -r)
az login --service-principal -u ${{ secrets.AZURE_CLIENT_ID }} -t ${{ secrets.AZURE_TENANT_ID }} --federated-token $token --output none
# Sleep for 4 minutes
sleep 240
done &

- name: Check Out Source Code
if: ${{ success() }}
uses: actions/checkout@v2
Expand Down Expand Up @@ -96,12 +108,24 @@ jobs:
tenant-id: ${{ secrets.AZURE_TENANT_ID }}
subscription-id: ${{ secrets.AZURE_SUBSCRIPTION_ID }}

# This is a temporary workaround. See: https://github.com/Azure/azure-cli/issues/28708#issuecomment-2049014471
- name: Fetch OID token every 4 mins
run: |
while true; do
token_request=$ACTIONS_ID_TOKEN_REQUEST_TOKEN
token_uri=$ACTIONS_ID_TOKEN_REQUEST_URL
token=$(curl -H "Authorization: bearer $token_request" "${token_uri}&audience=api://AzureADTokenExchange" | jq .value -r)
az login --service-principal -u ${{ secrets.AZURE_CLIENT_ID }} -t ${{ secrets.AZURE_TENANT_ID }} --federated-token $token --output none
# Sleep for 4 minutes
sleep 240
done &

- name: Check Out Source Code
if: ${{ success() }}
uses: actions/checkout@v2
with:
ref: ${{ env.TAG }}

- name: 'Make'
if: ${{ success() }}
run: |
Expand Down
2 changes: 1 addition & 1 deletion cmd/proxy/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@

eventsRecorder.Initializing()

r, err := routing.NewRouter(ctx, clientset, args.RouterAddr, httpsPort, clientset.Namespace)
r, err := routing.NewRouter(ctx, clientset, args.RouterAddr, httpsPort)

Check warning on line 101 in cmd/proxy/main.go

View check run for this annotation

Codecov / codecov/patch

cmd/proxy/main.go#L101

Added line #L101 was not covered by tests
if err != nil {
return err
}
Expand Down
12 changes: 0 additions & 12 deletions internal/context/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,24 +10,12 @@ import (
"strconv"
"strings"
"sync"
"time"

"github.com/gin-gonic/gin"
"github.com/google/uuid"
"github.com/rs/zerolog"
)

// P2P network.
const (
KeyTTL = 30 * time.Minute
)

// Cache constants.
const (
P2pLookupCacheTtl = 500 * time.Millisecond
P2pLookupNotFoundValue = "PEER_NOT_FOUND"
)

// Context keys.
const (
CorrelationIdCtxKey = "correlation_id"
Expand Down
2 changes: 1 addition & 1 deletion internal/oci/mirror.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func (m *Mirror) Handle(c *gin.Context) {
}

succeeded := false
u, err := url.Parse(peer.Addr)
u, err := url.Parse(peer.HttpHost)
if err != nil {
//nolint
c.AbortWithError(http.StatusInternalServerError, err)
Expand Down
8 changes: 4 additions & 4 deletions internal/remote/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func (r *reader) doP2p(log zerolog.Logger, fileChunkKey string, start, end int64

startTime := time.Now()
peerCount := 0
peersCh, negCacheCallback, err := r.router.ResolveWithCache(resolveCtx, fileChunkKey, false, r.resolveRetries)
peersCh, negCacheCallback, err := r.router.ResolveWithNegativeCacheCallback(resolveCtx, fileChunkKey, false, r.resolveRetries)
if err != nil {
//nolint:errcheck // ignore
log.Error().Err(err).Msg(p2pcontext.PeerRequestErrorLog)
Expand Down Expand Up @@ -139,11 +139,11 @@ peerLoop:

if peerCount == 0 {
// Only report the time it took to discover the first peer.
metrics.Global.RecordPeerDiscovery(peer.Addr, time.Since(startTime).Seconds())
metrics.Global.RecordPeerDiscovery(peer.HttpHost, time.Since(startTime).Seconds())
peerCount++
}

peerReq, err := r.peerRequest(peer.Addr, start, end)
peerReq, err := r.peerRequest(peer.HttpHost, start, end)
if err != nil {
log.Error().Err(err).Msg(p2pcontext.PeerRequestErrorLog)
// try next peer
Expand Down Expand Up @@ -172,7 +172,7 @@ peerLoop:
if o == operationPreadRemote {
op = "pread"
}
metrics.Global.RecordPeerResponse(peer.Addr, fileChunkKey, op, time.Since(startTime).Seconds(), count)
metrics.Global.RecordPeerResponse(peer.HttpHost, fileChunkKey, op, time.Since(startTime).Seconds(), count)
return count, nil
}
}
Expand Down
15 changes: 9 additions & 6 deletions internal/routing/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,20 @@ import (
"github.com/libp2p/go-libp2p/core/peer"
)

// Router provides an interface to a peered network.
// Router provides a content routing interface to the network.
type Router interface {
// Net returns the network interface.
Net() peernet.Network

// Resolve resolves the given key to a peer address.
Resolve(ctx context.Context, key string, allowSelf bool, count int) (<-chan PeerInfo, error)

// ResolveWithCache is like Resolve but it also returns a function callback that can be used to cache that a key could not be resolved.
ResolveWithCache(ctx context.Context, key string, allowSelf bool, count int) (<-chan PeerInfo, func(), error)
// ResolveWithNegativeCacheCallback is like Resolve but it also returns a function callback that can be used to cache that a key could not be resolved.
ResolveWithNegativeCacheCallback(ctx context.Context, key string, allowSelf bool, count int) (<-chan PeerInfo, func(), error)

// Advertise advertises the given keys to the network.
Advertise(ctx context.Context, keys []string) error
// Provide provides the given keys to the network.
// This lets the k-closest peers to the key know that we are providing it.
Provide(ctx context.Context, keys []string) error

// Close closes the router.
Close() error
Expand All @@ -30,5 +31,7 @@ type Router interface {
// PeerInfo describes a peer.
type PeerInfo struct {
peer.ID
Addr string

// HttpHost is the HTTP host of the peer.
HttpHost string
}
Loading
Loading