From 6a3c3e3992c44fc4e50962fe2212e94d628a39de Mon Sep 17 00:00:00 2001 From: Richard Bertozzo Date: Mon, 7 Oct 2024 15:20:13 -0300 Subject: [PATCH] Concurrently process insert db using worker pool (#23) * typo - rename file sql * open and apply sql schema before run the inserts * is error * add worker pool to run concurrently the insert job --- cmd/etl/main.go | 122 ++++++++++++++++++++------ configs/db/{squema.sql => schema.sql} | 2 +- 2 files changed, 97 insertions(+), 27 deletions(-) rename configs/db/{squema.sql => schema.sql} (93%) diff --git a/cmd/etl/main.go b/cmd/etl/main.go index 599aba5..b39d047 100644 --- a/cmd/etl/main.go +++ b/cmd/etl/main.go @@ -3,11 +3,14 @@ package main import ( "context" "encoding/csv" + "errors" "flag" "io" "log" "os" "strconv" + "sync" + "sync/atomic" "time" "github.com/google/uuid" @@ -27,7 +30,9 @@ func main() { log.Fatal("DATABASE_URL ENV is required") } - ctx := context.Background() + ctx, cancelFn := context.WithTimeout(context.Background(), time.Minute) + defer cancelFn() + dbPool, err := database.NewConnection(ctx, *dbURL) if err != nil { log.Fatal(err) @@ -37,42 +42,103 @@ func main() { log.Fatal(err) } - queries := provider.New(dbPool) - - err = run(queries, *flagFile) + // open sql table schema file and apply it + sqlSchema, err := os.ReadFile("./configs/db/schema.sql") if err != nil { + log.Fatalf("error opening SQL schema file: %v", err) + } + if _, err = dbPool.Exec(ctx, string(sqlSchema)); err != nil { log.Fatal(err) } + + queries := provider.New(dbPool) + + run(ctx, queries, *flagFile) } -func run(queries *provider.Queries, filePath string) error { - f, csvReader, err := CSVReader(filePath) +func run(ctx context.Context, queries *provider.Queries, filePath string) { + f, reader, err := csvReader(filePath) if err != nil { log.Fatal(err) } - defer f.Close() + + defer func() { + _ = f.Close() + }() isHeaderRow := true - recordsInserted := 0 + var recordsInserted atomic.Uint32 start := time.Now() - for { - record, err := csvReader.Read() - if err != nil { - // no more rows in the file - if err == io.EOF { - break + + const numWorkers = 10 + done := make(chan bool) + jobs := make(chan job, numWorkers) + results := make(chan result, numWorkers) + + go func() { + var wg sync.WaitGroup + wg.Add(numWorkers) + for w := 1; w <= numWorkers; w++ { + go workerInsert(ctx, &wg, queries, jobs, results) + } + wg.Wait() + close(results) + }() + + go func() { + for { + record, err := reader.Read() + if err != nil { + // no more rows in the file + if errors.Is(err, io.EOF) { + break + } + + log.Fatalf("error reading record row: %v", err) + return + } + + // skip the header row (first row) + if isHeaderRow { + isHeaderRow = false + continue + } + + jobs <- job{ + record: record, } - return err } - // skip the header row (first row) - if isHeaderRow { - isHeaderRow = false - continue + close(jobs) + }() + + go func() { + for r := range results { + recordsInserted.Add(1) + log.Printf("coffee inserted, id: %s - specie %s", r.ID, r.Specie) } + done <- true + }() - coffee := createCoffeeFromCSVRow(record) - id, err := queries.InsertCoffee(context.Background(), provider.InsertCoffeeParams{ + <-done + close(done) + log.Printf("finished processing rows csv file, total: %d - duration %v", recordsInserted.Load(), time.Now().Sub(start).Minutes()) +} + +type job struct { + id string + record []string +} + +type result struct { + ID string + Specie string +} + +func workerInsert(ctx context.Context, wg *sync.WaitGroup, queries *provider.Queries, jobChan chan job, results chan<- result) { + for j := range jobChan { + coffee := createCoffeeFromCSVRow(j.record) + id, err := queries.InsertCoffee(ctx, provider.InsertCoffeeParams{ Specie: coffee.Specie, Owner: coffee.Owner, CountryOfOrigin: coffee.CountryOfOrigin, @@ -88,16 +154,20 @@ func run(queries *provider.Queries, filePath string) error { log.Fatal(err) } - u, err := uuid.FromBytes(id.Bytes[0:16]) + u, err := uuid.FromBytes(id.Bytes[:]) if err != nil { log.Fatal(err) } + log.Printf("coffee inserted, id: %s - specie %s", u.String(), coffee.Specie) - recordsInserted++ + + results <- result{ + ID: u.String(), + Specie: coffee.Specie, + } } - log.Printf("finished processing rows csv file, total: %d - duration %v", recordsInserted, time.Now().Sub(start)) - return nil + wg.Done() } func parseStrToFloat(strValue string) float32 { @@ -127,7 +197,7 @@ func createCoffeeFromCSVRow(row []string) provider.Coffee { } } -func CSVReader(pathFile string) (*os.File, *csv.Reader, error) { +func csvReader(pathFile string) (*os.File, *csv.Reader, error) { file, err := os.Open(pathFile) if err != nil { return nil, nil, err diff --git a/configs/db/squema.sql b/configs/db/schema.sql similarity index 93% rename from configs/db/squema.sql rename to configs/db/schema.sql index 803e42d..6cd4424 100644 --- a/configs/db/squema.sql +++ b/configs/db/schema.sql @@ -1,4 +1,4 @@ -CREATE TABLE coffee +CREATE TABLE IF NOT EXISTS coffee ( id UUID DEFAULT gen_random_uuid() PRIMARY KEY, specie VARCHAR(50) NOT NULL,