@@ -443,7 +443,7 @@ class RSocketMachineImpl<D, M> implements RSocketMachine<D, M> {
443
443
this . _sendStreamComplete ( streamId ) ;
444
444
} ,
445
445
onError : error => {
446
- this . _sendStreamError ( streamId , error . message ) ;
446
+ this . _sendStreamError ( streamId , error ) ;
447
447
} ,
448
448
//Subscriber methods
449
449
onNext : payload => {
@@ -677,7 +677,7 @@ class RSocketMachineImpl<D, M> implements RSocketMachine<D, M> {
677
677
if ( this . _isRequest ( frame . type ) ) {
678
678
const leaseError = this . _useLeaseOrError ( this . _responderLeaseHandler ) ;
679
679
if ( leaseError ) {
680
- this . _sendStreamError ( streamId , leaseError ) ;
680
+ this . _sendStreamError ( streamId , new Error ( leaseError ) ) ;
681
681
return ;
682
682
}
683
683
}
@@ -758,7 +758,7 @@ class RSocketMachineImpl<D, M> implements RSocketMachine<D, M> {
758
758
onComplete : payload => {
759
759
this . _sendStreamPayload ( streamId , payload , true ) ;
760
760
} ,
761
- onError : error => this . _sendStreamError ( streamId , error . message ) ,
761
+ onError : error => this . _sendStreamError ( streamId , error ) ,
762
762
onSubscribe : cancel => {
763
763
const subscription = {
764
764
cancel,
@@ -773,7 +773,7 @@ class RSocketMachineImpl<D, M> implements RSocketMachine<D, M> {
773
773
const payload = this . _deserializePayload ( frame ) ;
774
774
this . _requestHandler . requestStream ( payload ) . subscribe ( {
775
775
onComplete : ( ) => this . _sendStreamComplete ( streamId ) ,
776
- onError : error => this . _sendStreamError ( streamId , error . message ) ,
776
+ onError : error => this . _sendStreamError ( streamId , error ) ,
777
777
onNext : payload => this . _sendStreamPayload ( streamId , payload ) ,
778
778
onSubscribe : subscription => {
779
779
this . _subscriptions . set ( streamId , subscription ) ;
@@ -835,7 +835,7 @@ class RSocketMachineImpl<D, M> implements RSocketMachine<D, M> {
835
835
836
836
this . _requestHandler . requestChannel ( framesToPayloads ) . subscribe ( {
837
837
onComplete : ( ) => this . _sendStreamComplete ( streamId ) ,
838
- onError : error => this . _sendStreamError ( streamId , error . message ) ,
838
+ onError : error => this . _sendStreamError ( streamId , error ) ,
839
839
onNext : payload => this . _sendStreamPayload ( streamId , payload ) ,
840
840
onSubscribe : subscription => {
841
841
this . _subscriptions . set ( streamId , subscription ) ;
@@ -864,16 +864,16 @@ class RSocketMachineImpl<D, M> implements RSocketMachine<D, M> {
864
864
} ) ;
865
865
}
866
866
867
- _sendStreamError ( streamId : number , errorMessage : string ) : void {
867
+ _sendStreamError ( streamId : number , err : Error ) : void {
868
868
this . _subscriptions . delete ( streamId ) ;
869
869
this . _connection . sendOne ( {
870
- code : ERROR_CODES . APPLICATION_ERROR ,
870
+ code : err instanceof RSocketError ? err . errorCode : ERROR_CODES . APPLICATION_ERROR ,
871
871
flags : 0 ,
872
- message : errorMessage ,
872
+ message : err . message ,
873
873
streamId,
874
874
type : FRAME_TYPES . ERROR ,
875
875
} ) ;
876
- const error = new Error ( `terminated from the requester: ${ errorMessage } ` ) ;
876
+ const error = new Error ( `terminated from the requester: ${ err . message } ` ) ;
877
877
this . _handleStreamError ( streamId , error ) ;
878
878
}
879
879
@@ -943,3 +943,11 @@ function deserializeMetadataPushPayload<D, M>(
943
943
metadata : serializers . metadata . deserialize ( frame . metadata ) ,
944
944
} ;
945
945
}
946
+
947
+ export class RSocketError extends Error {
948
+ + errorCode : number ;
949
+ constructor ( errorCode : number , message : string ) {
950
+ super ( message ) ;
951
+ this . errorCode = errorCode ;
952
+ }
953
+ }
0 commit comments