Skip to content

Commit

Permalink
[preprocessor/folder] add file status and database related functions
Browse files Browse the repository at this point in the history
  • Loading branch information
williamchong committed May 27, 2024
1 parent c621246 commit a2c85c7
Show file tree
Hide file tree
Showing 4 changed files with 181 additions and 4 deletions.
95 changes: 95 additions & 0 deletions preprocessor/folder/database.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package preprocessor_folder

import (
"time"

"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgxpool"
db "github.com/starlinglab/integrity-v2/database"
)

func initFileStatusTableIfNotExists(connPool *pgxpool.Pool) error {
_, err := connPool.Exec(
db.GetDatabaseContext(),
FILE_STATUS_TABLE,
)
if err != nil {
return err
}
return nil
}

func initDbTableIfNotExists(connPool *pgxpool.Pool) error {
err := initFileStatusTableIfNotExists(connPool)
return err
}

type FileQueryResult struct {
Status *string
Cid *string
ErrorMessage *string
}

func queryIfFileExists(connPool *pgxpool.Pool, filePath string) (*FileQueryResult, error) {
var result FileQueryResult
err := connPool.QueryRow(
db.GetDatabaseContext(),
"SELECT status, cid, error FROM file_status WHERE file_path = $1;",
filePath,
).Scan(&result.Status, &result.Cid, &result.ErrorMessage)
if err != nil {
if err == pgx.ErrNoRows {
return nil, nil
}
return nil, err
}
return &result, nil
}

func setFileStatusFound(connPool *pgxpool.Pool, filePath string) error {
_, err := connPool.Exec(
db.GetDatabaseContext(),
"INSERT INTO file_status (file_path, status, created_at, updated_at) VALUES ($1, $2, $3, $4);",
filePath,
FileStatusFound,
time.Now().UTC(),
time.Now().UTC(),
)
return err
}

func setFileStatusUploading(connPool *pgxpool.Pool, filePath string, sha256 string) error {
_, err := connPool.Exec(
db.GetDatabaseContext(),
"UPDATE file_status SET status = $1, sha256 = $2, updated_at = $3 WHERE file_path = $4;",
FileStatusUploading,
sha256,
time.Now().UTC(),
filePath,
)
return err
}

func setFileStatusDone(connPool *pgxpool.Pool, filePath string, cid string) error {
_, err := connPool.Exec(
db.GetDatabaseContext(),
"UPDATE file_status SET status = $1, cid = $2, updated_at = $3 WHERE file_path = $4;",
FileStatusSuccess,
cid,
time.Now().UTC(),
filePath,
)
return err
}

func setFileStatusError(connPool *pgxpool.Pool, filePath string, errorMessage string) error {
_, err := connPool.Exec(
db.GetDatabaseContext(),
"UPDATE file_status SET status = $1, error = $2, updated_at = $3 WHERE file_path = $4;",
FileStatusError,
errorMessage,
time.Now().UTC(),
filePath,
)
return err
}
16 changes: 16 additions & 0 deletions preprocessor/folder/database_schema.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package preprocessor_folder

var FILE_STATUS_TABLE = `CREATE TABLE IF NOT EXISTS file_status (
id BIGSERIAL PRIMARY KEY,
file_path TEXT UNIQUE NOT NULL,
sha256 TEXT,
status TEXT NOT NULL,
error TEXT,
cid TEXT,
created_at TIMESTAMP NOT NULL,
updated_at TIMESTAMP NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_file_status_file_path ON file_status (file_path);
CREATE INDEX IF NOT EXISTS idx_file_status_sha256 ON file_status (sha256);
CREATE INDEX IF NOT EXISTS idx_file_status_status ON file_status (status);
`
59 changes: 57 additions & 2 deletions preprocessor/folder/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,18 @@ import (
"path/filepath"
"slices"

"github.com/jackc/pgx/v5/pgxpool"
"github.com/starlinglab/integrity-v2/config"
"lukechampine.com/blake3"
)

