Skip to content

Commit

Permalink
Merge pull request #554 from Icinga/553
Browse files Browse the repository at this point in the history
convertFlappingRows(): fix foreign key error history -> flapping_history
  • Loading branch information
julianbrost authored Jul 31, 2023
2 parents 62f7ae9 + 71c1d2f commit 336ee4a
Show file tree
Hide file tree
Showing 5 changed files with 115 additions and 29 deletions.
42 changes: 31 additions & 11 deletions cmd/icingadb-migrate/convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ type commentRow = struct {
func convertCommentRows(
env string, envId icingadbTypes.Binary,
_ func(interface{}, string, ...interface{}), _ *sqlx.Tx, idoRows []commentRow,
) (icingaDbInserts, _ [][]contracts.Entity, checkpoint any) {
) (stages []icingaDbOutputStage, checkpoint any) {
var commentHistory, acknowledgementHistory, allHistoryComment, allHistoryAck []contracts.Entity

for _, row := range idoRows {
Expand Down Expand Up @@ -211,7 +211,12 @@ func convertCommentRows(
}
}

icingaDbInserts = [][]contracts.Entity{commentHistory, acknowledgementHistory, allHistoryComment, allHistoryAck}
stages = []icingaDbOutputStage{
{insert: commentHistory},
{insert: acknowledgementHistory},
{insert: allHistoryComment},
{insert: allHistoryAck},
}
return
}

Expand Down Expand Up @@ -241,7 +246,7 @@ type downtimeRow = struct {
func convertDowntimeRows(
env string, envId icingadbTypes.Binary,
_ func(interface{}, string, ...interface{}), _ *sqlx.Tx, idoRows []downtimeRow,
) (icingaDbInserts, _ [][]contracts.Entity, checkpoint any) {
) (stages []icingaDbOutputStage, checkpoint any) {
var downtimeHistory, allHistory, sla []contracts.Entity

for _, row := range idoRows {
Expand Down Expand Up @@ -366,7 +371,11 @@ func convertDowntimeRows(
sla = append(sla, s)
}

icingaDbInserts = [][]contracts.Entity{downtimeHistory, allHistory, sla}
stages = []icingaDbOutputStage{
{insert: downtimeHistory},
{insert: allHistory},
{insert: sla},
}
return
}

Expand All @@ -386,7 +395,7 @@ type flappingRow = struct {
func convertFlappingRows(
env string, envId icingadbTypes.Binary,
selectCache func(dest interface{}, query string, args ...interface{}), _ *sqlx.Tx, idoRows []flappingRow,
) (icingaDbInserts, icingaDbUpserts [][]contracts.Entity, checkpoint any) {
) (stages []icingaDbOutputStage, checkpoint any) {
if len(idoRows) < 1 {
return
}
Expand Down Expand Up @@ -519,8 +528,11 @@ func convertFlappingRows(
}
}

icingaDbInserts = [][]contracts.Entity{flappingHistory, allHistory}
icingaDbUpserts = [][]contracts.Entity{flappingHistoryUpserts}
stages = []icingaDbOutputStage{
{insert: flappingHistory},
{upsert: flappingHistoryUpserts},
{insert: allHistory},
}
return
}

Expand All @@ -541,7 +553,7 @@ type notificationRow = struct {
func convertNotificationRows(
env string, envId icingadbTypes.Binary,
selectCache func(dest interface{}, query string, args ...interface{}), ido *sqlx.Tx, idoRows []notificationRow,
) (icingaDbInserts, _ [][]contracts.Entity, checkpoint any) {
) (stages []icingaDbOutputStage, checkpoint any) {
if len(idoRows) < 1 {
return
}
Expand Down Expand Up @@ -682,7 +694,11 @@ func convertNotificationRows(
}
}

icingaDbInserts = [][]contracts.Entity{notificationHistory, userNotificationHistory, allHistory}
stages = []icingaDbOutputStage{
{insert: notificationHistory},
{insert: userNotificationHistory},
{insert: allHistory},
}
return
}

Expand Down Expand Up @@ -738,7 +754,7 @@ type stateRow = struct {
func convertStateRows(
env string, envId icingadbTypes.Binary,
selectCache func(dest interface{}, query string, args ...interface{}), _ *sqlx.Tx, idoRows []stateRow,
) (icingaDbInserts, _ [][]contracts.Entity, checkpoint any) {
) (stages []icingaDbOutputStage, checkpoint any) {
if len(idoRows) < 1 {
return
}
Expand Down Expand Up @@ -840,6 +856,10 @@ func convertStateRows(
}
}

icingaDbInserts = [][]contracts.Entity{stateHistory, allHistory, sla}
stages = []icingaDbOutputStage{
{insert: stateHistory},
{insert: allHistory},
{insert: sla},
}
return
}
31 changes: 13 additions & 18 deletions cmd/icingadb-migrate/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"github.com/creasty/defaults"
"github.com/goccy/go-yaml"
"github.com/icinga/icingadb/pkg/config"
"github.com/icinga/icingadb/pkg/contracts"
"github.com/icinga/icingadb/pkg/icingadb"
"github.com/icinga/icingadb/pkg/logging"
icingadbTypes "github.com/icinga/icingadb/pkg/types"
Expand Down Expand Up @@ -368,7 +367,7 @@ func migrateOneType[IdoRow any](
c *Config, idb *icingadb.DB, envId []byte, ht *historyType,
convertRows func(env string, envId icingadbTypes.Binary,
selectCache func(dest interface{}, query string, args ...interface{}), ido *sqlx.Tx,
idoRows []IdoRow) (icingaDbInserts, icingaDbUpserts [][]contracts.Entity, checkpoint any),
idoRows []IdoRow) (stages []icingaDbOutputStage, checkpoint any),
) {
var lastQuery string
var lastStmt *sqlx.Stmt
Expand Down Expand Up @@ -423,29 +422,25 @@ func migrateOneType[IdoRow any](
ht, ht.migrationQuery, args, ht.lastId,
func(idoRows []IdoRow) (checkpoint interface{}) {
// ... convert them, ...
inserts, upserts, lastIdoId := convertRows(c.Icinga2.Env, envId, selectCache, ht.snapshot, idoRows)
stages, lastIdoId := convertRows(c.Icinga2.Env, envId, selectCache, ht.snapshot, idoRows)

// ... and insert them:

for _, op := range []struct {
kind string
data [][]contracts.Entity
streamer func(context.Context, <-chan contracts.Entity, ...icingadb.OnSuccess[contracts.Entity]) error
}{{"INSERT IGNORE", inserts, idb.CreateIgnoreStreamed}, {"UPSERT", upserts, idb.UpsertStreamed}} {
for _, table := range op.data {
if len(table) < 1 {
continue
}
for _, stage := range stages {
if len(stage.insert) > 0 {
ch := utils.ChanFromSlice(stage.insert)

ch := make(chan contracts.Entity, len(table))
for _, row := range table {
ch <- row
if err := idb.CreateIgnoreStreamed(context.Background(), ch); err != nil {
log.With("backend", "Icinga DB", "op", "INSERT IGNORE", "table", utils.TableName(stage.insert[0])).
Fatalf("%+v", errors.Wrap(err, "can't perform DML"))
}
}

close(ch)
if len(stage.upsert) > 0 {
ch := utils.ChanFromSlice(stage.upsert)

if err := op.streamer(context.Background(), ch); err != nil {
log.With("backend", "Icinga DB", "op", op.kind, "table", utils.TableName(table[0])).
if err := idb.UpsertStreamed(context.Background(), ch); err != nil {
log.With("backend", "Icinga DB", "op", "UPSERT", "table", utils.TableName(stage.upsert[0])).
Fatalf("%+v", errors.Wrap(err, "can't perform DML"))
}
}
Expand Down
4 changes: 4 additions & 0 deletions cmd/icingadb-migrate/misc.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,10 @@ func (hts historyTypes) forEach(f func(*historyType)) {
_ = eg.Wait()
}

type icingaDbOutputStage struct {
insert, upsert []contracts.Entity
}

var types = historyTypes{
{
name: "ack & comment",
Expand Down
13 changes: 13 additions & 0 deletions pkg/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,3 +216,16 @@ func JoinHostPort(host string, port int) string {

return net.JoinHostPort(host, fmt.Sprint(port))
}

// ChanFromSlice takes a slice of values and returns a channel from which these values can be received.
// This channel is closed after the last value was sent.
func ChanFromSlice[T any](values []T) <-chan T {
ch := make(chan T, len(values))
for _, value := range values {
ch <- value
}

close(ch)

return ch
}
54 changes: 54 additions & 0 deletions pkg/utils/utils_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package utils

import (
"github.com/stretchr/testify/require"
"testing"
)

func TestChanFromSlice(t *testing.T) {
t.Run("Nil", func(t *testing.T) {
ch := ChanFromSlice[int](nil)
require.NotNil(t, ch)
requireClosedEmpty(t, ch)
})

t.Run("Empty", func(t *testing.T) {
ch := ChanFromSlice([]int{})
require.NotNil(t, ch)
requireClosedEmpty(t, ch)
})

t.Run("NonEmpty", func(t *testing.T) {
ch := ChanFromSlice([]int{42, 23, 1337})
require.NotNil(t, ch)
requireReceive(t, ch, 42)
requireReceive(t, ch, 23)
requireReceive(t, ch, 1337)
requireClosedEmpty(t, ch)
})
}

// requireReceive is a helper function to check if a value can immediately be received from a channel.
func requireReceive(t *testing.T, ch <-chan int, expected int) {
t.Helper()

select {
case v, ok := <-ch:
require.True(t, ok, "receiving should return a value")
require.Equal(t, expected, v)
default:
require.Fail(t, "receiving should not block")
}
}

// requireReceive is a helper function to check if the channel is closed and empty.
func requireClosedEmpty(t *testing.T, ch <-chan int) {
t.Helper()

select {
case _, ok := <-ch:
require.False(t, ok, "receiving from channel should not return anything")
default:
require.Fail(t, "receiving should not block")
}
}

0 comments on commit 336ee4a

Please sign in to comment.