Skip to content

Commit

Permalink
Fix workspace creation with index enabled (#7013)
Browse files Browse the repository at this point in the history
Signed-off-by: Andrey Sobolev <haiodo@gmail.com>
  • Loading branch information
haiodo committed Oct 22, 2024
1 parent 418d810 commit 32d148f
Show file tree
Hide file tree
Showing 12 changed files with 104 additions and 127 deletions.
6 changes: 3 additions & 3 deletions .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -118,9 +118,9 @@
"args": ["src/__start.ts"],
"env": {
"MONGO_URL": "mongodb://localhost:27017",
// "DB_URL": "mongodb://localhost:27017",
"REGION": "pg",
"DB_URL": "postgresql://postgres:example@localhost:5432",
"DB_URL": "mongodb://localhost:27017",
"REGION": "",
// "DB_URL": "postgresql://postgres:example@localhost:5432",
"SERVER_SECRET": "secret",
"TRANSACTOR_URL": "ws://localhost:3333",
"ACCOUNTS_URL": "http://localhost:3000",
Expand Down
7 changes: 3 additions & 4 deletions dev/docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -116,10 +116,9 @@ services:
- MODEL_ENABLED=*
- ACCOUNTS_URL=http://host.docker.internal:3000
- BRANDING_PATH=/var/cfg/branding.json
- NOTIFY_INBOX_ONLY=true
# - PARALLEL=2
# - INIT_SCRIPT_URL=https://raw.githubusercontent.com/hcengineering/init/main/script.yaml
# - INIT_WORKSPACE=onboarding
- INIT_SCRIPT_URL=https://raw.githubusercontent.com/hcengineering/init/main/script.yaml
- INIT_WORKSPACE=test
restart: unless-stopped
workspacepg:
image: hardcoreeng/workspace
Expand All @@ -144,7 +143,7 @@ services:
- ACCOUNTS_URL=http://host.docker.internal:3000
- BRANDING_PATH=/var/cfg/branding.json
# - PARALLEL=2
# - INIT_SCRIPT_URL=https://raw.githubusercontent.com/hcengineering/init/main/script.yaml
- INIT_SCRIPT_URL=https://raw.githubusercontent.com/hcengineering/init/main/script.yaml
# - INIT_WORKSPACE=onboarding
restart: unless-stopped
collaborator:
Expand Down
7 changes: 0 additions & 7 deletions dev/tool/src/markup.ts
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,6 @@ async function processFixJsonMarkupFor (
db: Db,
storageAdapter: StorageAdapter
): Promise<void> {
console.log('processing', domain, _class)

const collection = db.collection<Doc>(domain)
const docs = await collection.find({ _class }).toArray()
for (const doc of docs) {
Expand Down Expand Up @@ -119,8 +117,6 @@ async function processFixJsonMarkupFor (
}
}
}

console.log('...processed', docs.length)
}

export async function migrateMarkup (
Expand Down Expand Up @@ -151,12 +147,9 @@ export async function migrateMarkup (
const collection = workspaceDb.collection(domain)

const filter = hierarchy.isMixin(_class) ? { [_class]: { $exists: true } } : { _class }

const count = await collection.countDocuments(filter)
const iterator = collection.find<Doc>(filter)

try {
console.log('processing', _class, '->', count)
await processMigrateMarkupFor(ctx, hierarchy, storageAdapter, workspaceId, attributes, iterator, concurrency)
} finally {
await iterator.close()
Expand Down
4 changes: 0 additions & 4 deletions models/activity/src/migration.ts
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@ async function processMigrateMarkupFor (
client: MigrationClient,
iterator: MigrationIterator<DocUpdateMessage>
): Promise<void> {
let processed = 0
while (true) {
const docs = await iterator.next(1000)
if (docs === null || docs.length === 0) {
Expand Down Expand Up @@ -104,9 +103,6 @@ async function processMigrateMarkupFor (
if (ops.length > 0) {
await client.bulk(DOMAIN_ACTIVITY, ops)
}

processed += docs.length
console.log('...processed', processed)
}
}

Expand Down
8 changes: 0 additions & 8 deletions models/text-editor/src/migration.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ async function processMigrateMarkupFor (
client: MigrationClient,
iterator: MigrationIterator<Doc>
): Promise<void> {
let processed = 0
while (true) {
const docs = await iterator.next(1000)
if (docs === null || docs.length === 0) {
Expand Down Expand Up @@ -88,9 +87,6 @@ async function processMigrateMarkupFor (
if (operations.length > 0) {
await client.bulk(domain, operations)
}

processed += docs.length
console.log('...processed', processed)
}
}

Expand Down Expand Up @@ -122,7 +118,6 @@ async function processFixMigrateMarkupFor (
client: MigrationClient,
iterator: MigrationIterator<Doc>
): Promise<void> {
let processed = 0
while (true) {
const docs = await iterator.next(1000)
if (docs === null || docs.length === 0) {
Expand Down Expand Up @@ -164,9 +159,6 @@ async function processFixMigrateMarkupFor (
if (operations.length > 0) {
await client.bulk(domain, operations)
}

processed += docs.length
console.log('...processed', processed)
}
}

Expand Down
3 changes: 3 additions & 0 deletions server/indexer/src/fulltext.ts
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,11 @@ export class FullTextIndex implements WithFind {
}

async close (): Promise<void> {
this.indexer.triggerIndexing()
if (!this.upgrade) {
await this.indexer.cancel()
} else {
await this.indexer.processUpload(this.indexer.metrics)
}
}

Expand Down
4 changes: 3 additions & 1 deletion server/server/src/sessionManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,9 @@ class TSessionManager implements SessionManager {
version: this.modelVersion,
workspaceVersion: versionToString(workspaceInfo.version),
workspace: workspaceInfo.workspaceId,
workspaceUrl: workspaceInfo.workspaceUrl
workspaceUrl: workspaceInfo.workspaceUrl,
email: token.email,
extra: JSON.stringify(token.extra ?? {})
})
// Version mismatch, return upgrading.
return { upgrade: true, upgradeInfo: workspaceInfo.upgrade }
Expand Down
133 changes: 57 additions & 76 deletions server/tool/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,7 @@
//

import core, {
BackupClient,
Branding,
Client as CoreClient,
coreId,
DOMAIN_BENCHMARK,
DOMAIN_MIGRATION,
Expand All @@ -34,12 +32,13 @@ import core, {
TxOperations,
WorkspaceId,
WorkspaceIdWithUrl,
type Client,
type Doc,
type Ref
type Ref,
type WithLookup
} from '@hcengineering/core'
import { consoleModelLogger, MigrateOperation, ModelLogger, tryMigrate } from '@hcengineering/model'
import { DomainIndexHelperImpl, Pipeline, StorageAdapter, type DbAdapter } from '@hcengineering/server-core'
import { connect } from './connect'
import { InitScript, WorkspaceInitializer } from './initializer'
import toolPlugin from './plugin'
import { MigrateClientImpl } from './upgrade'
Expand Down Expand Up @@ -162,23 +161,15 @@ export async function updateModel (
try {
let i = 0
for (const op of migrateOperations) {
logger.log('Migrate', { name: op[0] })
const st = Date.now()
await op[1].upgrade(migrateState, async () => connection as any, logger)
const tdelta = Date.now() - st
if (tdelta > 0) {
logger.log('Create', { name: op[0], time: tdelta })
}
i++
await progress((((100 / migrateOperations.length) * i) / 100) * 30)
await progress((((100 / migrateOperations.length) * i) / 100) * 100)
}

// Create update indexes
await createUpdateIndexes(
ctx,
connection.getHierarchy(),
connection.getModel(),
pipeline,
async (value) => {
await progress(30 + (Math.min(value, 100) / 100) * 70)
},
workspaceId
)
await progress(100)
} catch (e: any) {
logger.error('error', { error: e })
Expand All @@ -200,6 +191,7 @@ export async function initializeWorkspace (
): Promise<void> {
const initWS = branding?.initWorkspace ?? getMetadata(toolPlugin.metadata.InitWorkspace)
const scriptUrl = getMetadata(toolPlugin.metadata.InitScriptURL)
ctx.info('Init script details', { scriptUrl, initWS })
if (initWS === undefined || scriptUrl === undefined) return
try {
// `https://raw.githubusercontent.com/hcengineering/init/main/script.yaml`
Expand Down Expand Up @@ -234,11 +226,12 @@ export async function upgradeModel (
workspaceId: WorkspaceIdWithUrl,
txes: Tx[],
pipeline: Pipeline,
connection: Client,
storageAdapter: StorageAdapter,
migrateOperations: [string, MigrateOperation][],
logger: ModelLogger = consoleModelLogger,
progress: (value: number) => Promise<void>,
forceIndexes: boolean = false
updateIndexes: 'perform' | 'skip' | 'disable' = 'skip'
): Promise<Tx[]> {
if (txes.some((tx) => tx.objectSpace !== core.space.Model)) {
throw Error('Model txes must target only core.space.Model')
Expand Down Expand Up @@ -305,87 +298,69 @@ export async function upgradeModel (
workspaceId
)
}
if (forceIndexes) {
if (updateIndexes === 'perform') {
await upgradeIndexes()
}

await ctx.with('migrate', {}, async (ctx) => {
let i = 0
for (const op of migrateOperations) {
const t = Date.now()
try {
const t = Date.now()
await ctx.with(op[0], {}, async () => {
await op[1].migrate(migrateClient, logger)
})
const tdelta = Date.now() - t
if (tdelta > 0) {
logger.log('migrate:', { workspaceId: workspaceId.name, operation: op[0], time: Date.now() - t })
}
} catch (err: any) {
logger.error(`error during migrate: ${op[0]} ${err.message}`, err)
throw err
}
logger.log('migrate:', { workspaceId: workspaceId.name, operation: op[0], time: Date.now() - t })
await progress(20 + ((100 / migrateOperations.length) * i * 20) / 100)
i++
}

await tryMigrate(migrateClient, coreId, [
{
state: 'indexes-v5',
func: upgradeIndexes
}
])
if (updateIndexes === 'skip') {
await tryMigrate(migrateClient, coreId, [
{
state: 'indexes-v5',
func: upgradeIndexes
}
])
}
})

logger.log('Apply upgrade operations', { workspaceId: workspaceId.name })

let connection: (CoreClient & BackupClient) | undefined
const getUpgradeClient = async (): Promise<CoreClient & BackupClient> =>
await ctx.with('connect-platform', {}, async (ctx) => {
if (connection !== undefined) {
return connection
}
connection = (await connect(
transactorUrl,
workspaceId,
undefined,
{
mode: 'backup',
model: 'upgrade',
admin: 'true'
},
model
)) as CoreClient & BackupClient
return connection
})
try {
await ctx.with('upgrade', {}, async (ctx) => {
let i = 0
for (const op of migrateOperations) {
const t = Date.now()
await ctx.with(op[0], {}, () => op[1].upgrade(migrateState, getUpgradeClient, logger))
logger.log('upgrade:', { operation: op[0], time: Date.now() - t, workspaceId: workspaceId.name })
await progress(60 + ((100 / migrateOperations.length) * i * 30) / 100)
i++
await ctx.with('upgrade', {}, async (ctx) => {
let i = 0
for (const op of migrateOperations) {
const t = Date.now()
await ctx.with(op[0], {}, () => op[1].upgrade(migrateState, async () => connection, logger))
const tdelta = Date.now() - t
if (tdelta > 0) {
logger.log('upgrade:', { operation: op[0], time: tdelta, workspaceId: workspaceId.name })
}
})
await progress(60 + ((100 / migrateOperations.length) * i * 30) / 100)
i++
}
})

if (connection === undefined) {
// We need to send reboot for workspace
ctx.info('send force close', { workspace: workspaceId.name, transactorUrl })
const serverEndpoint = transactorUrl.replaceAll('wss://', 'https://').replace('ws://', 'http://')
const token = generateToken(systemAccountEmail, workspaceId, { admin: 'true' })
try {
await fetch(
serverEndpoint + `/api/v1/manage?token=${token}&operation=force-close&wsId=${toWorkspaceString(workspaceId)}`,
{
method: 'PUT'
}
)
} catch (err: any) {
// Ignore error if transactor is not yet ready
// We need to send reboot for workspace
ctx.info('send force close', { workspace: workspaceId.name, transactorUrl })
const serverEndpoint = transactorUrl.replaceAll('wss://', 'https://').replace('ws://', 'http://')
const token = generateToken(systemAccountEmail, workspaceId, { admin: 'true' })
try {
await fetch(
serverEndpoint + `/api/v1/manage?token=${token}&operation=force-close&wsId=${toWorkspaceString(workspaceId)}`,
{
method: 'PUT'
}
}
} finally {
await connection?.sendForceClose()
await connection?.close()
)
} catch (err: any) {
// Ignore error if transactor is not yet ready
}
return model
}
Expand All @@ -404,7 +379,13 @@ async function prepareMigrationClient (
const migrateClient = new MigrateClientImpl(pipeline, hierarchy, model, logger, storageAdapter, workspaceId)
const states = await migrateClient.find<MigrationState>(DOMAIN_MIGRATION, { _class: core.class.MigrationState })
const sts = Array.from(groupByArray(states, (it) => it.plugin).entries())
const migrateState = new Map(sts.map((it) => [it[0], new Set(it[1].map((q) => q.state))]))

const _toSet = (vals: WithLookup<MigrationState>[]): Set<string> => {
return new Set(vals.map((q) => q.state))
}

const migrateState = new Map<string, Set<string>>(sts.map((it) => [it[0], _toSet(it[1])]))
// const migrateState = new Map(sts.map((it) => [it[0], new Set(it[1].map((q) => q.state))]))
migrateClient.migrateState = migrateState

return { migrateClient, migrateState }
Expand Down
Loading

0 comments on commit 32d148f

Please sign in to comment.