Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion api/pkg/controller/knowledge/knowledge_extract.go
Original file line number Diff line number Diff line change
Expand Up @@ -388,7 +388,15 @@ func (r *Reconciler) getFilestoreFiles(ctx context.Context, fs filestore.FileSto
Str("knowledge_id", k.ID).
Str("file", item.Path).
Interface("metadata", metadata).
Msgf("Added metadata to file")
Msg("Added metadata to file")
} else if metadataErr != nil && !errors.Is(metadataErr, filestore.ErrNotFound) {
// Only log unexpected errors, not file not found errors
log.Debug().
Err(metadataErr).
Str("knowledge_id", k.ID).
Str("file", item.Path).
Str("metadata_file", metadataFilePath).
Msg("Metadata file not available")
}

result = append(result, indexerItem)
Expand Down
30 changes: 22 additions & 8 deletions api/pkg/controller/knowledge/knowledge_indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"gopkg.in/yaml.v3"

"github.com/helixml/helix/api/pkg/dataprep/text"
"github.com/helixml/helix/api/pkg/filestore"
"github.com/helixml/helix/api/pkg/rag"
"github.com/helixml/helix/api/pkg/store"
"github.com/helixml/helix/api/pkg/system"
Expand Down Expand Up @@ -505,12 +506,20 @@ func (r *Reconciler) convertTextSplitterChunks(ctx context.Context, k *types.Kno
var err error
metadata, err = r.getMetadataFromFilestore(ctx, metadataFilePath)
if err != nil {
// Log but continue - metadata is optional
log.Warn().
Err(err).
Str("knowledge_id", k.ID).
Str("metadata_file", metadataFilePath).
Msg("Failed to load metadata file")
// Only log as a warning for unexpected errors, not for "not found" errors
if !errors.Is(err, filestore.ErrNotFound) {
log.Warn().
Err(err).
Str("knowledge_id", k.ID).
Str("metadata_file", metadataFilePath).
Msg("Failed to load metadata file due to unexpected error")
} else {
// Not found is an expected case - log at debug level
log.Debug().
Str("knowledge_id", k.ID).
Str("metadata_file", metadataFilePath).
Msg("No metadata file found")
}
}

// Cache the result, even if nil
Expand Down Expand Up @@ -567,8 +576,13 @@ func (r *Reconciler) getMetadataFromFilestore(ctx context.Context, metadataFileP
// Check if the metadata file exists
_, err := r.filestore.Get(ctx, metadataFilePath)
if err != nil {
// If the file doesn't exist, just return nil with no error
return nil, nil
// Check specifically if this is a not found error
if errors.Is(err, filestore.ErrNotFound) {
log.Debug().Str("path", metadataFilePath).Msg("Metadata file not found")
return nil, fmt.Errorf("metadata file not found: %w", err)
}
// For other types of errors, return the error as is
return nil, fmt.Errorf("error checking metadata file: %w", err)
}

log.Info().Str("path", metadataFilePath).Msg("Metadata file exists, opening...")
Expand Down
4 changes: 4 additions & 0 deletions api/pkg/filestore/filestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,14 @@ package filestore

import (
"context"
"errors"
"io"
"path/filepath"
)

// ErrNotFound is returned when a file or directory doesn't exist
var ErrNotFound = errors.New("file not found")

type Item struct {
// timestamp
Created int64 `json:"created"`
Expand Down
11 changes: 10 additions & 1 deletion api/pkg/filestore/fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,10 @@ func (s *FileSystemStorage) List(_ context.Context, prefix string) ([]Item, erro

files, err := os.ReadDir(fullPath)
if err != nil {
return []Item{}, nil
if os.IsNotExist(err) {
return nil, fmt.Errorf("%w: %s", ErrNotFound, prefix)
}
return nil, fmt.Errorf("error reading directory: %w", err)
}

items := []Item{}
Expand Down Expand Up @@ -73,6 +76,9 @@ func (s *FileSystemStorage) Get(_ context.Context, path string) (Item, error) {

info, err := os.Stat(fullPath)
if err != nil {
if os.IsNotExist(err) {
return Item{}, fmt.Errorf("%w: %s", ErrNotFound, path)
}
return Item{}, fmt.Errorf("error fetching file info: %w", err)
}
return Item{
Expand Down Expand Up @@ -125,6 +131,9 @@ func (s *FileSystemStorage) OpenFile(_ context.Context, path string) (io.ReadClo

file, err := os.Open(fullPath)
if err != nil {
if os.IsNotExist(err) {
return nil, fmt.Errorf("%w: %s", ErrNotFound, path)
}
return nil, fmt.Errorf("error opening file: %w", err)
}

Expand Down
29 changes: 27 additions & 2 deletions api/pkg/filestore/gcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"archive/tar"
"bytes"
"context"
"errors"
"fmt"
"io"
"net/http"
Expand Down Expand Up @@ -34,6 +35,15 @@ func NewGCSStorage(ctx context.Context, bucketName, serviceAccountKeyFile string
}

func (s *GCSStorage) List(ctx context.Context, prefix string) ([]Item, error) {
// First check if the prefix exists as an object
if prefix != "" {
_, err := s.Get(ctx, prefix)
if err != nil && !errors.Is(err, ErrNotFound) {
// If there's an error other than not found, return it
return nil, err
}
}

it := s.bucket.Objects(ctx, &storage.Query{Prefix: prefix})
items := []Item{}

Expand All @@ -43,7 +53,7 @@ func (s *GCSStorage) List(ctx context.Context, prefix string) ([]Item, error) {
break
}
if err != nil {
return []Item{}, nil
return nil, fmt.Errorf("error listing GCS objects: %w", err)
}

item := Item{
Expand All @@ -57,12 +67,22 @@ func (s *GCSStorage) List(ctx context.Context, prefix string) ([]Item, error) {
items = append(items, item)
}

// If we didn't find any items and the prefix is not empty,
// it means the directory doesn't exist
if len(items) == 0 && prefix != "" {
return nil, fmt.Errorf("%w: %s", ErrNotFound, prefix)
}

return items, nil
}

func (s *GCSStorage) Get(ctx context.Context, path string) (Item, error) {
attrs, err := s.bucket.Object(path).Attrs(ctx)
if err != nil {
// Check for 404 not found responses from GCS
if errors.Is(err, storage.ErrObjectNotExist) {
return Item{}, fmt.Errorf("%w: %s", ErrNotFound, path)
}
return Item{}, fmt.Errorf("error fetching GCS object attributes: %w", err)
}

Expand Down Expand Up @@ -110,6 +130,9 @@ func (s *GCSStorage) OpenFile(ctx context.Context, path string) (io.ReadCloser,
obj := s.bucket.Object(path)
reader, err := obj.NewReader(ctx)
if err != nil {
if errors.Is(err, storage.ErrObjectNotExist) {
return nil, fmt.Errorf("%w: %s", ErrNotFound, path)
}
return nil, fmt.Errorf("failed to create GCS object reader: %w", err)
}
return reader, nil
Expand Down Expand Up @@ -293,7 +316,9 @@ func (s *GCSStorage) CopyFile(ctx context.Context, fromPath string, toPath strin
// Check if the fromPath exists
_, err := s.Get(ctx, fromPath)
if err != nil {
return fmt.Errorf("failed to get source file: %w", err)
// If the source file doesn't exist, return the error directly
// It will already be wrapped with ErrNotFound if that's the case
return err
}

// Create the folder for the toPath if it doesn't exist
Expand Down