Skip to content

Commit

Permalink
Parallelize hashing and added logging to hashing
Browse files Browse the repository at this point in the history
  • Loading branch information
keybraker committed Oct 2, 2024
1 parent 03c0d14 commit f7ccad1
Show file tree
Hide file tree
Showing 2 changed files with 104 additions and 28 deletions.
58 changes: 47 additions & 11 deletions app/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,31 +55,40 @@ func main() {
startLoggerHandlers(&wg, infoQueue, warnQueue, errorQueue)

logger(LoggerTypeInfo, "Counting files in path.")
totalFiles := countFiles(sourcePath, fileTypes, *organisePhotos, *organiseVideos)
totalFilesToMove := countFiles(sourcePath, fileTypes, *organisePhotos, *organiseVideos)

if totalFiles == 0 {
if totalFilesToMove == 0 {
logger(LoggerTypeInfo, "No files in path, exiting.")
return
} else {
logger(LoggerTypeInfo, fmt.Sprintf("%d files to be processed.", totalFiles))
logger(LoggerTypeInfo, fmt.Sprintf("%d files to be processed.", totalFilesToMove))
}

hashCache := &sync.Map{}

logger(LoggerTypeInfo, "Creating file hash-map.")
fileHashMap, err := hash.HashImagesInPath(destinationPath, hashCache)
// Start a spinner for hash-map creation
logger(LoggerTypeInfo, "Creating file hash-map on the destination path.")
totalFilesInDestination := countFiles(destinationPath, fileTypes, *organisePhotos, *organiseVideos)
var hashedFiles int64
stopHashSpinner := make(chan bool)
go spinner(stopHashSpinner, "Hashing:", &hashedFiles, totalFilesInDestination)

// Create the hash map with a progress counter
fileHashMap, err := hash.HashImagesInPath(destinationPath, hashCache, &hashedFiles)
if err != nil {
stopHashSpinner <- true // Stop the spinner in case of error
logger(LoggerTypeInfo, "Failed to create file hash map.")
logger(LoggerTypeFatal, err.Error())
}

stopHashSpinner <- true // Stop the spinner after hash-map creation is done
elapsed := time.Since(start)
logger(LoggerTypeInfo, fmt.Sprintf("File hash-map created in %.2f seconds.", elapsed.Seconds()))

var processedFiles int64

stopSpinner := make(chan bool)
go spinner(stopSpinner, &processedFiles, totalFiles)
go spinner(stopSpinner, "Processing:", &processedFiles, totalFilesToMove)

done := make(chan struct{})

Expand All @@ -105,7 +114,7 @@ func main() {
*geoLocation,
*format,
*verbose,
totalFiles,
totalFilesToMove,
*duplicateStrategy,
&processedFiles,
done,
Expand All @@ -114,13 +123,29 @@ func main() {
<-done
stopSpinner <- true

logger(LoggerTypeInfo, strconv.Itoa(totalFiles)+" files processed.")
logger(LoggerTypeInfo, strconv.Itoa(totalFilesToMove)+" files processed.")

elapsed = time.Since(start)
logger(LoggerTypeInfo, fmt.Sprintf("Processing completed in %.2f seconds.", elapsed.Seconds()))
elapsedString := formatElapsedTime(elapsed)
logger(LoggerTypeInfo, fmt.Sprintf("Processing completed in %s.", elapsedString))
}

func formatElapsedTime(elapsed time.Duration) string {
seconds := int(elapsed.Seconds())
minutes := seconds / 60
seconds = seconds % 60

if minutes > 0 {
if minutes == 1 {
return fmt.Sprintf("%d minute and %d seconds", minutes, seconds)
}
return fmt.Sprintf("%d minutes and %d seconds", minutes, seconds)
}

return fmt.Sprintf("%.2f seconds.", elapsed.Seconds())
}

func spinner(stopSpinner chan bool, processedFiles *int64, totalFiles int) {
func spinner(stopSpinner chan bool, verb string, processedFiles *int64, totalFiles int) {
spinChars := `-\|/`
i := 0
for {
Expand All @@ -131,7 +156,7 @@ func spinner(stopSpinner chan bool, processedFiles *int64, totalFiles int) {
default:
processed := atomic.LoadInt64(processedFiles)
percentage := float64(processed) / float64(totalFiles) * 100
fmt.Printf("\rProcessing %c | Processed: %d/%d (%.2f%%)", spinChars[i], processed, totalFiles, percentage)
fmt.Printf("\r%c | %s: %d/%d (%.2f%%)", spinChars[i], verb, processed, totalFiles, percentage)
i = (i + 1) % len(spinChars)
time.Sleep(100 * time.Millisecond)
}
Expand Down Expand Up @@ -237,6 +262,13 @@ func flagProcessor() []string {
return fileTypes
}

func directoryExists(path string) error {
if _, err := os.Stat(path); os.IsNotExist(err) {
return fmt.Errorf("path %s does not exist", path)
}
return nil
}

func validatePaths(sourcePath, destinationPath string) {
if sourcePath == "" || destinationPath == "" {
logger(LoggerTypeFatal, "input and output paths must be supplied")
Expand All @@ -249,5 +281,9 @@ func validatePaths(sourcePath, destinationPath string) {
logger(LoggerTypeFatal, "input and output paths must be on drives")
} else if sourceDrive != destinationDrive {
logger(LoggerTypeFatal, fmt.Sprintf("input and output paths must be on the same drive: source drive (%s), destination drive (%s)", sourceDrive, destinationDrive))
} else if err := directoryExists(sourcePath); err != nil {
logger(LoggerTypeFatal, err.Error())
} else if err := directoryExists(destinationPath); err != nil {
logger(LoggerTypeFatal, err.Error())
}
}
74 changes: 57 additions & 17 deletions hash/hash.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@ import (
"io"
"os"
"path/filepath"
"runtime"
"strings"
"sync"
"sync/atomic"
)

// isImageFile checks if the file is an image based on its extension.
Expand Down Expand Up @@ -51,32 +53,70 @@ func GetFileHash(filePath string, hashCache *sync.Map) ([]byte, error) {
}

// hashImagesInPath hashes all images in the given path and updates the fileHashMap.
func HashImagesInPath(path string, hashCache *sync.Map) (*sync.Map, error) {
func HashImagesInPath(path string, hashCache *sync.Map, hashedFiles *int64) (*sync.Map, error) {
fileHashMap := &sync.Map{}
fileChan := make(chan string) // Channel to pass file paths to workers
errChan := make(chan error) // Channel to collect errors
var wg sync.WaitGroup // WaitGroup to track the worker goroutines

err := filepath.Walk(path, func(filePath string, info os.FileInfo, err error) error {
if err != nil {
return err
}
numWorkers := runtime.NumCPU() / 2

if info.IsDir() {
return nil
}
// Start worker goroutines
for i := 0; i < numWorkers; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for filePath := range fileChan {
if isImageFile(filePath) {
hashValue, err := GetFileHash(filePath, hashCache)
if err != nil {
errChan <- fmt.Errorf("failed to get file hash for %s: %v", filePath, err)
return
}

hashStr := hex.EncodeToString(hashValue)
fileHashMap.Store(hashStr, true)

if isImageFile(filePath) {
hashValue, err := GetFileHash(filePath, hashCache)
// Increment the hashed files counter
atomic.AddInt64(hashedFiles, 1)
}
}
}()
}

// Walk the directory and send file paths to the channel
go func() {
defer close(fileChan) // Close the channel when done
err := filepath.Walk(path, func(filePath string, info os.FileInfo, err error) error {
if err != nil {
return fmt.Errorf("failed to get file hash for %s: %v", filePath, err)
errChan <- fmt.Errorf("failed to walk path %s: %v", filePath, err)
return err
}

hashStr := hex.EncodeToString(hashValue)
fileHashMap.Store(hashStr, true)
if !info.IsDir() {
fileChan <- filePath // Send file to channel for hashing
}

return nil
})

// If an error occurred during filepath walk, send it to the error channel
if err != nil {
errChan <- err
}
}()

return nil
})
if err != nil {
return nil, fmt.Errorf("failed to walk path %s: %v", path, err)
// Wait for all workers to finish
go func() {
wg.Wait()
close(errChan) // Close error channel when all workers are done
}()

// Check for errors during execution
for err := range errChan {
if err != nil {
return nil, err
}
}

return fileHashMap, nil
Expand Down

0 comments on commit f7ccad1

Please sign in to comment.