Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions internal/cache/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,11 @@ func FilterTransportHeaders(headers textproto.MIMEHeader) textproto.MIMEHeader {
type Cache interface {
// String describes the Cache implementation.
String() string
// Stat returns the headers of an existing object in the cache.
//
// Expired files SHOULD not be returned.
// Must return os.ErrNotExist if the file does not exist.
Stat(ctx context.Context, key Key) (textproto.MIMEHeader, error)
// Open an existing file in the cache.
//
// Expired files SHOULD not be returned.
Expand Down
20 changes: 20 additions & 0 deletions internal/cache/disk.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,26 @@ func (d *Disk) Delete(_ context.Context, key Key) error {
return nil
}

func (d *Disk) Stat(ctx context.Context, key Key) (textproto.MIMEHeader, error) {
path := d.keyToPath(key)
fullPath := filepath.Join(d.config.Root, path)

if _, err := os.Stat(fullPath); err != nil {
return nil, errors.Errorf("failed to stat file: %w", err)
}

expiresAt, headers, err := d.ttl.get(key)
if err != nil {
return nil, errors.Errorf("failed to get metadata: %w", err)
}

if time.Now().After(expiresAt) {
return nil, errors.Join(fs.ErrNotExist, d.Delete(ctx, key))
}

return headers, nil
}

func (d *Disk) Open(ctx context.Context, key Key) (io.ReadCloser, textproto.MIMEHeader, error) {
path := d.keyToPath(key)
fullPath := filepath.Join(d.config.Root, path)
Expand Down
16 changes: 16 additions & 0 deletions internal/cache/memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,22 @@ func NewMemory(ctx context.Context, config MemoryConfig) (*Memory, error) {

func (m *Memory) String() string { return fmt.Sprintf("memory:%dMB", m.config.LimitMB) }

func (m *Memory) Stat(_ context.Context, key Key) (textproto.MIMEHeader, error) {
m.mu.RLock()
defer m.mu.RUnlock()

entry, exists := m.entries[key]
if !exists {
return nil, os.ErrNotExist
}

if time.Now().After(entry.expiresAt) {
return nil, os.ErrNotExist
}

return entry.headers, nil
}

func (m *Memory) Open(_ context.Context, key Key) (io.ReadCloser, textproto.MIMEHeader, error) {
m.mu.RLock()
defer m.mu.RUnlock()
Expand Down
28 changes: 28 additions & 0 deletions internal/cache/remote.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,34 @@ func (c *Remote) Open(ctx context.Context, key Key) (io.ReadCloser, textproto.MI
return resp.Body, headers, nil
}

// Stat retrieves headers for an object from the remote.
func (c *Remote) Stat(ctx context.Context, key Key) (textproto.MIMEHeader, error) {
url := fmt.Sprintf("%s/%s", c.baseURL, key.String())
req, err := http.NewRequestWithContext(ctx, http.MethodHead, url, nil)
if err != nil {
return nil, errors.Wrap(err, "failed to create request")
}

resp, err := c.client.Do(req)
if err != nil {
return nil, errors.Wrap(err, "failed to execute request")
}
defer resp.Body.Close()

if resp.StatusCode == http.StatusNotFound {
return nil, os.ErrNotExist
}

if resp.StatusCode != http.StatusOK {
return nil, errors.Errorf("unexpected status code: %d", resp.StatusCode)
}

// Filter out HTTP transport headers
headers := FilterTransportHeaders(textproto.MIMEHeader(resp.Header))

return headers, nil
}

// Create stores a new object in the remote.
func (c *Remote) Create(ctx context.Context, key Key, headers textproto.MIMEHeader, ttl time.Duration) (io.WriteCloser, error) {
pr, pw := io.Pipe()
Expand Down
21 changes: 16 additions & 5 deletions internal/cache/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,17 +157,17 @@ func (s *S3) keyToPath(key Key) string {
return hexKey[:2] + "/" + hexKey
}

func (s *S3) Open(ctx context.Context, key Key) (io.ReadCloser, textproto.MIMEHeader, error) {
func (s *S3) Stat(ctx context.Context, key Key) (textproto.MIMEHeader, error) {
objectName := s.keyToPath(key)

// Get object info to check metadata
objInfo, err := s.client.StatObject(ctx, s.config.Bucket, objectName, minio.StatObjectOptions{})
if err != nil {
errResponse := minio.ToErrorResponse(err)
if errResponse.Code == "NoSuchKey" {
return nil, nil, os.ErrNotExist
return nil, os.ErrNotExist
}
return nil, nil, errors.Errorf("failed to stat object: %w", err)
return nil, errors.Errorf("failed to stat object: %w", err)
}

// Check if object has expired
Expand All @@ -178,7 +178,7 @@ func (s *S3) Open(ctx context.Context, key Key) (io.ReadCloser, textproto.MIMEHe
if err := expiresAt.UnmarshalText([]byte(expiresAtStr)); err == nil {
if time.Now().After(expiresAt) {
// Object expired, delete it and return not found
return nil, nil, errors.Join(os.ErrNotExist, s.Delete(ctx, key))
return nil, errors.Join(os.ErrNotExist, s.Delete(ctx, key))
}
}
}
Expand All @@ -188,10 +188,21 @@ func (s *S3) Open(ctx context.Context, key Key) (io.ReadCloser, textproto.MIMEHe
headers := make(textproto.MIMEHeader)
if headersJSON := objInfo.UserMetadata["Headers"]; headersJSON != "" {
if err := json.Unmarshal([]byte(headersJSON), &headers); err != nil {
return nil, nil, errors.Errorf("failed to unmarshal headers: %w", err)
return nil, errors.Errorf("failed to unmarshal headers: %w", err)
}
}

return headers, nil
}

func (s *S3) Open(ctx context.Context, key Key) (io.ReadCloser, textproto.MIMEHeader, error) {
headers, err := s.Stat(ctx, key)
if err != nil {
return nil, nil, errors.WithStack(err)
}

objectName := s.keyToPath(key)

// Get object
obj, err := s.client.GetObject(ctx, s.config.Bucket, objectName, minio.GetObjectOptions{})
if err != nil {
Expand Down
18 changes: 18 additions & 0 deletions internal/cache/tiered.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,24 @@ func (t Tiered) Delete(ctx context.Context, key Key) error {
return errors.Join(errs...)
}

// Stat returns headers from the first cache that succeeds.
//
// If all caches fail, all errors are returned.
func (t Tiered) Stat(ctx context.Context, key Key) (textproto.MIMEHeader, error) {
errs := make([]error, len(t.caches))
for i, c := range t.caches {
headers, err := c.Stat(ctx, key)
errs[i] = err
if errors.Is(err, os.ErrNotExist) {
continue
} else if err != nil {
return nil, errors.WithStack(err)
}
return headers, nil
}
return nil, errors.Join(errs...)
}

// Open returns a reader from the first cache that succeeds.
//
// If all caches fail, all errors are returned.
Expand Down
22 changes: 22 additions & 0 deletions internal/strategy/apiv1.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,35 @@ func NewAPIV1(ctx context.Context, _ jobscheduler.Scheduler, _ struct{}, cache c
cache: cache,
}
mux.Handle("GET /api/v1/object/{key}", http.HandlerFunc(s.getObject))
mux.Handle("HEAD /api/v1/object/{key}", http.HandlerFunc(s.statObject))
mux.Handle("POST /api/v1/object/{key}", http.HandlerFunc(s.putObject))
mux.Handle("DELETE /api/v1/object/{key}", http.HandlerFunc(s.deleteObject))
return s, nil
}

func (d *APIV1) String() string { return "default" }

func (d *APIV1) statObject(w http.ResponseWriter, r *http.Request) {
key, err := cache.ParseKey(r.PathValue("key"))
if err != nil {
d.httpError(w, http.StatusBadRequest, err, "Invalid key")
return
}

headers, err := d.cache.Stat(r.Context(), key)
if err != nil {
if errors.Is(err, os.ErrNotExist) {
d.httpError(w, http.StatusNotFound, err, "Cache object not found", slog.String("key", key.String()))
return
}
d.httpError(w, http.StatusInternalServerError, err, "Failed to open cache object", slog.String("key", key.String()))
return
}

maps.Copy(w.Header(), headers)
w.WriteHeader(http.StatusOK)
}

func (d *APIV1) getObject(w http.ResponseWriter, r *http.Request) {
key, err := cache.ParseKey(r.PathValue("key"))
if err != nil {
Expand Down