Skip to content

Commit

Permalink
Replace subscriptionConfigurationHash with RpcRequest in RpcSubscript…
Browse files Browse the repository at this point in the history
…ionsPlan
  • Loading branch information
lorisleiva committed Oct 23, 2024
1 parent 10b08ac commit 4eab8d5
Show file tree
Hide file tree
Showing 11 changed files with 58 additions and 124 deletions.
7 changes: 7 additions & 0 deletions .changeset/silly-wombats-switch.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
'@solana/rpc-subscriptions-spec': patch
'@solana/rpc-subscriptions-api': patch
'@solana/rpc-subscriptions': patch
---

Replace subscriptionConfigurationHash with RpcRequest in RpcSubscriptionPlan
1 change: 0 additions & 1 deletion packages/rpc-subscriptions-api/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,6 @@
],
"dependencies": {
"@solana/addresses": "workspace:*",
"@solana/fast-stable-stringify": "workspace:*",
"@solana/keys": "workspace:*",
"@solana/rpc-subscriptions-spec": "workspace:*",
"@solana/rpc-transformers": "workspace:*",
Expand Down
13 changes: 3 additions & 10 deletions packages/rpc-subscriptions-api/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import fastStableStringify from '@solana/fast-stable-stringify';
import {
createRpcSubscriptionsApi,
executeRpcPubSubSubscriptionPlan,
Expand Down Expand Up @@ -56,21 +55,15 @@ function createSolanaRpcSubscriptionsApi_INTERNAL<TApi extends RpcSubscriptionsA
allowedNumericKeyPaths: getAllowedNumericKeypaths(),
});
return createRpcSubscriptionsApi<TApi>({
getSubscriptionConfigurationHash(request) {
return fastStableStringify([request.methodName, request.params]);
},
planExecutor({ request, ...rest }) {
const transformedRequest = requestTransformer(request);
return executeRpcPubSubSubscriptionPlan({
...rest,
responseTransformer,
subscribeRequest: {
...transformedRequest,
methodName: transformedRequest.methodName.replace(/Notifications$/, 'Subscribe'),
},
unsubscribeMethodName: transformedRequest.methodName.replace(/Notifications$/, 'Unsubscribe'),
subscribeRequest: { ...request, methodName: request.methodName.replace(/Notifications$/, 'Subscribe') },
unsubscribeMethodName: request.methodName.replace(/Notifications$/, 'Unsubscribe'),
});
},
requestTransformer,
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ describe('createRpcSubscriptionsApi', () => {
it('calls the plan executor with the expected params', () => {
const mockPlanExecutor = jest.fn().mockResolvedValue({
executeSubscriptionPlan: jest.fn(),
subscriptionConfigurationHash: 'MOCK_HASH',
request: { methodName: 'foo', params: [] },
} as RpcSubscriptionsPlan<unknown>);
const api = createRpcSubscriptionsApi({ planExecutor: mockPlanExecutor });
const expectedParams = [1, 'hi', 3];
Expand All @@ -28,48 +28,19 @@ describe('createRpcSubscriptionsApi', () => {
});
});
});
describe('subscriptionConfigurationHash', () => {
it('does not call the hash creator before it is accessed', () => {
const mockGetSubscriptionConfigurationHash = jest.fn();
const api = createRpcSubscriptionsApi({
getSubscriptionConfigurationHash: mockGetSubscriptionConfigurationHash,
planExecutor: jest.fn(),
});
api.foo('hi');
expect(mockGetSubscriptionConfigurationHash).not.toHaveBeenCalled();
});
it('calls the hash creator when it is accessed', () => {
const mockGetSubscriptionConfigurationHash = jest.fn();
const api = createRpcSubscriptionsApi({
getSubscriptionConfigurationHash: mockGetSubscriptionConfigurationHash,
planExecutor: jest.fn(),
});
const result = api.foo('hi');
result.subscriptionConfigurationHash;
expect(mockGetSubscriptionConfigurationHash).toHaveBeenCalledWith({
methodName: 'foo',
params: ['hi'],
});
});
it('memoizes the result of the hash creator', () => {
const mockGetSubscriptionConfigurationHash = jest.fn();
const api = createRpcSubscriptionsApi({
getSubscriptionConfigurationHash: mockGetSubscriptionConfigurationHash,
planExecutor: jest.fn(),
});
describe('rpcRequest', () => {
it('provides the initial request object by default', () => {
const api = createRpcSubscriptionsApi({ planExecutor: jest.fn() });
const result = api.foo('hi');
result.subscriptionConfigurationHash;
result.subscriptionConfigurationHash;
expect(mockGetSubscriptionConfigurationHash).toHaveBeenCalledTimes(1);
expect(result.request).toEqual({ methodName: 'foo', params: ['hi'] });
});
it('returns the result of the hash creator', () => {
const mockGetSubscriptionConfigurationHash = jest.fn().mockReturnValue('MOCK_HASH');
it('provides the transformed request object when a request transformer is provided', () => {
const api = createRpcSubscriptionsApi({
getSubscriptionConfigurationHash: mockGetSubscriptionConfigurationHash,
planExecutor: jest.fn(),
requestTransformer: jest.fn().mockReturnValue({ methodName: 'bar', params: [1, 2, 3] }),
});
const result = api.foo('hi');
expect(result.subscriptionConfigurationHash).toBe('MOCK_HASH');
expect(result.request).toEqual({ methodName: 'bar', params: [1, 2, 3] });
});
});
});
24 changes: 9 additions & 15 deletions packages/rpc-subscriptions-spec/src/rpc-subscriptions-api.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
import { Callable, RpcRequest } from '@solana/rpc-spec-types';
import { Callable, RpcRequest, RpcRequestTransformer } from '@solana/rpc-spec-types';
import { DataPublisher } from '@solana/subscribable';

import { RpcSubscriptionsChannel } from './rpc-subscriptions-channel';
import { RpcSubscriptionsTransportDataEvents } from './rpc-subscriptions-transport';

export type RpcSubscriptionsApiConfig<TApiMethods extends RpcSubscriptionsApiMethods> = Readonly<{
getSubscriptionConfigurationHash?: (request: RpcRequest) => string | undefined;
planExecutor: RpcSubscriptionsPlanExecutor<ReturnType<TApiMethods[keyof TApiMethods]>>;
requestTransformer?: RpcRequestTransformer;
}>;

type RpcSubscriptionsPlanExecutor<TNotification> = (
Expand All @@ -28,10 +28,11 @@ export type RpcSubscriptionsPlan<TNotification> = Readonly<{
}>,
) => Promise<DataPublisher<RpcSubscriptionsTransportDataEvents<TNotification>>>;
/**
* This hash uniquely identifies the configuration of a subscription. It is typically used by
* consumers of this API to deduplicate multiple subscriptions for the same notification.
* This request is used to uniquely identify the subscription.
* It typically comes from the method name and parameters of the subscription call,
* after potentially being transformed by the RPC Subscriptions API.
*/
subscriptionConfigurationHash: string | undefined;
request: RpcRequest;
}>;

export type RpcSubscriptionsApi<TRpcSubscriptionMethods> = {
Expand All @@ -50,8 +51,6 @@ export interface RpcSubscriptionsApiMethods {
[methodName: string]: RpcSubscriptionsApiMethod;
}

const UNINITIALIZED = Symbol();

export function createRpcSubscriptionsApi<TRpcSubscriptionsApiMethods extends RpcSubscriptionsApiMethods>(
config: RpcSubscriptionsApiConfig<TRpcSubscriptionsApiMethods>,
): RpcSubscriptionsApi<TRpcSubscriptionsApiMethods> {
Expand All @@ -74,18 +73,13 @@ export function createRpcSubscriptionsApi<TRpcSubscriptionsApiMethods extends Rp
: never
>
): RpcSubscriptionsPlan<ReturnType<TRpcSubscriptionsApiMethods[TNotificationName]>> {
let _cachedSubscriptionHash: string | typeof UNINITIALIZED | undefined = UNINITIALIZED;
const request = { methodName, params };
const rawRequest = { methodName, params };
const request = config.requestTransformer ? config.requestTransformer(rawRequest) : rawRequest;
return {
executeSubscriptionPlan(planConfig) {
return config.planExecutor({ ...planConfig, request });
},
get subscriptionConfigurationHash() {
if (_cachedSubscriptionHash === UNINITIALIZED) {
_cachedSubscriptionHash = config?.getSubscriptionConfigurationHash?.(request);
}
return _cachedSubscriptionHash;
},
request,
};
};
},
Expand Down
2 changes: 1 addition & 1 deletion packages/rpc-subscriptions/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,4 +51,4 @@ Given an `RpcSubscriptionsChannel`, will return a new channel that sends a ping

### `getRpcSubscriptionsTransportWithSubscriptionCoalescing(transport)`

Given an `RpcSubscriptionsTransport`, will return a new transport that coalesces identical subscriptions into a single subscription request to the server. The determination of whether a subscription is the same as another is based on the `subscriptionConfigurationHash` returned by its `RpcSubscriptionsPlan`. The subscription will only be aborted once all subscribers abort, or there is an error.
Given an `RpcSubscriptionsTransport`, will return a new transport that coalesces identical subscriptions into a single subscription request to the server. The determination of whether a subscription is the same as another is based on the `rpcRequest` returned by its `RpcSubscriptionsPlan`. The subscription will only be aborted once all subscribers abort, or there is an error.
1 change: 1 addition & 0 deletions packages/rpc-subscriptions/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@
],
"dependencies": {
"@solana/errors": "workspace:*",
"@solana/fast-stable-stringify": "workspace:*",
"@solana/functional": "workspace:*",
"@solana/promises": "workspace:*",
"@solana/rpc-subscriptions-api": "workspace:*",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,17 @@ describe('getRpcSubscriptionsTransportWithSubscriptionCoalescing', () => {
mockInnerTransport.mockResolvedValue(expectedDataPublisher);
const config = {
executeSubscriptionPlan: jest.fn(),
request: { methodName: 'foo', params: [] },
signal: new AbortController().signal,
subscriptionConfigurationHash: 'MOCK_HASH',
};
const transportPromise = coalescedTransport(config);
await expect(transportPromise).resolves.toBe(expectedDataPublisher);
});
it('passes the `executeSubscriptionPlan` config to the inner transport', () => {
const config = {
executeSubscriptionPlan: jest.fn(),
request: { methodName: 'foo', params: [] },
signal: new AbortController().signal,
subscriptionConfigurationHash: 'MOCK_HASH',
};
coalescedTransport(config).catch(() => {});
expect(mockInnerTransport).toHaveBeenCalledWith(
Expand All @@ -39,16 +39,16 @@ describe('getRpcSubscriptionsTransportWithSubscriptionCoalescing', () => {
}),
);
});
it('passes the `subscriptionConfigurationHash` config to the inner transport', () => {
it('passes the `rpcRequest` config to the inner transport', () => {
const config = {
executeSubscriptionPlan: jest.fn(),
request: { methodName: 'foo', params: [] },
signal: new AbortController().signal,
subscriptionConfigurationHash: 'MOCK_HASH',
};
coalescedTransport(config).catch(() => {});
expect(mockInnerTransport).toHaveBeenCalledWith(
expect.objectContaining({
subscriptionConfigurationHash: 'MOCK_HASH',
request: { methodName: 'foo', params: [] },
}),
);
});
Expand All @@ -58,11 +58,11 @@ describe('getRpcSubscriptionsTransportWithSubscriptionCoalescing', () => {
signal: new AbortController().signal,
};
coalescedTransport({
subscriptionConfigurationHash: 'MOCK_HASH_A',
request: { methodName: 'methodA', params: [] },
...config,
}).catch(() => {});
coalescedTransport({
subscriptionConfigurationHash: 'MOCK_HASH_B',
request: { methodName: 'methodB', params: [] },
...config,
}).catch(() => {});
expect(mockInnerTransport).toHaveBeenCalledTimes(2);
Expand All @@ -73,46 +73,15 @@ describe('getRpcSubscriptionsTransportWithSubscriptionCoalescing', () => {
executeSubscriptionPlan: jest.fn(),
signal: new AbortController().signal,
};
await coalescedTransport({ ...config, subscriptionConfigurationHash: 'MOCK_HASH_A' });
await coalescedTransport({ ...config, subscriptionConfigurationHash: 'MOCK_HASH_B' });
expect(mockInnerTransport).toHaveBeenCalledTimes(2);
});
it("calls the inner transport once per subscriber when both subscribers' hashes are `undefined`, in the same runloop", () => {
const config = {
executeSubscriptionPlan: jest.fn(),
signal: new AbortController().signal,
};
coalescedTransport({
subscriptionConfigurationHash: undefined,
...config,
}).catch(() => {});
coalescedTransport({
subscriptionConfigurationHash: undefined,
...config,
}).catch(() => {});
expect(mockInnerTransport).toHaveBeenCalledTimes(2);
});
it("calls the inner transport once per subscriber when both subscribers' hashes are `undefined`, in different runloops", async () => {
expect.assertions(1);
const config = {
executeSubscriptionPlan: jest.fn(),
signal: new AbortController().signal,
};
await coalescedTransport({
subscriptionConfigurationHash: undefined,
...config,
});
await coalescedTransport({
subscriptionConfigurationHash: undefined,
...config,
});
await coalescedTransport({ ...config, request: { methodName: 'methodA', params: [] } });
await coalescedTransport({ ...config, request: { methodName: 'methodB', params: [] } });
expect(mockInnerTransport).toHaveBeenCalledTimes(2);
});
it('only calls the inner transport once, in the same runloop', () => {
const config = {
executeSubscriptionPlan: jest.fn(),
request: { methodName: 'foo', params: [] },
signal: new AbortController().signal,
subscriptionConfigurationHash: 'MOCK_HASH',
};
coalescedTransport(config).catch(() => {});
coalescedTransport(config).catch(() => {});
Expand All @@ -122,8 +91,8 @@ describe('getRpcSubscriptionsTransportWithSubscriptionCoalescing', () => {
expect.assertions(1);
const config = {
executeSubscriptionPlan: jest.fn(),
request: { methodName: 'foo', params: [] },
signal: new AbortController().signal,
subscriptionConfigurationHash: 'MOCK_HASH',
};
await coalescedTransport(config);
await coalescedTransport(config);
Expand All @@ -133,8 +102,8 @@ describe('getRpcSubscriptionsTransportWithSubscriptionCoalescing', () => {
expect.assertions(1);
const config = {
executeSubscriptionPlan: jest.fn(),
request: { methodName: 'foo', params: [] },
signal: new AbortController().signal,
subscriptionConfigurationHash: 'MOCK_HASH',
};
const [publisherA, publisherB] = await Promise.all([coalescedTransport(config), coalescedTransport(config)]);
expect(publisherA).toBe(publisherB);
Expand All @@ -143,8 +112,8 @@ describe('getRpcSubscriptionsTransportWithSubscriptionCoalescing', () => {
expect.assertions(1);
const config = {
executeSubscriptionPlan: jest.fn(),
request: { methodName: 'foo', params: [] },
signal: new AbortController().signal,
subscriptionConfigurationHash: 'MOCK_HASH',
};
const publisherA = await coalescedTransport(config);
const publisherB = await coalescedTransport(config);
Expand All @@ -154,7 +123,7 @@ describe('getRpcSubscriptionsTransportWithSubscriptionCoalescing', () => {
jest.useFakeTimers();
const config = {
executeSubscriptionPlan: jest.fn(),
subscriptionConfigurationHash: 'MOCK_HASH',
request: { methodName: 'foo', params: [] },
};
const abortControllerB = new AbortController();
coalescedTransport({ ...config, signal: new AbortController().signal }).catch(() => {});
Expand All @@ -167,7 +136,7 @@ describe('getRpcSubscriptionsTransportWithSubscriptionCoalescing', () => {
jest.useFakeTimers();
const config = {
executeSubscriptionPlan: jest.fn(),
subscriptionConfigurationHash: 'MOCK_HASH',
request: { methodName: 'foo', params: [] },
};
const abortControllerA = new AbortController();
const abortControllerB = new AbortController();
Expand All @@ -182,7 +151,7 @@ describe('getRpcSubscriptionsTransportWithSubscriptionCoalescing', () => {
expect.assertions(1);
const config = {
executeSubscriptionPlan: jest.fn(),
subscriptionConfigurationHash: 'MOCK_HASH',
request: { methodName: 'foo', params: [] },
};
const abortControllerA = new AbortController();
const abortControllerB = new AbortController();
Expand All @@ -198,7 +167,7 @@ describe('getRpcSubscriptionsTransportWithSubscriptionCoalescing', () => {
it('does not fire the inner abort signal if the subscriber count is non zero at the end of the runloop, despite having aborted all in the middle of it', () => {
const config = {
executeSubscriptionPlan: jest.fn(),
subscriptionConfigurationHash: 'MOCK_HASH',
request: { methodName: 'foo', params: [] },
};
const abortControllerA = new AbortController();
coalescedTransport({ ...config, signal: abortControllerA.signal }).catch(() => {});
Expand All @@ -212,7 +181,7 @@ describe('getRpcSubscriptionsTransportWithSubscriptionCoalescing', () => {
jest.useFakeTimers();
const config = {
executeSubscriptionPlan: jest.fn(),
subscriptionConfigurationHash: 'MOCK_HASH',
request: { methodName: 'foo', params: [] },
};
coalescedTransport({ ...config, signal: new AbortController().signal }).catch(() => {});
await jest.runAllTimersAsync();
Expand All @@ -225,7 +194,7 @@ describe('getRpcSubscriptionsTransportWithSubscriptionCoalescing', () => {
jest.useFakeTimers();
const config = {
executeSubscriptionPlan: jest.fn(),
subscriptionConfigurationHash: 'MOCK_HASH',
request: { methodName: 'foo', params: [] },
};
const abortControllerA = new AbortController();
/**
Expand Down
Loading

0 comments on commit 4eab8d5

Please sign in to comment.