Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

convertFlappingRows(): fix foreign key error history -> flapping_history #554

Merged
merged 3 commits into from
Jul 31, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 20 additions & 11 deletions cmd/icingadb-migrate/convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ type commentRow = struct {
func convertCommentRows(
env string, envId icingadbTypes.Binary,
_ func(interface{}, string, ...interface{}), _ *sqlx.Tx, idoRows []commentRow,
) (icingaDbInserts, _ [][]contracts.Entity, checkpoint any) {
) (stages []icingaDbOutputStage, checkpoint any) {
var commentHistory, acknowledgementHistory, allHistoryComment, allHistoryAck []contracts.Entity

for _, row := range idoRows {
Expand Down Expand Up @@ -207,7 +207,9 @@ func convertCommentRows(
}
}

icingaDbInserts = [][]contracts.Entity{commentHistory, acknowledgementHistory, allHistoryComment, allHistoryAck}
stages = []icingaDbOutputStage{{inserts: [][]contracts.Entity{
commentHistory, acknowledgementHistory, allHistoryComment, allHistoryAck,
}}}
return
}

Expand Down Expand Up @@ -236,7 +238,7 @@ type downtimeRow = struct {
func convertDowntimeRows(
env string, envId icingadbTypes.Binary,
_ func(interface{}, string, ...interface{}), _ *sqlx.Tx, idoRows []downtimeRow,
) (icingaDbInserts, _ [][]contracts.Entity, checkpoint any) {
) (stages []icingaDbOutputStage, checkpoint any) {
var downtimeHistory, allHistory, sla []contracts.Entity

for _, row := range idoRows {
Expand Down Expand Up @@ -357,7 +359,7 @@ func convertDowntimeRows(
sla = append(sla, s)
}

icingaDbInserts = [][]contracts.Entity{downtimeHistory, allHistory, sla}
stages = []icingaDbOutputStage{{inserts: [][]contracts.Entity{downtimeHistory, allHistory, sla}}}
return
}

Expand All @@ -377,7 +379,7 @@ type flappingRow = struct {
func convertFlappingRows(
env string, envId icingadbTypes.Binary,
selectCache func(dest interface{}, query string, args ...interface{}), _ *sqlx.Tx, idoRows []flappingRow,
) (icingaDbInserts, icingaDbUpserts [][]contracts.Entity, checkpoint any) {
) (stages []icingaDbOutputStage, checkpoint any) {
if len(idoRows) < 1 {
return
}
Expand Down Expand Up @@ -506,8 +508,13 @@ func convertFlappingRows(
}
}

icingaDbInserts = [][]contracts.Entity{flappingHistory, allHistory}
icingaDbUpserts = [][]contracts.Entity{flappingHistoryUpserts}
stages = []icingaDbOutputStage{
{
inserts: [][]contracts.Entity{flappingHistory},
upserts: [][]contracts.Entity{flappingHistoryUpserts},
},
{inserts: [][]contracts.Entity{allHistory}},
}
return
}

Expand All @@ -528,7 +535,7 @@ type notificationRow = struct {
func convertNotificationRows(
env string, envId icingadbTypes.Binary,
selectCache func(dest interface{}, query string, args ...interface{}), ido *sqlx.Tx, idoRows []notificationRow,
) (icingaDbInserts, _ [][]contracts.Entity, checkpoint any) {
) (stages []icingaDbOutputStage, checkpoint any) {
if len(idoRows) < 1 {
return
}
Expand Down Expand Up @@ -665,7 +672,9 @@ func convertNotificationRows(
}
}

icingaDbInserts = [][]contracts.Entity{notificationHistory, userNotificationHistory, allHistory}
stages = []icingaDbOutputStage{{inserts: [][]contracts.Entity{
notificationHistory, userNotificationHistory, allHistory,
}}}
return
}

Expand Down Expand Up @@ -721,7 +730,7 @@ type stateRow = struct {
func convertStateRows(
env string, envId icingadbTypes.Binary,
selectCache func(dest interface{}, query string, args ...interface{}), _ *sqlx.Tx, idoRows []stateRow,
) (icingaDbInserts, _ [][]contracts.Entity, checkpoint any) {
) (stages []icingaDbOutputStage, checkpoint any) {
if len(idoRows) < 1 {
return
}
Expand Down Expand Up @@ -819,6 +828,6 @@ func convertStateRows(
}
}

icingaDbInserts = [][]contracts.Entity{stateHistory, allHistory, sla}
stages = []icingaDbOutputStage{{inserts: [][]contracts.Entity{stateHistory, allHistory, sla}}}
return
}
49 changes: 27 additions & 22 deletions cmd/icingadb-migrate/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -368,7 +368,7 @@ func migrateOneType[IdoRow any](
c *Config, idb *icingadb.DB, envId []byte, ht *historyType,
convertRows func(env string, envId icingadbTypes.Binary,
selectCache func(dest interface{}, query string, args ...interface{}), ido *sqlx.Tx,
idoRows []IdoRow) (icingaDbInserts, icingaDbUpserts [][]contracts.Entity, checkpoint any),
idoRows []IdoRow) (stages []icingaDbOutputStage, checkpoint any),
) {
var lastQuery string
var lastStmt *sqlx.Stmt
Expand Down Expand Up @@ -423,30 +423,35 @@ func migrateOneType[IdoRow any](
ht, ht.migrationQuery, args, ht.lastId,
func(idoRows []IdoRow) (checkpoint interface{}) {
// ... convert them, ...
inserts, upserts, lastIdoId := convertRows(c.Icinga2.Env, envId, selectCache, ht.snapshot, idoRows)
stages, lastIdoId := convertRows(c.Icinga2.Env, envId, selectCache, ht.snapshot, idoRows)

// ... and insert them:

for _, op := range []struct {
kind string
data [][]contracts.Entity
streamer func(context.Context, <-chan contracts.Entity, ...icingadb.OnSuccess[contracts.Entity]) error
}{{"INSERT IGNORE", inserts, idb.CreateIgnoreStreamed}, {"UPSERT", upserts, idb.UpsertStreamed}} {
for _, table := range op.data {
if len(table) < 1 {
continue
}

ch := make(chan contracts.Entity, len(table))
for _, row := range table {
ch <- row
}

close(ch)

if err := op.streamer(context.Background(), ch); err != nil {
log.With("backend", "Icinga DB", "op", op.kind, "table", utils.TableName(table[0])).
Fatalf("%+v", errors.Wrap(err, "can't perform DML"))
for _, stage := range stages {
for _, op := range []struct {
kind string
data [][]contracts.Entity
streamer func(context.Context, <-chan contracts.Entity, ...icingadb.OnSuccess[contracts.Entity]) error
}{
{"INSERT IGNORE", stage.inserts, idb.CreateIgnoreStreamed},
{"UPSERT", stage.upserts, idb.UpsertStreamed},
} {
for _, table := range op.data {
if len(table) < 1 {
continue
}

ch := make(chan contracts.Entity, len(table))
for _, row := range table {
ch <- row
}

close(ch)

if err := op.streamer(context.Background(), ch); err != nil {
log.With("backend", "Icinga DB", "op", op.kind, "table", utils.TableName(table[0])).
Fatalf("%+v", errors.Wrap(err, "can't perform DML"))
}
}
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This loop triggered my "I think I can make it look nicer" instinct, I played around in my editor a bit and ended up in a state where I could basically type git commit, so please have a look at 5bdaaa3 and let me know what you think.

(It should basically do the same as before, but I'm still waiting for the filling cache stage with the current code, so consider that commit untested for now.)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not bad on first sight, feel free to make a PR into this one (to be rebase-and-merged to avoid additional merge commits).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

... or into master w/ this one included. I'd prefer this if your complaint doesn't actually block anything here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My preferred way would be that if you approve of that change, you just manually push my commit to this PR (so more similar to the typical "include proposed changes from a review" workflow).

Not bad on first sight, feel free to make a PR into this one (to be rebase-and-merged to avoid additional merge commits).

That would leave a weird merged PR that wasn't actually merged into any "productive" branch, i.e. show up in "merged PRs without milestone" searches and so on.

... or into master w/ this one included. I'd prefer this if your complaint doesn't actually block anything here.

That would mean some merge/approval with "but actually I didn't look at this code in detail because (hopefully) it will be replaced anyways".

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My preferred way would be that if you approve of that change, you just manually push my commit to this PR (so more similar to the typical "include proposed changes from a review" workflow).

Yeees... but... this isn't just a typo, you know?

That would leave a weird merged PR that wasn't actually merged into any "productive" branch, i.e. show up in "merged PRs without milestone" searches and so on.

No, I think it wouldn't, e.g.

is on the 2.14 milestone, but not in the changelog. I think such and buildfix PRs can also be in the changelog in the future, possibly starting with

which is admittedly a bad example as it touches an out-of-changelog file, but you get the point.

That would mean some merge/approval with "but actually I didn't look at this code in detail because (hopefully) it will be replaced anyways".

Huh? You didn't look at this code in detail, and/but replaced it?

Anyway.

If you really don't want an additional PR, feel free to push on top of my branch, I have just a few non-critical things about the unit tests.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeees... but... this isn't just a typo, you know?

Reviews often include things way more complex than typos.

That would leave a weird merged PR that wasn't actually merged into any "productive" branch, i.e. show up in "merged PRs without milestone" searches and so on.

No, I think it wouldn't, e.g.

is on the 2.14 milestone, but not in the changelog.

Well, I already didn't like that PR at that time but you merged it faster than I could say please don't. The main reason I don't like this is that typically, if you see that a PR is merged, you expect that the change was included in the software. But that's only the case if you merge it into master/support/*. If it's merged into another branch of some PR, and that PR is discarded, the other PR would still show as merged even though the change never made it into the software.

Huh? You didn't look at this code in detail, and/but replaced it?

I replaced it with simpler code that does what I'd expect to happen there and what I hope your code does. If my code does something different than your code, please tell me.

Anyway.

If you really don't want an additional PR, feel free to push on top of my branch, I have just a few non-critical things about the unit tests.

Is that anything complex that would need the review tools from GH PRs? I mean I can create a PR to discuss it, I'd just want to avoid having it show up as merged in the end.

Expand Down
4 changes: 4 additions & 0 deletions cmd/icingadb-migrate/misc.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,10 @@ func (hts historyTypes) forEach(f func(*historyType)) {
_ = eg.Wait()
}

type icingaDbOutputStage struct {
inserts, upserts [][]contracts.Entity
}

var types = historyTypes{
{
name: "ack & comment",
Expand Down