Skip to content

Commit

Permalink
[preprocessor/folder] Add folder preprocessor (#6)
Browse files Browse the repository at this point in the history
* [preprocessor/folder] add folder scanner with config and hashing

* [preprocessor/folder] add file watcher

* [preprocessor/folder] add post to webhook client

* [preprocessor/folder] change attribute key name to snake case

* [preprocessor/folder] use net/http for media_type detection

* [preprocessor/folder] add preprocessorfolder to main.go

* [preprocessor/folder] add date_created to file metadata

* [preprocessor/folder] update argument for PostFileToWebHook

* [preprocessor/folder] add function comments

* [preprocessor/folder] change date_created to time_created

* [preprocessor/folder] optimize memory usage for hash calculation

* [preprocessor/folder] update function naming, comment and package name

* [preprocessor/folder] fix lint error

* [preprocessor/folder] ensure utc timestamp

* [webhook] calculate file hashes

* [preprocessor/folder] use filepath.WalkDir instead of filepath.Walk

* [preprocessor/folder] use log instead of fmt

* [preprocessor/folder] change fsnotify to rjeczalik/notify

* [preprocessor/folder] throw error on sync folder not set and unify log format

* [preprocessor/folder] run file watcher handler in coroutine
  • Loading branch information
williamchong authored Jun 4, 2024
1 parent 66adbf5 commit e6e42f5
Show file tree
Hide file tree
Showing 10 changed files with 235 additions and 16 deletions.
4 changes: 4 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@ type Config struct {
MetadataEncKeys string `toml:"metadata_enc_keys"`
FileEncKeys string `toml:"file_enc_keys"`
} `toml:"dirs"`
FolderPreprocessor struct {
SyncFolderRoot string `toml:"sync_folder_root"`
FileExtensions []string `toml:"file_extensions"`
} `toml:"folder_preprocessor"`
Bins struct {
Rclone string `toml:"rclone"`
C2patool string `toml:"c2patool"`
Expand Down
4 changes: 4 additions & 0 deletions example_config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ c2pa_manifest_templates = "/path/to/c2pa-manifest-templates/storage/"
metadata_enc_keys = "/path/to/metadata-encryption-key/storage/"
file_enc_keys = "/path/to/file-encryption-key/storage/"

[folder_preprocessor]
sync_folder_root = "/path/to/sync/folder/"
file_extensions = ['.jpg', '.png', '.jpeg']

[bins]
# All of these are optional.
# The CLI tools you use will tell you if they are needed.
Expand Down
25 changes: 15 additions & 10 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ require (
github.com/ipfs/go-datastore v0.6.0
github.com/multiformats/go-multicodec v0.9.0
github.com/photon-storage/go-ipfs-car v0.0.0-20240530014616-17d95f03173f
lukechampine.com/blake3 v1.3.0
)

require (
Expand Down Expand Up @@ -177,39 +178,43 @@ require (
github.com/whyrusleeping/chunker v0.0.0-20181014151217-fe64bd25879f // indirect
github.com/whyrusleeping/go-keyspace v0.0.0-20160322163242-5b898ac5add1 // indirect
github.com/whyrusleeping/multiaddr-filter v0.0.0-20160516205228-e903e4adabd7 // indirect
go.opentelemetry.io/otel v1.26.0 // indirect
go.opentelemetry.io/otel/metric v1.26.0 // indirect
go.opentelemetry.io/otel/trace v1.26.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
go.uber.org/zap v1.27.0 // indirect
golang.org/x/crypto v0.23.0 // indirect
golang.org/x/net v0.25.0 // indirect
golang.org/x/sys v0.20.0 // indirect
google.golang.org/protobuf v1.34.1 // indirect
)

require (
github.com/fsnotify/fsnotify v1.7.0 // indirect
github.com/rjeczalik/notify v0.9.3
github.com/x448/float16 v0.8.4 // indirect
go.opencensus.io v0.24.0 // indirect
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.51.0 // indirect
go.opentelemetry.io/otel v1.26.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.26.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.26.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.26.0 // indirect
go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.26.0 // indirect
go.opentelemetry.io/otel/exporters/zipkin v1.26.0 // indirect
go.opentelemetry.io/otel/metric v1.26.0 // indirect
go.opentelemetry.io/otel/sdk v1.26.0 // indirect
go.opentelemetry.io/otel/trace v1.26.0 // indirect
go.opentelemetry.io/proto/otlp v1.2.0 // indirect
go.uber.org/atomic v1.11.0 // indirect
go.uber.org/dig v1.17.1 // indirect
go.uber.org/fx v1.21.1 // indirect
go.uber.org/mock v0.4.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
go.uber.org/zap v1.27.0 // indirect
go4.org v0.0.0-20230225012048-214862532bf5 // indirect
golang.org/x/crypto v0.23.0 // indirect
golang.org/x/exp v0.0.0-20240506185415-9bf2ced13842 // indirect
golang.org/x/mod v0.17.0 // indirect
golang.org/x/net v0.25.0 // indirect
golang.org/x/sync v0.7.0 // indirect
golang.org/x/sys v0.20.0 // indirect
golang.org/x/text v0.15.0 // indirect
golang.org/x/tools v0.21.0 // indirect
golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028 // indirect
gonum.org/v1/gonum v0.15.0 // indirect
google.golang.org/genproto v0.0.0-20230306155012-7f2fa6fef1f4 // indirect
google.golang.org/grpc v1.64.0 // indirect
google.golang.org/protobuf v1.34.1 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
lukechampine.com/blake3 v1.3.0 // indirect
)
7 changes: 5 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -136,8 +136,8 @@ github.com/francoispqt/gojay v1.2.13/go.mod h1:ehT5mTG4ua4581f1++1WLG0vPdaA9HaiD
github.com/frankban/quicktest v1.14.6 h1:7Xjx+VpznH+oBnejlPUj8oUpdxnVs4f8XU8WnHkI4W8=
github.com/frankban/quicktest v1.14.6/go.mod h1:4ptaffx2x8+WTWXmUCuVU6aPUX1/Mz7zb5vbUoiM6w0=
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
github.com/fsnotify/fsnotify v1.6.0 h1:n+5WquG0fcWoWp6xPWfHdbskMCQaFnG6PfBrh1Ky4HY=
github.com/fsnotify/fsnotify v1.6.0/go.mod h1:sl3t1tCWJFWoRz9R8WJCbQihKKwmorjAbSClcnxKAGw=
github.com/fsnotify/fsnotify v1.7.0 h1:8JEhPFa5W2WU7YfeZzPNqzMP6Lwt7L2715Ggo0nosvA=
github.com/fsnotify/fsnotify v1.7.0/go.mod h1:40Bi/Hjc2AVfZrqy+aj+yEI+/bRxZnMJyTJwOpGvigM=
github.com/fxamacker/cbor/v2 v2.6.0 h1:sU6J2usfADwWlYDAFhZBQ6TnLFBHxgesMrQfQgk1tWA=
github.com/fxamacker/cbor/v2 v2.6.0/go.mod h1:pxXPTn3joSm21Gbwsv0w9OSA2y1HFR9qXEeXQVeNoDQ=
github.com/gabriel-vasile/mimetype v1.4.3 h1:in2uUcidCuFcDKtdcBxlR0rJ1+fsokWf+uqxgUFjbI0=
Expand Down Expand Up @@ -652,6 +652,8 @@ github.com/quic-go/webtransport-go v0.8.0 h1:HxSrwun11U+LlmwpgM1kEqIqH90IT4N8auv
github.com/quic-go/webtransport-go v0.8.0/go.mod h1:N99tjprW432Ut5ONql/aUhSLT0YVSlwHohQsuac9WaM=
github.com/raulk/go-watchdog v1.3.0 h1:oUmdlHxdkXRJlwfG0O9omj8ukerm8MEQavSiDTEtBsk=
github.com/raulk/go-watchdog v1.3.0/go.mod h1:fIvOnLbF0b0ZwkB9YU4mOW9Did//4vPZtDqv66NfsMU=
github.com/rjeczalik/notify v0.9.3 h1:6rJAzHTGKXGj76sbRgDiDcYj/HniypXmSJo1SWakZeY=
github.com/rjeczalik/notify v0.9.3/go.mod h1:gF3zSOrafR9DQEWSE8TjfI9NkooDxbyT4UgRGKZA0lc=
github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ=
github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog=
Expand Down Expand Up @@ -943,6 +945,7 @@ golang.org/x/sys v0.0.0-20180810173357-98c5dad5d1a0/go.mod h1:STP8DvDyc/dI5b8T5h
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180926160741-c2ed4eda69e7/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20181029174526-d69651ed3497/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20181205085412-a5c9d58dba9a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
Expand Down
3 changes: 3 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
exportproof "github.com/starlinglab/integrity-v2/export-proof"
"github.com/starlinglab/integrity-v2/getcid"
injectc2pa "github.com/starlinglab/integrity-v2/inject-c2pa"
preprocessorfolder "github.com/starlinglab/integrity-v2/preprocessor/folder"
"github.com/starlinglab/integrity-v2/register"
"github.com/starlinglab/integrity-v2/upload"
"github.com/starlinglab/integrity-v2/util"
Expand All @@ -35,6 +36,8 @@ func run(cmd string, args []string) (bool, error) {
err = webhook.Run(args)
case "upload":
err = upload.Run(args)
case "preprocessor-folder":
err = preprocessorfolder.Run(args)
case "register":
err = register.Run(args)
case "getcid":
Expand Down
12 changes: 12 additions & 0 deletions preprocessor/folder/cmd/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package main

import (
"os"

folder_preprocessor "github.com/starlinglab/integrity-v2/preprocessor/folder"
"github.com/starlinglab/integrity-v2/util"
)

func main() {
util.Fatal(folder_preprocessor.Run(os.Args[1:]))
}
74 changes: 74 additions & 0 deletions preprocessor/folder/file.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package folder

import (
"fmt"
"net/http"
"os"
"path/filepath"
"slices"
"time"

"github.com/starlinglab/integrity-v2/config"
"github.com/starlinglab/integrity-v2/webhook"
)

// getFileMetadata calculates and returns a map of attributes for a file
func getFileMetadata(filePath string) (map[string]any, error) {
file, err := os.Open(filePath)
if err != nil {
return nil, err
}
defer file.Close()
fileInfo, err := file.Stat()
if err != nil {
return nil, err
}

buffer := make([]byte, 512)
n, err := file.Read(buffer)
if err != nil {
return nil, err
}
mediaType := http.DetectContentType(buffer[:n])
_, err = file.Seek(0, 0)
if err != nil {
return nil, err
}

return map[string]any{
"media_type": mediaType,
"file_name": fileInfo.Name(),
"last_modified": fileInfo.ModTime().UTC().Format(time.RFC3339),
"time_created": fileInfo.ModTime().UTC().Format(time.RFC3339),
}, nil
}

// handleNewFile posts a new file and its metadata to the webhook server,
// and returns the CID of the file according to the server.
func handleNewFile(filePath string) (string, error) {
metadata, err := getFileMetadata(filePath)
if err != nil {
return "", fmt.Errorf("error getting metadata for file %s: %v", filePath, err)
}
resp, err := webhook.PostFileToWebHook(filePath, metadata, webhook.PostGenericWebhookOpt{})
if err != nil {
return "", fmt.Errorf("error posting metadata for file %s: %v", filePath, err)
}
return resp.Cid, nil
}

// shouldIncludeFile reports whether the file should be included in the processing
func shouldIncludeFile(fileName string) bool {
whiteListExtension := config.GetConfig().FolderPreprocessor.FileExtensions
if fileName[0] == '.' {
return false
}
fileExt := filepath.Ext(fileName)
if fileExt == ".partial" {
return false
}
if slices.Contains(whiteListExtension, fileExt) {
return true
}
return false
}
87 changes: 87 additions & 0 deletions preprocessor/folder/folder.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
package folder

import (
"fmt"
"log"
"os"
"path/filepath"

"github.com/rjeczalik/notify"
"github.com/starlinglab/integrity-v2/config"
)

// scanSyncDirectory scans a path under the sync directory and returns a list of files
func scanSyncDirectory(subPath string) ([]string, error) {
scanRoot := config.GetConfig().FolderPreprocessor.SyncFolderRoot
if scanRoot == "" {
return nil, fmt.Errorf("sync folder root not set")
}
scanPath := filepath.Join(scanRoot, subPath)
fileList := []string{}
err := filepath.WalkDir(scanPath, func(path string, info os.DirEntry, err error) error {
if err != nil {
return err
}
if shouldIncludeFile(info.Name()) {
fileList = append(fileList, path)
log.Println("found: " + path)
return nil
}
return nil
})
return fileList, err
}

func Run(args []string) error {
// Scan whole sync directory
fileList, err := scanSyncDirectory("")
if err != nil {
return err
}
for _, filePath := range fileList {
cid, err := handleNewFile(filePath)
if err != nil {
log.Println(err)
} else {
log.Printf("file %s uploaded to webhook with CID %s\n", filePath, cid)
}
}

// Init directory watcher
c := make(chan notify.EventInfo, 1)
scanRoot := config.GetConfig().FolderPreprocessor.SyncFolderRoot
err = notify.Watch(scanRoot+"/...", c, notify.Create, notify.Rename)
if err != nil {
return err
}
defer notify.Stop(c)

for {
ei := <-c
event := ei.Event()
if event == notify.Rename || event == notify.Create {
go func() {
filePath := ei.Path()
file, err := os.Open(filePath)
if err != nil {
// File may be moved away for notify.Rename
return
}
fileInfo, err := file.Stat()
if err != nil {
log.Println("error getting file info:", err)
return
}
if shouldIncludeFile(fileInfo.Name()) {
cid, err := handleNewFile(filePath)
if err != nil {
log.Println(err)
} else {
log.Printf("file %s uploaded to webhook with CID %s\n", filePath, cid)
}
}
file.Close()
}()
}
}
}
9 changes: 7 additions & 2 deletions webhook/attribute.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@ import (
"github.com/starlinglab/integrity-v2/aa"
)

// ParseJsonToAttributes parses a JSON map to a slice of attributes for POSTing to the AA server
func ParseJsonToAttributes(jsonMap map[string]any) []aa.PostKV {
// ParseJsonToAttributes parses a JSON map and a file stat map
// to a slice of attributes for POSTing to the AA server
func ParseJsonToAttributes(jsonMap map[string]any, fileAttributes map[string]any) []aa.PostKV {

var attributes []aa.PostKV

Expand All @@ -16,5 +17,9 @@ func ParseJsonToAttributes(jsonMap map[string]any) []aa.PostKV {
}
}

for k, v := range fileAttributes {
attributes = append(attributes, aa.PostKV{Key: k, Value: v})
}

return attributes
}
26 changes: 24 additions & 2 deletions webhook/webhook.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package webhook

import (
"crypto/md5"
"crypto/sha256"
"encoding/hex"
"encoding/json"
"fmt"
"io"
Expand All @@ -13,6 +16,7 @@ import (
"github.com/starlinglab/integrity-v2/aa"
"github.com/starlinglab/integrity-v2/config"
"github.com/starlinglab/integrity-v2/util"
"lukechampine.com/blake3"
)

// Helper function to write http JSON response
Expand Down Expand Up @@ -84,6 +88,7 @@ func handleGenericFileUpload(w http.ResponseWriter, r *http.Request) {
defer tempFile.Close()
defer os.Remove(tempFile.Name())
cid := ""
fileAttributes := map[string]any{}
for {
part, err := form.NextPart()
if err == io.EOF {
Expand All @@ -109,7 +114,13 @@ func handleGenericFileUpload(w http.ResponseWriter, r *http.Request) {
cidChan <- cid
errChan <- err
}()
fileWriter := io.MultiWriter(tempFile, pw)

sha := sha256.New()
md := md5.New()
blake := blake3.New(32, nil)

fileWriter := io.MultiWriter(tempFile, pw, sha, md, blake)

_, err = io.Copy(fileWriter, part)
defer part.Close()
if err != nil {
Expand All @@ -127,6 +138,17 @@ func handleGenericFileUpload(w http.ResponseWriter, r *http.Request) {
writeJsonResponse(w, http.StatusInternalServerError, map[string]string{"error": err.Error()})
return
}
tempFileState, err := tempFile.Stat()
if err != nil {
writeJsonResponse(w, http.StatusInternalServerError, map[string]string{"error": err.Error()})
return
}
fileAttributes = map[string]any{
"sha256": hex.EncodeToString(sha.Sum(nil)),
"md5": hex.EncodeToString(md.Sum(nil)),
"blake3": hex.EncodeToString(blake.Sum(nil)),
"file_size": tempFileState.Size(),
}
}
}
if cid == "" {
Expand All @@ -145,7 +167,7 @@ func handleGenericFileUpload(w http.ResponseWriter, r *http.Request) {
writeJsonResponse(w, http.StatusBadRequest, map[string]string{"error": err.Error()})
return
}
attributes := ParseJsonToAttributes(jsonMap)
attributes := ParseJsonToAttributes(jsonMap, fileAttributes)
err = aa.SetAttestations(cid, false, attributes)
if err != nil {
fmt.Println("Error setting attestations:", err)
Expand Down

0 comments on commit e6e42f5

Please sign in to comment.