Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ GOLINT = golangci-lint run
# Source repository variables.
ROOT_DIR := $(shell git rev-parse --show-toplevel)
BIN_DIR = $(ROOT_DIR)/bin
TEST_PKGS = $(shell go list ./... | grep -v 'github.com/azure/peerd/api\|github.com/azure/peerd/pkg/mocks') # Exclude generated and mock code.
TEST_PKGS = $(shell go list ./... | grep -v 'github.com/azure/peerd/api\|github.com/azure/peerd/pkg/discovery/routing/mocks') # Exclude generated and mock code.
TESTS_BIN_DIR = $(BIN_DIR)/tests
COVERAGE_DIR=$(BIN_DIR)/coverage
SCRIPTS_DIR=$(ROOT_DIR)/build/ci/scripts
Expand Down
14 changes: 7 additions & 7 deletions cmd/proxy/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@ import (
"time"

"github.com/alexflint/go-arg"
p2pcontext "github.com/azure/peerd/internal/context"
"github.com/azure/peerd/internal/files/store"
"github.com/azure/peerd/internal/handlers"
"github.com/azure/peerd/pkg/containerd"
"github.com/azure/peerd/pkg/discovery"
pcontext "github.com/azure/peerd/pkg/context"
"github.com/azure/peerd/pkg/discovery/content/provider"
"github.com/azure/peerd/pkg/discovery/routing"
"github.com/azure/peerd/pkg/files/store"
"github.com/azure/peerd/pkg/k8s"
"github.com/azure/peerd/pkg/k8s/events"
"github.com/azure/peerd/pkg/metrics"
Expand All @@ -43,10 +43,10 @@ func main() {
zerolog.SetGlobalLevel(ll)
zerolog.TimeFieldFormat = time.RFC3339Nano

l := zerolog.New(os.Stdout).With().Timestamp().Str("self", p2pcontext.NodeName).Str("version", version).Logger()
l := zerolog.New(os.Stdout).With().Timestamp().Str("self", pcontext.NodeName).Str("version", version).Logger()
ctx := l.WithContext(context.Background())

ctx, err = metrics.WithContext(ctx, p2pcontext.NodeName, "peerd")
ctx, err = metrics.WithContext(ctx, pcontext.NodeName, "peerd")
if err != nil {
l.Error().Err(err).Msg("failed to initialize metrics")
os.Exit(1)
Expand Down Expand Up @@ -86,7 +86,7 @@ func serverCommand(ctx context.Context, args *ServerCmd) (err error) {
return err
}

clientset, err := k8s.NewKubernetesInterface(p2pcontext.KubeConfigPath, p2pcontext.NodeName)
clientset, err := k8s.NewKubernetesInterface(pcontext.KubeConfigPath, pcontext.NodeName)
if err != nil {
return err
}
Expand Down Expand Up @@ -157,7 +157,7 @@ func serverCommand(ctx context.Context, args *ServerCmd) (err error) {
g, ctx := errgroup.WithContext(ctx)

g.Go(func() error {
discovery.Provide(ctx, r, containerdStore, filesStore.Subscribe())
provider.Provide(ctx, r, containerdStore, filesStore.Subscribe())
return nil
})

Expand Down
25 changes: 11 additions & 14 deletions internal/handlers/files/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,9 @@ import (
"os"
"time"

p2pcontext "github.com/azure/peerd/internal/context"
"github.com/azure/peerd/internal/files/store"
pcontext "github.com/azure/peerd/pkg/context"
"github.com/azure/peerd/pkg/files/store"
"github.com/azure/peerd/pkg/metrics"
"github.com/gin-gonic/gin"
)

// FilesHandler describes a handler for files.
Expand All @@ -20,11 +19,9 @@ type FilesHandler struct {
metricsRecorder metrics.Metrics
}

var _ gin.HandlerFunc = (&FilesHandler{}).Handle

// Handle handles a request for a file.
func (h *FilesHandler) Handle(c *gin.Context) {
log := p2pcontext.Logger(c).With().Str("blob", p2pcontext.BlobUrl(c)).Bool("p2p", p2pcontext.IsRequestFromAPeer(c)).Logger()
func (h *FilesHandler) Handle(c pcontext.Context) {
log := pcontext.Logger(c).With().Str("blob", pcontext.BlobUrl(c)).Bool("p2p", pcontext.IsRequestFromAPeer(c)).Logger()
log.Debug().Msg("files handler start")
s := time.Now()
defer func() {
Expand Down Expand Up @@ -56,25 +53,25 @@ func (h *FilesHandler) Handle(c *gin.Context) {

w.Header().Set("Content-Type", "application/octet-stream")
w.Header().Del("Content-Length")
w.Header().Set(p2pcontext.NodeHeaderKey, p2pcontext.NodeName)
w.Header().Set(p2pcontext.CorrelationHeaderKey, c.GetString(p2pcontext.CorrelationIdCtxKey))
w.Header().Set(pcontext.NodeHeaderKey, pcontext.NodeName)
w.Header().Set(pcontext.CorrelationHeaderKey, c.GetString(pcontext.CorrelationIdCtxKey))

http.ServeContent(w, c.Request, "file", time.Now(), f)
}

// fill fills the context with handler specific information.
func (h *FilesHandler) fill(c *gin.Context) error {
func (h *FilesHandler) fill(c pcontext.Context) error {
c.Set("handler", "files")

key, d, err := h.store.Key(c)
if err != nil {
return err
}

c.Set(p2pcontext.DigestCtxKey, d.String())
c.Set(p2pcontext.FileChunkCtxKey, key)
c.Set(p2pcontext.BlobUrlCtxKey, p2pcontext.BlobUrl(c))
c.Set(p2pcontext.BlobRangeCtxKey, c.Request.Header.Get("Range"))
c.Set(pcontext.DigestCtxKey, d.String())
c.Set(pcontext.FileChunkCtxKey, key)
c.Set(pcontext.BlobUrlCtxKey, pcontext.BlobUrl(c))
c.Set(pcontext.BlobRangeCtxKey, c.Request.Header.Get("Range"))

return nil
}
Expand Down
46 changes: 26 additions & 20 deletions internal/handlers/files/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@ import (
"net/http/httptest"
"testing"

p2pcontext "github.com/azure/peerd/internal/context"
"github.com/azure/peerd/internal/files"
"github.com/azure/peerd/internal/files/store"
"github.com/azure/peerd/pkg/discovery/routing/tests"
pcontext "github.com/azure/peerd/pkg/context"
"github.com/azure/peerd/pkg/discovery/routing/mocks"
"github.com/azure/peerd/pkg/files"
"github.com/azure/peerd/pkg/files/store"
"github.com/azure/peerd/pkg/metrics"
"github.com/gin-gonic/gin"
)
Expand All @@ -34,7 +34,7 @@ func TestPartialContentResponseInP2PMode(t *testing.T) {
}
expRange := fmt.Sprintf("bytes=%v-%v", 12, 100)
req.Header.Set("Range", expRange)
req.Header.Set(p2pcontext.P2PHeaderKey, "true")
req.Header.Set(pcontext.P2PHeaderKey, "true")

expD := "sha256:d18c7a64c5158179bdee531a663c5b487de57ff17cff3af29a51c7e70b491d9d"

Expand All @@ -47,7 +47,7 @@ func TestPartialContentResponseInP2PMode(t *testing.T) {
}

store.PrefetchWorkers = 0 // turn off prefetching
s, err := store.NewMockStore(ctxWithMetrics, tests.NewMockRouter(make(map[string][]string)))
s, err := store.NewMockStore(ctxWithMetrics, mocks.NewMockRouter(make(map[string][]string)))
if err != nil {
t.Fatal(err)
}
Expand All @@ -63,11 +63,13 @@ func TestPartialContentResponseInP2PMode(t *testing.T) {
return []byte(content), nil
})

h.Handle(ctx)
pctx := pcontext.FromContext(ctx)

h.Handle(pctx)
resp := recorder.Result()

if resp.StatusCode != http.StatusPartialContent {
t.Errorf("expected %v, got %v", http.StatusOK, ctx.Writer.Status())
t.Errorf("expected %v, got %v", http.StatusOK, pctx.Writer.Status())
}

ret, err := io.ReadAll(resp.Body)
Expand All @@ -87,7 +89,7 @@ func TestNotFoundInP2PMode(t *testing.T) {
}
expRange := fmt.Sprintf("bytes=%v-%v", files.CacheBlockSize, files.CacheBlockSize+172)
req.Header.Set("Range", expRange)
req.Header.Set(p2pcontext.P2PHeaderKey, "true")
req.Header.Set(pcontext.P2PHeaderKey, "true")

// Create a new context with the request.
ctx, _ := gin.CreateTestContext(httptest.NewRecorder())
Expand All @@ -97,14 +99,16 @@ func TestNotFoundInP2PMode(t *testing.T) {
}

store.PrefetchWorkers = 0 // turn off prefetching
s, err := store.NewFilesStore(ctxWithMetrics, tests.NewMockRouter(make(map[string][]string)))
s, err := store.NewFilesStore(ctxWithMetrics, mocks.NewMockRouter(make(map[string][]string)))
if err != nil {
t.Fatal(err)
}

h := New(ctxWithMetrics, s)

h.Handle(ctx)
pmc := pcontext.FromContext(ctx)

h.Handle(pmc)
if ctx.Writer.Status() != http.StatusNotFound {
t.Errorf("expected %v, got %v", http.StatusNotFound, ctx.Writer.Status())
}
Expand All @@ -118,7 +122,7 @@ func TestFill(t *testing.T) {
}
expRange := fmt.Sprintf("bytes=%v-%v", files.CacheBlockSize, files.CacheBlockSize+172)
req.Header.Set("Range", expRange)
req.Header.Set(p2pcontext.P2PHeaderKey, "true")
req.Header.Set(pcontext.P2PHeaderKey, "true")

expD := "sha256:d18c7a64c5158179bdee531a663c5b487de57ff17cff3af29a51c7e70b491d9d"
expK := fmt.Sprintf("%v_%v", expD, files.CacheBlockSize)
Expand All @@ -131,26 +135,28 @@ func TestFill(t *testing.T) {
}

store.PrefetchWorkers = 0 // turn off prefetching
s, err := store.NewFilesStore(ctxWithMetrics, tests.NewMockRouter(make(map[string][]string)))
s, err := store.NewFilesStore(ctxWithMetrics, mocks.NewMockRouter(make(map[string][]string)))
if err != nil {
t.Fatal(err)
}

h := New(ctxWithMetrics, s)

err = h.fill(ctx)
pmc := pcontext.FromContext(ctx)

err = h.fill(pmc)
if err != nil {
t.Fatal(err)
}
if ctx.GetString(p2pcontext.FileChunkCtxKey) != expK {
t.Errorf("expected %v, got %v", expK, ctx.GetString(p2pcontext.FileChunkCtxKey))
if ctx.GetString(pcontext.FileChunkCtxKey) != expK {
t.Errorf("expected %v, got %v", expK, ctx.GetString(pcontext.FileChunkCtxKey))
}

if ctx.GetString(p2pcontext.BlobRangeCtxKey) != expRange {
t.Errorf("expected %v, got %v", expRange, ctx.GetString(p2pcontext.BlobRangeCtxKey))
if ctx.GetString(pcontext.BlobRangeCtxKey) != expRange {
t.Errorf("expected %v, got %v", expRange, ctx.GetString(pcontext.BlobRangeCtxKey))
}

if ctx.GetString(p2pcontext.BlobUrlCtxKey) != hostAndPath+query {
t.Errorf("expected %v, got %v", hostAndPath+query, ctx.GetString(p2pcontext.BlobUrlCtxKey))
if ctx.GetString(pcontext.BlobUrlCtxKey) != hostAndPath+query {
t.Errorf("expected %v, got %v", hostAndPath+query, ctx.GetString(pcontext.BlobUrlCtxKey))
}
}
17 changes: 10 additions & 7 deletions internal/handlers/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@ import (
"net/http"
"time"

p2pcontext "github.com/azure/peerd/internal/context"
filesStore "github.com/azure/peerd/internal/files/store"
"github.com/azure/peerd/internal/handlers/files"
v2 "github.com/azure/peerd/internal/handlers/v2"
"github.com/azure/peerd/pkg/containerd"
pcontext "github.com/azure/peerd/pkg/context"
"github.com/azure/peerd/pkg/discovery/routing"
filesStore "github.com/azure/peerd/pkg/files/store"
"github.com/gin-gonic/gin"
"github.com/rs/zerolog"
)
Expand Down Expand Up @@ -44,10 +44,13 @@ func newEngine(ctx context.Context) *gin.Engine {
baseLog := zerolog.Ctx(ctx)

engine.Use(func(c *gin.Context) {
p2pcontext.FillCorrelationId(c)
c.Set(p2pcontext.LoggerCtxKey, baseLog)

l := p2pcontext.Logger(c)
pc := pcontext.FromContext(c)

pcontext.FillCorrelationId(pc)
c.Set(pcontext.LoggerCtxKey, baseLog)

l := pcontext.Logger(pc)
l.Debug().Msg("request start")
s := time.Now()

Expand Down Expand Up @@ -94,7 +97,7 @@ func registerRoutes(engine *gin.Engine, f, v gin.HandlerFunc) {
// @Failure 404 {string} string "Not Found"
// @Router /blobs/{url} [get]
func fileHandler(c *gin.Context) {
fh.Handle(c)
fh.Handle(pcontext.FromContext(c))
}

// v2Handler is a handler function for the /v2 API
Expand All @@ -107,5 +110,5 @@ func fileHandler(c *gin.Context) {
// @Router /v2/{repo}/manifests/{reference} [get]
// @Router /v2/{repo}/blobs/{digest} [get]
func v2Handler(c *gin.Context) {
v2h.Handle(c)
v2h.Handle(pcontext.FromContext(c))
}
6 changes: 3 additions & 3 deletions internal/handlers/root_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ import (
"net/http/httptest"
"testing"

"github.com/azure/peerd/internal/files/store"
"github.com/azure/peerd/pkg/containerd"
"github.com/azure/peerd/pkg/discovery/routing/tests"
"github.com/azure/peerd/pkg/discovery/routing/mocks"
"github.com/azure/peerd/pkg/files/store"
"github.com/azure/peerd/pkg/metrics"
"github.com/gin-gonic/gin"
)
Expand Down Expand Up @@ -102,7 +102,7 @@ func TestNewEngine(t *testing.T) {
}

func TestHandler(t *testing.T) {
mr := tests.NewMockRouter(map[string][]string{})
mr := mocks.NewMockRouter(map[string][]string{})
ms := containerd.NewMockContainerdStore(nil)
mfs, err := store.NewMockStore(ctxWithMetrics, mr)
if err != nil {
Expand Down
23 changes: 10 additions & 13 deletions internal/handlers/v2/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,11 @@ import (
"path"
"time"

p2pcontext "github.com/azure/peerd/internal/context"
"github.com/azure/peerd/pkg/containerd"
pcontext "github.com/azure/peerd/pkg/context"
"github.com/azure/peerd/pkg/discovery/routing"
"github.com/azure/peerd/pkg/metrics"
"github.com/azure/peerd/pkg/oci/distribution"
"github.com/gin-gonic/gin"
)

// V2Handler describes a handler for OCI content.
Expand All @@ -23,17 +22,15 @@ type V2Handler struct {
metricsRecorder metrics.Metrics
}

var _ gin.HandlerFunc = (&V2Handler{}).Handle

// Handle handles a request for a file.
func (h *V2Handler) Handle(c *gin.Context) {
l := p2pcontext.Logger(c).With().Bool("p2p", p2pcontext.IsRequestFromAPeer(c)).Logger()
func (h *V2Handler) Handle(c pcontext.Context) {
l := pcontext.Logger(c).With().Bool("p2p", pcontext.IsRequestFromAPeer(c)).Logger()
l.Debug().Msg("v2 handler start")
s := time.Now()
defer func() {
dur := time.Since(s)
h.metricsRecorder.RecordRequest(c.Request.Method, "oci", dur.Seconds())
l.Debug().Dur("duration", dur).Str("ns", c.GetString(p2pcontext.NamespaceCtxKey)).Str("ref", c.GetString(p2pcontext.ReferenceCtxKey)).Str("digest", c.GetString(p2pcontext.DigestCtxKey)).Msg("v2 handler stop")
l.Debug().Dur("duration", dur).Str("ns", c.GetString(pcontext.NamespaceCtxKey)).Str("ref", c.GetString(pcontext.ReferenceCtxKey)).Str("digest", c.GetString(pcontext.DigestCtxKey)).Msg("v2 handler stop")
}()

p := path.Clean(c.Request.URL.Path)
Expand All @@ -54,7 +51,7 @@ func (h *V2Handler) Handle(c *gin.Context) {
return
}

if p2pcontext.IsRequestFromAPeer(c) {
if pcontext.IsRequestFromAPeer(c) {
h.registry.Handle(c)
return
} else {
Expand All @@ -64,24 +61,24 @@ func (h *V2Handler) Handle(c *gin.Context) {
}

// fill fills the context with handler specific information.
func (h *V2Handler) fill(c *gin.Context) error {
func (h *V2Handler) fill(c pcontext.Context) error {
c.Set("handler", "v2")

ns := c.Query("ns")
if ns == "" {
ns = "docker.io"
}

c.Set(p2pcontext.NamespaceCtxKey, ns)
c.Set(pcontext.NamespaceCtxKey, ns)

ref, dgst, refType, err := distribution.ParsePathComponents(ns, c.Request.URL.Path)
if err != nil {
return err
}

c.Set(p2pcontext.ReferenceCtxKey, ref)
c.Set(p2pcontext.DigestCtxKey, dgst.String())
c.Set(p2pcontext.RefTypeCtxKey, refType)
c.Set(pcontext.ReferenceCtxKey, ref)
c.Set(pcontext.DigestCtxKey, dgst.String())
c.Set(pcontext.RefTypeCtxKey, refType)

return nil
}
Expand Down
Loading