From 9fb6ae0005f11197eefdb864aa8a7cf3b79357f0 Mon Sep 17 00:00:00 2001 From: Ariel Gentile Date: Thu, 28 Mar 2024 09:08:17 -0300 Subject: [PATCH] fix: unsubscribe from emitter after pickup completion (#1806) Signed-off-by: Ariel Gentile --- .../src/modules/discover-features/DiscoverFeaturesApi.ts | 4 +++- packages/core/src/modules/message-pickup/MessagePickupApi.ts | 5 +++-- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/packages/core/src/modules/discover-features/DiscoverFeaturesApi.ts b/packages/core/src/modules/discover-features/DiscoverFeaturesApi.ts index e473dd97b5..9b1df670e7 100644 --- a/packages/core/src/modules/discover-features/DiscoverFeaturesApi.ts +++ b/packages/core/src/modules/discover-features/DiscoverFeaturesApi.ts @@ -8,7 +8,7 @@ import type { DiscoverFeaturesService } from './services' import type { Feature } from '../../agent/models' import { firstValueFrom, of, ReplaySubject, Subject } from 'rxjs' -import { catchError, filter, map, takeUntil, timeout } from 'rxjs/operators' +import { catchError, filter, first, map, takeUntil, timeout } from 'rxjs/operators' import { AgentContext } from '../../agent' import { EventEmitter } from '../../agent/EventEmitter' @@ -120,6 +120,8 @@ export class DiscoverFeaturesApi< filter((e) => e.payload.connection?.id === connection.id), // Return disclosures map((e) => e.payload.disclosures), + // Only wait for first event that matches the criteria + first(), // If we don't have an answer in timeoutMs miliseconds (no response, not supported, etc...) error timeout({ first: options.awaitDisclosuresTimeoutMs ?? 7000, diff --git a/packages/core/src/modules/message-pickup/MessagePickupApi.ts b/packages/core/src/modules/message-pickup/MessagePickupApi.ts index 871fce42f2..a5033c8008 100644 --- a/packages/core/src/modules/message-pickup/MessagePickupApi.ts +++ b/packages/core/src/modules/message-pickup/MessagePickupApi.ts @@ -16,7 +16,7 @@ import type { V1MessagePickupProtocol, V2MessagePickupProtocol } from './protoco import type { MessagePickupProtocol } from './protocol/MessagePickupProtocol' import type { MessagePickupRepository } from './storage/MessagePickupRepository' -import { ReplaySubject, Subject, filter, firstValueFrom, takeUntil, timeout } from 'rxjs' +import { ReplaySubject, Subject, filter, first, firstValueFrom, takeUntil, timeout } from 'rxjs' import { AgentContext } from '../../agent' import { EventEmitter } from '../../agent/EventEmitter' @@ -222,7 +222,6 @@ export class MessagePickupApi(MessagePickupEventTypes.MessagePickupCompleted) .pipe( @@ -230,6 +229,8 @@ export class MessagePickupApi e.payload.connection.id === connectionRecord.id), + // Only wait for first event that matches the criteria + first(), // If we don't receive all messages within timeoutMs miliseconds (no response, not supported, etc...) error timeout({ first: options.awaitCompletionTimeoutMs ?? 10000,