Skip to content

Commit

Permalink
chore: migration reflection groups to pg (#9514)
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Krick <matt.krick@gmail.com>
  • Loading branch information
mattkrick authored Apr 11, 2024
1 parent c2a3a43 commit ddb4244
Show file tree
Hide file tree
Showing 6 changed files with 145 additions and 24 deletions.
47 changes: 31 additions & 16 deletions packages/server/graphql/private/mutations/checkRethinkPgEquality.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
import fs from 'fs'
import path from 'path'
import getRethink from '../../../database/rethinkDriver'
import getMeetingTemplatesByIds from '../../../postgres/queries/getMeetingTemplatesByIds'
import getKysely from '../../../postgres/getKysely'
import {checkRowCount, checkTableEq} from '../../../postgres/utils/checkEqBase'
import {
compareDateAlmostEqual,
compareRValUndefinedAsFalse,
compareRValUndefinedAsNull,
compareRValUndefinedAsNullAndTruncateRVal,
defaultEqFn
} from '../../../postgres/utils/rethinkEqualityFns'
import {MutationResolvers} from '../resolverTypes'
Expand Down Expand Up @@ -35,22 +34,38 @@ const checkRethinkPgEquality: MutationResolvers['checkRethinkPgEquality'] = asyn
) => {
const r = await getRethink()

if (tableName === 'MeetingTemplate') {
if (tableName === 'RetroReflectionGroup') {
const rowCountResult = await checkRowCount(tableName)
const rethinkQuery = r.table('MeetingTemplate').orderBy('updatedAt', {index: 'updatedAt'})
const errors = await checkTableEq(rethinkQuery, getMeetingTemplatesByIds, {
const rethinkQuery = (updatedAt: Date, id: string | number) => {
return r
.table('RetroReflectionGroup')
.between([updatedAt, id], [r.maxval, r.maxval], {
index: 'updatedAtId',
leftBound: 'open',
rightBound: 'closed'
})
.orderBy({index: 'updatedAtId'}) as any
}
const pgQuery = (ids: string[]) => {
return getKysely()
.selectFrom('RetroReflectionGroup')
.selectAll()
.where('id', 'in', ids)
.execute()
}
const errors = await checkTableEq(rethinkQuery, pgQuery, {
id: defaultEqFn,
createdAt: defaultEqFn,
isActive: defaultEqFn,
name: defaultEqFn,
teamId: defaultEqFn,
updatedAt: compareDateAlmostEqual,
scope: defaultEqFn,
orgId: defaultEqFn,
parentTemplateId: compareRValUndefinedAsNull,
lastUsedAt: compareRValUndefinedAsNull,
type: defaultEqFn,
isStarter: compareRValUndefinedAsFalse,
isFree: compareRValUndefinedAsFalse
isActive: defaultEqFn,
meetingId: defaultEqFn,
promptId: defaultEqFn,
sortOrder: defaultEqFn,
voterIds: defaultEqFn,
smartTitle: compareRValUndefinedAsNullAndTruncateRVal(255),
title: compareRValUndefinedAsNullAndTruncateRVal(255),
summary: compareRValUndefinedAsNullAndTruncateRVal(2000),
discussionPromptQuestion: compareRValUndefinedAsNullAndTruncateRVal(2000)
})
return handleResult(tableName, rowCountResult, errors, writeToFile)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,13 @@ import getPgConfig from '../getPgConfig'
export async function up() {
const client = new Client(getPgConfig())
await client.connect()
await client.query(`
try {
await client.query(`
ALTER TABLE "EmbeddingsMetadata" RENAME COLUMN "embedText" TO "fullText";
`)
} catch {
// noop
}
await client.end()
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
import {Kysely, PostgresDialect} from 'kysely'
import {r} from 'rethinkdb-ts'
import connectRethinkDB from '../../database/connectRethinkDB'
import getPg from '../getPg'

export async function up() {
await connectRethinkDB()
const pg = new Kysely<any>({
dialect: new PostgresDialect({
pool: getPg()
})
})
try {
await r
.table('RetroReflectionGroup')
.indexCreate('updatedAtId', (row: any) => [row('updatedAt'), row('id')])
.run()
await r.table('RetroReflectionGroup').indexWait().run()
} catch {
// index already exists
}

const MAX_PG_PARAMS = 65545
const PG_COLS = [
'id',
'createdAt',
'updatedAt',
'isActive',
'meetingId',
'promptId',
'sortOrder',
'voterIds',
'smartTitle',
'title',
'summary',
'discussionPromptQuestion'
] as const
type RetroReflectionGroup = {
[K in (typeof PG_COLS)[number]]: any
}
const BATCH_SIZE = Math.trunc(MAX_PG_PARAMS / PG_COLS.length)

let curUpdatedAt = r.minval
let curId = r.minval
for (let i = 0; i < 1e6; i++) {
const rawRowsToInsert = (await r
.table('RetroReflectionGroup')
.between([curUpdatedAt, curId], [r.maxval, r.maxval], {
index: 'updatedAtId',
leftBound: 'open',
rightBound: 'closed'
})
.orderBy({index: 'updatedAtId'})
.limit(BATCH_SIZE)
.pluck(...PG_COLS)
.run()) as RetroReflectionGroup[]

const rowsToInsert = rawRowsToInsert.map((row) => ({
...row,
title: row.title?.slice(0, 255),
smartTitle: row.smartTitle?.slice(0, 255),
summary: row.summary?.slice(0, 2000)
}))
if (rowsToInsert.length === 0) break
const lastRow = rowsToInsert[rowsToInsert.length - 1]
curUpdatedAt = lastRow.updatedAt
curId = lastRow.id
try {
await pg
.insertInto('RetroReflectionGroup')
.values(rowsToInsert)
.onConflict((oc) => oc.doNothing())
.execute()
} catch (e) {
console.log({lastRow}, rowsToInsert.length)
throw e
}
}
}

export async function down() {
await connectRethinkDB()
try {
await r.table('RetroReflectionGroup').indexDrop('updatedAtId').run()
} catch {
// index already dropped
}
}
18 changes: 12 additions & 6 deletions packages/server/postgres/utils/checkEqBase.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import {RSelection} from 'rethinkdb-ts'
import getRethink from '../../database/rethinkDriver'
import {RTable, TableSchema} from '../../database/stricterR'
import getPg from '../getPg'

interface DBDoc {
Expand Down Expand Up @@ -33,19 +33,25 @@ export const checkRowCount = async (tableName: string) => {
}

export async function checkTableEq(
rethinkQuery: RTable<TableSchema>,
rethinkQuery: (updatedAt: Date, id: string | number) => RSelection,
pgQuery: (ids: string[]) => Promise<PGDoc[] | null>,
equalityMap: Record<string, (a: unknown, b: unknown) => boolean>,
maxErrors = 10
) {
const batchSize = 3000
const errors = [] as Diff[]
const propsToCheck = Object.keys(equalityMap)

const r = await getRethink()
let curUpdatedDate = r.minval
let curId = r.minval
for (let i = 0; i < 1e6; i++) {
const offset = batchSize * i
const rethinkRows = (await rethinkQuery.skip(offset).limit(batchSize).run()) as RethinkDoc[]
if (!rethinkRows.length) break
const rethinkRows = (await rethinkQuery(curUpdatedDate, curId)
.limit(batchSize)
.run()) as RethinkDoc[]
if (rethinkRows.length === 0) break
const lastRow = rethinkRows[rethinkRows.length - 1]!
curUpdatedDate = lastRow.updatedAt
curId = lastRow.id
const ids = rethinkRows.map((t) => t.id)
const pgRows = (await pgQuery(ids)) ?? []
const pgRowsById = {} as {[key: string]: PGDoc}
Expand Down
8 changes: 8 additions & 0 deletions packages/server/postgres/utils/rethinkEqualityFns.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import isValidDate from 'parabol-client/utils/isValidDate'

export const defaultEqFn = (a: unknown, b: unknown) => {
if (a instanceof Date && b instanceof Date) return a.getTime() === b.getTime()
if (Array.isArray(a) && Array.isArray(b)) return JSON.stringify(a) === JSON.stringify(b)
return a === b
}
export const compareDateAlmostEqual = (rVal: unknown, pgVal: unknown) => {
Expand All @@ -19,3 +20,10 @@ export const compareRValUndefinedAsFalse = (rVal: unknown, pgVal: unknown) => {
const normalizedRVal = rVal === undefined ? false : rVal
return normalizedRVal === pgVal
}

export const compareRValUndefinedAsNullAndTruncateRVal =
(length: number) => (rVal: unknown, pgVal: unknown) => {
const truncatedRVal = typeof rVal === 'string' ? rVal.slice(0, length) : rVal
const normalizedRVal = truncatedRVal === undefined ? null : truncatedRVal
return defaultEqFn(normalizedRVal, pgVal)
}
2 changes: 1 addition & 1 deletion packages/server/utils/PubSubPromise.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import numToBase64 from './numToBase64'
import sendToSentry from './sendToSentry'

const STANDARD_TIMEOUT = ms('10s')
const ADHOC_TIMEOUT = ms('1m')
const ADHOC_TIMEOUT = ms('10m')

interface Job {
resolve: (payload: any) => void
Expand Down

0 comments on commit ddb4244

Please sign in to comment.