Skip to content

Commit

Permalink
pkg/cache: improve logging (#48)
Browse files Browse the repository at this point in the history
All un-exported functions downstream from narinfo/nar-related exported
function should receive a logger and the logger should include context
about hash/compression.
  • Loading branch information
kalbasit authored Dec 6, 2024
1 parent 8cf50a7 commit 0da51b3
Showing 1 changed file with 63 additions and 51 deletions.
114 changes: 63 additions & 51 deletions pkg/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,12 +130,12 @@ func (c *Cache) PublicKey() signature.PublicKey { return c.secretKey.ToPublicKey
// stored and finally returned.
// NOTE: It's the caller responsibility to close the body.
func (c *Cache) GetNar(hash, compression string) (int64, io.ReadCloser, error) {
if c.hasNarInStore(hash, compression) {
return c.getNarFromStore(hash, compression)
}

log := c.logger.New("hash", hash, "compression", compression)

if c.hasNarInStore(log, hash, compression) {
return c.getNarFromStore(log, hash, compression)
}

errC := make(chan error)

c.mu.Lock()
Expand All @@ -159,11 +159,11 @@ func (c *Cache) GetNar(hash, compression string) (int64, io.ReadCloser, error) {
case <-doneC:
}

if !c.hasNarInStore(hash, compression) {
if !c.hasNarInStore(log, hash, compression) {
return 0, nil, ErrNotFound
}

return c.getNarFromStore(hash, compression)
return c.getNarFromStore(log, hash, compression)
}

// PutNar records the NAR (given as an io.Reader) into the store.
Expand All @@ -175,22 +175,26 @@ func (c *Cache) PutNar(_ context.Context, hash, compression string, r io.ReadClo
r.Close()
}()

_, err := c.putNarInStore(hash, compression, r)
log := c.logger.New("hash", hash, "compression", compression)

_, err := c.putNarInStore(log, hash, compression, r)

return err
}

// DeleteNar deletes the nar from the store.
func (c *Cache) DeleteNar(_ context.Context, hash, compression string) error {
return c.deleteNarFromStore(hash, compression)
log := c.logger.New("hash", hash, "compression", compression)

return c.deleteNarFromStore(log, hash, compression)
}

func (c *Cache) pullNar(log log15.Logger, hash, compression string, doneC chan struct{}, errC chan error) {
now := time.Now()

log.Info("downloading the nar from upstream")

size, r, err := c.getNarFromUpstream(hash, compression)
size, r, err := c.getNarFromUpstream(log, hash, compression)
if err != nil {
c.mu.Lock()
delete(c.upstreamJobs, hash)
Expand All @@ -203,7 +207,7 @@ func (c *Cache) pullNar(log log15.Logger, hash, compression string, doneC chan s

defer r.Close()

written, err := c.putNarInStore(hash, compression, r)
written, err := c.putNarInStore(log, hash, compression, r)
if err != nil {
c.mu.Lock()
delete(c.upstreamJobs, hash)
Expand All @@ -227,12 +231,12 @@ func (c *Cache) pullNar(log log15.Logger, hash, compression string, doneC chan s
close(doneC)
}

func (c *Cache) hasNarInStore(hash, compression string) bool {
return c.hasInStore(helper.NarPath(hash, compression))
func (c *Cache) hasNarInStore(log log15.Logger, hash, compression string) bool {
return c.hasInStore(log, helper.NarPath(hash, compression))
}

func (c *Cache) getNarFromStore(hash, compression string) (int64, io.ReadCloser, error) {
size, r, err := c.getFromStore(helper.NarPath(hash, compression))
func (c *Cache) getNarFromStore(log log15.Logger, hash, compression string) (int64, io.ReadCloser, error) {
size, r, err := c.getFromStore(log, helper.NarPath(hash, compression))
if err != nil {
return 0, nil, fmt.Errorf("error fetching the narinfo from the store: %w", err)
}
Expand All @@ -245,7 +249,7 @@ func (c *Cache) getNarFromStore(hash, compression string) (int64, io.ReadCloser,
defer func() {
if err := tx.Rollback(); err != nil {
if !errors.Is(err, sql.ErrTxDone) {
c.logger.Error("error rolling back the transaction", "error", err)
log.Error("error rolling back the transaction", "error", err)
}
}
}()
Expand Down Expand Up @@ -273,7 +277,7 @@ func (c *Cache) getNarFromStore(hash, compression string) (int64, io.ReadCloser,
return size, r, nil
}

func (c *Cache) getNarFromUpstream(hash, compression string) (int64, io.ReadCloser, error) {
func (c *Cache) getNarFromUpstream(log log15.Logger, hash, compression string) (int64, io.ReadCloser, error) {
// create a new context not associated with any request because we don't want
// pulling from upstream to be associated with a user request.
ctx := context.Background()
Expand All @@ -282,7 +286,7 @@ func (c *Cache) getNarFromUpstream(hash, compression string) (int64, io.ReadClos
size, nar, err := uc.GetNar(ctx, hash, compression)
if err != nil {
if !errors.Is(err, upstream.ErrNotFound) {
c.logger.Error("error fetching the narInfo from upstream", "hostname", uc.GetHostname(), "error", err)
log.Error("error fetching the narInfo from upstream", "hostname", uc.GetHostname(), "error", err)
}

continue
Expand All @@ -294,7 +298,7 @@ func (c *Cache) getNarFromUpstream(hash, compression string) (int64, io.ReadClos
return 0, nil, ErrNotFound
}

func (c *Cache) putNarInStore(hash, compression string, r io.ReadCloser) (int64, error) {
func (c *Cache) putNarInStore(_ log15.Logger, hash, compression string, r io.ReadCloser) (int64, error) {
pattern := hash + "-*.nar"
if compression != "" {
pattern += "." + compression
Expand Down Expand Up @@ -326,8 +330,8 @@ func (c *Cache) putNarInStore(hash, compression string, r io.ReadCloser) (int64,
return written, nil
}

func (c *Cache) deleteNarFromStore(hash, compression string) error {
if !c.hasNarInStore(hash, compression) {
func (c *Cache) deleteNarFromStore(log log15.Logger, hash, compression string) error {
if !c.hasNarInStore(log, hash, compression) {
return ErrNotFound
}

Expand All @@ -338,27 +342,29 @@ func (c *Cache) deleteNarFromStore(hash, compression string) error {
// is not found in the store, it's pulled from an upstream, stored in the
// stored and finally returned.
func (c *Cache) GetNarInfo(hash string) (*narinfo.NarInfo, error) {
if c.hasNarInfoInStore(hash) {
return c.getNarInfoFromStore(hash)
log := log15.New("hash", hash)

if c.hasNarInfoInStore(log, hash) {
return c.getNarInfoFromStore(log, hash)
}

narInfo, err := c.getNarInfoFromUpstream(hash)
narInfo, err := c.getNarInfoFromUpstream(log, hash)
if err != nil {
return nil, fmt.Errorf("error getting the narInfo from upstream caches: %w", err)
}

// start a job to also pull the nar
go c.prePullNar(narInfo.URL)
go c.prePullNar(log, narInfo.URL)

if err := c.signNarInfo(narInfo); err != nil {
if err := c.signNarInfo(log, narInfo); err != nil {
return nil, fmt.Errorf("error signing the narinfo: %w", err)
}

if err := c.putNarInfoInStore(hash, narInfo); err != nil {
if err := c.putNarInfoInStore(log, hash, narInfo); err != nil {
return nil, fmt.Errorf("error storing the narInfo in the store: %w", err)
}

return narInfo, c.storeInDatabase(hash, narInfo)
return narInfo, c.storeInDatabase(log, hash, narInfo)
}

// PutNarInfo records the narInfo (given as an io.Reader) into the store and signs it.
Expand All @@ -370,48 +376,54 @@ func (c *Cache) PutNarInfo(_ context.Context, hash string, r io.ReadCloser) erro
r.Close()
}()

log := log15.New("hash", hash)

narInfo, err := narinfo.Parse(r)
if err != nil {
return fmt.Errorf("error parsing narinfo: %w", err)
}

if err := c.signNarInfo(narInfo); err != nil {
if err := c.signNarInfo(log, narInfo); err != nil {
return fmt.Errorf("error signing the narinfo: %w", err)
}

if err := c.putNarInfoInStore(hash, narInfo); err != nil {
if err := c.putNarInfoInStore(log, hash, narInfo); err != nil {
return fmt.Errorf("error storing the narInfo in the store: %w", err)
}

return c.storeInDatabase(hash, narInfo)
return c.storeInDatabase(log, hash, narInfo)
}

// DeleteNarInfo deletes the narInfo from the store.
func (c *Cache) DeleteNarInfo(_ context.Context, hash string) error {
return c.deleteNarInfoFromStore(hash)
log := c.logger.New("hash", hash)

return c.deleteNarInfoFromStore(log, hash)
}

func (c *Cache) prePullNar(url string) {
func (c *Cache) prePullNar(log log15.Logger, url string) {
hash, compression, err := helper.ParseNarURL(url)
if err != nil {
c.logger.Error("error parsing the nar URL %q: %w", url, err)
c.logger.Error("error parsing the nar URL", "url", url, "error", err)

return
}

c.logger.Info("pre-caching NAR ahead of time", "URL", url, "hash", hash, "compression", compression)
log = log.New("hash", hash, "compression", compression)

log.Info("pre-caching NAR ahead of time", "URL", url)

_, nar, err := c.GetNar(hash, compression)
if err != nil {
c.logger.Error("error fetching the NAR: %w", err)
log.Error("error fetching the NAR", "error", err)

return
}

nar.Close()
}

func (c *Cache) signNarInfo(narInfo *narinfo.NarInfo) error {
func (c *Cache) signNarInfo(_ log15.Logger, narInfo *narinfo.NarInfo) error {
sig, err := c.secretKey.Sign(nil, narInfo.Fingerprint())
if err != nil {
return fmt.Errorf("error signing the fingerprint: %w", err)
Expand All @@ -422,12 +434,12 @@ func (c *Cache) signNarInfo(narInfo *narinfo.NarInfo) error {
return nil
}

func (c *Cache) hasNarInfoInStore(hash string) bool {
return c.hasInStore(helper.NarInfoPath(hash))
func (c *Cache) hasNarInfoInStore(log log15.Logger, hash string) bool {
return c.hasInStore(log, helper.NarInfoPath(hash))
}

func (c *Cache) getNarInfoFromStore(hash string) (*narinfo.NarInfo, error) {
_, r, err := c.getFromStore(helper.NarInfoPath(hash))
func (c *Cache) getNarInfoFromStore(log log15.Logger, hash string) (*narinfo.NarInfo, error) {
_, r, err := c.getFromStore(log, helper.NarInfoPath(hash))
if err != nil {
return nil, fmt.Errorf("error fetching the narinfo from the store: %w", err)
}
Expand All @@ -447,7 +459,7 @@ func (c *Cache) getNarInfoFromStore(hash string) (*narinfo.NarInfo, error) {
defer func() {
if err := tx.Rollback(); err != nil {
if !errors.Is(err, sql.ErrTxDone) {
c.logger.Error("error rolling back the transaction", "error", err)
log.Error("error rolling back the transaction", "error", err)
}
}
}()
Expand Down Expand Up @@ -475,7 +487,7 @@ func (c *Cache) getNarInfoFromStore(hash string) (*narinfo.NarInfo, error) {
return ni, nil
}

func (c *Cache) getNarInfoFromUpstream(hash string) (*narinfo.NarInfo, error) {
func (c *Cache) getNarInfoFromUpstream(log log15.Logger, hash string) (*narinfo.NarInfo, error) {
// create a new context not associated with any request because we don't want
// pulling from upstream to be associated with a user request.
ctx := context.Background()
Expand All @@ -484,7 +496,7 @@ func (c *Cache) getNarInfoFromUpstream(hash string) (*narinfo.NarInfo, error) {
narInfo, err := uc.GetNarInfo(ctx, hash)
if err != nil {
if !errors.Is(err, upstream.ErrNotFound) {
c.logger.Error("error fetching the narInfo from upstream", "hostname", uc.GetHostname(), "error", err)
log.Error("error fetching the narInfo from upstream", "hash", hash, "hostname", uc.GetHostname(), "error", err)
}

continue
Expand All @@ -496,7 +508,7 @@ func (c *Cache) getNarInfoFromUpstream(hash string) (*narinfo.NarInfo, error) {
return nil, ErrNotFound
}

func (c *Cache) putNarInfoInStore(hash string, narInfo *narinfo.NarInfo) error {
func (c *Cache) putNarInfoInStore(_ log15.Logger, hash string, narInfo *narinfo.NarInfo) error {
f, err := os.CreateTemp(c.storeTMPPath(), hash+"-*.narinfo")
if err != nil {
return fmt.Errorf("error creating the temporary directory: %w", err)
Expand All @@ -522,8 +534,8 @@ func (c *Cache) putNarInfoInStore(hash string, narInfo *narinfo.NarInfo) error {
return nil
}

func (c *Cache) storeInDatabase(hash string, narInfo *narinfo.NarInfo) error {
log := c.logger.New("hash", hash, "nar-url", narInfo.URL)
func (c *Cache) storeInDatabase(log log15.Logger, hash string, narInfo *narinfo.NarInfo) error {
log = log.New("nar-url", narInfo.URL)

log.Info("storing narinfo and nar record in the database")

Expand All @@ -535,7 +547,7 @@ func (c *Cache) storeInDatabase(hash string, narInfo *narinfo.NarInfo) error {
defer func() {
if err := tx.Rollback(); err != nil {
if !errors.Is(err, sql.ErrTxDone) {
c.logger.Error("error rolling back the transaction", "error", err)
log.Error("error rolling back the transaction", "error", err)
}
}
}()
Expand Down Expand Up @@ -578,23 +590,23 @@ func (c *Cache) storeInDatabase(hash string, narInfo *narinfo.NarInfo) error {
return nil
}

func (c *Cache) deleteNarInfoFromStore(hash string) error {
if !c.hasNarInfoInStore(hash) {
func (c *Cache) deleteNarInfoFromStore(log log15.Logger, hash string) error {
if !c.hasNarInfoInStore(log, hash) {
return ErrNotFound
}

return os.Remove(filepath.Join(c.storePath(), helper.NarInfoPath(hash)))
}

func (c *Cache) hasInStore(key string) bool {
func (c *Cache) hasInStore(_ log15.Logger, key string) bool {
_, err := os.Stat(filepath.Join(c.storePath(), key))

return err == nil
}

// GetFile returns the file define by its key
// NOTE: It's the caller responsibility to close the file after using it.
func (c *Cache) getFromStore(key string) (int64, io.ReadCloser, error) {
func (c *Cache) getFromStore(_ log15.Logger, key string) (int64, io.ReadCloser, error) {
p := filepath.Join(c.storePath(), key)

f, err := os.Open(p)
Expand Down

0 comments on commit 0da51b3

Please sign in to comment.