Skip to content

Commit

Permalink
query Addresses/ErgoTrees by chunks of 20
Browse files Browse the repository at this point in the history
  • Loading branch information
arobsn committed Aug 9, 2024
1 parent adb3a76 commit 506dfc4
Show file tree
Hide file tree
Showing 3 changed files with 110 additions and 112 deletions.
5 changes: 5 additions & 0 deletions .changeset/unlucky-numbers-sort.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@fleet-sdk/blockchain-providers": patch
---

[`ErgoGraphQLProvider`] Query Addresses/ErgoTrees by chunks of 20
Original file line number Diff line number Diff line change
Expand Up @@ -63,15 +63,15 @@ describe("ergo-graphql provider", () => {

await _client.getBoxes({
where: {
boxIds: ["boxId_1", "boxId_2"],
boxId: "boxId_1",
ergoTrees: ["contract", "another_contract"]
}
});

call = JSON.parse(fetchSpy.mock.lastCall?.[1]?.body as string);
expect(call.variables).to.be.deep.equal({
spent: false,
boxIds: ["boxId_1", "boxId_2"],
boxIds: ["boxId_1"],
ergoTrees: ["contract", "another_contract"],
skip: 0,
take: 50
Expand All @@ -90,7 +90,6 @@ describe("ergo-graphql provider", () => {
.setBigIntMapper((v) => Number(v))
.getBoxes({
where: {
boxIds: ["boxId_0", "boxId_1"],
boxId: "boxId_0",
ergoTrees: ["ergoTree_0", "ergoTree_1", "ergoTree_1"],
ergoTree: "ergoTree_2"
Expand All @@ -103,7 +102,7 @@ describe("ergo-graphql provider", () => {
const call = JSON.parse(fetchSpy.mock.lastCall?.[1]?.body as string);
expect(call.variables).to.be.deep.equal({
spent: false,
boxIds: ["boxId_0", "boxId_1"],
boxIds: ["boxId_0"],
ergoTrees: ["ergoTree_0", "ergoTree_1", "ergoTree_2"],
skip: 0,
take: 50
Expand Down Expand Up @@ -451,7 +450,6 @@ describe("ergo-graphql provider", () => {
const response = await _client.getConfirmedTransactions({
where: {
transactionId: "txId",
transactionIds: ["txId_1", "txId_2", "txId_1"],
address: _addresses[0].encode(),
addresses: [_addresses[0], _addresses[1]],
ergoTree: _addresses[1].ergoTree,
Expand All @@ -469,7 +467,7 @@ describe("ergo-graphql provider", () => {
let callBody = JSON.parse(fetchSpy.mock.lastCall?.[1]?.body as string);
expect(callBody.query).to.be.equal(CONF_TX_QUERY);
expect(callBody.variables).to.be.deep.equal({
transactionIds: ["txId_1", "txId_2", "txId"],
transactionIds: ["txId"],
addresses: [
_addresses[0].encode(),
_addresses[1].encode(),
Expand Down Expand Up @@ -565,8 +563,6 @@ describe("ergo-graphql provider", () => {

const response = await _client.getUnconfirmedTransactions({
where: {
transactionId: "txId",
transactionIds: ["txId_1", "txId_2", "txId_1"],
address: _addresses[0].encode(),
addresses: [_addresses[0], _addresses[1]],
ergoTree: _addresses[1].ergoTree,
Expand All @@ -582,7 +578,6 @@ describe("ergo-graphql provider", () => {
let callBody = JSON.parse(fetchSpy.mock.lastCall?.[1]?.body as string);
expect(callBody.query).to.be.equal(UNCONF_TX_QUERY);
expect(callBody.variables).to.be.deep.equal({
transactionIds: ["txId_1", "txId_2", "txId"],
addresses: [
_addresses[0].encode(),
_addresses[1].encode(),
Expand Down
204 changes: 101 additions & 103 deletions packages/blockchain-providers/src/ergo-graphql/ergoGraphQLProvider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,15 +56,9 @@ import {
UNCONF_BOXES_QUERY,
UNCONF_TX_QUERY
} from "./queries";

type GraphQLThrowableOptions = GraphQLRequestOptions & { throwOnNonNetworkErrors: true };
type OP<R, V extends GraphQLVariables> = GraphQLOperation<GraphQLSuccessResponse<R>, V>;
type BiMapper<T> = (value: string) => T;
import { chunk } from "packages/common/src";

export type GraphQLBoxWhere = BoxWhere & {
/** Base16-encoded BoxIds */
boxIds?: HexString[];

/** Base16-encoded ErgoTrees */
ergoTrees?: HexString[];

Expand All @@ -73,13 +67,11 @@ export type GraphQLBoxWhere = BoxWhere & {
};

export type GraphQLConfirmedTransactionWhere = ConfirmedTransactionWhere & {
transactionIds?: HexString[];
addresses?: (Base58String | ErgoAddress)[];
ergoTrees?: HexString[];
};

export type GraphQLUnconfirmedTransactionWhere = UnconfirmedTransactionWhere & {
transactionIds?: HexString[];
addresses?: (Base58String | ErgoAddress)[];
ergoTrees?: HexString[];
};
Expand All @@ -100,7 +92,12 @@ type CheckTransactionResponse = { checkTransaction: string };
type TransactionSubmissionResponse = { submitTransaction: string };
type SignedTxArgsResp = { signedTransaction: SignedTransaction };

type GraphQLThrowableOptions = GraphQLRequestOptions & { throwOnNonNetworkErrors: true };
type OP<R, V extends GraphQLVariables> = GraphQLOperation<GraphQLSuccessResponse<R>, V>;
type BiMapper<T> = (value: string) => T;

const PAGE_SIZE = 50;
const MAX_ARGS = 20;

export class ErgoGraphQLProvider<I = bigint> implements IBlockchainProvider<I> {
#options: GraphQLThrowableOptions;
Expand All @@ -116,15 +113,14 @@ export class ErgoGraphQLProvider<I = bigint> implements IBlockchainProvider<I> {
#getHeaders!: OP<BlockHeadersResponse, QueryBlockHeadersArgs>;

constructor(url: string);
constructor(url: ErgoGraphQLRequestOptions);
constructor(options: ErgoGraphQLRequestOptions);
constructor(optOrUrl: ErgoGraphQLRequestOptions | string) {
this.#biMapper = (value) => BigInt(value) as I;
this.#options = {
...(isRequestParam(optOrUrl) ? optOrUrl : { url: optOrUrl }),
throwOnNonNetworkErrors: true
};

this.#biMapper = (value) => BigInt(value) as I;

this.#getConfirmedBoxes = this.createOperation(CONF_BOXES_QUERY);
this.#getUnconfirmedBoxes = this.createOperation(UNCONF_BOXES_QUERY);
this.#getAllBoxes = this.createOperation(ALL_BOXES_QUERY);
Expand All @@ -137,10 +133,10 @@ export class ErgoGraphQLProvider<I = bigint> implements IBlockchainProvider<I> {

#fetchBoxes(args: QueryBoxesArgs, inclConf: boolean, inclUnconf: boolean) {
return inclConf && inclUnconf
? this.#getAllBoxes(args, this.#options.url)
? this.#getAllBoxes(args)
: inclUnconf
? this.#getUnconfirmedBoxes(args, this.#options.url)
: this.#getConfirmedBoxes(args, this.#options.url);
? this.#getUnconfirmedBoxes(args)
: this.#getConfirmedBoxes(args);
}

setUrl(url: string): ErgoGraphQLProvider<I> {
Expand All @@ -161,55 +157,57 @@ export class ErgoGraphQLProvider<I = bigint> implements IBlockchainProvider<I> {
const notBeingSpent = (box: GQLBox) => !box.beingSpent;
const returnedBoxIds = new Set<string>();
const { where, from } = query;
const args = buildGqlBoxQueryArgs(where);
const queries = buildGqlBoxQueries(where);
const isMempoolAware = from !== "blockchain";

let inclChain = from !== "mempool";
let inclPool = from !== "blockchain";
const isMempoolAware = inclPool;
for (const query of queries) {
let inclChain = from !== "mempool";
let inclPool = from !== "blockchain";

do {
const { data } = await this.#fetchBoxes(args, inclChain, inclPool);
let boxes: ChainProviderBox<I>[] = [];
while (inclChain || inclPool) {
const { data } = await this.#fetchBoxes(query, inclChain, inclPool);
let boxes: ChainProviderBox<I>[] = [];

if (inclChain && hasConfirmed(data)) {
if (some(data.boxes)) {
const confirmedBoxes = (
isMempoolAware ? data.boxes.filter(notBeingSpent) : data.boxes
).map((b) => mapConfirmedBox(b, this.#biMapper));
if (inclChain && hasConfirmed(data)) {
if (some(data.boxes)) {
const confirmedBoxes = (
isMempoolAware ? data.boxes.filter(notBeingSpent) : data.boxes
).map((b) => mapConfirmedBox(b, this.#biMapper));

boxes = boxes.concat(confirmedBoxes);
}

inclChain = data.boxes.length === PAGE_SIZE;
}
boxes = boxes.concat(confirmedBoxes);
}

if (isMempoolAware && hasMempool(data)) {
if (some(data.mempool.boxes)) {
const mempoolBoxes = data.mempool.boxes
.filter(notBeingSpent)
.map((b) => mapUnconfirmedBox(b, this.#biMapper));
boxes = boxes.concat(mempoolBoxes);
inclChain = data.boxes.length === PAGE_SIZE;
}

inclPool = data.mempool.boxes.length === PAGE_SIZE;
}
if (isMempoolAware && hasMempool(data)) {
if (some(data.mempool.boxes)) {
const mempoolBoxes = data.mempool.boxes
.filter(notBeingSpent)
.map((b) => mapUnconfirmedBox(b, this.#biMapper));
boxes = boxes.concat(mempoolBoxes);
}

if (some(boxes)) {
// boxes can be moved from the mempool to the blockchain while streaming,
// so we need to filter out boxes that have already been returned.
if (boxes.some((box) => returnedBoxIds.has(box.boxId))) {
boxes = boxes.filter((b) => !returnedBoxIds.has(b.boxId));
inclPool = data.mempool.boxes.length === PAGE_SIZE;
}

if (some(boxes)) {
boxes = uniqBy(boxes, (box) => box.boxId);
for (const box of boxes) returnedBoxIds.add(box.boxId);
yield boxes;
// boxes can be moved from the mempool to the blockchain while streaming,
// so we need to filter out boxes that have already been returned.
if (boxes.some((box) => returnedBoxIds.has(box.boxId))) {
boxes = boxes.filter((b) => !returnedBoxIds.has(b.boxId));
}

if (some(boxes)) {
boxes = uniqBy(boxes, (box) => box.boxId);
for (const box of boxes) returnedBoxIds.add(box.boxId);
yield boxes;
}
}
}

if (inclChain || inclPool) args.skip += PAGE_SIZE;
} while (inclChain || inclPool);
if (inclChain || inclPool) query.skip += PAGE_SIZE;
}
}
}

async getBoxes(query: GraphQLBoxQuery): Promise<ChainProviderBox<I>[]> {
Expand All @@ -221,19 +219,21 @@ export class ErgoGraphQLProvider<I = bigint> implements IBlockchainProvider<I> {
async *streamUnconfirmedTransactions(
query: TransactionQuery<GraphQLUnconfirmedTransactionWhere>
): AsyncIterable<ChainProviderUnconfirmedTransaction<I>[]> {
const args = buildGqlUnconfirmedTxQueryArgs(query.where);

let keepFetching = true;
while (keepFetching) {
const response = await this.#getUnconfirmedTransactions(args);
if (some(response.data?.mempool?.transactions)) {
yield response.data.mempool.transactions.map((t) =>
mapUnconfirmedTransaction(t, this.#biMapper)
);
}
const queries = buildGqlUnconfirmedTxQueries(query.where);

for (const query of queries) {
let keepFetching = true;
while (keepFetching) {
const response = await this.#getUnconfirmedTransactions(query);
if (some(response.data?.mempool?.transactions)) {
yield response.data.mempool.transactions.map((t) =>
mapUnconfirmedTransaction(t, this.#biMapper)
);
}

keepFetching = response.data?.mempool?.transactions?.length === PAGE_SIZE;
if (keepFetching) args.skip += PAGE_SIZE;
keepFetching = response.data?.mempool?.transactions?.length === PAGE_SIZE;
if (keepFetching) query.skip += PAGE_SIZE;
}
}
}

Expand All @@ -251,19 +251,21 @@ export class ErgoGraphQLProvider<I = bigint> implements IBlockchainProvider<I> {
async *streamConfirmedTransactions(
query: TransactionQuery<GraphQLConfirmedTransactionWhere>
): AsyncIterable<ChainProviderConfirmedTransaction<I>[]> {
const args = buildGqlConfirmedTxQueryArgs(query.where);

let keepFetching = true;
while (keepFetching) {
const response = await this.#getConfirmedTransactions(args);
if (some(response.data?.transactions)) {
yield response.data.transactions.map((t) =>
mapConfirmedTransaction(t, this.#biMapper)
);
}
const queries = buildGqlConfirmedTxQueries(query.where);

for (const query of queries) {
let keepFetching = true;
while (keepFetching) {
const response = await this.#getConfirmedTransactions(query);
if (some(response.data?.transactions)) {
yield response.data.transactions.map((t) =>
mapConfirmedTransaction(t, this.#biMapper)
);
}

keepFetching = response.data?.transactions?.length === PAGE_SIZE;
if (keepFetching) args.skip += PAGE_SIZE;
keepFetching = response.data?.transactions?.length === PAGE_SIZE;
if (keepFetching) query.skip += PAGE_SIZE;
}
}
}

Expand All @@ -279,7 +281,7 @@ export class ErgoGraphQLProvider<I = bigint> implements IBlockchainProvider<I> {
}

async getHeaders(query: HeaderQuery): Promise<BlockHeader[]> {
const response = await this.#getHeaders(query, this.#options.url);
const response = await this.#getHeaders(query);

return (
response.data?.blockHeaders.map((header) => ({
Expand Down Expand Up @@ -335,32 +337,28 @@ export class ErgoGraphQLProvider<I = bigint> implements IBlockchainProvider<I> {
}
}

function buildGqlBoxQueryArgs(where: GraphQLBoxWhere) {
const args = {
function buildGqlBoxQueries(where: GraphQLBoxWhere) {
const ergoTrees = uniq(
[
merge(where.ergoTrees, where.ergoTree) ?? [],
merge(where.addresses, where.address)?.map((a) =>
typeof a === "string" ? ErgoAddress.decode(a).ergoTree : a.ergoTree
) ?? []
].flat()
);

return chunk(ergoTrees, MAX_ARGS).map((chunk) => ({
spent: false,
boxIds: merge(where.boxIds, where.boxId),
ergoTrees: merge(where.ergoTrees, where.ergoTree),
boxIds: where.boxId ? [where.boxId] : undefined,
ergoTrees: chunk,
ergoTreeTemplateHash: where.templateHash,
tokenId: where.tokenId,
skip: 0,
take: PAGE_SIZE
} satisfies QueryBoxesArgs;

const addresses = merge(where.addresses, where.address);
if (some(addresses)) {
const trees = addresses.map((address) =>
typeof address === "string"
? ErgoAddress.decode(address).ergoTree
: address.ergoTree
);

args.ergoTrees = uniq(some(args.ergoTrees) ? args.ergoTrees.concat(trees) : trees);
}

return args;
}));
}

function buildGqlUnconfirmedTxQueryArgs(where: GraphQLConfirmedTransactionWhere) {
function buildGqlUnconfirmedTxQueries(where: GraphQLConfirmedTransactionWhere) {
const addresses = uniq(
[
merge(where.addresses, where.address)?.map((address): string =>
Expand All @@ -372,21 +370,21 @@ function buildGqlUnconfirmedTxQueryArgs(where: GraphQLConfirmedTransactionWhere)
].flat()
);

return {
addresses: addresses.length ? addresses : undefined,
transactionIds: merge(where.transactionIds, where.transactionId),
return chunk(addresses, MAX_ARGS).map((chunk) => ({
addresses: chunk.length ? chunk : undefined,
transactionIds: where.transactionId ? [where.transactionId] : undefined,
skip: 0,
take: PAGE_SIZE
};
}));
}

function buildGqlConfirmedTxQueryArgs(where: GraphQLConfirmedTransactionWhere) {
return {
...buildGqlUnconfirmedTxQueryArgs(where),
function buildGqlConfirmedTxQueries(where: GraphQLConfirmedTransactionWhere) {
return buildGqlUnconfirmedTxQueries(where).map((query) => ({
...query,
headerId: where.headerId,
minHeight: where.minHeight,
onlyRelevantOutputs: where.onlyRelevantOutputs
};
}));
}

function merge<T>(array?: T[], el?: T) {
Expand Down

0 comments on commit 506dfc4

Please sign in to comment.