diff --git a/preprocessor/folder/file.go b/preprocessor/folder/file.go index ca8d81f..2d1c0ce 100644 --- a/preprocessor/folder/file.go +++ b/preprocessor/folder/file.go @@ -60,11 +60,12 @@ func getFileMetadata(filePath string) (map[string]any, error) { }, nil } -func handleNewFile(pgPool *pgxpool.Pool, filePath string) (string, error) { +func handleNewFile(pgPool *pgxpool.Pool, filePath string, project *ProjectQueryResult) (string, error) { result, err := queryIfFileExists(pgPool, filePath) if err != nil { return "", fmt.Errorf("error checking if file exists in database: %v", err) } + status, errorMessage, cid := "", "", "" if result != nil { if result.Status != nil { @@ -77,11 +78,12 @@ func handleNewFile(pgPool *pgxpool.Pool, filePath string) (string, error) { cid = *result.Cid } } + switch status { case FileStatusFound: - return "", fmt.Errorf("file %s is already found", filePath) + fmt.Println("retrying found file:", filePath) case FileStatusUploading: - return "", fmt.Errorf("file %s is already uploading", filePath) + fmt.Println("retrying uploading file:", filePath) case FileStatusSuccess: return cid, nil case FileStatusError: @@ -89,10 +91,12 @@ func handleNewFile(pgPool *pgxpool.Pool, filePath string) (string, error) { default: // noop } + err = setFileStatusFound(pgPool, filePath) if err != nil { return "", fmt.Errorf("error setting file status to found: %v", err) } + metadata, err := getFileMetadata(filePath) if err != nil { e := setFileStatusError(pgPool, filePath, err.Error()) @@ -101,10 +105,30 @@ func handleNewFile(pgPool *pgxpool.Pool, filePath string) (string, error) { } return "", fmt.Errorf("error getting metadata for file %s: %v", filePath, err) } + + if project != nil { + metadata["project_id"] = *project.ProjectId + metadata["project_path"] = *project.ProjectPath + if project.AuthorType != nil || project.AuthorName != nil || project.AuthorIdentifier != nil { + author := map[string]string{} + if project.AuthorType != nil { + author["@type"] = *project.AuthorType + } + if project.AuthorName != nil { + author["name"] = *project.AuthorName + } + if project.AuthorIdentifier != nil { + author["identifier"] = *project.AuthorIdentifier + } + metadata["author"] = author + } + } + err = setFileStatusUploading(pgPool, filePath, metadata["sha256"].(string)) if err != nil { return "", fmt.Errorf("error setting file status to uploading: %v", err) } + cid, err = postFileMetadataToWebHook(filePath, metadata) if err != nil { e := setFileStatusError(pgPool, filePath, err.Error()) @@ -113,6 +137,7 @@ func handleNewFile(pgPool *pgxpool.Pool, filePath string) (string, error) { } return "", fmt.Errorf("error posting metadata for file %s: %v", filePath, err) } + err = setFileStatusDone(pgPool, filePath, cid) if err != nil { return "", fmt.Errorf("error setting file status to done: %v", err) diff --git a/preprocessor/folder/folder.go b/preprocessor/folder/folder.go index 1587133..1181a2a 100644 --- a/preprocessor/folder/folder.go +++ b/preprocessor/folder/folder.go @@ -7,29 +7,71 @@ import ( "path/filepath" "github.com/fsnotify/fsnotify" + "github.com/jackc/pgx/v5/pgxpool" "github.com/starlinglab/integrity-v2/config" "github.com/starlinglab/integrity-v2/database" ) -func scanSyncDirectory(subPath string) ([]string, error) { +func scanSyncDirectory(subPath string) (fileList []string, dirList []string, err error) { scanRoot := config.GetConfig().FolderPreprocessor.SyncFolderRoot if scanRoot == "" { scanRoot = "." } scanPath := filepath.Join(scanRoot, subPath) - fileList := []string{} - err := filepath.Walk(scanPath, func(path string, info fs.FileInfo, err error) error { + fmt.Println("Scanning: " + scanPath) + err = filepath.Walk(scanPath, func(path string, info fs.FileInfo, err error) error { if err != nil { return err } - if checkShouldIncludeFile(info) { + if info.IsDir() { + dirList = append(dirList, path) + } else if checkShouldIncludeFile(info) { fileList = append(fileList, path) fmt.Println("Found: " + path) return nil } return nil }) - return fileList, err + return fileList, dirList, err +} + +func watchLoop(w *fsnotify.Watcher, pgPool *pgxpool.Pool, dirPathToProject map[string]ProjectQueryResult) { + for { + select { + case event, ok := <-w.Events: + if !ok { + return + } + if event.Has(fsnotify.Create) || event.Has(fsnotify.Rename) { + filePath := event.Name + file, err := os.Open(filePath) + if err != nil { + // File may be moved away for fsnotify.Rename + continue + } + defer file.Close() + fileInfo, err := file.Stat() + if err != nil { + fmt.Println("error getting file info:", err) + continue + } + if checkShouldIncludeFile(fileInfo) { + project := dirPathToProject[filepath.Dir(filePath)] + cid, err := handleNewFile(pgPool, filePath, &project) + if err != nil { + fmt.Println(err) + } else { + fmt.Printf("File %s uploaded to webhook with CID %s\n", filePath, cid) + } + } + } + case err, ok := <-w.Errors: + if !ok { + return + } + fmt.Println("error:", err) + } + } } func Run(args []string) error { @@ -43,17 +85,31 @@ func Run(args []string) error { return err } - // Scan whole sync directory - fileList, err := scanSyncDirectory("") + projects, err := queryAllProjects(pgPool) if err != nil { return err } - for _, filePath := range fileList { - cid, err := handleNewFile(pgPool, filePath) + + dirPathToProject := map[string]ProjectQueryResult{} + var dirPaths []string + + for _, project := range projects { + projectPath := *project.ProjectPath + fileList, dirList, err := scanSyncDirectory(projectPath) if err != nil { fmt.Println(err) - } else { - fmt.Printf("File %s uploaded to webhook with CID %s\n", filePath, cid) + } + for _, filePath := range fileList { + cid, err := handleNewFile(pgPool, filePath, &project) + if err != nil { + fmt.Println(err) + } else { + fmt.Printf("File %s uploaded to webhook with CID %s\n", filePath, cid) + } + } + for _, dirPath := range dirList { + dirPaths = append(dirPaths, dirPath) + dirPathToProject[dirPath] = project } } @@ -64,48 +120,14 @@ func Run(args []string) error { } defer watcher.Close() - go func() { - for { - select { - case event, ok := <-watcher.Events: - if !ok { - return - } - if event.Has(fsnotify.Create) || event.Has(fsnotify.Rename) { - filePath := event.Name - file, err := os.Open(filePath) - if err != nil { - // File may be moved away for fsnotify.Rename - continue - } - defer file.Close() - fileInfo, err := file.Stat() - if err != nil { - fmt.Println("error getting file info:", err) - continue - } - if checkShouldIncludeFile(fileInfo) { - cid, err := handleNewFile(pgPool, filePath) - if err != nil { - fmt.Println(err) - } else { - fmt.Printf("File %s uploaded to webhook with CID %s\n", filePath, cid) - } - } - } - case err, ok := <-watcher.Errors: - if !ok { - return - } - fmt.Println("error:", err) - } - } - }() + go watchLoop(watcher, pgPool, dirPathToProject) - scanRoot := config.GetConfig().FolderPreprocessor.SyncFolderRoot - err = watcher.Add(scanRoot) - if err != nil { - return err + for _, dirPath := range dirPaths { + err = watcher.Add(dirPath) + if err != nil { + return err + } + fmt.Println("Watching folder changes: " + dirPath) } // Block main goroutine forever.