Skip to content

Commit

Permalink
Migration: refactor output/processing of converted entities
Browse files Browse the repository at this point in the history
This commit simplifies the `icingaDbOutputStage` type to contain only one
entity slice to be insert/upsert. This allows to simplify the handling in
`migrateOneType()` by removing nested loops.

Additionally, a bit of code inside that function is outsourced into a new
`utils.ChanFromSlice()` function. This makes the body of the loop over the
insert/upsert operation (the loop using the `op` variable) simple enough so
that it can just be unrolled which saves the inline struct and slice definition
for that loop.
  • Loading branch information
julianbrost committed Jul 31, 2023
1 parent 03b0e84 commit 71c1d2f
Show file tree
Hide file tree
Showing 5 changed files with 107 additions and 39 deletions.
37 changes: 24 additions & 13 deletions cmd/icingadb-migrate/convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,9 +207,12 @@ func convertCommentRows(
}
}

stages = []icingaDbOutputStage{{inserts: [][]contracts.Entity{
commentHistory, acknowledgementHistory, allHistoryComment, allHistoryAck,
}}}
stages = []icingaDbOutputStage{
{insert: commentHistory},
{insert: acknowledgementHistory},
{insert: allHistoryComment},
{insert: allHistoryAck},
}
return
}

Expand Down Expand Up @@ -359,7 +362,11 @@ func convertDowntimeRows(
sla = append(sla, s)
}

stages = []icingaDbOutputStage{{inserts: [][]contracts.Entity{downtimeHistory, allHistory, sla}}}
stages = []icingaDbOutputStage{
{insert: downtimeHistory},
{insert: allHistory},
{insert: sla},
}
return
}

Expand Down Expand Up @@ -509,11 +516,9 @@ func convertFlappingRows(
}

stages = []icingaDbOutputStage{
{
inserts: [][]contracts.Entity{flappingHistory},
upserts: [][]contracts.Entity{flappingHistoryUpserts},
},
{inserts: [][]contracts.Entity{allHistory}},
{insert: flappingHistory},
{upsert: flappingHistoryUpserts},
{insert: allHistory},
}
return
}
Expand Down Expand Up @@ -672,9 +677,11 @@ func convertNotificationRows(
}
}

stages = []icingaDbOutputStage{{inserts: [][]contracts.Entity{
notificationHistory, userNotificationHistory, allHistory,
}}}
stages = []icingaDbOutputStage{
{insert: notificationHistory},
{insert: userNotificationHistory},
{insert: allHistory},
}
return
}

Expand Down Expand Up @@ -828,6 +835,10 @@ func convertStateRows(
}
}

stages = []icingaDbOutputStage{{inserts: [][]contracts.Entity{stateHistory, allHistory, sla}}}
stages = []icingaDbOutputStage{
{insert: stateHistory},
{insert: allHistory},
{insert: sla},
}
return
}
40 changes: 15 additions & 25 deletions cmd/icingadb-migrate/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"github.com/creasty/defaults"
"github.com/goccy/go-yaml"
"github.com/icinga/icingadb/pkg/config"
"github.com/icinga/icingadb/pkg/contracts"
"github.com/icinga/icingadb/pkg/icingadb"
"github.com/icinga/icingadb/pkg/logging"
icingadbTypes "github.com/icinga/icingadb/pkg/types"
Expand Down Expand Up @@ -428,30 +427,21 @@ func migrateOneType[IdoRow any](
// ... and insert them:

for _, stage := range stages {
for _, op := range []struct {
kind string
data [][]contracts.Entity
streamer func(context.Context, <-chan contracts.Entity, ...icingadb.OnSuccess[contracts.Entity]) error
}{
{"INSERT IGNORE", stage.inserts, idb.CreateIgnoreStreamed},
{"UPSERT", stage.upserts, idb.UpsertStreamed},
} {
for _, table := range op.data {
if len(table) < 1 {
continue
}

ch := make(chan contracts.Entity, len(table))
for _, row := range table {
ch <- row
}

close(ch)

if err := op.streamer(context.Background(), ch); err != nil {
log.With("backend", "Icinga DB", "op", op.kind, "table", utils.TableName(table[0])).
Fatalf("%+v", errors.Wrap(err, "can't perform DML"))
}
if len(stage.insert) > 0 {
ch := utils.ChanFromSlice(stage.insert)

if err := idb.CreateIgnoreStreamed(context.Background(), ch); err != nil {
log.With("backend", "Icinga DB", "op", "INSERT IGNORE", "table", utils.TableName(stage.insert[0])).
Fatalf("%+v", errors.Wrap(err, "can't perform DML"))
}
}

if len(stage.upsert) > 0 {
ch := utils.ChanFromSlice(stage.upsert)

if err := idb.UpsertStreamed(context.Background(), ch); err != nil {
log.With("backend", "Icinga DB", "op", "UPSERT", "table", utils.TableName(stage.upsert[0])).
Fatalf("%+v", errors.Wrap(err, "can't perform DML"))
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/icingadb-migrate/misc.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ func (hts historyTypes) forEach(f func(*historyType)) {
}

type icingaDbOutputStage struct {
inserts, upserts [][]contracts.Entity
insert, upsert []contracts.Entity
}

var types = historyTypes{
Expand Down
13 changes: 13 additions & 0 deletions pkg/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,3 +216,16 @@ func JoinHostPort(host string, port int) string {

return net.JoinHostPort(host, fmt.Sprint(port))
}

// ChanFromSlice takes a slice of values and returns a channel from which these values can be received.
// This channel is closed after the last value was sent.
func ChanFromSlice[T any](values []T) <-chan T {
ch := make(chan T, len(values))
for _, value := range values {
ch <- value
}

close(ch)

return ch
}
54 changes: 54 additions & 0 deletions pkg/utils/utils_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package utils

import (
"github.com/stretchr/testify/require"
"testing"
)

func TestChanFromSlice(t *testing.T) {
t.Run("Nil", func(t *testing.T) {
ch := ChanFromSlice[int](nil)
require.NotNil(t, ch)
requireClosedEmpty(t, ch)
})

t.Run("Empty", func(t *testing.T) {
ch := ChanFromSlice([]int{})
require.NotNil(t, ch)
requireClosedEmpty(t, ch)
})

t.Run("NonEmpty", func(t *testing.T) {
ch := ChanFromSlice([]int{42, 23, 1337})
require.NotNil(t, ch)
requireReceive(t, ch, 42)
requireReceive(t, ch, 23)
requireReceive(t, ch, 1337)
requireClosedEmpty(t, ch)
})
}

// requireReceive is a helper function to check if a value can immediately be received from a channel.
func requireReceive(t *testing.T, ch <-chan int, expected int) {
t.Helper()

select {
case v, ok := <-ch:
require.True(t, ok, "receiving should return a value")
require.Equal(t, expected, v)
default:
require.Fail(t, "receiving should not block")
}
}

// requireReceive is a helper function to check if the channel is closed and empty.
func requireClosedEmpty(t *testing.T, ch <-chan int) {
t.Helper()

select {
case _, ok := <-ch:
require.False(t, ok, "receiving from channel should not return anything")
default:
require.Fail(t, "receiving should not block")
}
}

0 comments on commit 71c1d2f

Please sign in to comment.