Skip to content

Commit

Permalink
cleanup factory
Browse files Browse the repository at this point in the history
  • Loading branch information
kyscott18 committed Feb 27, 2025
1 parent cafde16 commit f43b6ac
Show file tree
Hide file tree
Showing 6 changed files with 66 additions and 54 deletions.
20 changes: 12 additions & 8 deletions packages/core/src/sync-historical/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -424,21 +424,23 @@ export const createHistoricalSync = async (
};

/** Extract and insert the log-based addresses that match `filter` + `interval`. */
const syncLogFactory = async (filter: LogFactory, interval: Interval) => {
const syncLogFactory = async (factory: LogFactory, interval: Interval) => {
const logs = await syncLogsDynamic({
filter,
filter: factory,
interval,
address: filter.address,
address: factory.address,
});

const childAddresses = new Map<Address, number>();
for (const log of logs) {
const address = getChildAddress({ log, factory: filter });
const address = getChildAddress({ log, factory });
childAddresses.set(address, hexToNumber(log.blockNumber));
}

// Note: `factory` must refer to the same original `factory` in `filter`
// and not be a recovered factory from `recoverFilter`.
await args.syncStore.insertChildAddresses({
childAddresses: new Map([[filter, childAddresses]]),
childAddresses: new Map([[factory, childAddresses]]),
chainId: args.network.chainId,
});
};
Expand All @@ -449,11 +451,13 @@ export const createHistoricalSync = async (
* child addresses is above the limit.
*/
const syncAddressFactory = async (
filter: Factory,
factory: Factory,
interval: Interval,
): Promise<Map<Address, number>> => {
await syncLogFactory(filter, interval);
return args.syncStore.getChildAddresses({ filter });
await syncLogFactory(factory, interval);
// Note: `factory` must refer to the same original `factory` in `filter`
// and not be a recovered factory from `recoverFilter`.
return args.syncStore.getChildAddresses({ factory });
};

////////
Expand Down
6 changes: 3 additions & 3 deletions packages/core/src/sync-store/index.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -485,7 +485,7 @@ test("getChildAddresses()", async (context) => {
});

const addresses = await syncStore.getChildAddresses({
filter: filter.address,
factory: filter.address,
});

expect(addresses).toMatchInlineSnapshot(`
Expand All @@ -512,7 +512,7 @@ test("getChildAddresses() empty", async (context) => {
const filter = sources[0]!.filter as LogFilter<Factory>;

const addresses = await syncStore.getChildAddresses({
filter: filter.address,
factory: filter.address,
});

expect(addresses).toMatchInlineSnapshot("Map {}");
Expand Down Expand Up @@ -544,7 +544,7 @@ test("getChildAddresses() distinct", async (context) => {
});

const addresses = await syncStore.getChildAddresses({
filter: filter.address,
factory: filter.address,
});

expect(addresses).toMatchInlineSnapshot(`
Expand Down
51 changes: 35 additions & 16 deletions packages/core/src/sync-store/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,12 @@ import type {
SyncTransactionReceipt,
} from "@/internal/types.js";
import { shouldGetTransactionReceipt } from "@/sync/filter.js";
import { fragmentToId, getFragments } from "@/sync/fragments.js";
import {
fragmentAddressToId,
fragmentToId,
getAddressFragments,
getFragments,
} from "@/sync/fragments.js";
import type { Interval } from "@/utils/interval.js";
import { type SelectQueryBuilder, sql as ksql, sql } from "kysely";
import type { InsertObject } from "kysely";
Expand Down Expand Up @@ -50,7 +55,7 @@ export type SyncStore = {
childAddresses: Map<Factory, Map<Address, number>>;
chainId: number;
}): Promise<void>;
getChildAddresses(args: { filter: Factory }): Promise<Map<Address, number>>;
getChildAddresses(args: { factory: Factory }): Promise<Map<Address, number>>;
insertLogs(args: { logs: SyncLog[]; chainId: number }): Promise<void>;
insertBlocks(args: { blocks: SyncBlock[]; chainId: number }): Promise<void>;
insertTransactions(args: {
Expand Down Expand Up @@ -231,42 +236,56 @@ export const createSyncStore = ({
return result;
},
),
insertChildAddresses: ({ childAddresses, chainId }) =>
database.wrap(
insertChildAddresses: async ({ childAddresses, chainId }) => {
if (
childAddresses.size === 0 ||
Array.from(childAddresses.values()).every(
(addresses) => addresses.size === 0,
)
) {
return;
}
await database.wrap(
{ method: "insertChildAddresses", includeTraceLogs: true },
async () => {
const values: InsertObject<PonderSyncSchema, "factories">[] = [];
for (const [factory, addresses] of childAddresses) {
const filterId = JSON.stringify(factory);
const fragmentIds = getAddressFragments(factory)
.map(({ fragment }) => fragmentAddressToId(fragment))
.sort((a, b) =>
a === null ? -1 : b === null ? 1 : a < b ? -1 : 1,
);
const factoryId = fragmentIds.join("_");

// Note: factories must be keyed by fragment, then how do we know which address belongs to which fragment

for (const [address, blockNumber] of addresses) {
values.push({
factory_hash: sql`MD5(${filterId})`,
factory_hash: sql`MD5(${factoryId})`,
chain_id: chainId,
block_number: blockNumber,
address: address,
});
}
}

if (values.length > 0) {
await database.qb.sync
.insertInto("factories")
.values(values)
.execute();
}
await database.qb.sync.insertInto("factories").values(values).execute();
},
),
getChildAddresses: ({ filter }) =>
);
},
getChildAddresses: ({ factory }) =>
database.wrap(
{ method: "getChildAddresses", includeTraceLogs: true },
() => {
const filterId = JSON.stringify(filter);
const fragmentIds = getAddressFragments(factory)
.map(({ fragment }) => fragmentAddressToId(fragment))
.sort((a, b) => (a === null ? -1 : b === null ? 1 : a < b ? -1 : 1));
const factoryId = fragmentIds.join("_");

return database.qb.sync
.selectFrom("factories")
.select(["address", sql<number>`block_number`.as("blockNumber")])
.where("factory_hash", "=", sql`MD5(${filterId})`)
.where("factory_hash", "=", sql`MD5(${factoryId})`)
.execute()
.then((rows) => {
const result = new Map<Address, number>();
Expand Down
4 changes: 2 additions & 2 deletions packages/core/src/sync/filter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,8 @@ export const isAddressMatched = ({
}) => {
if (address === undefined) return false;
if (
childAddresses.has(address) &&
childAddresses.get(address)! <= blockNumber
childAddresses.has(toLowerCase(address)) &&
childAddresses.get(toLowerCase(address))! <= blockNumber
) {
return true;
}
Expand Down
27 changes: 8 additions & 19 deletions packages/core/src/sync/fragments.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ type FragmentReturnType = {
adjacentIds: FragmentId[];
}[];

const getAddressFragments = (
export const getAddressFragments = (
address: Address | Address[] | Factory | undefined,
) => {
const fragments: {
Expand Down Expand Up @@ -302,7 +302,7 @@ export const getTransferFilterFragments = ({
return fragments;
};

const fragmentAddressToId = (
export const fragmentAddressToId = (
fragmentAddress: FragmentAddress,
): FragmentAddressId => {
if (fragmentAddress === null) return null;
Expand Down Expand Up @@ -332,23 +332,12 @@ const recoverAddress = (
if (baseAddress === undefined) return undefined;
if (typeof baseAddress === "string") return baseAddress;
if (Array.isArray(baseAddress)) return dedupe(fragmentAddresses) as Address[];
if (typeof baseAddress.address === "string") return baseAddress;

const address = {
type: "log",
chainId: baseAddress.chainId,
address: [] as Address[],
eventSelector: baseAddress.eventSelector,
childAddressLocation: baseAddress.childAddressLocation,
} satisfies Factory;

address.address = dedupe(
(fragmentAddresses as Extract<FragmentAddress, { address: Address }>[]).map(
({ address }) => address,
),
);

return address;

// Note: At this point, `baseAddress` is a factory. We explicitly don't try to recover the factory
// address from the fragments because we want a `insertChildAddresses` and `getChildAddresses` to
// use the factory as a stable key.

return baseAddress;
};

const recoverSelector = (
Expand Down
12 changes: 6 additions & 6 deletions packages/core/src/sync/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -877,7 +877,7 @@ export const createSync = async (params: {
if (isAddressFactory(filter.address)) {
const childAddresses =
await params.syncStore.getChildAddresses({
filter: filter.address,
factory: filter.address,
});

initialChildAddresses.set(
Expand All @@ -893,7 +893,7 @@ export const createSync = async (params: {
if (isAddressFactory(filter.fromAddress)) {
const childAddresses =
await params.syncStore.getChildAddresses({
filter: filter.fromAddress,
factory: filter.fromAddress,
});

initialChildAddresses.set(
Expand All @@ -905,7 +905,7 @@ export const createSync = async (params: {
if (isAddressFactory(filter.toAddress)) {
const childAddresses =
await params.syncStore.getChildAddresses({
filter: filter.toAddress,
factory: filter.toAddress,
});

initialChildAddresses.set(
Expand Down Expand Up @@ -1153,7 +1153,7 @@ export async function* getLocalEventGenerator(params: {
case "log":
if (isAddressFactory(filter.address)) {
const childAddresses = await params.syncStore.getChildAddresses({
filter: filter.address,
factory: filter.address,
});

initialChildAddresses.set(filter.address, new Map(childAddresses));
Expand All @@ -1165,7 +1165,7 @@ export async function* getLocalEventGenerator(params: {
case "trace":
if (isAddressFactory(filter.fromAddress)) {
const childAddresses = await params.syncStore.getChildAddresses({
filter: filter.fromAddress,
factory: filter.fromAddress,
});

initialChildAddresses.set(
Expand All @@ -1176,7 +1176,7 @@ export async function* getLocalEventGenerator(params: {

if (isAddressFactory(filter.toAddress)) {
const childAddresses = await params.syncStore.getChildAddresses({
filter: filter.toAddress,
factory: filter.toAddress,
});

initialChildAddresses.set(
Expand Down

0 comments on commit f43b6ac

Please sign in to comment.