From a778fc8d34409b4e830a643ee00f6e4066974389 Mon Sep 17 00:00:00 2001 From: "Alexander A. Klimov" Date: Mon, 19 Dec 2022 15:54:31 +0100 Subject: [PATCH 1/3] migrateOneType(): allow multiple stages of INSERT IGNORE, UPSERT Refactor the converters so they can signal the caller to behave not as usual i.e. not to INSERT IGNORE everything, then UPSERT everything. --- cmd/icingadb-migrate/convert.go | 28 +++++++++++-------- cmd/icingadb-migrate/main.go | 49 ++++++++++++++++++--------------- cmd/icingadb-migrate/misc.go | 4 +++ 3 files changed, 48 insertions(+), 33 deletions(-) diff --git a/cmd/icingadb-migrate/convert.go b/cmd/icingadb-migrate/convert.go index 965077e56..c061c3407 100644 --- a/cmd/icingadb-migrate/convert.go +++ b/cmd/icingadb-migrate/convert.go @@ -50,7 +50,7 @@ type commentRow = struct { func convertCommentRows( env string, envId icingadbTypes.Binary, _ func(interface{}, string, ...interface{}), _ *sqlx.Tx, idoRows []commentRow, -) (icingaDbInserts, _ [][]contracts.Entity, checkpoint any) { +) (stages []icingaDbOutputStage, checkpoint any) { var commentHistory, acknowledgementHistory, allHistoryComment, allHistoryAck []contracts.Entity for _, row := range idoRows { @@ -207,7 +207,9 @@ func convertCommentRows( } } - icingaDbInserts = [][]contracts.Entity{commentHistory, acknowledgementHistory, allHistoryComment, allHistoryAck} + stages = []icingaDbOutputStage{{inserts: [][]contracts.Entity{ + commentHistory, acknowledgementHistory, allHistoryComment, allHistoryAck, + }}} return } @@ -236,7 +238,7 @@ type downtimeRow = struct { func convertDowntimeRows( env string, envId icingadbTypes.Binary, _ func(interface{}, string, ...interface{}), _ *sqlx.Tx, idoRows []downtimeRow, -) (icingaDbInserts, _ [][]contracts.Entity, checkpoint any) { +) (stages []icingaDbOutputStage, checkpoint any) { var downtimeHistory, allHistory, sla []contracts.Entity for _, row := range idoRows { @@ -357,7 +359,7 @@ func convertDowntimeRows( sla = append(sla, s) } - icingaDbInserts = [][]contracts.Entity{downtimeHistory, allHistory, sla} + stages = []icingaDbOutputStage{{inserts: [][]contracts.Entity{downtimeHistory, allHistory, sla}}} return } @@ -377,7 +379,7 @@ type flappingRow = struct { func convertFlappingRows( env string, envId icingadbTypes.Binary, selectCache func(dest interface{}, query string, args ...interface{}), _ *sqlx.Tx, idoRows []flappingRow, -) (icingaDbInserts, icingaDbUpserts [][]contracts.Entity, checkpoint any) { +) (stages []icingaDbOutputStage, checkpoint any) { if len(idoRows) < 1 { return } @@ -506,8 +508,10 @@ func convertFlappingRows( } } - icingaDbInserts = [][]contracts.Entity{flappingHistory, allHistory} - icingaDbUpserts = [][]contracts.Entity{flappingHistoryUpserts} + stages = []icingaDbOutputStage{{ + inserts: [][]contracts.Entity{flappingHistory, allHistory}, + upserts: [][]contracts.Entity{flappingHistoryUpserts}, + }} return } @@ -528,7 +532,7 @@ type notificationRow = struct { func convertNotificationRows( env string, envId icingadbTypes.Binary, selectCache func(dest interface{}, query string, args ...interface{}), ido *sqlx.Tx, idoRows []notificationRow, -) (icingaDbInserts, _ [][]contracts.Entity, checkpoint any) { +) (stages []icingaDbOutputStage, checkpoint any) { if len(idoRows) < 1 { return } @@ -665,7 +669,9 @@ func convertNotificationRows( } } - icingaDbInserts = [][]contracts.Entity{notificationHistory, userNotificationHistory, allHistory} + stages = []icingaDbOutputStage{{inserts: [][]contracts.Entity{ + notificationHistory, userNotificationHistory, allHistory, + }}} return } @@ -721,7 +727,7 @@ type stateRow = struct { func convertStateRows( env string, envId icingadbTypes.Binary, selectCache func(dest interface{}, query string, args ...interface{}), _ *sqlx.Tx, idoRows []stateRow, -) (icingaDbInserts, _ [][]contracts.Entity, checkpoint any) { +) (stages []icingaDbOutputStage, checkpoint any) { if len(idoRows) < 1 { return } @@ -819,6 +825,6 @@ func convertStateRows( } } - icingaDbInserts = [][]contracts.Entity{stateHistory, allHistory, sla} + stages = []icingaDbOutputStage{{inserts: [][]contracts.Entity{stateHistory, allHistory, sla}}} return } diff --git a/cmd/icingadb-migrate/main.go b/cmd/icingadb-migrate/main.go index 9618ec2f8..b567db756 100644 --- a/cmd/icingadb-migrate/main.go +++ b/cmd/icingadb-migrate/main.go @@ -368,7 +368,7 @@ func migrateOneType[IdoRow any]( c *Config, idb *icingadb.DB, envId []byte, ht *historyType, convertRows func(env string, envId icingadbTypes.Binary, selectCache func(dest interface{}, query string, args ...interface{}), ido *sqlx.Tx, - idoRows []IdoRow) (icingaDbInserts, icingaDbUpserts [][]contracts.Entity, checkpoint any), + idoRows []IdoRow) (stages []icingaDbOutputStage, checkpoint any), ) { var lastQuery string var lastStmt *sqlx.Stmt @@ -423,30 +423,35 @@ func migrateOneType[IdoRow any]( ht, ht.migrationQuery, args, ht.lastId, func(idoRows []IdoRow) (checkpoint interface{}) { // ... convert them, ... - inserts, upserts, lastIdoId := convertRows(c.Icinga2.Env, envId, selectCache, ht.snapshot, idoRows) + stages, lastIdoId := convertRows(c.Icinga2.Env, envId, selectCache, ht.snapshot, idoRows) // ... and insert them: - for _, op := range []struct { - kind string - data [][]contracts.Entity - streamer func(context.Context, <-chan contracts.Entity, ...icingadb.OnSuccess[contracts.Entity]) error - }{{"INSERT IGNORE", inserts, idb.CreateIgnoreStreamed}, {"UPSERT", upserts, idb.UpsertStreamed}} { - for _, table := range op.data { - if len(table) < 1 { - continue - } - - ch := make(chan contracts.Entity, len(table)) - for _, row := range table { - ch <- row - } - - close(ch) - - if err := op.streamer(context.Background(), ch); err != nil { - log.With("backend", "Icinga DB", "op", op.kind, "table", utils.TableName(table[0])). - Fatalf("%+v", errors.Wrap(err, "can't perform DML")) + for _, stage := range stages { + for _, op := range []struct { + kind string + data [][]contracts.Entity + streamer func(context.Context, <-chan contracts.Entity, ...icingadb.OnSuccess[contracts.Entity]) error + }{ + {"INSERT IGNORE", stage.inserts, idb.CreateIgnoreStreamed}, + {"UPSERT", stage.upserts, idb.UpsertStreamed}, + } { + for _, table := range op.data { + if len(table) < 1 { + continue + } + + ch := make(chan contracts.Entity, len(table)) + for _, row := range table { + ch <- row + } + + close(ch) + + if err := op.streamer(context.Background(), ch); err != nil { + log.With("backend", "Icinga DB", "op", op.kind, "table", utils.TableName(table[0])). + Fatalf("%+v", errors.Wrap(err, "can't perform DML")) + } } } } diff --git a/cmd/icingadb-migrate/misc.go b/cmd/icingadb-migrate/misc.go index c228f3a8e..c1130a975 100644 --- a/cmd/icingadb-migrate/misc.go +++ b/cmd/icingadb-migrate/misc.go @@ -237,6 +237,10 @@ func (hts historyTypes) forEach(f func(*historyType)) { _ = eg.Wait() } +type icingaDbOutputStage struct { + inserts, upserts [][]contracts.Entity +} + var types = historyTypes{ { name: "ack & comment", From 03b0e848083aa67727ade092d6ec94641899af69 Mon Sep 17 00:00:00 2001 From: "Alexander A. Klimov" Date: Mon, 19 Dec 2022 16:01:04 +0100 Subject: [PATCH 2/3] convertFlappingRows(): fix foreign key error history -> flapping_history Don't INSERT IGNORE everything, then UPSERT everything. Instead INSERT IGNORE flapping_history, UPSERT flapping_history and finally INSERT IGNORE history. --- cmd/icingadb-migrate/convert.go | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/cmd/icingadb-migrate/convert.go b/cmd/icingadb-migrate/convert.go index c061c3407..1d5dd5bad 100644 --- a/cmd/icingadb-migrate/convert.go +++ b/cmd/icingadb-migrate/convert.go @@ -508,10 +508,13 @@ func convertFlappingRows( } } - stages = []icingaDbOutputStage{{ - inserts: [][]contracts.Entity{flappingHistory, allHistory}, - upserts: [][]contracts.Entity{flappingHistoryUpserts}, - }} + stages = []icingaDbOutputStage{ + { + inserts: [][]contracts.Entity{flappingHistory}, + upserts: [][]contracts.Entity{flappingHistoryUpserts}, + }, + {inserts: [][]contracts.Entity{allHistory}}, + } return } From 71c1d2fa4dbb8c3042edf2ac51265f21159a6e3e Mon Sep 17 00:00:00 2001 From: Julian Brost Date: Wed, 26 Jul 2023 15:46:22 +0200 Subject: [PATCH 3/3] Migration: refactor output/processing of converted entities This commit simplifies the `icingaDbOutputStage` type to contain only one entity slice to be insert/upsert. This allows to simplify the handling in `migrateOneType()` by removing nested loops. Additionally, a bit of code inside that function is outsourced into a new `utils.ChanFromSlice()` function. This makes the body of the loop over the insert/upsert operation (the loop using the `op` variable) simple enough so that it can just be unrolled which saves the inline struct and slice definition for that loop. --- cmd/icingadb-migrate/convert.go | 37 ++++++++++++++-------- cmd/icingadb-migrate/main.go | 40 +++++++++--------------- cmd/icingadb-migrate/misc.go | 2 +- pkg/utils/utils.go | 13 ++++++++ pkg/utils/utils_test.go | 54 +++++++++++++++++++++++++++++++++ 5 files changed, 107 insertions(+), 39 deletions(-) create mode 100644 pkg/utils/utils_test.go diff --git a/cmd/icingadb-migrate/convert.go b/cmd/icingadb-migrate/convert.go index 1d5dd5bad..b5ed47a88 100644 --- a/cmd/icingadb-migrate/convert.go +++ b/cmd/icingadb-migrate/convert.go @@ -207,9 +207,12 @@ func convertCommentRows( } } - stages = []icingaDbOutputStage{{inserts: [][]contracts.Entity{ - commentHistory, acknowledgementHistory, allHistoryComment, allHistoryAck, - }}} + stages = []icingaDbOutputStage{ + {insert: commentHistory}, + {insert: acknowledgementHistory}, + {insert: allHistoryComment}, + {insert: allHistoryAck}, + } return } @@ -359,7 +362,11 @@ func convertDowntimeRows( sla = append(sla, s) } - stages = []icingaDbOutputStage{{inserts: [][]contracts.Entity{downtimeHistory, allHistory, sla}}} + stages = []icingaDbOutputStage{ + {insert: downtimeHistory}, + {insert: allHistory}, + {insert: sla}, + } return } @@ -509,11 +516,9 @@ func convertFlappingRows( } stages = []icingaDbOutputStage{ - { - inserts: [][]contracts.Entity{flappingHistory}, - upserts: [][]contracts.Entity{flappingHistoryUpserts}, - }, - {inserts: [][]contracts.Entity{allHistory}}, + {insert: flappingHistory}, + {upsert: flappingHistoryUpserts}, + {insert: allHistory}, } return } @@ -672,9 +677,11 @@ func convertNotificationRows( } } - stages = []icingaDbOutputStage{{inserts: [][]contracts.Entity{ - notificationHistory, userNotificationHistory, allHistory, - }}} + stages = []icingaDbOutputStage{ + {insert: notificationHistory}, + {insert: userNotificationHistory}, + {insert: allHistory}, + } return } @@ -828,6 +835,10 @@ func convertStateRows( } } - stages = []icingaDbOutputStage{{inserts: [][]contracts.Entity{stateHistory, allHistory, sla}}} + stages = []icingaDbOutputStage{ + {insert: stateHistory}, + {insert: allHistory}, + {insert: sla}, + } return } diff --git a/cmd/icingadb-migrate/main.go b/cmd/icingadb-migrate/main.go index b567db756..0bb219106 100644 --- a/cmd/icingadb-migrate/main.go +++ b/cmd/icingadb-migrate/main.go @@ -9,7 +9,6 @@ import ( "github.com/creasty/defaults" "github.com/goccy/go-yaml" "github.com/icinga/icingadb/pkg/config" - "github.com/icinga/icingadb/pkg/contracts" "github.com/icinga/icingadb/pkg/icingadb" "github.com/icinga/icingadb/pkg/logging" icingadbTypes "github.com/icinga/icingadb/pkg/types" @@ -428,30 +427,21 @@ func migrateOneType[IdoRow any]( // ... and insert them: for _, stage := range stages { - for _, op := range []struct { - kind string - data [][]contracts.Entity - streamer func(context.Context, <-chan contracts.Entity, ...icingadb.OnSuccess[contracts.Entity]) error - }{ - {"INSERT IGNORE", stage.inserts, idb.CreateIgnoreStreamed}, - {"UPSERT", stage.upserts, idb.UpsertStreamed}, - } { - for _, table := range op.data { - if len(table) < 1 { - continue - } - - ch := make(chan contracts.Entity, len(table)) - for _, row := range table { - ch <- row - } - - close(ch) - - if err := op.streamer(context.Background(), ch); err != nil { - log.With("backend", "Icinga DB", "op", op.kind, "table", utils.TableName(table[0])). - Fatalf("%+v", errors.Wrap(err, "can't perform DML")) - } + if len(stage.insert) > 0 { + ch := utils.ChanFromSlice(stage.insert) + + if err := idb.CreateIgnoreStreamed(context.Background(), ch); err != nil { + log.With("backend", "Icinga DB", "op", "INSERT IGNORE", "table", utils.TableName(stage.insert[0])). + Fatalf("%+v", errors.Wrap(err, "can't perform DML")) + } + } + + if len(stage.upsert) > 0 { + ch := utils.ChanFromSlice(stage.upsert) + + if err := idb.UpsertStreamed(context.Background(), ch); err != nil { + log.With("backend", "Icinga DB", "op", "UPSERT", "table", utils.TableName(stage.upsert[0])). + Fatalf("%+v", errors.Wrap(err, "can't perform DML")) } } } diff --git a/cmd/icingadb-migrate/misc.go b/cmd/icingadb-migrate/misc.go index c1130a975..f1db20cbe 100644 --- a/cmd/icingadb-migrate/misc.go +++ b/cmd/icingadb-migrate/misc.go @@ -238,7 +238,7 @@ func (hts historyTypes) forEach(f func(*historyType)) { } type icingaDbOutputStage struct { - inserts, upserts [][]contracts.Entity + insert, upsert []contracts.Entity } var types = historyTypes{ diff --git a/pkg/utils/utils.go b/pkg/utils/utils.go index b5a963f9a..8ccf29b33 100644 --- a/pkg/utils/utils.go +++ b/pkg/utils/utils.go @@ -216,3 +216,16 @@ func JoinHostPort(host string, port int) string { return net.JoinHostPort(host, fmt.Sprint(port)) } + +// ChanFromSlice takes a slice of values and returns a channel from which these values can be received. +// This channel is closed after the last value was sent. +func ChanFromSlice[T any](values []T) <-chan T { + ch := make(chan T, len(values)) + for _, value := range values { + ch <- value + } + + close(ch) + + return ch +} diff --git a/pkg/utils/utils_test.go b/pkg/utils/utils_test.go new file mode 100644 index 000000000..b0ea54b8f --- /dev/null +++ b/pkg/utils/utils_test.go @@ -0,0 +1,54 @@ +package utils + +import ( + "github.com/stretchr/testify/require" + "testing" +) + +func TestChanFromSlice(t *testing.T) { + t.Run("Nil", func(t *testing.T) { + ch := ChanFromSlice[int](nil) + require.NotNil(t, ch) + requireClosedEmpty(t, ch) + }) + + t.Run("Empty", func(t *testing.T) { + ch := ChanFromSlice([]int{}) + require.NotNil(t, ch) + requireClosedEmpty(t, ch) + }) + + t.Run("NonEmpty", func(t *testing.T) { + ch := ChanFromSlice([]int{42, 23, 1337}) + require.NotNil(t, ch) + requireReceive(t, ch, 42) + requireReceive(t, ch, 23) + requireReceive(t, ch, 1337) + requireClosedEmpty(t, ch) + }) +} + +// requireReceive is a helper function to check if a value can immediately be received from a channel. +func requireReceive(t *testing.T, ch <-chan int, expected int) { + t.Helper() + + select { + case v, ok := <-ch: + require.True(t, ok, "receiving should return a value") + require.Equal(t, expected, v) + default: + require.Fail(t, "receiving should not block") + } +} + +// requireReceive is a helper function to check if the channel is closed and empty. +func requireClosedEmpty(t *testing.T, ch <-chan int) { + t.Helper() + + select { + case _, ok := <-ch: + require.False(t, ok, "receiving from channel should not return anything") + default: + require.Fail(t, "receiving should not block") + } +}