From 860ddd5a81b4bea6872fa4e71de3029a7356e4f7 Mon Sep 17 00:00:00 2001 From: Palani C Date: Wed, 21 Feb 2024 13:11:34 +1100 Subject: [PATCH] Add support for throwing errors with custom error code (#272) * Added support for custom RSocketError Signed-off-by: palani --------- Signed-off-by: palani --- packages/rsocket-core/src/RSocketError.js | 26 ++++++++++ packages/rsocket-core/src/RSocketMachine.js | 22 ++++---- .../src/__tests__/RSocketServer-test.js | 51 +++++++++++++++++++ packages/rsocket-core/src/index.js | 4 ++ 4 files changed, 94 insertions(+), 9 deletions(-) create mode 100644 packages/rsocket-core/src/RSocketError.js diff --git a/packages/rsocket-core/src/RSocketError.js b/packages/rsocket-core/src/RSocketError.js new file mode 100644 index 00000000..dd5632b0 --- /dev/null +++ b/packages/rsocket-core/src/RSocketError.js @@ -0,0 +1,26 @@ +/** Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * @flow + */ + +'use strict'; + +export default class RSocketError extends Error { + +errorCode: number; + constructor(errorCode: number, message: string) { + super(message); + this.errorCode = errorCode; + } +} diff --git a/packages/rsocket-core/src/RSocketMachine.js b/packages/rsocket-core/src/RSocketMachine.js index 02bbb3cc..a1dcb524 100644 --- a/packages/rsocket-core/src/RSocketMachine.js +++ b/packages/rsocket-core/src/RSocketMachine.js @@ -58,6 +58,7 @@ import { ResponderLeaseHandler, Disposable, } from './RSocketLease'; +import RSocketError from './RSocketError'; type Role = 'CLIENT' | 'SERVER'; @@ -443,7 +444,7 @@ class RSocketMachineImpl implements RSocketMachine { this._sendStreamComplete(streamId); }, onError: error => { - this._sendStreamError(streamId, error.message); + this._sendStreamError(streamId, error); }, //Subscriber methods onNext: payload => { @@ -677,7 +678,7 @@ class RSocketMachineImpl implements RSocketMachine { if (this._isRequest(frame.type)) { const leaseError = this._useLeaseOrError(this._responderLeaseHandler); if (leaseError) { - this._sendStreamError(streamId, leaseError); + this._sendStreamError(streamId, new Error(leaseError)); return; } } @@ -758,7 +759,7 @@ class RSocketMachineImpl implements RSocketMachine { onComplete: payload => { this._sendStreamPayload(streamId, payload, true); }, - onError: error => this._sendStreamError(streamId, error.message), + onError: error => this._sendStreamError(streamId, error), onSubscribe: cancel => { const subscription = { cancel, @@ -773,7 +774,7 @@ class RSocketMachineImpl implements RSocketMachine { const payload = this._deserializePayload(frame); this._requestHandler.requestStream(payload).subscribe({ onComplete: () => this._sendStreamComplete(streamId), - onError: error => this._sendStreamError(streamId, error.message), + onError: error => this._sendStreamError(streamId, error), onNext: payload => this._sendStreamPayload(streamId, payload), onSubscribe: subscription => { this._subscriptions.set(streamId, subscription); @@ -835,7 +836,7 @@ class RSocketMachineImpl implements RSocketMachine { this._requestHandler.requestChannel(framesToPayloads).subscribe({ onComplete: () => this._sendStreamComplete(streamId), - onError: error => this._sendStreamError(streamId, error.message), + onError: error => this._sendStreamError(streamId, error), onNext: payload => this._sendStreamPayload(streamId, payload), onSubscribe: subscription => { this._subscriptions.set(streamId, subscription); @@ -864,16 +865,19 @@ class RSocketMachineImpl implements RSocketMachine { }); } - _sendStreamError(streamId: number, errorMessage: string): void { + _sendStreamError(streamId: number, err: Error): void { this._subscriptions.delete(streamId); this._connection.sendOne({ - code: ERROR_CODES.APPLICATION_ERROR, + code: + err instanceof RSocketError + ? err.errorCode + : ERROR_CODES.APPLICATION_ERROR, flags: 0, - message: errorMessage, + message: err.message, streamId, type: FRAME_TYPES.ERROR, }); - const error = new Error(`terminated from the requester: ${errorMessage}`); + const error = new Error(`terminated from the requester: ${err.message}`); this._handleStreamError(streamId, error); } diff --git a/packages/rsocket-core/src/__tests__/RSocketServer-test.js b/packages/rsocket-core/src/__tests__/RSocketServer-test.js index 0b0bd843..4ebaab9f 100644 --- a/packages/rsocket-core/src/__tests__/RSocketServer-test.js +++ b/packages/rsocket-core/src/__tests__/RSocketServer-test.js @@ -31,6 +31,7 @@ import {genMockConnection} from 'MockDuplexConnection'; import {genMockSubscriber} from 'MockFlowableSubscriber'; import {genMockPublisher} from 'MockFlowableSubscription'; import {Single, Flowable} from 'rsocket-flowable'; +import RSocketError from '../RSocketError'; jest.useFakeTimers(); @@ -226,6 +227,56 @@ describe('RSocketServer', () => { expect(console.error).toHaveBeenCalled(); }); + it('sends custom error code if request handler throws RSocketError', () => { + console.error = jest.fn(); + const transport = genMockTransportServer(); + const server = new RSocketServer({ + getRequestHandler: () => { + return { + requestResponse: () => { + throw new RSocketError(1234, 'Custom Error'); + }, + }; + }, + transport, + }); + server.start(); + transport.mock.connect(); + connection.receive.mock.publisher.onNext({ + type: FRAME_TYPES.SETUP, + data: undefined, + dataMimeType: '', + flags: 0, + keepAlive: 42, + lifetime: 2017, + metadata: undefined, + metadataMimeType: '', + resumeToken: null, + streamId: 0, + majorVersion: 1, + minorVersion: 0, + }); + jest.runOnlyPendingTimers(); + connection.receive.mock.publisher.onNext({ + type: FRAME_TYPES.REQUEST_RESPONSE, + data: undefined, + dataMimeType: '', + flags: 0, + metadata: undefined, + metadataMimeType: '', + streamId: 1, + }); + expect(connection.sendOne.mock.calls.length).toBe(1); + expect(connection.sendOne.mock.frame).toEqual({ + code: 1234, + flags: 0, + message: 'Custom Error', + streamId: 1, + type: FRAME_TYPES.ERROR, + }); + expect(console.error).toHaveBeenCalled(); + }); + it('call subscription.cancel() for all active subscriptions', () => { let cancelled = false; const transport = genMockTransportServer(); diff --git a/packages/rsocket-core/src/index.js b/packages/rsocket-core/src/index.js index 0b31e5ef..dbeeb825 100644 --- a/packages/rsocket-core/src/index.js +++ b/packages/rsocket-core/src/index.js @@ -39,6 +39,10 @@ import RSocketServer from './RSocketServer'; export {RSocketServer}; +import RSocketError from './RSocketError'; + +export {RSocketError}; + import RSocketResumableTransport from './RSocketResumableTransport'; export {RSocketResumableTransport};