Skip to content

Commit

Permalink
feat: refactor router and add tests, use periodic ID token refresh in…
Browse files Browse the repository at this point in the history
… release CI (#38)

* feat: refactor to improve readability and coverage

* ci: add periodic id token refresh
  • Loading branch information
avtakkar authored Apr 11, 2024
1 parent d69157d commit 9fee397
Show file tree
Hide file tree
Showing 10 changed files with 236 additions and 131 deletions.
26 changes: 25 additions & 1 deletion .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,18 @@ jobs:
tenant-id: ${{ secrets.AZURE_TENANT_ID }}
subscription-id: ${{ secrets.AZURE_SUBSCRIPTION_ID }}

# This is a temporary workaround. See: https://github.com/Azure/azure-cli/issues/28708#issuecomment-2049014471
- name: Fetch OID token every 4 mins
run: |
while true; do
token_request=$ACTIONS_ID_TOKEN_REQUEST_TOKEN
token_uri=$ACTIONS_ID_TOKEN_REQUEST_URL
token=$(curl -H "Authorization: bearer $token_request" "${token_uri}&audience=api://AzureADTokenExchange" | jq .value -r)
az login --service-principal -u ${{ secrets.AZURE_CLIENT_ID }} -t ${{ secrets.AZURE_TENANT_ID }} --federated-token $token --output none
# Sleep for 4 minutes
sleep 240
done &
- name: Check Out Source Code
if: ${{ success() }}
uses: actions/checkout@v2
Expand Down Expand Up @@ -96,12 +108,24 @@ jobs:
tenant-id: ${{ secrets.AZURE_TENANT_ID }}
subscription-id: ${{ secrets.AZURE_SUBSCRIPTION_ID }}

# This is a temporary workaround. See: https://github.com/Azure/azure-cli/issues/28708#issuecomment-2049014471
- name: Fetch OID token every 4 mins
run: |
while true; do
token_request=$ACTIONS_ID_TOKEN_REQUEST_TOKEN
token_uri=$ACTIONS_ID_TOKEN_REQUEST_URL
token=$(curl -H "Authorization: bearer $token_request" "${token_uri}&audience=api://AzureADTokenExchange" | jq .value -r)
az login --service-principal -u ${{ secrets.AZURE_CLIENT_ID }} -t ${{ secrets.AZURE_TENANT_ID }} --federated-token $token --output none
# Sleep for 4 minutes
sleep 240
done &
- name: Check Out Source Code
if: ${{ success() }}
uses: actions/checkout@v2
with:
ref: ${{ env.TAG }}

- name: 'Make'
if: ${{ success() }}
run: |
Expand Down
2 changes: 1 addition & 1 deletion cmd/proxy/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func serverCommand(ctx context.Context, args *ServerCmd) (err error) {

eventsRecorder.Initializing()

r, err := routing.NewRouter(ctx, clientset, args.RouterAddr, httpsPort, clientset.Namespace)
r, err := routing.NewRouter(ctx, clientset, args.RouterAddr, httpsPort)
if err != nil {
return err
}
Expand Down
12 changes: 0 additions & 12 deletions internal/context/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,24 +10,12 @@ import (
"strconv"
"strings"
"sync"
"time"

"github.com/gin-gonic/gin"
"github.com/google/uuid"
"github.com/rs/zerolog"
)

// P2P network.
const (
KeyTTL = 30 * time.Minute
)

// Cache constants.
const (
P2pLookupCacheTtl = 500 * time.Millisecond
P2pLookupNotFoundValue = "PEER_NOT_FOUND"
)

// Context keys.
const (
CorrelationIdCtxKey = "correlation_id"
Expand Down
2 changes: 1 addition & 1 deletion internal/oci/mirror.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func (m *Mirror) Handle(c *gin.Context) {
}

succeeded := false
u, err := url.Parse(peer.Addr)
u, err := url.Parse(peer.HttpHost)
if err != nil {
//nolint
c.AbortWithError(http.StatusInternalServerError, err)
Expand Down
8 changes: 4 additions & 4 deletions internal/remote/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func (r *reader) doP2p(log zerolog.Logger, fileChunkKey string, start, end int64

startTime := time.Now()
peerCount := 0
peersCh, negCacheCallback, err := r.router.ResolveWithCache(resolveCtx, fileChunkKey, false, r.resolveRetries)
peersCh, negCacheCallback, err := r.router.ResolveWithNegativeCacheCallback(resolveCtx, fileChunkKey, false, r.resolveRetries)
if err != nil {
//nolint:errcheck // ignore
log.Error().Err(err).Msg(p2pcontext.PeerRequestErrorLog)
Expand Down Expand Up @@ -139,11 +139,11 @@ peerLoop:

if peerCount == 0 {
// Only report the time it took to discover the first peer.
metrics.Global.RecordPeerDiscovery(peer.Addr, time.Since(startTime).Seconds())
metrics.Global.RecordPeerDiscovery(peer.HttpHost, time.Since(startTime).Seconds())
peerCount++
}

peerReq, err := r.peerRequest(peer.Addr, start, end)
peerReq, err := r.peerRequest(peer.HttpHost, start, end)
if err != nil {
log.Error().Err(err).Msg(p2pcontext.PeerRequestErrorLog)
// try next peer
Expand Down Expand Up @@ -172,7 +172,7 @@ peerLoop:
if o == operationPreadRemote {
op = "pread"
}
metrics.Global.RecordPeerResponse(peer.Addr, fileChunkKey, op, time.Since(startTime).Seconds(), count)
metrics.Global.RecordPeerResponse(peer.HttpHost, fileChunkKey, op, time.Since(startTime).Seconds(), count)
return count, nil
}
}
Expand Down
15 changes: 9 additions & 6 deletions internal/routing/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,20 @@ import (
"github.com/libp2p/go-libp2p/core/peer"
)

// Router provides an interface to a peered network.
// Router provides a content routing interface to the network.
type Router interface {
// Net returns the network interface.
Net() peernet.Network

// Resolve resolves the given key to a peer address.
Resolve(ctx context.Context, key string, allowSelf bool, count int) (<-chan PeerInfo, error)

// ResolveWithCache is like Resolve but it also returns a function callback that can be used to cache that a key could not be resolved.
ResolveWithCache(ctx context.Context, key string, allowSelf bool, count int) (<-chan PeerInfo, func(), error)
// ResolveWithNegativeCacheCallback is like Resolve but it also returns a function callback that can be used to cache that a key could not be resolved.
ResolveWithNegativeCacheCallback(ctx context.Context, key string, allowSelf bool, count int) (<-chan PeerInfo, func(), error)

// Advertise advertises the given keys to the network.
Advertise(ctx context.Context, keys []string) error
// Provide provides the given keys to the network.
// This lets the k-closest peers to the key know that we are providing it.
Provide(ctx context.Context, keys []string) error

// Close closes the router.
Close() error
Expand All @@ -30,5 +31,7 @@ type Router interface {
// PeerInfo describes a peer.
type PeerInfo struct {
peer.ID
Addr string

// HttpHost is the HTTP host of the peer.
HttpHost string
}
Loading

0 comments on commit 9fee397

Please sign in to comment.