diff --git a/config/config.go b/config/config.go index dda6d3e..55c0a1f 100644 --- a/config/config.go +++ b/config/config.go @@ -38,6 +38,13 @@ type Config struct { PrivateKey string `toml:"private_key"` SignCert string `toml:"sign_cert"` } `toml:"c2pa"` + Database struct { + Host string `toml:"host"` + Port string `toml:"port"` + User string `toml:"user"` + Password string `toml:"password"` + Database string `toml:"database"` + } `toml:"database"` } var conf *Config diff --git a/database/database.go b/database/database.go new file mode 100644 index 0000000..d1333af --- /dev/null +++ b/database/database.go @@ -0,0 +1,45 @@ +package database + +import ( + "context" + "fmt" + "sync" + + "github.com/jackc/pgx/v5/pgxpool" + "github.com/starlinglab/integrity-v2/config" +) + +var ( + pgPool *pgxpool.Pool + pgOnce sync.Once +) + +// GetDatabaseContext returns a new context for database operations +func GetDatabaseContext() context.Context { + return context.Background() +} + +// GetDatabaseConnectionPool returns a thread safe connection pool singleton +func GetDatabaseConnectionPool() (*pgxpool.Pool, error) { + var pgErr error = nil + pgOnce.Do(func() { + connString := fmt.Sprintf("postgres://%s:%s@%s:%s/%s", + config.GetConfig().Database.User, + config.GetConfig().Database.Password, + config.GetConfig().Database.Host, + config.GetConfig().Database.Port, + config.GetConfig().Database.Database, + ) + db, err := pgxpool.New(GetDatabaseContext(), connString) + pgPool = db + pgErr = err + }) + return pgPool, pgErr +} + +// CloseDatabaseConnectionPool closes the database connection pool +func CloseDatabaseConnectionPool() { + if pgPool != nil { + pgPool.Close() + } +} diff --git a/example_config.toml b/example_config.toml index 766ef2f..cb09ab6 100644 --- a/example_config.toml +++ b/example_config.toml @@ -34,3 +34,10 @@ w3 = "/usr/bin/w3" # https://web3.storage/docs/w3cli/ [c2pa] private_key = "/path/to/c2pa/private.key" sign_cert = "/path/to/c2pa/cert.pem" + +[database] +user = "user" +password = "password" +host = "postgres" +port = "5432" +database = "database" diff --git a/go.mod b/go.mod index 2c18045..6b4cec8 100644 --- a/go.mod +++ b/go.mod @@ -7,13 +7,63 @@ require ( github.com/fxamacker/cbor/v2 v2.6.0 github.com/go-chi/chi/v5 v5.0.12 github.com/go-chi/jwtauth/v5 v5.3.1 + github.com/google/uuid v1.6.0 // indirect github.com/ipfs/boxo v0.20.0 + lukechampine.com/blake3 v1.3.0 +) + +require ( + github.com/crackcomm/go-gitignore v0.0.0-20231225121904-e25f5bc08668 // indirect + github.com/go-logr/logr v1.4.1 // indirect + github.com/go-logr/stdr v1.2.2 // indirect + github.com/gogo/protobuf v1.3.2 // indirect + github.com/hashicorp/golang-lru/v2 v2.0.7 // indirect + github.com/ipfs/bbloom v0.0.4 // indirect + github.com/ipfs/go-block-format v0.2.0 // indirect + github.com/ipfs/go-ipfs-util v0.0.3 // indirect + github.com/ipfs/go-ipld-format v0.6.0 // indirect + github.com/ipfs/go-ipld-legacy v0.2.1 // indirect + github.com/ipfs/go-log/v2 v2.5.1 // indirect + github.com/ipfs/go-metrics-interface v0.0.1 // indirect + github.com/ipld/go-codec-dagpb v1.6.0 // indirect + github.com/ipld/go-ipld-prime v0.21.0 // indirect + github.com/jackc/pgpassfile v1.0.0 // indirect + github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect + github.com/jackc/puddle/v2 v2.2.1 // indirect + github.com/jbenet/goprocess v0.1.4 // indirect + github.com/klauspost/cpuid/v2 v2.2.7 // indirect + github.com/libp2p/go-buffer-pool v0.1.0 // indirect + github.com/mattn/go-isatty v0.0.20 // indirect + github.com/minio/sha256-simd v1.0.1 // indirect + github.com/mr-tron/base58 v1.2.0 // indirect + github.com/multiformats/go-base32 v0.1.0 // indirect + github.com/multiformats/go-base36 v0.2.0 // indirect + github.com/multiformats/go-multibase v0.2.0 // indirect + github.com/multiformats/go-multihash v0.2.3 // indirect + github.com/multiformats/go-varint v0.0.7 // indirect + github.com/polydawn/refmt v0.89.0 // indirect + github.com/spaolacci/murmur3 v1.1.0 // indirect + github.com/whyrusleeping/chunker v0.0.0-20181014151217-fe64bd25879f // indirect + go.opentelemetry.io/otel v1.26.0 // indirect + go.opentelemetry.io/otel/metric v1.26.0 // indirect + go.opentelemetry.io/otel/trace v1.26.0 // indirect + go.uber.org/multierr v1.11.0 // indirect + go.uber.org/zap v1.27.0 // indirect + golang.org/x/crypto v0.23.0 // indirect + golang.org/x/sync v0.7.0 // indirect + golang.org/x/sys v0.20.0 // indirect + golang.org/x/text v0.15.0 // indirect + google.golang.org/protobuf v1.34.1 // indirect +) + +require ( + github.com/fsnotify/fsnotify v1.7.0 github.com/ipfs/go-cid v0.4.1 github.com/ipfs/go-datastore v0.6.0 + github.com/jackc/pgx/v5 v5.5.5 github.com/mitchellh/mapstructure v1.5.0 github.com/multiformats/go-multicodec v0.9.0 github.com/photon-storage/go-ipfs-car v0.0.0-20240530014616-17d95f03173f - lukechampine.com/blake3 v1.3.0 ) require ( @@ -27,7 +77,6 @@ require ( github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/containerd/cgroups v1.1.0 // indirect github.com/coreos/go-systemd/v22 v22.5.0 // indirect - github.com/crackcomm/go-gitignore v0.0.0-20231225121904-e25f5bc08668 // indirect github.com/cskr/pubsub v1.0.2 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/davidlazar/go-crypto v0.0.0-20200604182044-b73af7476f6c // indirect @@ -40,25 +89,18 @@ require ( github.com/flynn/noise v1.1.0 // indirect github.com/francoispqt/gojay v1.2.13 // indirect github.com/gabriel-vasile/mimetype v1.4.3 // indirect - github.com/go-logr/logr v1.4.1 // indirect - github.com/go-logr/stdr v1.2.2 // indirect github.com/go-task/slim-sprig/v3 v3.0.0 // indirect github.com/goccy/go-json v0.10.2 // indirect github.com/godbus/dbus/v5 v5.1.0 // indirect - github.com/gogo/protobuf v1.3.2 // indirect github.com/google/gopacket v1.1.19 // indirect github.com/google/pprof v0.0.0-20240509144519-723abb6459b7 // indirect - github.com/google/uuid v1.6.0 // indirect github.com/gorilla/websocket v1.5.1 // indirect github.com/grpc-ecosystem/grpc-gateway/v2 v2.20.0 // indirect github.com/hashicorp/errwrap v1.1.0 // indirect github.com/hashicorp/go-multierror v1.1.1 // indirect github.com/hashicorp/golang-lru v1.0.2 // indirect - github.com/hashicorp/golang-lru/v2 v2.0.7 // indirect github.com/huin/goupnp v1.3.0 // indirect - github.com/ipfs/bbloom v0.0.4 // indirect github.com/ipfs/go-bitfield v1.1.0 // indirect - github.com/ipfs/go-block-format v0.2.0 // indirect github.com/ipfs/go-blockservice v0.5.2 // indirect github.com/ipfs/go-cidutil v0.1.0 // indirect github.com/ipfs/go-ds-measure v0.2.0 // indirect @@ -69,28 +111,19 @@ require ( github.com/ipfs/go-ipfs-exchange-interface v0.2.1 // indirect github.com/ipfs/go-ipfs-pq v0.0.3 // indirect github.com/ipfs/go-ipfs-redirects-file v0.1.1 // indirect - github.com/ipfs/go-ipfs-util v0.0.3 // indirect github.com/ipfs/go-ipld-cbor v0.1.0 // indirect - github.com/ipfs/go-ipld-format v0.6.0 // indirect - github.com/ipfs/go-ipld-legacy v0.2.1 // indirect github.com/ipfs/go-log v1.0.5 // indirect - github.com/ipfs/go-log/v2 v2.5.1 // indirect github.com/ipfs/go-merkledag v0.11.0 // indirect - github.com/ipfs/go-metrics-interface v0.0.1 // indirect github.com/ipfs/go-peertaskqueue v0.8.1 // indirect github.com/ipfs/go-unixfsnode v1.9.0 // indirect github.com/ipfs/go-verifcid v0.0.3 // indirect github.com/ipfs/kubo v0.28.0 // indirect github.com/ipld/go-car v0.6.2 // indirect github.com/ipld/go-car/v2 v2.13.1 // indirect - github.com/ipld/go-codec-dagpb v1.6.0 // indirect - github.com/ipld/go-ipld-prime v0.21.0 // indirect github.com/ipld/go-ipld-prime/storage/bsadapter v0.0.0-20230628024246-65bfa53512f2 // indirect github.com/jackpal/go-nat-pmp v1.0.2 // indirect github.com/jbenet/go-temp-err-catcher v0.1.0 // indirect - github.com/jbenet/goprocess v0.1.4 // indirect github.com/klauspost/compress v1.17.8 // indirect - github.com/klauspost/cpuid/v2 v2.2.7 // indirect github.com/koron/go-ssdp v0.0.4 // indirect github.com/lestrrat-go/blackmagic v1.0.2 // indirect github.com/lestrrat-go/httpcc v1.0.1 // indirect @@ -98,7 +131,6 @@ require ( github.com/lestrrat-go/iter v1.0.2 // indirect github.com/lestrrat-go/jwx/v2 v2.0.20 // indirect github.com/lestrrat-go/option v1.0.1 // indirect - github.com/libp2p/go-buffer-pool v0.1.0 // indirect github.com/libp2p/go-cidranger v1.1.0 // indirect github.com/libp2p/go-doh-resolver v0.4.0 // indirect github.com/libp2p/go-flow-metrics v0.1.0 // indirect @@ -118,22 +150,14 @@ require ( github.com/libp2p/go-yamux/v4 v4.0.1 // indirect github.com/libp2p/zeroconf/v2 v2.2.0 // indirect github.com/marten-seemann/tcp v0.0.0-20210406111302-dfbc87cc63fd // indirect - github.com/mattn/go-isatty v0.0.20 // indirect github.com/miekg/dns v1.1.59 // indirect github.com/mikioh/tcpinfo v0.0.0-20190314235526-30a79bb1804b // indirect github.com/mikioh/tcpopt v0.0.0-20190314235656-172688c1accc // indirect - github.com/minio/sha256-simd v1.0.1 // indirect github.com/mitchellh/go-homedir v1.1.0 // indirect - github.com/mr-tron/base58 v1.2.0 // indirect - github.com/multiformats/go-base32 v0.1.0 // indirect - github.com/multiformats/go-base36 v0.2.0 // indirect github.com/multiformats/go-multiaddr v0.12.4 // indirect github.com/multiformats/go-multiaddr-dns v0.3.1 // indirect github.com/multiformats/go-multiaddr-fmt v0.1.0 // indirect - github.com/multiformats/go-multibase v0.2.0 // indirect - github.com/multiformats/go-multihash v0.2.3 // indirect github.com/multiformats/go-multistream v0.5.0 // indirect - github.com/multiformats/go-varint v0.0.7 // indirect github.com/onsi/ginkgo/v2 v2.17.3 // indirect github.com/opencontainers/runtime-spec v1.2.0 // indirect github.com/opentracing/opentracing-go v1.2.0 // indirect @@ -158,7 +182,6 @@ require ( github.com/pion/webrtc/v3 v3.2.40 // indirect github.com/pkg/errors v0.9.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect - github.com/polydawn/refmt v0.89.0 // indirect github.com/prometheus/client_golang v1.19.1 // indirect github.com/prometheus/client_model v0.6.1 // indirect github.com/prometheus/common v0.53.0 // indirect @@ -169,28 +192,17 @@ require ( github.com/raulk/go-watchdog v1.3.0 // indirect github.com/samber/lo v1.39.0 // indirect github.com/segmentio/asm v1.2.0 // indirect - github.com/spaolacci/murmur3 v1.1.0 // indirect github.com/stretchr/testify v1.9.0 // indirect github.com/ucarion/urlpath v0.0.0-20200424170820-7ccc79b76bbb // indirect github.com/whyrusleeping/base32 v0.0.0-20170828182744-c30ac30633cc // indirect github.com/whyrusleeping/cbor v0.0.0-20171005072247-63513f603b11 // indirect github.com/whyrusleeping/cbor-gen v0.1.1 // indirect - github.com/whyrusleeping/chunker v0.0.0-20181014151217-fe64bd25879f // indirect github.com/whyrusleeping/go-keyspace v0.0.0-20160322163242-5b898ac5add1 // indirect github.com/whyrusleeping/multiaddr-filter v0.0.0-20160516205228-e903e4adabd7 // indirect - go.opentelemetry.io/otel v1.26.0 // indirect - go.opentelemetry.io/otel/metric v1.26.0 // indirect - go.opentelemetry.io/otel/trace v1.26.0 // indirect - go.uber.org/multierr v1.11.0 // indirect - go.uber.org/zap v1.27.0 // indirect - golang.org/x/crypto v0.23.0 // indirect golang.org/x/net v0.25.0 // indirect - golang.org/x/sys v0.20.0 // indirect - google.golang.org/protobuf v1.34.1 // indirect ) require ( - github.com/fsnotify/fsnotify v1.7.0 github.com/x448/float16 v0.8.4 // indirect go.opencensus.io v0.24.0 // indirect go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.51.0 // indirect @@ -208,8 +220,6 @@ require ( go4.org v0.0.0-20230225012048-214862532bf5 // indirect golang.org/x/exp v0.0.0-20240506185415-9bf2ced13842 // indirect golang.org/x/mod v0.17.0 // indirect - golang.org/x/sync v0.7.0 // indirect - golang.org/x/text v0.15.0 // indirect golang.org/x/tools v0.21.0 // indirect golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028 // indirect gonum.org/v1/gonum v0.15.0 // indirect diff --git a/go.sum b/go.sum index bd78b04..352fb79 100644 --- a/go.sum +++ b/go.sum @@ -364,6 +364,14 @@ github.com/ipld/go-ipld-prime v0.21.0 h1:n4JmcpOlPDIxBcY037SVfpd1G+Sj1nKZah0m6QH github.com/ipld/go-ipld-prime v0.21.0/go.mod h1:3RLqy//ERg/y5oShXXdx5YIp50cFGOanyMctpPjsvxQ= github.com/ipld/go-ipld-prime/storage/bsadapter v0.0.0-20230628024246-65bfa53512f2 h1:p4WuvmJI4kz4OkT6aC+FK3nzweyT8S8mg8dLOQoUENs= github.com/ipld/go-ipld-prime/storage/bsadapter v0.0.0-20230628024246-65bfa53512f2/go.mod h1:+1hLvc7foaeTxwYLaZ1AoAbyzbKnOXST8+lh1MiLOAc= +github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM= +github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg= +github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a h1:bbPeKD0xmW/Y25WS6cokEszi5g+S0QxI/d45PkRi7Nk= +github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM= +github.com/jackc/pgx/v5 v5.5.5 h1:amBjrZVmksIdNjxGW/IiIMzxMKZFelXbUoPNb+8sjQw= +github.com/jackc/pgx/v5 v5.5.5/go.mod h1:ez9gk+OAat140fv9ErkZDYFWmXLfV+++K0uAOiwgm1A= +github.com/jackc/puddle/v2 v2.2.1 h1:RhxXJtFG022u4ibrCSMSiu5aOq1i77R3OHKNJj77OAk= +github.com/jackc/puddle/v2 v2.2.1/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4= github.com/jackpal/go-nat-pmp v1.0.2 h1:KzKSgb7qkJvOUTqYl9/Hg/me3pWgBmERKrTGD7BdWus= github.com/jackpal/go-nat-pmp v1.0.2/go.mod h1:QPH045xvCAeXUZOxsnwmrtiCoxIr9eob+4orBN1SBKc= github.com/jbenet/go-cienv v0.1.0 h1:Vc/s0QbQtoxX8MwwSLWWh+xNNZvM3Lw7NsTcHrvvhMc= diff --git a/preprocessor/folder/database.go b/preprocessor/folder/database.go new file mode 100644 index 0000000..eec0b96 --- /dev/null +++ b/preprocessor/folder/database.go @@ -0,0 +1,153 @@ +package preprocessor_folder + +import ( + "time" + + "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5/pgxpool" + db "github.com/starlinglab/integrity-v2/database" +) + +// initFileStatusTableIfNotExists creates the file_status table if it does not exist +func initFileStatusTableIfNotExists(connPool *pgxpool.Pool) error { + _, err := connPool.Exec( + db.GetDatabaseContext(), + FILE_STATUS_TABLE, + ) + if err != nil { + return err + } + return nil +} + +// initFileStatusTableIfNotExists creates the project_metadata table if it does not exist +func initProjectDataTableIfNotExists(connPool *pgxpool.Pool) error { + _, err := connPool.Exec( + db.GetDatabaseContext(), + PROJECT_METADATA_TABLE, + ) + if err != nil { + return err + } + return nil +} + +// initDbTableIfNotExists initializes the database tables if they do not exist +func initDbTableIfNotExists(connPool *pgxpool.Pool) error { + err := initFileStatusTableIfNotExists(connPool) + if err != nil { + return err + } + err = initProjectDataTableIfNotExists(connPool) + if err != nil { + return err + } + return nil +} + +// ProjectQueryResult represents the result of a project metadata query +type ProjectQueryResult struct { + ProjectId *string + ProjectPath *string + AuthorType *string + AuthorName *string + AuthorIdentifier *string +} + +// queryAllProjects queries all project metadata from the database +func queryAllProjects(connPool *pgxpool.Pool) ([]ProjectQueryResult, error) { + var result []ProjectQueryResult + rows, err := connPool.Query( + db.GetDatabaseContext(), + "SELECT project_id, project_path, author_type, author_name, author_identifier FROM project_metadata;", + ) + if err != nil { + return nil, err + } + defer rows.Close() + for rows.Next() { + var row ProjectQueryResult + err := rows.Scan(&row.ProjectId, &row.ProjectPath, &row.AuthorType, &row.AuthorName, &row.AuthorIdentifier) + if err != nil { + return nil, err + } + result = append(result, row) + } + return result, nil +} + +// FileQueryResult represents the result of a file query +type FileQueryResult struct { + Status *string + Cid *string + ErrorMessage *string +} + +// queryIfFileExists checks if a file is already found in the database +func queryIfFileExists(connPool *pgxpool.Pool, filePath string) (*FileQueryResult, error) { + var result FileQueryResult + err := connPool.QueryRow( + db.GetDatabaseContext(), + "SELECT status, cid, error FROM file_status WHERE file_path = $1;", + filePath, + ).Scan(&result.Status, &result.Cid, &result.ErrorMessage) + if err != nil { + if err == pgx.ErrNoRows { + return nil, nil + } + return nil, err + } + return &result, nil +} + +// setFileStatusFound add a file to database with status found +func setFileStatusFound(connPool *pgxpool.Pool, filePath string) error { + _, err := connPool.Exec( + db.GetDatabaseContext(), + "INSERT INTO file_status (file_path, status, created_at, updated_at) VALUES ($1, $2, $3, $4);", + filePath, + FileStatusFound, + time.Now().UTC(), + time.Now().UTC(), + ) + return err +} + +// setFileStatusUploading sets the status of a file to uploading +func setFileStatusUploading(connPool *pgxpool.Pool, filePath string, sha256 string) error { + _, err := connPool.Exec( + db.GetDatabaseContext(), + "UPDATE file_status SET status = $1, sha256 = $2, updated_at = $3 WHERE file_path = $4;", + FileStatusUploading, + sha256, + time.Now().UTC(), + filePath, + ) + return err +} + +// setFileStatusDone sets the status of a file to done with cid +func setFileStatusDone(connPool *pgxpool.Pool, filePath string, cid string) error { + _, err := connPool.Exec( + db.GetDatabaseContext(), + "UPDATE file_status SET status = $1, cid = $2, updated_at = $3 WHERE file_path = $4;", + FileStatusSuccess, + cid, + time.Now().UTC(), + filePath, + ) + return err +} + +// setFileStatusError sets the status of a file to error with the error message +func setFileStatusError(connPool *pgxpool.Pool, filePath string, errorMessage string) error { + _, err := connPool.Exec( + db.GetDatabaseContext(), + "UPDATE file_status SET status = $1, error = $2, updated_at = $3 WHERE file_path = $4;", + FileStatusError, + errorMessage, + time.Now().UTC(), + filePath, + ) + return err +} diff --git a/preprocessor/folder/database_schema.go b/preprocessor/folder/database_schema.go new file mode 100644 index 0000000..53a7a74 --- /dev/null +++ b/preprocessor/folder/database_schema.go @@ -0,0 +1,29 @@ +package preprocessor_folder + +// schema for file_status table +var FILE_STATUS_TABLE = `CREATE TABLE IF NOT EXISTS file_status ( + id BIGSERIAL PRIMARY KEY, + file_path TEXT UNIQUE NOT NULL, + sha256 TEXT, + status TEXT NOT NULL, + error TEXT, + cid TEXT, + created_at TIMESTAMP NOT NULL, + updated_at TIMESTAMP NOT NULL + ); + CREATE INDEX IF NOT EXISTS idx_file_status_file_path ON file_status (file_path); + CREATE INDEX IF NOT EXISTS idx_file_status_sha256 ON file_status (sha256); + CREATE INDEX IF NOT EXISTS idx_file_status_status ON file_status (status); +` + +// schema for project_metadata table +var PROJECT_METADATA_TABLE = `CREATE TABLE IF NOT EXISTS project_metadata ( + id BIGSERIAL PRIMARY KEY, + project_id TEXT UNIQUE NOT NULL, + project_path TEXT UNIQUE NOT NULL, + author_type TEXT, + author_name TEXT, + author_identifier TEXT +); +CREATE INDEX IF NOT EXISTS idx_project_metadata_project_id ON project_metadata (project_id); +` diff --git a/preprocessor/folder/file.go b/preprocessor/folder/file.go index 0fa3e74..f2175fa 100644 --- a/preprocessor/folder/file.go +++ b/preprocessor/folder/file.go @@ -11,13 +11,23 @@ import ( "os" "path/filepath" "slices" + "strings" "time" + "github.com/jackc/pgx/v5/pgxpool" "github.com/starlinglab/integrity-v2/config" "github.com/starlinglab/integrity-v2/webhook" "lukechampine.com/blake3" ) +// File status constants +var ( + FileStatusFound = "Found" + FileStatusUploading = "Uploading" + FileStatusSuccess = "Success" + FileStatusError = "Error" +) + // getFileMetadata calculates and returns a map of attributes for a file func getFileMetadata(filePath string) (map[string]any, error) { file, err := os.Open(filePath) @@ -44,6 +54,8 @@ func getFileMetadata(filePath string) (map[string]any, error) { } mediaType := http.DetectContentType(bytes) + syncRoot := config.GetConfig().FolderPreprocessor.SyncFolderRoot + return map[string]any{ "sha256": hex.EncodeToString(sha.Sum(nil)), "md5": hex.EncodeToString(md.Sum(nil)), @@ -53,19 +65,91 @@ func getFileMetadata(filePath string) (map[string]any, error) { "file_name": fileInfo.Name(), "last_modified": fileInfo.ModTime().Format(time.RFC3339), "time_created": fileInfo.ModTime().Format(time.RFC3339), + "asset_origin": strings.TrimPrefix(filePath, syncRoot), }, nil } -// handleNewFile posts a new file and its metadata to the webhook server -func handleNewFile(filePath string) (string, error) { +// handleNewFile takes a discovered file, update file status on database, +// and post the file and its metadata to the webhook server +func handleNewFile(pgPool *pgxpool.Pool, filePath string, project *ProjectQueryResult) (string, error) { + result, err := queryIfFileExists(pgPool, filePath) + if err != nil { + return "", fmt.Errorf("error checking if file exists in database: %v", err) + } + + status, errorMessage, cid := "", "", "" + if result != nil { + if result.Status != nil { + status = *result.Status + } + if result.ErrorMessage != nil { + errorMessage = *result.ErrorMessage + } + if result.Cid != nil { + cid = *result.Cid + } + } + + switch status { + case FileStatusFound: + fmt.Println("retrying found file:", filePath) + case FileStatusUploading: + fmt.Println("retrying uploading file:", filePath) + case FileStatusSuccess: + return cid, nil + case FileStatusError: + return "", fmt.Errorf("file %s has error: %s", filePath, errorMessage) + default: + err = setFileStatusFound(pgPool, filePath) + if err != nil { + return "", fmt.Errorf("error setting file status to found: %v", err) + } + } + metadata, err := getFileMetadata(filePath) if err != nil { + e := setFileStatusError(pgPool, filePath, err.Error()) + if e != nil { + fmt.Println("error setting file status to error:", e) + } return "", fmt.Errorf("error getting metadata for file %s: %v", filePath, err) } + + if project != nil { + metadata["project_id"] = *project.ProjectId + metadata["project_path"] = *project.ProjectPath + if project.AuthorType != nil || project.AuthorName != nil || project.AuthorIdentifier != nil { + author := map[string]string{} + if project.AuthorType != nil { + author["@type"] = *project.AuthorType + } + if project.AuthorName != nil { + author["name"] = *project.AuthorName + } + if project.AuthorIdentifier != nil { + author["identifier"] = *project.AuthorIdentifier + } + metadata["author"] = author + } + } + + err = setFileStatusUploading(pgPool, filePath, metadata["sha256"].(string)) + if err != nil { + return "", fmt.Errorf("error setting file status to uploading: %v", err) + } resp, err := webhook.PostFileToWebHook(filePath, metadata, webhook.PostGenericWebhookOpt{}) if err != nil { + e := setFileStatusError(pgPool, filePath, err.Error()) + if e != nil { + fmt.Println("error setting file status to error:", e) + } return "", fmt.Errorf("error posting metadata for file %s: %v", filePath, err) } + + err = setFileStatusDone(pgPool, filePath, cid) + if err != nil { + return "", fmt.Errorf("error setting file status to done: %v", err) + } return resp.Cid, nil } diff --git a/preprocessor/folder/folder.go b/preprocessor/folder/folder.go index bd4463e..7787c27 100644 --- a/preprocessor/folder/folder.go +++ b/preprocessor/folder/folder.go @@ -7,43 +7,112 @@ import ( "path/filepath" "github.com/fsnotify/fsnotify" + "github.com/jackc/pgx/v5/pgxpool" "github.com/starlinglab/integrity-v2/config" + "github.com/starlinglab/integrity-v2/database" ) // scanSyncDirectory scans a path under the sync directory and returns a list of files -func scanSyncDirectory(subPath string) ([]string, error) { +func scanSyncDirectory(subPath string) (fileList []string, dirList []string, err error) { scanRoot := config.GetConfig().FolderPreprocessor.SyncFolderRoot if scanRoot == "" { scanRoot = "." } scanPath := filepath.Join(scanRoot, subPath) - fileList := []string{} - err := filepath.Walk(scanPath, func(path string, info fs.FileInfo, err error) error { + fmt.Println("Scanning: " + scanPath) + err = filepath.Walk(scanPath, func(path string, info fs.FileInfo, err error) error { if err != nil { return err } - if checkShouldIncludeFile(info) { + if info.IsDir() { + dirList = append(dirList, path) + } else if checkShouldIncludeFile(info) { fileList = append(fileList, path) fmt.Println("Found: " + path) return nil } return nil }) - return fileList, err + return fileList, dirList, err } +// watchLoop watches for file changes in a directory and checks if it should be handled +func watchLoop(w *fsnotify.Watcher, pgPool *pgxpool.Pool, dirPathToProject map[string]ProjectQueryResult) { + for { + select { + case event, ok := <-w.Events: + if !ok { + return + } + if event.Has(fsnotify.Create) || event.Has(fsnotify.Rename) { + filePath := event.Name + file, err := os.Open(filePath) + if err != nil { + // File may be moved away for fsnotify.Rename + continue + } + defer file.Close() + fileInfo, err := file.Stat() + if err != nil { + fmt.Println("error getting file info:", err) + continue + } + if checkShouldIncludeFile(fileInfo) { + project := dirPathToProject[filepath.Dir(filePath)] + cid, err := handleNewFile(pgPool, filePath, &project) + if err != nil { + fmt.Println(err) + } else { + fmt.Printf("File %s uploaded to webhook with CID %s\n", filePath, cid) + } + } + } + case err, ok := <-w.Errors: + if !ok { + return + } + fmt.Println("error:", err) + } + } +} + +// Scan the sync directory and watch for file changes func Run(args []string) error { - // Scan whole sync directory - fileList, err := scanSyncDirectory("") + pgPool, err := database.GetDatabaseConnectionPool() + if err != nil { + return err + } + defer database.CloseDatabaseConnectionPool() + err = initDbTableIfNotExists(pgPool) if err != nil { return err } - for _, filePath := range fileList { - cid, err := handleNewFile(filePath) + + projects, err := queryAllProjects(pgPool) + if err != nil { + return err + } + + dirPathToProject := map[string]ProjectQueryResult{} + var dirPaths []string + + for _, project := range projects { + projectPath := *project.ProjectPath + fileList, dirList, err := scanSyncDirectory(projectPath) if err != nil { fmt.Println(err) - } else { - fmt.Printf("File %s uploaded to webhook with CID %s\n", filePath, cid) + } + for _, filePath := range fileList { + cid, err := handleNewFile(pgPool, filePath, &project) + if err != nil { + fmt.Println(err) + } else { + fmt.Printf("File %s uploaded to webhook with CID %s\n", filePath, cid) + } + } + for _, dirPath := range dirList { + dirPaths = append(dirPaths, dirPath) + dirPathToProject[dirPath] = project } } @@ -54,48 +123,14 @@ func Run(args []string) error { } defer watcher.Close() - go func() { - for { - select { - case event, ok := <-watcher.Events: - if !ok { - return - } - if event.Has(fsnotify.Create) || event.Has(fsnotify.Rename) { - filePath := event.Name - file, err := os.Open(filePath) - if err != nil { - // File may be moved away for fsnotify.Rename - continue - } - defer file.Close() - fileInfo, err := file.Stat() - if err != nil { - fmt.Println("error getting file info:", err) - continue - } - if checkShouldIncludeFile(fileInfo) { - cid, err := handleNewFile(filePath) - if err != nil { - fmt.Println(err) - } else { - fmt.Printf("File %s uploaded to webhook with CID %s\n", filePath, cid) - } - } - } - case err, ok := <-watcher.Errors: - if !ok { - return - } - fmt.Println("error:", err) - } - } - }() + go watchLoop(watcher, pgPool, dirPathToProject) - scanRoot := config.GetConfig().FolderPreprocessor.SyncFolderRoot - err = watcher.Add(scanRoot) - if err != nil { - return err + for _, dirPath := range dirPaths { + err = watcher.Add(dirPath) + if err != nil { + return err + } + fmt.Println("Watching folder changes: " + dirPath) } // Block main goroutine forever.