diff --git a/README.md b/README.md index 46c51fd..360082d 100644 --- a/README.md +++ b/README.md @@ -20,28 +20,51 @@ - Safely routing and transferring data and control across service and application boundaries - Coordinating transactional work that requires a high-degree of reliability -#### Installation +## Installation To start building Azure Service Bus-based microservices, first install the required packages: ```bash -$ npm i --save @azure/service-bus @niur/nestjs-service-bus +$ npm i nestjs-azure-service-bus-transporter ``` -#### Overview + +## Server To use the Azure Service Bus strategy, pass the following options object to the `createMicroservice()` method: ```typescript // main.ts -const app = await NestFactory.createMicroservice(AppModule, { +const app = await NestFactory.createMicroservice( + AppModule, + { + strategy: new AzureServiceBusServer({ + connectionString: + "Endpoint=sb://.servicebus.windows.net/;SharedAccessKeyName=;SharedAccessKey=", + options: {}, + }), + } +); +``` + +To use in a hybrid application, pass the following options object to the `connectMicroservice()` method: + +```typescript +// main.ts +const app = await NestFactory.create(AppModule); + +await app.connectMicroservice({ strategy: new AzureServiceBusServer({ - connectionString: 'Endpoint=sb://.servicebus.windows.net/;SharedAccessKeyName=;SharedAccessKey=', - options: {} + connectionString: + "Endpoint=sb://.servicebus.windows.net/;SharedAccessKeyName=;SharedAccessKey=", + options: {}, }), }); +await app.startAllMicroservices(); +await app.listen(3000); ``` + #### Options The Azure Service Bus strategy exposes the properties described below. @@ -61,9 +84,117 @@ The Azure Service Bus strategy exposes the properties described -#### Client +### Queue Consumer + +To access the original Azure Service Bus message use the `Queue` decorator as follows: + +```typescript +import { Queue } from '@niur/nestjs-service-bus'; +import { ServiceBusReceiver } from '@azure/service-bus'; +import { Payload, Ctx } from '@nestjs/microservices'; +@Queue({ + queueName: 'sample-topic', + receiveMode: 'peekLock', // or receiveAndDelete + options:{ + autoCompleteMessages:true, + } + }) +getMessages( + @Payload() message: ServiceBusMessage,@Ctx() context:AzureServiceBusContext) { + const serviceBusReceiver:ServiceBusReceiver= context.getArgs()[0]; + console.log(message); +} +``` + +Options + + + + + + + + + + + + + + + + + + + + + + + + + + +
queueNameName of the queue we want to receive from.
receiveModeRepresents the receive mode for the receiver. (read more here).
subQueueTypeRepresents the sub queue that is applicable for any queue or subscription. (read more here).
maxAutoLockRenewalDurationInMsThe maximum duration in milliseconds until which the lock on the message will be renewed by the sdk automatically.
skipParsingBodyAsJsonOption to disable the client from running JSON.parse() on the message body when receiving the message.
optionsOptions used when subscribing to a Service Bus queue or subscription.
+ +### Topic Consumer + +To access the original Azure Service Bus message use the `Subscription` decorator as follows: ```typescript +import { Topic } from '@niur/nestjs-service-bus'; +import { Payload, Ctx } from '@nestjs/microservices'; +import { ServiceBusReceiver } from '@azure/service-bus'; +@Subscription({ + topic: 'sample-topic', + subscription: 'sample-subscription', + receiveMode: 'peekLock', // or receiveAndDelete + options:{ + autoCompleteMessages:true, + } + }) +getMessages(@Payload() message: ServiceBusMessage) { + const serviceBusReceiver: ServiceBusReceiver = context.getArgs()[0]; + console.log(message); +} +``` + +Options + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
topicName of the topic for the subscription we want to receive from.
subscriptionName of the subscription (under the `topic`) that we want to receive from.
receiveModeRepresents the receive mode for the receiver. (read more here).
subQueueTypeRepresents the sub queue that is applicable for any queue or subscription. (read more here).
maxAutoLockRenewalDurationInMsThe maximum duration in milliseconds until which the lock on the message will be renewed by the sdk automatically.
skipParsingBodyAsJsonOption to disable the client from running JSON.parse() on the message body when receiving the message.
optionsOptions used when subscribing to a Service Bus queue or subscription.
+ +## Client + +```typescript +// app.module.ts + @Module({ imports: [ AzureServiceBusModule.forRoot([ @@ -97,105 +228,66 @@ The Azure Service Bus strategy exposes the properties described ``` -```typescript +Since AzureServiceBusModule is a global module you can inject Clients into other modules providers and even controllers. +```typescript +//example.service.ts +//provider @Injectable() -constructor( - @Inject('SB_CLIENT') private readonly sbClient: AzureServiceBusClientProxy, -) {} +export class exampleService { + constructor( + @Inject("SB_CLIENT") private readonly sbClient: AzureServiceBusClientProxy + ) {} +} +``` +```typescript +//example.controller.ts +//controller +@Controller("example") +export class exampleController { + constructor( + @Inject("SB_CLIENT") private readonly sbClient: AzureServiceBusClientProxy + ) {} +} ``` -##### Producer +### Producer Event-based ```typescript - const pattern = { - name: 'sample-topic', // topic name - options: {} -}; // queue name + name: "sample-topic", // topic/queue name + options: {}, +}; const data = { - body: 'Example message' + body: "Example message", }; this.sbClient.send(pattern, data).subscribe((response) => { console.log(response); // reply message }); - ``` Message-based ```typescript - const pattern = { - name: 'sample-topic', // topic name - options: {} -}; // queue name + name: "sample-topic", // topic/queue name + options: {}, +}; const data = { - body: 'Example message' + body: "Example message", }; this.sbClient.emit(pattern, data); ``` - -##### Consumer - -To access the original Azure Service Bus message use the `Subscription` decorator as follows: - - -```typescript - -@Subscription({ - topic: 'sample-topic', - subscription: 'sample-subscription', - receiveMode: 'peekLock', // or receiveAndDelete - }) -getMessages(@Payload() message: ServiceBusMessage) { - console.log(message); -} -``` - -Options - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
topicName of the topic for the subscription we want to receive from.
subscriptionName of the subscription (under the `topic`) that we want to receive from.
receiveModeRepresents the receive mode for the receiver. (read more here).
subQueueTypeRepresents the sub queue that is applicable for any queue or subscription. (read more here).
maxAutoLockRenewalDurationInMsThe maximum duration in milliseconds until which the lock on the message will be renewed by the sdk automatically.
skipParsingBodyAsJsonOption to disable the client from running JSON.parse() on the message body when receiving the message.
optionsOptions used when subscribing to a Service Bus queue or subscription.
- - - ## Stay in touch -* Author - [Niurmiguel](https://github.com/Niurmiguel) +- Author - [Santiagomrn](https://github.com/Santiagomrn) +- Author - [Niurmiguel](https://github.com/Niurmiguel) ## License -Nestjs Azure Service Bus is [MIT licensed](LICENSE). \ No newline at end of file + +Nestjs Azure Service Bus is [MIT licensed](LICENSE). diff --git a/lib/azure-service-bus.context.ts b/lib/azure-service-bus.context.ts index 0672546..7e7270b 100644 --- a/lib/azure-service-bus.context.ts +++ b/lib/azure-service-bus.context.ts @@ -1,6 +1,7 @@ +import { ServiceBusReceiver } from "@azure/service-bus"; import { BaseRpcContext } from "@nestjs/microservices/ctx-host/base-rpc.context"; -type AzureServiceBusContextArgs = []; +type AzureServiceBusContextArgs = [ServiceBusReceiver]; export class AzureServiceBusContext extends BaseRpcContext { constructor(args: AzureServiceBusContextArgs) { diff --git a/lib/decorators/subscriber.decorator.ts b/lib/decorators/subscriber.decorator.ts index 4dedcd7..026f577 100644 --- a/lib/decorators/subscriber.decorator.ts +++ b/lib/decorators/subscriber.decorator.ts @@ -1,9 +1,9 @@ import { EventPattern } from "@nestjs/microservices"; - import { SB_SUBSCRIBER_METADATA } from "../azure-service-bus.constants"; import { SbSubscriberMetadata } from "../metadata"; import { MetaOrMetaFactory, + SbQueueMetadataOptions, SbSubscriptionMetadataOptions, } from "../interfaces"; @@ -44,3 +44,14 @@ export function Subscription( } }; } + +export const Queue = (metadata: SbQueueMetadataOptions) => { + const data = { ...metadata, topic: metadata.queueName }; + delete data.queueName; + return Subscription({ + ...data, + subscription: null, + }); +}; + +export const Topic = Subscription; diff --git a/lib/interfaces/subscriber.interface.ts b/lib/interfaces/subscriber.interface.ts index cf6c2d6..5151f67 100644 --- a/lib/interfaces/subscriber.interface.ts +++ b/lib/interfaces/subscriber.interface.ts @@ -62,3 +62,12 @@ export interface SbSubscriptionMetadataOptions export interface SbSubscriberTypeMap { subscription: SbSubscriptionMetadataOptions; } + + +export interface SbQueueMetadataOptions + extends Omit { + /** + * Name of the topic for the subscription we want to receive from. + */ + queueName: string; +} \ No newline at end of file diff --git a/lib/server/azure-service-bus.server.ts b/lib/server/azure-service-bus.server.ts index 42709b5..85872f4 100644 --- a/lib/server/azure-service-bus.server.ts +++ b/lib/server/azure-service-bus.server.ts @@ -9,6 +9,7 @@ import { ServiceBusClient, ServiceBusMessage, ServiceBusReceivedMessage, + ServiceBusReceiver, } from "@azure/service-bus"; import { AzureServiceBusContext } from "../azure-service-bus.context"; @@ -20,7 +21,7 @@ export class AzureServiceBusServer implements CustomTransportStrategy { private sbClient: ServiceBusClient; - + private readonly sbReceivers = new Map(); constructor(protected readonly options: AzureServiceBusOptions) { super(); @@ -63,12 +64,11 @@ export class AzureServiceBusServer skipParsingBodyAsJson, }); - receiver.subscribe(this.createMessageHandlers(pattern), options); - await receiver.close(); + await receiver.subscribe(this.createMessageHandlers(pattern), options); + this.sbReceivers.set(pattern,receiver); }; const registeredPatterns = [...this.messageHandlers.keys()]; - await Promise.all(registeredPatterns.map(subscribe)); } @@ -88,9 +88,8 @@ export class AzureServiceBusServer ): Promise { const partialPacket = { data: receivedMessage, pattern }; const packet = await this.deserializer.deserialize(partialPacket); - if (!receivedMessage.replyTo) { - const sbContext = new AzureServiceBusContext([]); + const sbContext = new AzureServiceBusContext([this.sbReceivers.get(packet.pattern)]); return this.handleEvent(packet.pattern, packet, sbContext); } @@ -124,6 +123,10 @@ export class AzureServiceBusServer } async close(): Promise { + const registeredReceivers= [...this.sbReceivers.keys()] + await Promise.all(registeredReceivers.map(async key=>{ + await this.sbReceivers.get(key).close(); + })) await this.sbClient?.close(); } } diff --git a/package.json b/package.json index ad4fecd..838d4dd 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { - "name": "@niur/nestjs-service-bus", - "version": "1.1.0", + "name": "nestjs-azure-service-bus-transporter", + "version": "0.0.2", "description": "NestJS Azure Service Bus Microservice Transport", "main": "dist/index", "types": "dist/index", @@ -11,14 +11,14 @@ }, "repository": { "type": "git", - "url": "git+https://github.com/Niurmiguel/nestjs-service-bus.git" + "url": "git+https://github.com/Santiagomrn/nestjs-service-bus" }, - "author": "Niurmiguel Gonzalez", + "author": "Santiago Rodriguez", "license": "MIT", "bugs": { - "url": "https://github.com/Niurmiguel/nestjs-service-bus/issues" + "url": "https://github.com/Santiagomrn/nestjs-service-bus/issues" }, - "homepage": "https://github.com/Niurmiguel/nestjs-service-bus#readme", + "homepage": "https://github.com/Santiagomrn/nestjs-service-bus#readme", "peerDependencies": { "@nestjs/common": "^10.0.4", "@nestjs/core": "^10.0.4",