Skip to content

Commit

Permalink
migrateOneType(): allow multiple stages of INSERT IGNORE, UPSERT
Browse files Browse the repository at this point in the history
Refactor the converters so they can signal the caller to behave not as usual
i.e. not to INSERT IGNORE everything, then UPSERT everything.
  • Loading branch information
Al2Klimov committed Dec 19, 2022
1 parent 4ed4db3 commit a778fc8
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 33 deletions.
28 changes: 17 additions & 11 deletions cmd/icingadb-migrate/convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ type commentRow = struct {
func convertCommentRows(
env string, envId icingadbTypes.Binary,
_ func(interface{}, string, ...interface{}), _ *sqlx.Tx, idoRows []commentRow,
) (icingaDbInserts, _ [][]contracts.Entity, checkpoint any) {
) (stages []icingaDbOutputStage, checkpoint any) {
var commentHistory, acknowledgementHistory, allHistoryComment, allHistoryAck []contracts.Entity

for _, row := range idoRows {
Expand Down Expand Up @@ -207,7 +207,9 @@ func convertCommentRows(
}
}

icingaDbInserts = [][]contracts.Entity{commentHistory, acknowledgementHistory, allHistoryComment, allHistoryAck}
stages = []icingaDbOutputStage{{inserts: [][]contracts.Entity{
commentHistory, acknowledgementHistory, allHistoryComment, allHistoryAck,
}}}
return
}

Expand Down Expand Up @@ -236,7 +238,7 @@ type downtimeRow = struct {
func convertDowntimeRows(
env string, envId icingadbTypes.Binary,
_ func(interface{}, string, ...interface{}), _ *sqlx.Tx, idoRows []downtimeRow,
) (icingaDbInserts, _ [][]contracts.Entity, checkpoint any) {
) (stages []icingaDbOutputStage, checkpoint any) {
var downtimeHistory, allHistory, sla []contracts.Entity

for _, row := range idoRows {
Expand Down Expand Up @@ -357,7 +359,7 @@ func convertDowntimeRows(
sla = append(sla, s)
}

icingaDbInserts = [][]contracts.Entity{downtimeHistory, allHistory, sla}
stages = []icingaDbOutputStage{{inserts: [][]contracts.Entity{downtimeHistory, allHistory, sla}}}
return
}

Expand All @@ -377,7 +379,7 @@ type flappingRow = struct {
func convertFlappingRows(
env string, envId icingadbTypes.Binary,
selectCache func(dest interface{}, query string, args ...interface{}), _ *sqlx.Tx, idoRows []flappingRow,
) (icingaDbInserts, icingaDbUpserts [][]contracts.Entity, checkpoint any) {
) (stages []icingaDbOutputStage, checkpoint any) {
if len(idoRows) < 1 {
return
}
Expand Down Expand Up @@ -506,8 +508,10 @@ func convertFlappingRows(
}
}

icingaDbInserts = [][]contracts.Entity{flappingHistory, allHistory}
icingaDbUpserts = [][]contracts.Entity{flappingHistoryUpserts}
stages = []icingaDbOutputStage{{
inserts: [][]contracts.Entity{flappingHistory, allHistory},
upserts: [][]contracts.Entity{flappingHistoryUpserts},
}}
return
}

Expand All @@ -528,7 +532,7 @@ type notificationRow = struct {
func convertNotificationRows(
env string, envId icingadbTypes.Binary,
selectCache func(dest interface{}, query string, args ...interface{}), ido *sqlx.Tx, idoRows []notificationRow,
) (icingaDbInserts, _ [][]contracts.Entity, checkpoint any) {
) (stages []icingaDbOutputStage, checkpoint any) {
if len(idoRows) < 1 {
return
}
Expand Down Expand Up @@ -665,7 +669,9 @@ func convertNotificationRows(
}
}

icingaDbInserts = [][]contracts.Entity{notificationHistory, userNotificationHistory, allHistory}
stages = []icingaDbOutputStage{{inserts: [][]contracts.Entity{
notificationHistory, userNotificationHistory, allHistory,
}}}
return
}

Expand Down Expand Up @@ -721,7 +727,7 @@ type stateRow = struct {
func convertStateRows(
env string, envId icingadbTypes.Binary,
selectCache func(dest interface{}, query string, args ...interface{}), _ *sqlx.Tx, idoRows []stateRow,
) (icingaDbInserts, _ [][]contracts.Entity, checkpoint any) {
) (stages []icingaDbOutputStage, checkpoint any) {
if len(idoRows) < 1 {
return
}
Expand Down Expand Up @@ -819,6 +825,6 @@ func convertStateRows(
}
}

icingaDbInserts = [][]contracts.Entity{stateHistory, allHistory, sla}
stages = []icingaDbOutputStage{{inserts: [][]contracts.Entity{stateHistory, allHistory, sla}}}
return
}
49 changes: 27 additions & 22 deletions cmd/icingadb-migrate/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -368,7 +368,7 @@ func migrateOneType[IdoRow any](
c *Config, idb *icingadb.DB, envId []byte, ht *historyType,
convertRows func(env string, envId icingadbTypes.Binary,
selectCache func(dest interface{}, query string, args ...interface{}), ido *sqlx.Tx,
idoRows []IdoRow) (icingaDbInserts, icingaDbUpserts [][]contracts.Entity, checkpoint any),
idoRows []IdoRow) (stages []icingaDbOutputStage, checkpoint any),
) {
var lastQuery string
var lastStmt *sqlx.Stmt
Expand Down Expand Up @@ -423,30 +423,35 @@ func migrateOneType[IdoRow any](
ht, ht.migrationQuery, args, ht.lastId,
func(idoRows []IdoRow) (checkpoint interface{}) {
// ... convert them, ...
inserts, upserts, lastIdoId := convertRows(c.Icinga2.Env, envId, selectCache, ht.snapshot, idoRows)
stages, lastIdoId := convertRows(c.Icinga2.Env, envId, selectCache, ht.snapshot, idoRows)

// ... and insert them:

for _, op := range []struct {
kind string
data [][]contracts.Entity
streamer func(context.Context, <-chan contracts.Entity, ...icingadb.OnSuccess[contracts.Entity]) error
}{{"INSERT IGNORE", inserts, idb.CreateIgnoreStreamed}, {"UPSERT", upserts, idb.UpsertStreamed}} {
for _, table := range op.data {
if len(table) < 1 {
continue
}

ch := make(chan contracts.Entity, len(table))
for _, row := range table {
ch <- row
}

close(ch)

if err := op.streamer(context.Background(), ch); err != nil {
log.With("backend", "Icinga DB", "op", op.kind, "table", utils.TableName(table[0])).
Fatalf("%+v", errors.Wrap(err, "can't perform DML"))
for _, stage := range stages {
for _, op := range []struct {
kind string
data [][]contracts.Entity
streamer func(context.Context, <-chan contracts.Entity, ...icingadb.OnSuccess[contracts.Entity]) error
}{
{"INSERT IGNORE", stage.inserts, idb.CreateIgnoreStreamed},
{"UPSERT", stage.upserts, idb.UpsertStreamed},
} {
for _, table := range op.data {
if len(table) < 1 {
continue
}

ch := make(chan contracts.Entity, len(table))
for _, row := range table {
ch <- row
}

close(ch)

if err := op.streamer(context.Background(), ch); err != nil {
log.With("backend", "Icinga DB", "op", op.kind, "table", utils.TableName(table[0])).
Fatalf("%+v", errors.Wrap(err, "can't perform DML"))
}
}
}
}
Expand Down
4 changes: 4 additions & 0 deletions cmd/icingadb-migrate/misc.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,10 @@ func (hts historyTypes) forEach(f func(*historyType)) {
_ = eg.Wait()
}

type icingaDbOutputStage struct {
inserts, upserts [][]contracts.Entity
}

var types = historyTypes{
{
name: "ack & comment",
Expand Down

0 comments on commit a778fc8

Please sign in to comment.