Skip to content

Commit

Permalink
fix(db-dynamodb): tools for batch write (#4445)
Browse files Browse the repository at this point in the history
Co-authored-by: Pavel Denisjuk <pavel@webiny.com>
  • Loading branch information
brunozoric and Pavel910 authored Dec 18, 2024
1 parent edaf911 commit e359463
Show file tree
Hide file tree
Showing 80 changed files with 2,763 additions and 2,170 deletions.
14 changes: 11 additions & 3 deletions packages/api-elasticsearch-tasks/src/definitions/entry.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,18 @@
import { Entity, TableDef } from "@webiny/db-dynamodb/toolbox";
/**
* TODO If adding GSIs to the Elasticsearch table, add them here.
*/
import type { TableDef } from "@webiny/db-dynamodb/toolbox";
import type { IEntity } from "@webiny/db-dynamodb";
import { createEntity } from "@webiny/db-dynamodb";

interface Params {
table: TableDef;
entityName: string;
}

export const createEntry = (params: Params): Entity<any> => {
export const createEntry = (params: Params): IEntity => {
const { table, entityName } = params;
return new Entity({
return createEntity({
name: entityName,
table,
attributes: {
Expand All @@ -24,6 +29,9 @@ export const createEntry = (params: Params): Entity<any> => {
},
data: {
type: "map"
},
TYPE: {
type: "string"
}
}
});
Expand Down
34 changes: 12 additions & 22 deletions packages/api-elasticsearch-tasks/src/tasks/Manager.ts
Original file line number Diff line number Diff line change
@@ -1,19 +1,16 @@
import { DynamoDBDocument, getDocumentClient } from "@webiny/aws-sdk/client-dynamodb";
import { Client, createElasticsearchClient } from "@webiny/api-elasticsearch";
import { createTable } from "~/definitions";
import { Context, IManager } from "~/types";
import type { Context, IManager } from "~/types";
import { createEntry } from "~/definitions/entry";
import { Entity } from "@webiny/db-dynamodb/toolbox";
import { ITaskResponse } from "@webiny/tasks/response/abstractions";
import { IIsCloseToTimeoutCallable, ITaskManagerStore } from "@webiny/tasks/runner/abstractions";
import {
batchReadAll,
BatchReadItem,
batchWriteAll,
BatchWriteItem,
BatchWriteResult
} from "@webiny/db-dynamodb";
import { ITimer } from "@webiny/handler-aws/utils";
import type { ITaskResponse } from "@webiny/tasks/response/abstractions";
import type {
IIsCloseToTimeoutCallable,
ITaskManagerStore
} from "@webiny/tasks/runner/abstractions";
import type { BatchReadItem, IEntity } from "@webiny/db-dynamodb";
import { batchReadAll } from "@webiny/db-dynamodb";
import type { ITimer } from "@webiny/handler-aws/utils";

export interface ManagerParams<T> {
context: Context;
Expand All @@ -37,7 +34,7 @@ export class Manager<T> implements IManager<T> {
public readonly store: ITaskManagerStore<T>;
public readonly timer: ITimer;

private readonly entities: Record<string, Entity<any>> = {};
private readonly entities: Record<string, IEntity> = {};

public constructor(params: ManagerParams<T>) {
this.context = params.context;
Expand All @@ -64,7 +61,7 @@ export class Manager<T> implements IManager<T> {
this.timer = params.timer;
}

public getEntity(name: string): Entity<any> {
public getEntity(name: string): IEntity {
if (this.entities[name]) {
return this.entities[name];
}
Expand All @@ -75,17 +72,10 @@ export class Manager<T> implements IManager<T> {
}));
}

public async read<T>(items: BatchReadItem[]) {
public async read<T>(items: BatchReadItem[]): Promise<T[]> {
return await batchReadAll<T>({
table: this.table,
items
});
}

public async write(items: BatchWriteItem[]): Promise<BatchWriteResult> {
return await batchWriteAll({
table: this.table,
items
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import {
} from "~/types";
import { ITaskResponse, ITaskResponseResult } from "@webiny/tasks/response/abstractions";
import { scan } from "~/helpers/scan";
import { BatchWriteItem, ScanResponse } from "@webiny/db-dynamodb";
import { createTableWriteBatch, ScanResponse } from "@webiny/db-dynamodb";
import { IndexManager } from "~/settings";
import { IIndexManager } from "~/settings/types";

Expand Down Expand Up @@ -73,7 +73,10 @@ export class ReindexingTaskRunner {
return this.response.done("No more items to process.");
}

const batch: BatchWriteItem[] = [];
const tableWriteBatch = createTableWriteBatch({
table: this.manager.table
});

for (const item of results.items) {
/**
* No index defined? Impossible but let's skip if really happens.
Expand Down Expand Up @@ -110,14 +113,13 @@ export class ReindexingTaskRunner {
/**
* Reindexing will be triggered by the `putBatch` method.
*/
batch.push(
entity.putBatch({
...item,
modified: new Date().toISOString()
})
);
tableWriteBatch.put(entity.entity, {
...item,
TYPE: item.TYPE || "unknown",
modified: new Date().toISOString()
});
}
await this.manager.write(batch);
await tableWriteBatch.execute();
/**
* We always store the index settings, so we can restore them later.
* Also, we always want to store what was the last key we processed, just in case something breaks, so we can continue from this point.
Expand Down
27 changes: 14 additions & 13 deletions packages/api-elasticsearch-tasks/src/types.ts
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
import { ElasticsearchContext } from "@webiny/api-elasticsearch/types";
import { Entity } from "@webiny/db-dynamodb/toolbox";
import {
import type { ElasticsearchContext } from "@webiny/api-elasticsearch/types";
import type {
Context as TasksContext,
IIsCloseToTimeoutCallable,
ITaskManagerStore,
ITaskResponse,
ITaskResponseDoneResultOutput
} from "@webiny/tasks/types";
import { DynamoDBDocument } from "@webiny/aws-sdk/client-dynamodb";
import { Client } from "@webiny/api-elasticsearch";
import type { DynamoDBDocument } from "@webiny/aws-sdk/client-dynamodb";
import type { Client } from "@webiny/api-elasticsearch";
import { createTable } from "~/definitions";
import { ITaskResponse } from "@webiny/tasks/response/abstractions";
import { ITaskManagerStore } from "@webiny/tasks/runner/abstractions";
import { BatchWriteItem, BatchWriteResult } from "@webiny/db-dynamodb";
import { ITimer } from "@webiny/handler-aws";
import type { BatchReadItem, IEntity } from "@webiny/db-dynamodb";
import type { ITimer } from "@webiny/handler-aws";
import type { GenericRecord } from "@webiny/api/types";

export interface Context extends ElasticsearchContext, TasksContext {}

Expand Down Expand Up @@ -42,17 +42,18 @@ export interface IElasticsearchIndexingTaskValues {
}

export interface AugmentedError extends Error {
data?: Record<string, any>;
data?: GenericRecord;
[key: string]: any;
}

export interface IDynamoDbElasticsearchRecord {
PK: string;
SK: string;
TYPE?: string;
index: string;
_et?: string;
entity: string;
data: Record<string, any>;
data: GenericRecord;
modified: string;
}

Expand All @@ -70,7 +71,7 @@ export interface IManager<
readonly store: ITaskManagerStore<T>;
readonly timer: ITimer;

getEntity: (name: string) => Entity<any>;
getEntity: (name: string) => IEntity;

write: (items: BatchWriteItem[]) => Promise<BatchWriteResult>;
read<T>(items: BatchReadItem[]): Promise<T[]>;
}
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
import { DynamoDBDocument } from "@webiny/aws-sdk/client-dynamodb";
import { Entity, Table } from "@webiny/db-dynamodb/toolbox";
import {
FileManagerAliasesStorageOperations,
import type { DynamoDBDocument } from "@webiny/aws-sdk/client-dynamodb";
import type { Entity, Table } from "@webiny/db-dynamodb/toolbox";
import type {
File,
FileAlias
FileAlias,
FileManagerAliasesStorageOperations
} from "@webiny/api-file-manager/types";
import {
BatchWriteItem,
batchWriteAll,
createEntityWriteBatch,
createStandardEntity,
createTable,
DbItem,
Expand Down Expand Up @@ -39,52 +38,49 @@ export class AliasesStorageOperations implements FileManagerAliasesStorageOperat

async deleteAliases(file: File): Promise<void> {
const aliasItems = await this.getExistingAliases(file);
const items: BatchWriteItem[] = [];

aliasItems.forEach(item => {
items.push(
this.aliasEntity.deleteBatch({
const batchWrite = createEntityWriteBatch({
entity: this.aliasEntity,
delete: aliasItems.map(item => {
return {
PK: this.createPartitionKey({
id: item.fileId,
tenant: item.tenant,
locale: item.locale
}),
SK: `ALIAS#${item.alias}`
})
);
};
})
});

await batchWriteAll({ table: this.table, items });
await batchWrite.execute();
}

async storeAliases(file: File): Promise<void> {
const items: BatchWriteItem[] = [];
const existingAliases = await this.getExistingAliases(file);
const newAliases = this.createNewAliasesRecords(file, existingAliases);

newAliases.forEach(alias => {
items.push(this.aliasEntity.putBatch(alias));
const batchWrite = createEntityWriteBatch({
entity: this.aliasEntity
});
for (const alias of newAliases) {
batchWrite.put(alias);
}

// Delete aliases that are in the DB but are NOT in the file.
for (const data of existingAliases) {
if (!file.aliases.some(alias => data.alias === alias)) {
items.push(
this.aliasEntity.deleteBatch({
PK: this.createPartitionKey(file),
SK: `ALIAS#${data.alias}`
})
);
batchWrite.delete({
PK: this.createPartitionKey(file),
SK: `ALIAS#${data.alias}`
});
}
}

await batchWriteAll({
table: this.table,
items
});
await batchWrite.execute();
}

private async getExistingAliases(file: File) {
private async getExistingAliases(file: File): Promise<FileAlias[]> {
const aliases = await queryAll<{ data: FileAlias }>({
entity: this.aliasEntity,
partitionKey: this.createPartitionKey(file),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ export const createElasticsearchEntity = (params: Params) => {
TYPE: {
type: "string"
},

...(attributes || {})
}
});
Expand Down
Loading

0 comments on commit e359463

Please sign in to comment.