Skip to content

Commit

Permalink
[preprocessor/folder] add project based folder watcher
Browse files Browse the repository at this point in the history
  • Loading branch information
williamchong committed May 27, 2024
1 parent 7827af0 commit 91ae53e
Show file tree
Hide file tree
Showing 2 changed files with 102 additions and 55 deletions.
31 changes: 28 additions & 3 deletions preprocessor/folder/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,12 @@ func getFileMetadata(filePath string) (map[string]any, error) {
}, nil
}

func handleNewFile(pgPool *pgxpool.Pool, filePath string) (string, error) {
func handleNewFile(pgPool *pgxpool.Pool, filePath string, project *ProjectQueryResult) (string, error) {
result, err := queryIfFileExists(pgPool, filePath)
if err != nil {
return "", fmt.Errorf("error checking if file exists in database: %v", err)
}

status, errorMessage, cid := "", "", ""
if result != nil {
if result.Status != nil {
Expand All @@ -77,22 +78,25 @@ func handleNewFile(pgPool *pgxpool.Pool, filePath string) (string, error) {
cid = *result.Cid
}
}

switch status {
case FileStatusFound:
return "", fmt.Errorf("file %s is already found", filePath)
fmt.Println("retrying found file:", filePath)
case FileStatusUploading:
return "", fmt.Errorf("file %s is already uploading", filePath)
fmt.Println("retrying uploading file:", filePath)
case FileStatusSuccess:
return cid, nil
case FileStatusError:
return "", fmt.Errorf("file %s has error: %s", filePath, errorMessage)
default:
// noop
}

err = setFileStatusFound(pgPool, filePath)
if err != nil {
return "", fmt.Errorf("error setting file status to found: %v", err)
}

metadata, err := getFileMetadata(filePath)
if err != nil {
e := setFileStatusError(pgPool, filePath, err.Error())
Expand All @@ -101,10 +105,30 @@ func handleNewFile(pgPool *pgxpool.Pool, filePath string) (string, error) {
}
return "", fmt.Errorf("error getting metadata for file %s: %v", filePath, err)
}

if project != nil {
metadata["project_id"] = *project.ProjectId
metadata["project_path"] = *project.ProjectPath
if project.AuthorType != nil || project.AuthorName != nil || project.AuthorIdentifier != nil {
author := map[string]string{}
if project.AuthorType != nil {
author["@type"] = *project.AuthorType
}
if project.AuthorName != nil {
author["name"] = *project.AuthorName
}
if project.AuthorIdentifier != nil {
author["identifier"] = *project.AuthorIdentifier
}
metadata["author"] = author
}
}

err = setFileStatusUploading(pgPool, filePath, metadata["sha256"].(string))
if err != nil {
return "", fmt.Errorf("error setting file status to uploading: %v", err)
}

cid, err = postFileMetadataToWebHook(filePath, metadata)
if err != nil {
e := setFileStatusError(pgPool, filePath, err.Error())
Expand All @@ -113,6 +137,7 @@ func handleNewFile(pgPool *pgxpool.Pool, filePath string) (string, error) {
}
return "", fmt.Errorf("error posting metadata for file %s: %v", filePath, err)
}

err = setFileStatusDone(pgPool, filePath, cid)
if err != nil {
return "", fmt.Errorf("error setting file status to done: %v", err)
Expand Down
126 changes: 74 additions & 52 deletions preprocessor/folder/folder.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,29 +7,71 @@ import (
"path/filepath"

"github.com/fsnotify/fsnotify"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/starlinglab/integrity-v2/config"
"github.com/starlinglab/integrity-v2/database"
)

func scanSyncDirectory(subPath string) ([]string, error) {
func scanSyncDirectory(subPath string) (fileList []string, dirList []string, err error) {
scanRoot := config.GetConfig().FolderPreprocessor.SyncFolderRoot
if scanRoot == "" {
scanRoot = "."
}
scanPath := filepath.Join(scanRoot, subPath)
fileList := []string{}
err := filepath.Walk(scanPath, func(path string, info fs.FileInfo, err error) error {
fmt.Println("Scanning: " + scanPath)
err = filepath.Walk(scanPath, func(path string, info fs.FileInfo, err error) error {
if err != nil {
return err
}
if checkShouldIncludeFile(info) {
if info.IsDir() {
dirList = append(dirList, path)
} else if checkShouldIncludeFile(info) {
fileList = append(fileList, path)
fmt.Println("Found: " + path)
return nil
}
return nil
})
return fileList, err
return fileList, dirList, err
}

func watchLoop(w *fsnotify.Watcher, pgPool *pgxpool.Pool, dirPathToProject map[string]ProjectQueryResult) {
for {
select {
case event, ok := <-w.Events:
if !ok {
return
}
if event.Has(fsnotify.Create) || event.Has(fsnotify.Rename) {
filePath := event.Name
file, err := os.Open(filePath)
if err != nil {
// File may be moved away for fsnotify.Rename
continue
}
defer file.Close()
fileInfo, err := file.Stat()
if err != nil {
fmt.Println("error getting file info:", err)
continue
}
if checkShouldIncludeFile(fileInfo) {
project := dirPathToProject[filepath.Dir(filePath)]
cid, err := handleNewFile(pgPool, filePath, &project)
if err != nil {
fmt.Println(err)
} else {
fmt.Printf("File %s uploaded to webhook with CID %s\n", filePath, cid)
}
}
}
case err, ok := <-w.Errors:
if !ok {
return
}
fmt.Println("error:", err)
}
}
}

func Run(args []string) error {
Expand All @@ -43,17 +85,31 @@ func Run(args []string) error {
return err
}

// Scan whole sync directory
fileList, err := scanSyncDirectory("")
projects, err := queryAllProjects(pgPool)
if err != nil {
return err
}
for _, filePath := range fileList {
cid, err := handleNewFile(pgPool, filePath)

dirPathToProject := map[string]ProjectQueryResult{}
var dirPaths []string

for _, project := range projects {
projectPath := *project.ProjectPath
fileList, dirList, err := scanSyncDirectory(projectPath)
if err != nil {
fmt.Println(err)
} else {
fmt.Printf("File %s uploaded to webhook with CID %s\n", filePath, cid)
}
for _, filePath := range fileList {
cid, err := handleNewFile(pgPool, filePath, &project)
if err != nil {
fmt.Println(err)
} else {
fmt.Printf("File %s uploaded to webhook with CID %s\n", filePath, cid)
}
}
for _, dirPath := range dirList {
dirPaths = append(dirPaths, dirPath)
dirPathToProject[dirPath] = project
}
}

Expand All @@ -64,48 +120,14 @@ func Run(args []string) error {
}
defer watcher.Close()

go func() {
for {
select {
case event, ok := <-watcher.Events:
if !ok {
return
}
if event.Has(fsnotify.Create) || event.Has(fsnotify.Rename) {
filePath := event.Name
file, err := os.Open(filePath)
if err != nil {
// File may be moved away for fsnotify.Rename
continue
}
defer file.Close()
fileInfo, err := file.Stat()
if err != nil {
fmt.Println("error getting file info:", err)
continue
}
if checkShouldIncludeFile(fileInfo) {
cid, err := handleNewFile(pgPool, filePath)
if err != nil {
fmt.Println(err)
} else {
fmt.Printf("File %s uploaded to webhook with CID %s\n", filePath, cid)
}
}
}
case err, ok := <-watcher.Errors:
if !ok {
return
}
fmt.Println("error:", err)
}
}
}()
go watchLoop(watcher, pgPool, dirPathToProject)

scanRoot := config.GetConfig().FolderPreprocessor.SyncFolderRoot
err = watcher.Add(scanRoot)
if err != nil {
return err
for _, dirPath := range dirPaths {
err = watcher.Add(dirPath)
if err != nil {
return err
}
fmt.Println("Watching folder changes: " + dirPath)
}

// Block main goroutine forever.
Expand Down

0 comments on commit 91ae53e

Please sign in to comment.