diff --git a/firehose.js b/firehose.js index 1ba2f81..c0e9e13 100644 --- a/firehose.js +++ b/firehose.js @@ -8,6 +8,17 @@ const loadGrpcPackageDefinition = package => grpc.loadPackageDefinition(protoLoa })); const ProtoBuf = require("protobufjs"); const grpc = require("@grpc/grpc-js"); + +function getMetadata(useBootFirehose) { + const metadata = new grpc.Metadata(); + const apiKey = useBootFirehose ? process.env.BOOT_GRPC_API_KEY : process.env.GRPC_API_KEY; + + if (apiKey) { + metadata.add('X-Api-Key', apiKey); + } + return metadata; +} + const eosioProto = loadProto("sf/antelope/type/v1/type.proto") const firehoseV1Service = loadGrpcPackageDefinition("dfuse/bstream/v1/bstream.proto").dfuse.bstream.v1; const firehoseV2Service = loadGrpcPackageDefinition("sf/firehose/v2/firehose.proto").sf.firehose.v2; @@ -36,7 +47,8 @@ const streamFirehose = forceStartBlock => new Promise( async (resolve, reject)=> console.log("start_block_num",start_block_num) console.log("Starting stream from firehose at "+ start_block_num); const client = getClient(); - let stream = client.Blocks({ start_block_num, fork_steps: ["STEP_NEW", "STEP_IRREVERSIBLE"]}); + const metadata = getMetadata(true); // Pass 'true' if using BOOT_GRPC_ADDRESS + let stream = client.Blocks({ start_block_num, fork_steps: ["STEP_NEW", "STEP_IRREVERSIBLE"] }, metadata); // let stream = client.Blocks({ start_block_num, fork_steps: ["STEP_IRREVERSIBLE"]});//for testing stream.on("data", async (data) => { @@ -87,7 +99,14 @@ const streamFirehose = forceStartBlock => new Promise( async (resolve, reject)=> blockMerkle.activeNodes.forEach((node,index) => blockMerkle.activeNodes[index] = toHex(node) ); - const { blockToEdit } = annotateIncrementalMerkleTree(JSON.parse(JSON.stringify(blockMerkle)), false); + const result = annotateIncrementalMerkleTree(JSON.parse(JSON.stringify(blockMerkle)), false); + console.log('annotateIncrementalMerkleTree result:', result); + + if (!result || !result.blockToEdit) { + console.error('annotateIncrementalMerkleTree did not return blockToEdit. Block number:', block.number); + return; // Or handle the error as appropriate + } + const { blockToEdit } = result; const buffer = await serialize(block.id, blockMerkle.activeNodes, blockToEdit.aliveUntil); blocksDB.put(block.number, asBinary(buffer)); @@ -100,7 +119,8 @@ const streamFirehose = forceStartBlock => new Promise( async (resolve, reject)=> const getBlock = req => new Promise((resolve,reject) => { if (!req.retries && req.retires!==0) req.retries = 10; const client = getClient(req.useBootFirehose); - let stream = client.Blocks(req.firehoseOptions) + const metadata = getMetadata(req.useBootFirehose); + let stream = client.Blocks(req.firehoseOptions, metadata) stream.on("data", (data) => { const { block: rawBlock } = data;