var (
FileStatusFound = "Found"
FileStatusUploading = "Uploading"
FileStatusSuccess = "Success"
FileStatusError = "Error"
)

func getFileMetadata(filePath string) (map[string]any, error) {
file, err := os.Open(filePath)
if err != nil {
Expand Down Expand Up @@ -52,15 +60,62 @@ func getFileMetadata(filePath string) (map[string]any, error) {
}, nil
}

func handleNewFile(filePath string) (string, error) {
func handleNewFile(pgPool *pgxpool.Pool, filePath string) (string, error) {
result, err := queryIfFileExists(pgPool, filePath)
if err != nil {
return "", fmt.Errorf("error checking if file exists in database: %v", err)
}
status, errorMessage, cid := "", "", ""
if result != nil {
if result.Status != nil {
status = *result.Status
}
if result.ErrorMessage != nil {
errorMessage = *result.ErrorMessage
}
if result.Cid != nil {
cid = *result.Cid
}
}
switch status {
case FileStatusFound:
return "", fmt.Errorf("file %s is already found", filePath)
case FileStatusUploading:
return "", fmt.Errorf("file %s is already uploading", filePath)
case FileStatusSuccess:
return cid, nil
case FileStatusError:
return "", fmt.Errorf("file %s has error: %s", filePath, errorMessage)
default:
err = setFileStatusFound(pgPool, filePath)
if err != nil {
return "", fmt.Errorf("error setting file status to found: %v", err)
}
}
metadata, err := getFileMetadata(filePath)
if err != nil {
e := setFileStatusError(pgPool, filePath, err.Error())
if e != nil {
fmt.Println("error setting file status to error:", e)
}
return "", fmt.Errorf("error getting metadata for file %s: %v", filePath, err)
}
cid, err := postFileMetadataToWebHook(filePath, metadata)
err = setFileStatusUploading(pgPool, filePath, metadata["sha256"].(string))
if err != nil {
return "", fmt.Errorf("error setting file status to uploading: %v", err)
}
cid, err = postFileMetadataToWebHook(filePath, metadata)
if err != nil {
e := setFileStatusError(pgPool, filePath, err.Error())
if e != nil {
fmt.Println("error setting file status to error:", e)
}
return "", fmt.Errorf("error posting metadata for file %s: %v", filePath, err)
}
err = setFileStatusDone(pgPool, filePath, cid)
if err != nil {
return "", fmt.Errorf("error setting file status to done: %v", err)
}
return cid, nil
}

Expand Down
15 changes: 13 additions & 2 deletions preprocessor/folder/folder.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"github.com/fsnotify/fsnotify"
"github.com/starlinglab/integrity-v2/config"
"github.com/starlinglab/integrity-v2/database"
)

func scanSyncDirectory(subPath string) ([]string, error) {
Expand All @@ -32,13 +33,23 @@ func scanSyncDirectory(subPath string) ([]string, error) {
}

func Run(args []string) error {
pgPool, err := database.GetDatabaseConnectionPool()
if err != nil {
return err
}
defer database.CloseDatabaseConnectionPool()
err = initDbTableIfNotExists(pgPool)
if err != nil {
return err
}

// Scan whole sync directory
fileList, err := scanSyncDirectory("")
if err != nil {
return err
}
for _, filePath := range fileList {
cid, err := handleNewFile(filePath)
cid, err := handleNewFile(pgPool, filePath)
if err != nil {
fmt.Println(err)
} else {
Expand Down Expand Up @@ -74,7 +85,7 @@ func Run(args []string) error {
continue
}
if checkShouldIncludeFile(fileInfo) {
cid, err := handleNewFile(filePath)
cid, err := handleNewFile(pgPool, filePath)
if err != nil {
fmt.Println(err)
} else {
Expand Down

0 comments on commit a2c85c7

Please sign in to comment.