diff --git a/src/env.ts b/src/env.ts index bc7aca0..e4bdc15 100644 --- a/src/env.ts +++ b/src/env.ts @@ -55,6 +55,11 @@ const schema = Type.Object({ ), /** If the API should automatically shut down when Ordhook ingestion mode is `replay` */ ORDHOOK_REPLAY_INGESTION_MODE_AUTO_SHUTDOWN: Type.Boolean({ default: true }), + /** + * If the API should check that streamed blocks received from Ordhook are contiguous (without + * inscription number gaps). + */ + ORDHOOK_STREAMED_BLOCK_CONTINUITY_CHECK: Type.Boolean({ default: true }), PGHOST: Type.String(), PGPORT: Type.Number({ default: 5432, minimum: 0, maximum: 65535 }), diff --git a/src/pg/pg-store.ts b/src/pg/pg-store.ts index 2dac87c..76be6bf 100644 --- a/src/pg/pg-store.ts +++ b/src/pg/pg-store.ts @@ -149,7 +149,8 @@ export class PgStore extends BasePgStore { } switch (direction) { case 'apply': - if (streamed) await this.assertNextBlockIsContiguous(sql, event, cache); + if (streamed && ENV.ORDHOOK_STREAMED_BLOCK_CONTINUITY_CHECK) + await this.assertNextBlockIsContiguous(sql, event, cache); await this.applyInscriptions(sql, cache, streamed); break; case 'rollback': @@ -177,7 +178,24 @@ export class PgStore extends BasePgStore { for await (const batch of batchIterate(entries, INSERT_BATCH_SIZE)) await sql` INSERT INTO inscriptions ${sql(batch)} - ON CONFLICT (genesis_id) DO NOTHING + ON CONFLICT ON CONSTRAINT inscriptions_number_unique DO UPDATE SET + genesis_id = EXCLUDED.genesis_id, + ordinal_number = EXCLUDED.ordinal_number, + classic_number = EXCLUDED.classic_number, + block_height = EXCLUDED.block_height, + block_hash = EXCLUDED.block_hash, + tx_index = EXCLUDED.tx_index, + address = EXCLUDED.address, + mime_type = EXCLUDED.mime_type, + content_type = EXCLUDED.content_type, + content_length = EXCLUDED.content_length, + content = EXCLUDED.content, + fee = EXCLUDED.fee, + curse_type = EXCLUDED.curse_type, + recursive = EXCLUDED.recursive, + metadata = EXCLUDED.metadata, + parent = EXCLUDED.parent, + timestamp = EXCLUDED.timestamp `; } if (cache.locations.length) { @@ -190,7 +208,17 @@ export class PgStore extends BasePgStore { await sql` WITH location_inserts AS ( INSERT INTO locations ${sql(batch)} - ON CONFLICT (ordinal_number, block_height, tx_index) DO NOTHING + ON CONFLICT (ordinal_number, block_height, tx_index) DO UPDATE SET + tx_id = EXCLUDED.tx_id, + block_hash = EXCLUDED.block_hash, + address = EXCLUDED.address, + output = EXCLUDED.output, + offset = EXCLUDED.offset, + prev_output = EXCLUDED.prev_output, + prev_offset = EXCLUDED.prev_offset, + value = EXCLUDED.value, + transfer_type = EXCLUDED.transfer_type, + timestamp = EXCLUDED.timestamp RETURNING ordinal_number, block_height, block_hash, tx_index ), prev_transfer_index AS ( @@ -213,7 +241,12 @@ export class PgStore extends BasePgStore { INSERT INTO inscription_transfers (genesis_id, number, ordinal_number, block_height, block_hash, tx_index, block_transfer_index) (SELECT * FROM moved_inscriptions) - ON CONFLICT (block_height, block_transfer_index) DO NOTHING + ON CONFLICT (block_height, block_transfer_index) DO UPDATE SET + genesis_id = EXCLUDED.genesis_id, + number = EXCLUDED.number, + ordinal_number = EXCLUDED.ordinal_number, + tx_index = EXCLUDED.tx_index, + block_hash = EXCLUDED.block_hash `; } if (cache.recursiveRefs.size) @@ -249,7 +282,8 @@ export class PgStore extends BasePgStore { ON CONFLICT (ordinal_number) DO UPDATE SET block_height = EXCLUDED.block_height, tx_index = EXCLUDED.tx_index, - address = EXCLUDED.address + address = EXCLUDED.address, + output = EXCLUDED.output WHERE EXCLUDED.block_height > current_locations.block_height OR (EXCLUDED.block_height = current_locations.block_height AND