Skip to content

Commit

Permalink
Concurrently process insert db using worker pool (#23)
Browse files Browse the repository at this point in the history
* typo - rename file sql

* open and apply sql schema before run the inserts

* is error

* add worker pool to run concurrently the insert job
  • Loading branch information
richardbertozzo authored Oct 7, 2024
1 parent 2584417 commit 6a3c3e3
Show file tree
Hide file tree
Showing 2 changed files with 97 additions and 27 deletions.
122 changes: 96 additions & 26 deletions cmd/etl/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,14 @@ package main
import (
"context"
"encoding/csv"
"errors"
"flag"
"io"
"log"
"os"
"strconv"
"sync"
"sync/atomic"
"time"

"github.com/google/uuid"
Expand All @@ -27,7 +30,9 @@ func main() {
log.Fatal("DATABASE_URL ENV is required")
}

ctx := context.Background()
ctx, cancelFn := context.WithTimeout(context.Background(), time.Minute)
defer cancelFn()

dbPool, err := database.NewConnection(ctx, *dbURL)
if err != nil {
log.Fatal(err)
Expand All @@ -37,42 +42,103 @@ func main() {
log.Fatal(err)
}

queries := provider.New(dbPool)

err = run(queries, *flagFile)
// open sql table schema file and apply it
sqlSchema, err := os.ReadFile("./configs/db/schema.sql")
if err != nil {
log.Fatalf("error opening SQL schema file: %v", err)
}
if _, err = dbPool.Exec(ctx, string(sqlSchema)); err != nil {
log.Fatal(err)
}

queries := provider.New(dbPool)

run(ctx, queries, *flagFile)
}

func run(queries *provider.Queries, filePath string) error {
f, csvReader, err := CSVReader(filePath)
func run(ctx context.Context, queries *provider.Queries, filePath string) {
f, reader, err := csvReader(filePath)
if err != nil {
log.Fatal(err)
}
defer f.Close()

defer func() {
_ = f.Close()
}()

isHeaderRow := true
recordsInserted := 0
var recordsInserted atomic.Uint32
start := time.Now()
for {
record, err := csvReader.Read()
if err != nil {
// no more rows in the file
if err == io.EOF {
break

const numWorkers = 10
done := make(chan bool)
jobs := make(chan job, numWorkers)
results := make(chan result, numWorkers)

go func() {
var wg sync.WaitGroup
wg.Add(numWorkers)
for w := 1; w <= numWorkers; w++ {
go workerInsert(ctx, &wg, queries, jobs, results)
}
wg.Wait()
close(results)
}()

go func() {
for {
record, err := reader.Read()
if err != nil {
// no more rows in the file
if errors.Is(err, io.EOF) {
break
}

log.Fatalf("error reading record row: %v", err)
return
}

// skip the header row (first row)
if isHeaderRow {
isHeaderRow = false
continue
}

jobs <- job{
record: record,
}
return err
}

// skip the header row (first row)
if isHeaderRow {
isHeaderRow = false
continue
close(jobs)
}()

go func() {
for r := range results {
recordsInserted.Add(1)
log.Printf("coffee inserted, id: %s - specie %s", r.ID, r.Specie)
}
done <- true
}()

coffee := createCoffeeFromCSVRow(record)
id, err := queries.InsertCoffee(context.Background(), provider.InsertCoffeeParams{
<-done
close(done)
log.Printf("finished processing rows csv file, total: %d - duration %v", recordsInserted.Load(), time.Now().Sub(start).Minutes())
}

type job struct {
id string
record []string
}

type result struct {
ID string
Specie string
}

func workerInsert(ctx context.Context, wg *sync.WaitGroup, queries *provider.Queries, jobChan chan job, results chan<- result) {
for j := range jobChan {
coffee := createCoffeeFromCSVRow(j.record)
id, err := queries.InsertCoffee(ctx, provider.InsertCoffeeParams{
Specie: coffee.Specie,
Owner: coffee.Owner,
CountryOfOrigin: coffee.CountryOfOrigin,
Expand All @@ -88,16 +154,20 @@ func run(queries *provider.Queries, filePath string) error {
log.Fatal(err)
}

u, err := uuid.FromBytes(id.Bytes[0:16])
u, err := uuid.FromBytes(id.Bytes[:])
if err != nil {
log.Fatal(err)
}

log.Printf("coffee inserted, id: %s - specie %s", u.String(), coffee.Specie)
recordsInserted++

results <- result{
ID: u.String(),
Specie: coffee.Specie,
}
}

log.Printf("finished processing rows csv file, total: %d - duration %v", recordsInserted, time.Now().Sub(start))
return nil
wg.Done()
}

func parseStrToFloat(strValue string) float32 {
Expand Down Expand Up @@ -127,7 +197,7 @@ func createCoffeeFromCSVRow(row []string) provider.Coffee {
}
}

func CSVReader(pathFile string) (*os.File, *csv.Reader, error) {
func csvReader(pathFile string) (*os.File, *csv.Reader, error) {
file, err := os.Open(pathFile)
if err != nil {
return nil, nil, err
Expand Down
2 changes: 1 addition & 1 deletion configs/db/squema.sql → configs/db/schema.sql
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
CREATE TABLE coffee
CREATE TABLE IF NOT EXISTS coffee
(
id UUID DEFAULT gen_random_uuid() PRIMARY KEY,
specie VARCHAR(50) NOT NULL,
Expand Down

0 comments on commit 6a3c3e3

Please sign in to comment.