diff --git a/.changeset/silly-wombats-switch.md b/.changeset/silly-wombats-switch.md new file mode 100644 index 000000000000..74bdbc11ef1f --- /dev/null +++ b/.changeset/silly-wombats-switch.md @@ -0,0 +1,7 @@ +--- +'@solana/rpc-subscriptions-spec': patch +'@solana/rpc-subscriptions-api': patch +'@solana/rpc-subscriptions': patch +--- + +Replace subscriptionConfigurationHash with RpcRequest in RpcSubscriptionPlan diff --git a/packages/rpc-subscriptions-api/package.json b/packages/rpc-subscriptions-api/package.json index bd18672d5b45..a664ad68af4e 100644 --- a/packages/rpc-subscriptions-api/package.json +++ b/packages/rpc-subscriptions-api/package.json @@ -72,7 +72,6 @@ ], "dependencies": { "@solana/addresses": "workspace:*", - "@solana/fast-stable-stringify": "workspace:*", "@solana/keys": "workspace:*", "@solana/rpc-subscriptions-spec": "workspace:*", "@solana/rpc-transformers": "workspace:*", diff --git a/packages/rpc-subscriptions-api/src/index.ts b/packages/rpc-subscriptions-api/src/index.ts index 9a4e0d31adbc..ada574952261 100644 --- a/packages/rpc-subscriptions-api/src/index.ts +++ b/packages/rpc-subscriptions-api/src/index.ts @@ -1,4 +1,3 @@ -import fastStableStringify from '@solana/fast-stable-stringify'; import { createRpcSubscriptionsApi, executeRpcPubSubSubscriptionPlan, @@ -56,21 +55,15 @@ function createSolanaRpcSubscriptionsApi_INTERNAL({ - getSubscriptionConfigurationHash(request) { - return fastStableStringify([request.methodName, request.params]); - }, planExecutor({ request, ...rest }) { - const transformedRequest = requestTransformer(request); return executeRpcPubSubSubscriptionPlan({ ...rest, responseTransformer, - subscribeRequest: { - ...transformedRequest, - methodName: transformedRequest.methodName.replace(/Notifications$/, 'Subscribe'), - }, - unsubscribeMethodName: transformedRequest.methodName.replace(/Notifications$/, 'Unsubscribe'), + subscribeRequest: { ...request, methodName: request.methodName.replace(/Notifications$/, 'Subscribe') }, + unsubscribeMethodName: request.methodName.replace(/Notifications$/, 'Unsubscribe'), }); }, + requestTransformer, }); } diff --git a/packages/rpc-subscriptions-spec/src/__tests__/rpc-subscriptions-api-test.ts b/packages/rpc-subscriptions-spec/src/__tests__/rpc-subscriptions-api-test.ts index 1a1d658edc92..ced1444bf251 100644 --- a/packages/rpc-subscriptions-spec/src/__tests__/rpc-subscriptions-api-test.ts +++ b/packages/rpc-subscriptions-spec/src/__tests__/rpc-subscriptions-api-test.ts @@ -10,7 +10,7 @@ describe('createRpcSubscriptionsApi', () => { it('calls the plan executor with the expected params', () => { const mockPlanExecutor = jest.fn().mockResolvedValue({ executeSubscriptionPlan: jest.fn(), - subscriptionConfigurationHash: 'MOCK_HASH', + request: { methodName: 'foo', params: [] }, } as RpcSubscriptionsPlan); const api = createRpcSubscriptionsApi({ planExecutor: mockPlanExecutor }); const expectedParams = [1, 'hi', 3]; @@ -28,48 +28,19 @@ describe('createRpcSubscriptionsApi', () => { }); }); }); - describe('subscriptionConfigurationHash', () => { - it('does not call the hash creator before it is accessed', () => { - const mockGetSubscriptionConfigurationHash = jest.fn(); - const api = createRpcSubscriptionsApi({ - getSubscriptionConfigurationHash: mockGetSubscriptionConfigurationHash, - planExecutor: jest.fn(), - }); - api.foo('hi'); - expect(mockGetSubscriptionConfigurationHash).not.toHaveBeenCalled(); - }); - it('calls the hash creator when it is accessed', () => { - const mockGetSubscriptionConfigurationHash = jest.fn(); - const api = createRpcSubscriptionsApi({ - getSubscriptionConfigurationHash: mockGetSubscriptionConfigurationHash, - planExecutor: jest.fn(), - }); - const result = api.foo('hi'); - result.subscriptionConfigurationHash; - expect(mockGetSubscriptionConfigurationHash).toHaveBeenCalledWith({ - methodName: 'foo', - params: ['hi'], - }); - }); - it('memoizes the result of the hash creator', () => { - const mockGetSubscriptionConfigurationHash = jest.fn(); - const api = createRpcSubscriptionsApi({ - getSubscriptionConfigurationHash: mockGetSubscriptionConfigurationHash, - planExecutor: jest.fn(), - }); + describe('rpcRequest', () => { + it('provides the initial request object by default', () => { + const api = createRpcSubscriptionsApi({ planExecutor: jest.fn() }); const result = api.foo('hi'); - result.subscriptionConfigurationHash; - result.subscriptionConfigurationHash; - expect(mockGetSubscriptionConfigurationHash).toHaveBeenCalledTimes(1); + expect(result.request).toEqual({ methodName: 'foo', params: ['hi'] }); }); - it('returns the result of the hash creator', () => { - const mockGetSubscriptionConfigurationHash = jest.fn().mockReturnValue('MOCK_HASH'); + it('provides the transformed request object when a request transformer is provided', () => { const api = createRpcSubscriptionsApi({ - getSubscriptionConfigurationHash: mockGetSubscriptionConfigurationHash, planExecutor: jest.fn(), + requestTransformer: jest.fn().mockReturnValue({ methodName: 'bar', params: [1, 2, 3] }), }); const result = api.foo('hi'); - expect(result.subscriptionConfigurationHash).toBe('MOCK_HASH'); + expect(result.request).toEqual({ methodName: 'bar', params: [1, 2, 3] }); }); }); }); diff --git a/packages/rpc-subscriptions-spec/src/rpc-subscriptions-api.ts b/packages/rpc-subscriptions-spec/src/rpc-subscriptions-api.ts index 8476c990cfe5..58ea615dc7fd 100644 --- a/packages/rpc-subscriptions-spec/src/rpc-subscriptions-api.ts +++ b/packages/rpc-subscriptions-spec/src/rpc-subscriptions-api.ts @@ -1,12 +1,12 @@ -import { Callable, RpcRequest } from '@solana/rpc-spec-types'; +import { Callable, RpcRequest, RpcRequestTransformer } from '@solana/rpc-spec-types'; import { DataPublisher } from '@solana/subscribable'; import { RpcSubscriptionsChannel } from './rpc-subscriptions-channel'; import { RpcSubscriptionsTransportDataEvents } from './rpc-subscriptions-transport'; export type RpcSubscriptionsApiConfig = Readonly<{ - getSubscriptionConfigurationHash?: (request: RpcRequest) => string | undefined; planExecutor: RpcSubscriptionsPlanExecutor>; + requestTransformer?: RpcRequestTransformer; }>; type RpcSubscriptionsPlanExecutor = ( @@ -28,10 +28,11 @@ export type RpcSubscriptionsPlan = Readonly<{ }>, ) => Promise>>; /** - * This hash uniquely identifies the configuration of a subscription. It is typically used by - * consumers of this API to deduplicate multiple subscriptions for the same notification. + * This request is used to uniquely identify the subscription. + * It typically comes from the method name and parameters of the subscription call, + * after potentially being transformed by the RPC Subscriptions API. */ - subscriptionConfigurationHash: string | undefined; + request: RpcRequest; }>; export type RpcSubscriptionsApi = { @@ -50,8 +51,6 @@ export interface RpcSubscriptionsApiMethods { [methodName: string]: RpcSubscriptionsApiMethod; } -const UNINITIALIZED = Symbol(); - export function createRpcSubscriptionsApi( config: RpcSubscriptionsApiConfig, ): RpcSubscriptionsApi { @@ -74,18 +73,13 @@ export function createRpcSubscriptionsApi ): RpcSubscriptionsPlan> { - let _cachedSubscriptionHash: string | typeof UNINITIALIZED | undefined = UNINITIALIZED; - const request = { methodName, params }; + const rawRequest = { methodName, params }; + const request = config.requestTransformer ? config.requestTransformer(rawRequest) : rawRequest; return { executeSubscriptionPlan(planConfig) { return config.planExecutor({ ...planConfig, request }); }, - get subscriptionConfigurationHash() { - if (_cachedSubscriptionHash === UNINITIALIZED) { - _cachedSubscriptionHash = config?.getSubscriptionConfigurationHash?.(request); - } - return _cachedSubscriptionHash; - }, + request, }; }; }, diff --git a/packages/rpc-subscriptions/README.md b/packages/rpc-subscriptions/README.md index f7ee0274dbe4..39850d4cfc40 100644 --- a/packages/rpc-subscriptions/README.md +++ b/packages/rpc-subscriptions/README.md @@ -51,4 +51,4 @@ Given an `RpcSubscriptionsChannel`, will return a new channel that sends a ping ### `getRpcSubscriptionsTransportWithSubscriptionCoalescing(transport)` -Given an `RpcSubscriptionsTransport`, will return a new transport that coalesces identical subscriptions into a single subscription request to the server. The determination of whether a subscription is the same as another is based on the `subscriptionConfigurationHash` returned by its `RpcSubscriptionsPlan`. The subscription will only be aborted once all subscribers abort, or there is an error. +Given an `RpcSubscriptionsTransport`, will return a new transport that coalesces identical subscriptions into a single subscription request to the server. The determination of whether a subscription is the same as another is based on the `rpcRequest` returned by its `RpcSubscriptionsPlan`. The subscription will only be aborted once all subscribers abort, or there is an error. diff --git a/packages/rpc-subscriptions/package.json b/packages/rpc-subscriptions/package.json index edb4255a5f84..ffc74be82ae5 100644 --- a/packages/rpc-subscriptions/package.json +++ b/packages/rpc-subscriptions/package.json @@ -72,6 +72,7 @@ ], "dependencies": { "@solana/errors": "workspace:*", + "@solana/fast-stable-stringify": "workspace:*", "@solana/functional": "workspace:*", "@solana/promises": "workspace:*", "@solana/rpc-subscriptions-api": "workspace:*", diff --git a/packages/rpc-subscriptions/src/__tests__/rpc-subscriptions-coalescer-test.ts b/packages/rpc-subscriptions/src/__tests__/rpc-subscriptions-coalescer-test.ts index 0da0416a4431..98619e30a42d 100644 --- a/packages/rpc-subscriptions/src/__tests__/rpc-subscriptions-coalescer-test.ts +++ b/packages/rpc-subscriptions/src/__tests__/rpc-subscriptions-coalescer-test.ts @@ -20,8 +20,8 @@ describe('getRpcSubscriptionsTransportWithSubscriptionCoalescing', () => { mockInnerTransport.mockResolvedValue(expectedDataPublisher); const config = { executeSubscriptionPlan: jest.fn(), + request: { methodName: 'foo', params: [] }, signal: new AbortController().signal, - subscriptionConfigurationHash: 'MOCK_HASH', }; const transportPromise = coalescedTransport(config); await expect(transportPromise).resolves.toBe(expectedDataPublisher); @@ -29,8 +29,8 @@ describe('getRpcSubscriptionsTransportWithSubscriptionCoalescing', () => { it('passes the `executeSubscriptionPlan` config to the inner transport', () => { const config = { executeSubscriptionPlan: jest.fn(), + request: { methodName: 'foo', params: [] }, signal: new AbortController().signal, - subscriptionConfigurationHash: 'MOCK_HASH', }; coalescedTransport(config).catch(() => {}); expect(mockInnerTransport).toHaveBeenCalledWith( @@ -39,16 +39,16 @@ describe('getRpcSubscriptionsTransportWithSubscriptionCoalescing', () => { }), ); }); - it('passes the `subscriptionConfigurationHash` config to the inner transport', () => { + it('passes the `rpcRequest` config to the inner transport', () => { const config = { executeSubscriptionPlan: jest.fn(), + request: { methodName: 'foo', params: [] }, signal: new AbortController().signal, - subscriptionConfigurationHash: 'MOCK_HASH', }; coalescedTransport(config).catch(() => {}); expect(mockInnerTransport).toHaveBeenCalledWith( expect.objectContaining({ - subscriptionConfigurationHash: 'MOCK_HASH', + request: { methodName: 'foo', params: [] }, }), ); }); @@ -58,11 +58,11 @@ describe('getRpcSubscriptionsTransportWithSubscriptionCoalescing', () => { signal: new AbortController().signal, }; coalescedTransport({ - subscriptionConfigurationHash: 'MOCK_HASH_A', + request: { methodName: 'methodA', params: [] }, ...config, }).catch(() => {}); coalescedTransport({ - subscriptionConfigurationHash: 'MOCK_HASH_B', + request: { methodName: 'methodB', params: [] }, ...config, }).catch(() => {}); expect(mockInnerTransport).toHaveBeenCalledTimes(2); @@ -73,46 +73,15 @@ describe('getRpcSubscriptionsTransportWithSubscriptionCoalescing', () => { executeSubscriptionPlan: jest.fn(), signal: new AbortController().signal, }; - await coalescedTransport({ ...config, subscriptionConfigurationHash: 'MOCK_HASH_A' }); - await coalescedTransport({ ...config, subscriptionConfigurationHash: 'MOCK_HASH_B' }); - expect(mockInnerTransport).toHaveBeenCalledTimes(2); - }); - it("calls the inner transport once per subscriber when both subscribers' hashes are `undefined`, in the same runloop", () => { - const config = { - executeSubscriptionPlan: jest.fn(), - signal: new AbortController().signal, - }; - coalescedTransport({ - subscriptionConfigurationHash: undefined, - ...config, - }).catch(() => {}); - coalescedTransport({ - subscriptionConfigurationHash: undefined, - ...config, - }).catch(() => {}); - expect(mockInnerTransport).toHaveBeenCalledTimes(2); - }); - it("calls the inner transport once per subscriber when both subscribers' hashes are `undefined`, in different runloops", async () => { - expect.assertions(1); - const config = { - executeSubscriptionPlan: jest.fn(), - signal: new AbortController().signal, - }; - await coalescedTransport({ - subscriptionConfigurationHash: undefined, - ...config, - }); - await coalescedTransport({ - subscriptionConfigurationHash: undefined, - ...config, - }); + await coalescedTransport({ ...config, request: { methodName: 'methodA', params: [] } }); + await coalescedTransport({ ...config, request: { methodName: 'methodB', params: [] } }); expect(mockInnerTransport).toHaveBeenCalledTimes(2); }); it('only calls the inner transport once, in the same runloop', () => { const config = { executeSubscriptionPlan: jest.fn(), + request: { methodName: 'foo', params: [] }, signal: new AbortController().signal, - subscriptionConfigurationHash: 'MOCK_HASH', }; coalescedTransport(config).catch(() => {}); coalescedTransport(config).catch(() => {}); @@ -122,8 +91,8 @@ describe('getRpcSubscriptionsTransportWithSubscriptionCoalescing', () => { expect.assertions(1); const config = { executeSubscriptionPlan: jest.fn(), + request: { methodName: 'foo', params: [] }, signal: new AbortController().signal, - subscriptionConfigurationHash: 'MOCK_HASH', }; await coalescedTransport(config); await coalescedTransport(config); @@ -133,8 +102,8 @@ describe('getRpcSubscriptionsTransportWithSubscriptionCoalescing', () => { expect.assertions(1); const config = { executeSubscriptionPlan: jest.fn(), + request: { methodName: 'foo', params: [] }, signal: new AbortController().signal, - subscriptionConfigurationHash: 'MOCK_HASH', }; const [publisherA, publisherB] = await Promise.all([coalescedTransport(config), coalescedTransport(config)]); expect(publisherA).toBe(publisherB); @@ -143,8 +112,8 @@ describe('getRpcSubscriptionsTransportWithSubscriptionCoalescing', () => { expect.assertions(1); const config = { executeSubscriptionPlan: jest.fn(), + request: { methodName: 'foo', params: [] }, signal: new AbortController().signal, - subscriptionConfigurationHash: 'MOCK_HASH', }; const publisherA = await coalescedTransport(config); const publisherB = await coalescedTransport(config); @@ -154,7 +123,7 @@ describe('getRpcSubscriptionsTransportWithSubscriptionCoalescing', () => { jest.useFakeTimers(); const config = { executeSubscriptionPlan: jest.fn(), - subscriptionConfigurationHash: 'MOCK_HASH', + request: { methodName: 'foo', params: [] }, }; const abortControllerB = new AbortController(); coalescedTransport({ ...config, signal: new AbortController().signal }).catch(() => {}); @@ -167,7 +136,7 @@ describe('getRpcSubscriptionsTransportWithSubscriptionCoalescing', () => { jest.useFakeTimers(); const config = { executeSubscriptionPlan: jest.fn(), - subscriptionConfigurationHash: 'MOCK_HASH', + request: { methodName: 'foo', params: [] }, }; const abortControllerA = new AbortController(); const abortControllerB = new AbortController(); @@ -182,7 +151,7 @@ describe('getRpcSubscriptionsTransportWithSubscriptionCoalescing', () => { expect.assertions(1); const config = { executeSubscriptionPlan: jest.fn(), - subscriptionConfigurationHash: 'MOCK_HASH', + request: { methodName: 'foo', params: [] }, }; const abortControllerA = new AbortController(); const abortControllerB = new AbortController(); @@ -198,7 +167,7 @@ describe('getRpcSubscriptionsTransportWithSubscriptionCoalescing', () => { it('does not fire the inner abort signal if the subscriber count is non zero at the end of the runloop, despite having aborted all in the middle of it', () => { const config = { executeSubscriptionPlan: jest.fn(), - subscriptionConfigurationHash: 'MOCK_HASH', + request: { methodName: 'foo', params: [] }, }; const abortControllerA = new AbortController(); coalescedTransport({ ...config, signal: abortControllerA.signal }).catch(() => {}); @@ -212,7 +181,7 @@ describe('getRpcSubscriptionsTransportWithSubscriptionCoalescing', () => { jest.useFakeTimers(); const config = { executeSubscriptionPlan: jest.fn(), - subscriptionConfigurationHash: 'MOCK_HASH', + request: { methodName: 'foo', params: [] }, }; coalescedTransport({ ...config, signal: new AbortController().signal }).catch(() => {}); await jest.runAllTimersAsync(); @@ -225,7 +194,7 @@ describe('getRpcSubscriptionsTransportWithSubscriptionCoalescing', () => { jest.useFakeTimers(); const config = { executeSubscriptionPlan: jest.fn(), - subscriptionConfigurationHash: 'MOCK_HASH', + request: { methodName: 'foo', params: [] }, }; const abortControllerA = new AbortController(); /** diff --git a/packages/rpc-subscriptions/src/__tests__/rpc-subscriptions-transport-test.ts b/packages/rpc-subscriptions/src/__tests__/rpc-subscriptions-transport-test.ts index 6bd5e0926f19..6542a43e9f6c 100644 --- a/packages/rpc-subscriptions/src/__tests__/rpc-subscriptions-transport-test.ts +++ b/packages/rpc-subscriptions/src/__tests__/rpc-subscriptions-transport-test.ts @@ -10,8 +10,8 @@ describe('createRpcSubscriptionsTransportFromChannelCreator', () => { const abortSignal = new AbortController().signal; creator({ executeSubscriptionPlan: jest.fn(), + request: { methodName: 'foo', params: [] }, signal: abortSignal, - subscriptionConfigurationHash: undefined, }).catch(() => {}); expect(mockCreateChannel).toHaveBeenCalledWith({ abortSignal }); }); @@ -21,8 +21,8 @@ describe('createRpcSubscriptionsTransportFromChannelCreator', () => { const mockExecuteSubscriptionPlan = jest.fn(); creator({ executeSubscriptionPlan: mockExecuteSubscriptionPlan, + request: { methodName: 'foo', params: [] }, signal: new AbortController().signal, - subscriptionConfigurationHash: undefined, }).catch(() => {}); await jest.runAllTimersAsync(); expect(mockExecuteSubscriptionPlan).toHaveBeenCalledWith( @@ -38,8 +38,8 @@ describe('createRpcSubscriptionsTransportFromChannelCreator', () => { const signal = new AbortController().signal; creator({ executeSubscriptionPlan: mockExecuteSubscriptionPlan, + request: { methodName: 'foo', params: [] }, signal, - subscriptionConfigurationHash: undefined, }).catch(() => {}); await jest.runAllTimersAsync(); expect(mockExecuteSubscriptionPlan).toHaveBeenCalledWith( diff --git a/packages/rpc-subscriptions/src/rpc-subscriptions-coalescer.ts b/packages/rpc-subscriptions/src/rpc-subscriptions-coalescer.ts index a0677e0cc359..3dda21edd3f4 100644 --- a/packages/rpc-subscriptions/src/rpc-subscriptions-coalescer.ts +++ b/packages/rpc-subscriptions/src/rpc-subscriptions-coalescer.ts @@ -1,3 +1,4 @@ +import fastStableStringify from '@solana/fast-stable-stringify'; import { RpcSubscriptionsTransport } from '@solana/rpc-subscriptions-spec'; import { DataPublisher } from '@solana/subscribable'; @@ -12,10 +13,9 @@ export function getRpcSubscriptionsTransportWithSubscriptionCoalescing(); return function rpcSubscriptionsTransportWithSubscriptionCoalescing(config) { - const { subscriptionConfigurationHash, signal } = config; - if (subscriptionConfigurationHash === undefined) { - return transport(config); - } + const { request, signal } = config; + const subscriptionConfigurationHash = fastStableStringify([request.methodName, request.params]); + let cachedDataPublisherPromise = cache.get(subscriptionConfigurationHash); if (!cachedDataPublisherPromise) { const abortController = new AbortController(); diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 285bcc86768e..9687f457da44 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -865,6 +865,9 @@ importers: '@solana/errors': specifier: workspace:* version: link:../errors + '@solana/fast-stable-stringify': + specifier: workspace:* + version: link:../fast-stable-stringify '@solana/functional': specifier: workspace:* version: link:../functional @@ -898,9 +901,6 @@ importers: '@solana/addresses': specifier: workspace:* version: link:../addresses - '@solana/fast-stable-stringify': - specifier: workspace:* - version: link:../fast-stable-stringify '@solana/keys': specifier: workspace:* version: link:../keys