From a9dfc31f4d359483295b14ca2af6774dd7c79de9 Mon Sep 17 00:00:00 2001 From: Vikram Venkataranan <14256527+viktree@users.noreply.github.com> Date: Mon, 1 Apr 2024 15:53:50 -0400 Subject: [PATCH] change to save data --- src/microservice/gcloud-pub-sub.server.ts | 23 ++++++------- src/module/gcloud-pub-sub.service.spec.ts | 42 +++++++++++------------ src/module/gcloud-pub-sub.service.ts | 14 +++++--- 3 files changed, 40 insertions(+), 39 deletions(-) diff --git a/src/microservice/gcloud-pub-sub.server.ts b/src/microservice/gcloud-pub-sub.server.ts index a0cc265..3852a69 100644 --- a/src/microservice/gcloud-pub-sub.server.ts +++ b/src/microservice/gcloud-pub-sub.server.ts @@ -2,14 +2,11 @@ import { PubSub, Subscription, Message } from '@google-cloud/pubsub' import { Server, CustomTransportStrategy } from '@nestjs/microservices' import { MESSAGE, ERROR, PUB_SUB_DEFAULT_RETRY_CODES } from '../helpers/constants' -import { GCloudPubSubServerOptions } from '../interfaces/gcloud-pub-sub.interface' +import type { GCloudPubSubServerOptions } from '../interfaces/gcloud-pub-sub.interface' const RETRY_INTERVAL = 5000 -export class GCloudPubSubServer - extends Server - implements CustomTransportStrategy -{ +export class GCloudPubSubServer extends Server implements CustomTransportStrategy { public client: PubSub = null public subscriptions: Subscription[] = [] public isShuttingDown: boolean = false @@ -21,13 +18,13 @@ export class GCloudPubSubServer public listen(callback: () => void) { this.isShuttingDown = false this.client = new PubSub(this.options.authOptions) - this.options.subscriptionIds.forEach((subcriptionName) => { + this.options.subscriptionIds.forEach((subscriptionName) => { const subscription = this.client.subscription( - subcriptionName, + subscriptionName, this.options.subscriberOptions || {} ) - const handleMessage = this.handleMessageFactory(subcriptionName) - const handleError = this.handleErrorFactory(subscription, subcriptionName) + const handleMessage = this.handleMessageFactory(subscriptionName) + const handleError = this.handleErrorFactory(subscription, subscriptionName) subscription.on(MESSAGE, handleMessage.bind(this)) subscription.on(ERROR, handleError) this.subscriptions.push(subscription) @@ -35,14 +32,14 @@ export class GCloudPubSubServer callback() } - public handleErrorFactory(subscription: Subscription, subcriptionName: string) { - return (error): void => { + public handleErrorFactory(subscription: Subscription, subscriptionName: string) { + return (error: any): void => { this.handleError(error) if (!this.isShuttingDown && PUB_SUB_DEFAULT_RETRY_CODES.includes(error.code)) { - this.logger.warn(`Closing subscription: ${subcriptionName}`) + this.logger.warn(`Closing subscription: ${subscriptionName}`) subscription.close() setTimeout(() => { - this.logger.warn(`Opening subscription: ${subcriptionName}`) + this.logger.warn(`Opening subscription: ${subscriptionName}`) subscription.open() }, RETRY_INTERVAL) } diff --git a/src/module/gcloud-pub-sub.service.spec.ts b/src/module/gcloud-pub-sub.service.spec.ts index 54c58f1..9d47e0b 100644 --- a/src/module/gcloud-pub-sub.service.spec.ts +++ b/src/module/gcloud-pub-sub.service.spec.ts @@ -38,84 +38,84 @@ describe('GcloudPubSubService', () => { const data = 'You Tried Your Best and You Failed Miserably. The Lesson Is Never Try' const gcloudPubSubLibMock = { topic: jest.fn().mockReturnThis(), - publish: jest.fn((buffer) => { - expect(buffer).toMatchSnapshot() + publishMessage: jest.fn(({ data }) => { + expect(data).toMatchSnapshot() }), } service.gcloudPubSubLib = gcloudPubSubLibMock as any service.publishMessage(topic, data) - expect(gcloudPubSubLibMock.publish).toHaveBeenCalled() + expect(gcloudPubSubLibMock.publishMessage).toHaveBeenCalled() }) it('handles Buffer as data', () => { const topic = 'Homer' const data = 'You Tried Your Best and You Failed Miserably. The Lesson Is Never Try' const gcloudPubSubLibMock = { topic: jest.fn().mockReturnThis(), - publish: jest.fn((buffer) => { - expect(buffer).toMatchSnapshot() + publishMessage: jest.fn(({ data }) => { + expect(data).toMatchSnapshot() }), } service.gcloudPubSubLib = gcloudPubSubLibMock as any service.publishMessage(topic, Buffer.from(data)) - expect(gcloudPubSubLibMock.publish).toHaveBeenCalled() + expect(gcloudPubSubLibMock.publishMessage).toHaveBeenCalled() }) it('handles an array of numbers as data', () => { const topic = 'Homer' const data = [10, 20, 30, 40, 50] const gcloudPubSubLibMock = { topic: jest.fn().mockReturnThis(), - publish: jest.fn((buffer) => { - expect(buffer).toMatchSnapshot() + publishMessage: jest.fn(({ data }) => { + expect(data).toMatchSnapshot() }), } service.gcloudPubSubLib = gcloudPubSubLibMock as any service.publishMessage(topic, data) - expect(gcloudPubSubLibMock.publish).toHaveBeenCalled() + expect(gcloudPubSubLibMock.publishMessage).toHaveBeenCalled() }) it('handles an ArrayBuffer as data', () => { const topic = 'Homer' const data = new ArrayBuffer(1) const gcloudPubSubLibMock = { topic: jest.fn().mockReturnThis(), - publish: jest.fn((buffer) => { - expect(buffer).toMatchSnapshot() + publishMessage: jest.fn(({ data }) => { + expect(data).toMatchSnapshot() }), } service.gcloudPubSubLib = gcloudPubSubLibMock as any service.publishMessage(topic, data) - expect(gcloudPubSubLibMock.publish).toHaveBeenCalled() + expect(gcloudPubSubLibMock.publishMessage).toHaveBeenCalled() }) it('handles a SharedArrayBuffer as data', () => { const topic = 'Homer' const data = new SharedArrayBuffer(1) const gcloudPubSubLibMock = { topic: jest.fn().mockReturnThis(), - publish: jest.fn((buffer) => { - expect(buffer).toMatchSnapshot() + publishMessage: jest.fn(({ data }) => { + expect(data).toMatchSnapshot() }), } service.gcloudPubSubLib = gcloudPubSubLibMock as any service.publishMessage(topic, data) - expect(gcloudPubSubLibMock.publish).toHaveBeenCalled() + expect(gcloudPubSubLibMock.publishMessage).toHaveBeenCalled() }) it('handles a Uint8Array as data', () => { const topic = 'Homer' const data = new Uint8Array([1, 2, 3]) const gcloudPubSubLibMock = { topic: jest.fn().mockReturnThis(), - publish: jest.fn((buffer) => { - expect(buffer).toMatchSnapshot() + publishMessage: jest.fn(({ data }) => { + expect(data).toMatchSnapshot() }), } service.gcloudPubSubLib = gcloudPubSubLibMock as any service.publishMessage(topic, data) - expect(gcloudPubSubLibMock.publish).toHaveBeenCalled() + expect(gcloudPubSubLibMock.publishMessage).toHaveBeenCalled() }) it('handles a string and binary encoding', () => { const topic = 'Homer' @@ -123,14 +123,14 @@ describe('GcloudPubSubService', () => { const encoding = 'binary' const gcloudPubSubLibMock = { topic: jest.fn().mockReturnThis(), - publish: jest.fn((buffer) => { - expect(buffer).toMatchSnapshot() + publishMessage: jest.fn(({ data }) => { + expect(data).toMatchSnapshot() }), } service.gcloudPubSubLib = gcloudPubSubLibMock as any service.publishMessage(topic, data, {}, encoding) - expect(gcloudPubSubLibMock.publish).toHaveBeenCalled() + expect(gcloudPubSubLibMock.publishMessage).toHaveBeenCalled() }) }) }) diff --git a/src/module/gcloud-pub-sub.service.ts b/src/module/gcloud-pub-sub.service.ts index 8a8f617..50dd692 100644 --- a/src/module/gcloud-pub-sub.service.ts +++ b/src/module/gcloud-pub-sub.service.ts @@ -1,7 +1,8 @@ import { Injectable } from '@nestjs/common' import { PubSub } from '@google-cloud/pubsub' -import { GoogleAuthOptions } from '../interfaces/gcloud-pub-sub.interface' -import { PublishOptions } from '@google-cloud/pubsub/build/src/topic' +import type { PublishOptions, Topic } from '@google-cloud/pubsub/build/src/topic' + +import type { GoogleAuthOptions } from '../interfaces/gcloud-pub-sub.interface' @Injectable() export class GcloudPubSubService { @@ -11,11 +12,11 @@ export class GcloudPubSubService { private readonly googleAuthOptions: GoogleAuthOptions, private readonly publishOptions: PublishOptions ) { - this.gcloudPubSubLib = new PubSub(googleAuthOptions) + this.gcloudPubSubLib = new PubSub(this.googleAuthOptions) } public publishMessage( - topic: string, + topicName: string, data: string | Uint8Array | number[] | ArrayBuffer | SharedArrayBuffer, attributes: { [key: string]: string } = {}, encoding?: BufferEncoding @@ -34,6 +35,9 @@ export class GcloudPubSubService { } else { dataBuffer = Buffer.from(data as string) } - return this.gcloudPubSubLib.topic(topic, this.publishOptions).publish(dataBuffer, attributes) + + const topic: Topic = this.gcloudPubSubLib.topic(topicName) + // const topic = this.gcloudPubSubLib.topic(topicName, this.publishOptions) + return topic.publishMessage({ data: dataBuffer, attributes }) } }