diff --git a/packages/core/src/agent/Agent.ts b/packages/core/src/agent/Agent.ts index 9c25006433..891965b0a9 100644 --- a/packages/core/src/agent/Agent.ts +++ b/packages/core/src/agent/Agent.ts @@ -16,7 +16,6 @@ import { JwsService } from '../crypto/JwsService' import { AriesFrameworkError } from '../error' import { DependencyManager } from '../plugins' import { DidCommMessageRepository, StorageUpdateService, StorageVersionRepository } from '../storage' -import { InMemoryMessageRepository } from '../storage/InMemoryMessageRepository' import { AgentConfig } from './AgentConfig' import { extendModulesWithDefaultModules } from './AgentModules' @@ -90,9 +89,6 @@ export class Agent extends BaseAge "Missing required dependency: 'StorageService'. You can register it using the AskarModule, or implement your own." ) } - if (!dependencyManager.isRegistered(InjectionSymbols.MessageRepository)) { - dependencyManager.registerSingleton(InjectionSymbols.MessageRepository, InMemoryMessageRepository) - } // TODO: contextCorrelationId for base wallet // Bind the default agent context to the container for use in modules etc. @@ -197,6 +193,8 @@ export class Agent extends BaseAge ) await this.mediationRecipient.provision(mediationConnection) } + + await this.messagePickup.initialize() await this.mediator.initialize() await this.mediationRecipient.initialize() diff --git a/packages/core/src/agent/AgentMessage.ts b/packages/core/src/agent/AgentMessage.ts index 320be216c4..fdca23daa9 100644 --- a/packages/core/src/agent/AgentMessage.ts +++ b/packages/core/src/agent/AgentMessage.ts @@ -32,6 +32,15 @@ export class AgentMessage extends Decorated { @Exclude() public readonly allowDidSovPrefix: boolean = false + /** + * Whether to use Queue Transport in case the recipient of this message does not have a reliable + * endpoint available + * + * @see https://github.com/decentralized-identity/didcomm-messaging/blob/main/extensions/return_route/main.md#queue-transport + */ + @Exclude() + public readonly allowQueueTransport: boolean = true + public toJSON({ useDidSovPrefixWhereAllowed }: { useDidSovPrefixWhereAllowed?: boolean } = {}): PlaintextMessage { const json = JsonTransformer.toJSON(this) diff --git a/packages/core/src/agent/MessageSender.ts b/packages/core/src/agent/MessageSender.ts index 82efd2129d..bcf23f93b8 100644 --- a/packages/core/src/agent/MessageSender.ts +++ b/packages/core/src/agent/MessageSender.ts @@ -5,21 +5,21 @@ import type { TransportSession } from './TransportService' import type { AgentContext } from './context' import type { ConnectionRecord } from '../modules/connections' import type { ResolvedDidCommService } from '../modules/didcomm' -import type { DidDocument } from '../modules/dids' import type { OutOfBandRecord } from '../modules/oob/repository' import type { OutboundTransport } from '../transport/OutboundTransport' -import type { OutboundPackage, EncryptedMessage } from '../types' +import type { EncryptedMessage, OutboundPackage } from '../types' import { DID_COMM_TRANSPORT_QUEUE, InjectionSymbols } from '../constants' import { ReturnRouteTypes } from '../decorators/transport/TransportDecorator' import { AriesFrameworkError, MessageSendingError } from '../error' import { Logger } from '../logger' import { DidCommDocumentService } from '../modules/didcomm' +import { DidKey, type DidDocument } from '../modules/dids' import { getKeyFromVerificationMethod } from '../modules/dids/domain/key-type' -import { didKeyToInstanceOfKey } from '../modules/dids/helpers' +import { didKeyToInstanceOfKey, verkeyToDidKey } from '../modules/dids/helpers' import { DidResolverService } from '../modules/dids/services/DidResolverService' +import { MessagePickupRepository } from '../modules/message-pickup/storage' import { inject, injectable } from '../plugins' -import { MessageRepository } from '../storage/MessageRepository' import { MessageValidator } from '../utils/MessageValidator' import { getProtocolScheme } from '../utils/uri' @@ -38,7 +38,7 @@ export interface TransportPriorityOptions { export class MessageSender { private envelopeService: EnvelopeService private transportService: TransportService - private messageRepository: MessageRepository + private messagePickupRepository: MessagePickupRepository private logger: Logger private didResolverService: DidResolverService private didCommDocumentService: DidCommDocumentService @@ -48,7 +48,7 @@ export class MessageSender { public constructor( envelopeService: EnvelopeService, transportService: TransportService, - @inject(InjectionSymbols.MessageRepository) messageRepository: MessageRepository, + @inject(InjectionSymbols.MessagePickupRepository) messagePickupRepository: MessagePickupRepository, @inject(InjectionSymbols.Logger) logger: Logger, didResolverService: DidResolverService, didCommDocumentService: DidCommDocumentService, @@ -56,7 +56,7 @@ export class MessageSender { ) { this.envelopeService = envelopeService this.transportService = transportService - this.messageRepository = messageRepository + this.messagePickupRepository = messagePickupRepository this.logger = logger this.didResolverService = didResolverService this.didCommDocumentService = didCommDocumentService @@ -113,9 +113,11 @@ export class MessageSender { { connection, encryptedMessage, + recipientKey, options, }: { connection: ConnectionRecord + recipientKey: string encryptedMessage: EncryptedMessage options?: { transportPriority?: TransportPriorityOptions } } @@ -176,7 +178,11 @@ export class MessageSender { // If the other party shared a queue service endpoint in their did doc we queue the message if (queueService) { this.logger.debug(`Queue packed message for connection ${connection.id} (${connection.theirLabel})`) - await this.messageRepository.add(connection.id, encryptedMessage) + await this.messagePickupRepository.addMessage({ + connectionId: connection.id, + recipientDids: [verkeyToDidKey(recipientKey)], + payload: encryptedMessage, + }) return } @@ -318,7 +324,7 @@ export class MessageSender { // We didn't succeed to send the message over open session, or directly to serviceEndpoint // If the other party shared a queue service endpoint in their did doc we queue the message - if (queueService) { + if (queueService && message.allowQueueTransport) { this.logger.debug(`Queue message for connection ${connection.id} (${connection.theirLabel})`) const keys = { @@ -328,7 +334,11 @@ export class MessageSender { } const encryptedMessage = await this.envelopeService.packMessage(agentContext, message, keys) - await this.messageRepository.add(connection.id, encryptedMessage) + await this.messagePickupRepository.addMessage({ + connectionId: connection.id, + recipientDids: keys.recipientKeys.map((item) => new DidKey(item).did), + payload: encryptedMessage, + }) this.emitMessageSentEvent(outboundMessageContext, OutboundMessageSendStatus.QueuedForPickup) diff --git a/packages/core/src/agent/TransportService.ts b/packages/core/src/agent/TransportService.ts index 4095cb7e9b..bd42f892a5 100644 --- a/packages/core/src/agent/TransportService.ts +++ b/packages/core/src/agent/TransportService.ts @@ -1,27 +1,45 @@ import type { AgentMessage } from './AgentMessage' import type { EnvelopeKeys } from './EnvelopeService' -import type { AgentContext } from './context' import type { DidDocument } from '../modules/dids' +import type { TransportSessionRemovedEvent, TransportSessionSavedEvent } from '../transport' import type { EncryptedMessage } from '../types' import { DID_COMM_TRANSPORT_QUEUE } from '../constants' import { AriesFrameworkError } from '../error' import { injectable } from '../plugins' +import { TransportEventTypes } from '../transport' + +import { EventEmitter } from './EventEmitter' +import { AgentContext } from './context' @injectable() export class TransportService { public transportSessionTable: TransportSessionTable = {} + private agentContext: AgentContext + private eventEmitter: EventEmitter + + public constructor(agentContext: AgentContext, eventEmitter: EventEmitter) { + this.agentContext = agentContext + this.eventEmitter = eventEmitter + } public saveSession(session: TransportSession) { if (session.connectionId) { const oldSessions = this.getExistingSessionsForConnectionIdAndType(session.connectionId, session.type) oldSessions.forEach((oldSession) => { - if (oldSession) { + if (oldSession && oldSession.id !== session.id) { this.removeSession(oldSession) } }) } this.transportSessionTable[session.id] = session + + this.eventEmitter.emit(this.agentContext, { + type: TransportEventTypes.TransportSessionSaved, + payload: { + session, + }, + }) } public findSessionByConnectionId(connectionId: string) { @@ -47,6 +65,12 @@ export class TransportService { public removeSession(session: TransportSession) { delete this.transportSessionTable[session.id] + this.eventEmitter.emit(this.agentContext, { + type: TransportEventTypes.TransportSessionRemoved, + payload: { + session, + }, + }) } private getExistingSessionsForConnectionIdAndType(connectionId: string, type: string) { diff --git a/packages/core/src/agent/__tests__/Agent.test.ts b/packages/core/src/agent/__tests__/Agent.test.ts index 662dd2ccc6..3c2e3d4dcd 100644 --- a/packages/core/src/agent/__tests__/Agent.test.ts +++ b/packages/core/src/agent/__tests__/Agent.test.ts @@ -14,7 +14,7 @@ import { ConnectionService } from '../../modules/connections/services/Connection import { TrustPingService } from '../../modules/connections/services/TrustPingService' import { CredentialRepository } from '../../modules/credentials' import { CredentialsApi } from '../../modules/credentials/CredentialsApi' -import { MessagePickupApi } from '../../modules/message-pickup' +import { MessagePickupApi, InMemoryMessagePickupRepository } from '../../modules/message-pickup' import { ProofRepository } from '../../modules/proofs' import { ProofsApi } from '../../modules/proofs/ProofsApi' import { @@ -25,7 +25,6 @@ import { MediationRecipientApi, MediationRecipientModule, } from '../../modules/routing' -import { InMemoryMessageRepository } from '../../storage/InMemoryMessageRepository' import { WalletError } from '../../wallet/error' import { Agent } from '../Agent' import { Dispatcher } from '../Dispatcher' @@ -181,7 +180,9 @@ describe('Agent', () => { // Symbols, interface based expect(container.resolve(InjectionSymbols.Logger)).toBe(agentOptions.config.logger) - expect(container.resolve(InjectionSymbols.MessageRepository)).toBeInstanceOf(InMemoryMessageRepository) + expect(container.resolve(InjectionSymbols.MessagePickupRepository)).toBeInstanceOf( + InMemoryMessagePickupRepository + ) // Agent expect(container.resolve(MessageSender)).toBeInstanceOf(MessageSender) @@ -220,8 +221,8 @@ describe('Agent', () => { // Symbols, interface based expect(container.resolve(InjectionSymbols.Logger)).toBe(container.resolve(InjectionSymbols.Logger)) - expect(container.resolve(InjectionSymbols.MessageRepository)).toBe( - container.resolve(InjectionSymbols.MessageRepository) + expect(container.resolve(InjectionSymbols.MessagePickupRepository)).toBe( + container.resolve(InjectionSymbols.MessagePickupRepository) ) expect(container.resolve(InjectionSymbols.StorageService)).toBe( container.resolve(InjectionSymbols.StorageService) diff --git a/packages/core/src/agent/__tests__/MessageSender.test.ts b/packages/core/src/agent/__tests__/MessageSender.test.ts index 4312591684..6c12cf1e01 100644 --- a/packages/core/src/agent/__tests__/MessageSender.test.ts +++ b/packages/core/src/agent/__tests__/MessageSender.test.ts @@ -2,7 +2,7 @@ import type { ConnectionRecord } from '../../modules/connections' import type { ResolvedDidCommService } from '../../modules/didcomm' import type { DidDocumentService } from '../../modules/dids' -import type { MessageRepository } from '../../storage/MessageRepository' +import type { MessagePickupRepository } from '../../modules/message-pickup/storage' import type { OutboundTransport } from '../../transport' import type { EncryptedMessage } from '../../types' import type { AgentMessageSentEvent } from '../Events' @@ -24,7 +24,7 @@ import { DidCommDocumentService } from '../../modules/didcomm' import { DidResolverService, DidDocument, VerificationMethod } from '../../modules/dids' import { DidCommV1Service } from '../../modules/dids/domain/service/DidCommV1Service' import { verkeyToInstanceOfKey } from '../../modules/dids/helpers' -import { InMemoryMessageRepository } from '../../storage/InMemoryMessageRepository' +import { InMemoryMessagePickupRepository } from '../../modules/message-pickup/storage' import { EnvelopeService as EnvelopeServiceImpl } from '../EnvelopeService' import { EventEmitter } from '../EventEmitter' import { AgentEventTypes } from '../Events' @@ -114,7 +114,7 @@ describe('MessageSender', () => { sessionWithoutKeys.inboundMessage = inboundMessage sessionWithoutKeys.send = jest.fn() - const transportService = new TransportService() + const transportService = new TransportService(getAgentContext(), eventEmitter) const transportServiceFindSessionMock = mockFunction(transportService.findSessionByConnectionId) const transportServiceFindSessionByIdMock = mockFunction(transportService.findSessionById) const transportServiceHasInboundEndpoint = mockFunction(transportService.hasInboundEndpoint) @@ -132,7 +132,7 @@ describe('MessageSender', () => { let messageSender: MessageSender let outboundTransport: OutboundTransport - let messageRepository: MessageRepository + let messagePickupRepository: MessagePickupRepository let connection: ConnectionRecord let outboundMessageContext: OutboundMessageContext const agentConfig = getAgentConfig('MessageSender') @@ -147,11 +147,11 @@ describe('MessageSender', () => { eventEmitter.on(AgentEventTypes.AgentMessageSent, eventListenerMock) outboundTransport = new DummyHttpOutboundTransport() - messageRepository = new InMemoryMessageRepository(agentConfig.logger) + messagePickupRepository = new InMemoryMessagePickupRepository(agentConfig.logger) messageSender = new MessageSender( enveloperService, transportService, - messageRepository, + messagePickupRepository, logger, didResolverService, didCommDocumentService, @@ -497,7 +497,7 @@ describe('MessageSender', () => { messageSender = new MessageSender( enveloperService, transportService, - new InMemoryMessageRepository(agentConfig.logger), + new InMemoryMessagePickupRepository(agentConfig.logger), logger, didResolverService, didCommDocumentService, @@ -636,11 +636,11 @@ describe('MessageSender', () => { describe('packMessage', () => { beforeEach(() => { outboundTransport = new DummyHttpOutboundTransport() - messageRepository = new InMemoryMessageRepository(agentConfig.logger) + messagePickupRepository = new InMemoryMessagePickupRepository(agentConfig.logger) messageSender = new MessageSender( enveloperService, transportService, - messageRepository, + messagePickupRepository, logger, didResolverService, didCommDocumentService, diff --git a/packages/core/src/agent/__tests__/TransportService.test.ts b/packages/core/src/agent/__tests__/TransportService.test.ts index f46707fa3d..fdfbc57bf9 100644 --- a/packages/core/src/agent/__tests__/TransportService.test.ts +++ b/packages/core/src/agent/__tests__/TransportService.test.ts @@ -1,5 +1,8 @@ -import { getMockConnection } from '../../../tests/helpers' +import { Subject } from 'rxjs' + +import { agentDependencies, getAgentContext, getMockConnection } from '../../../tests/helpers' import { DidExchangeRole } from '../../modules/connections' +import { EventEmitter } from '../EventEmitter' import { TransportService } from '../TransportService' import { DummyTransportSession } from './stubs' @@ -9,7 +12,7 @@ describe('TransportService', () => { let transportService: TransportService beforeEach(() => { - transportService = new TransportService() + transportService = new TransportService(getAgentContext(), new EventEmitter(agentDependencies, new Subject())) }) test(`remove session saved for a given connection`, () => { diff --git a/packages/core/src/constants.ts b/packages/core/src/constants.ts index 0c67d367f6..f7d43876ab 100644 --- a/packages/core/src/constants.ts +++ b/packages/core/src/constants.ts @@ -1,5 +1,5 @@ export const InjectionSymbols = { - MessageRepository: Symbol('MessageRepository'), + MessagePickupRepository: Symbol('MessagePickupRepository'), StorageService: Symbol('StorageService'), Logger: Symbol('Logger'), AgentContextProvider: Symbol('AgentContextProvider'), diff --git a/packages/core/src/index.ts b/packages/core/src/index.ts index 9ec5248ac4..4b86373206 100644 --- a/packages/core/src/index.ts +++ b/packages/core/src/index.ts @@ -33,7 +33,6 @@ export { DidCommMimeType, KeyDerivationMethod } from './types' export type { FileSystem, DownloadToFileOptions } from './storage/FileSystem' export * from './storage/BaseRecord' export { DidCommMessageRecord, DidCommMessageRole, DidCommMessageRepository } from './storage/didcomm' -export { InMemoryMessageRepository } from './storage/InMemoryMessageRepository' export { Repository } from './storage/Repository' export * from './storage/RepositoryEvents' export { StorageService, Query, SimpleQuery, BaseRecordConstructor } from './storage/StorageService' @@ -53,6 +52,7 @@ export * from './modules/basic-messages' export * from './modules/common' export * from './modules/credentials' export * from './modules/discover-features' +export * from './modules/message-pickup' export * from './modules/problem-reports' export * from './modules/proofs' export * from './modules/connections' diff --git a/packages/core/src/modules/message-pickup/MessagePickupApi.ts b/packages/core/src/modules/message-pickup/MessagePickupApi.ts index d47521dc44..9878cc2684 100644 --- a/packages/core/src/modules/message-pickup/MessagePickupApi.ts +++ b/packages/core/src/modules/message-pickup/MessagePickupApi.ts @@ -1,12 +1,19 @@ import type { + DeliverMessagesOptions, + DeliverMessagesFromQueueOptions, PickupMessagesOptions, PickupMessagesReturnType, QueueMessageOptions, QueueMessageReturnType, + SetLiveDeliveryModeOptions, + SetLiveDeliveryModeReturnType, + DeliverMessagesReturnType, + DeliverMessagesFromQueueReturnType, } from './MessagePickupApiOptions' +import type { MessagePickupSession, MessagePickupSessionRole } from './MessagePickupSession' import type { V1MessagePickupProtocol, V2MessagePickupProtocol } from './protocol' import type { MessagePickupProtocol } from './protocol/MessagePickupProtocol' -import type { MessageRepository } from '../../storage/MessageRepository' +import type { MessagePickupRepository } from './storage/MessagePickupRepository' import { AgentContext } from '../../agent' import { MessageSender } from '../../agent/MessageSender' @@ -18,10 +25,18 @@ import { inject, injectable } from '../../plugins' import { ConnectionService } from '../connections/services' import { MessagePickupModuleConfig } from './MessagePickupModuleConfig' +import { MessagePickupSessionService } from './services/MessagePickupSessionService' export interface MessagePickupApi { queueMessage(options: QueueMessageOptions): Promise pickupMessages(options: PickupMessagesOptions): Promise + getLiveModeSession(options: { + connectionId: string + role?: MessagePickupSessionRole + }): Promise + deliverMessages(options: DeliverMessagesOptions): Promise + deliverMessagesFromQueue(options: DeliverMessagesFromQueueOptions): Promise + setLiveDeliveryMode(options: SetLiveDeliveryModeOptions): Promise } @injectable() @@ -33,12 +48,14 @@ export class MessagePickupApi, @inject(InjectionSymbols.Logger) logger: Logger ) { @@ -46,9 +63,14 @@ export class MessagePickupApi(protocolVersion: MPP): MessagePickupProtocol { const protocol = this.config.protocols.find((protocol) => protocol.version === protocolVersion) @@ -66,13 +88,99 @@ export class MessagePickupApi { this.logger.debug('Queuing message...') - const connectionRecord = await this.connectionService.getById(this.agentContext, options.connectionId) + const { connectionId, message, recipientDids } = options + const connectionRecord = await this.connectionService.getById(this.agentContext, connectionId) - const messageRepository = this.agentContext.dependencyManager.resolve( - InjectionSymbols.MessageRepository + const messagePickupRepository = this.agentContext.dependencyManager.resolve( + InjectionSymbols.MessagePickupRepository ) - await messageRepository.add(connectionRecord.id, options.message) + await messagePickupRepository.addMessage({ connectionId: connectionRecord.id, recipientDids, payload: message }) + } + + /** + * Get current active live mode message pickup session for a given connection. Undefined if no active session found + * + * @param options connection id and optional role + * @returns live mode session + */ + public async getLiveModeSession(options: { connectionId: string; role?: MessagePickupSessionRole }) { + const { connectionId, role } = options + return this.messagePickupSessionService.getLiveSessionByConnectionId(this.agentContext, { connectionId, role }) + } + + /** + * Deliver specific messages to an active live mode pickup session through message pickup protocol. + * + * This will deliver the messages regardless of the state of the message pickup queue, meaning that + * any message stuck there should be sent separately (e.g. using deliverQU). + * + * @param options: pickup session id and the messages to deliver + */ + public async deliverMessages(options: DeliverMessagesOptions) { + const { pickupSessionId, messages } = options + + const session = this.messagePickupSessionService.getLiveSession(this.agentContext, pickupSessionId) + + if (!session) { + this.logger.debug(`No active live mode session found with id ${pickupSessionId}`) + return + } + const connectionRecord = await this.connectionService.getById(this.agentContext, session.connectionId) + + const protocol = this.getProtocol(session.protocolVersion) + + const createDeliveryReturn = await protocol.createDeliveryMessage(this.agentContext, { + connectionRecord, + messages, + }) + + if (createDeliveryReturn) { + await this.messageSender.sendMessage( + new OutboundMessageContext(createDeliveryReturn.message, { + agentContext: this.agentContext, + connection: connectionRecord, + }) + ) + } + } + + /** + * Deliver messages in the Message Pickup Queue for a given live mode session and key (if specified). + * + * This will retrieve messages up to 'batchSize' messages from the queue and deliver it through the + * corresponding Message Pickup protocol. If there are more than 'batchSize' messages in the queue, + * the recipient may request remaining messages after receiving the first batch of messages. + * + */ + public async deliverMessagesFromQueue(options: DeliverMessagesFromQueueOptions) { + this.logger.debug('Deliverying queued messages') + + const { pickupSessionId, recipientDid: recipientKey, batchSize } = options + + const session = this.messagePickupSessionService.getLiveSession(this.agentContext, pickupSessionId) + + if (!session) { + throw new AriesFrameworkError(`No active live mode session found with id ${pickupSessionId}`) + } + const connectionRecord = await this.connectionService.getById(this.agentContext, session.connectionId) + + const protocol = this.getProtocol(session.protocolVersion) + + const deliverMessagesReturn = await protocol.createDeliveryMessage(this.agentContext, { + connectionRecord, + recipientKey, + batchSize, + }) + + if (deliverMessagesReturn) { + await this.messageSender.sendMessage( + new OutboundMessageContext(deliverMessagesReturn.message, { + agentContext: this.agentContext, + connection: connectionRecord, + }) + ) + } } /** @@ -85,10 +193,33 @@ export class MessagePickupApi { + const connectionRecord = await this.connectionService.getById(this.agentContext, options.connectionId) + + const protocol = this.getProtocol(options.protocolVersion) + const { message } = await protocol.setLiveDeliveryMode(this.agentContext, { + connectionRecord, + liveDelivery: options.liveDelivery, }) await this.messageSender.sendMessage( diff --git a/packages/core/src/modules/message-pickup/MessagePickupApiOptions.ts b/packages/core/src/modules/message-pickup/MessagePickupApiOptions.ts index 1f8d54e264..7700915017 100644 --- a/packages/core/src/modules/message-pickup/MessagePickupApiOptions.ts +++ b/packages/core/src/modules/message-pickup/MessagePickupApiOptions.ts @@ -1,23 +1,48 @@ import type { MessagePickupProtocol } from './protocol/MessagePickupProtocol' +import type { QueuedMessage } from './storage' import type { EncryptedMessage } from '../../types' /** - * Get the supported protocol versions based on the provided discover features services. + * Get the supported protocol versions based on the provided message pickup protocols */ export type MessagePickupProtocolVersionType = MPPs[number]['version'] export interface QueueMessageOptions { connectionId: string + recipientDids: string[] message: EncryptedMessage } +export interface DeliverMessagesFromQueueOptions { + pickupSessionId: string + recipientDid?: string + batchSize?: number +} + +export interface DeliverMessagesOptions { + pickupSessionId: string + messages: QueuedMessage[] +} + export interface PickupMessagesOptions { connectionId: string protocolVersion: MessagePickupProtocolVersionType - recipientKey?: string + recipientDid?: string batchSize?: number } +export interface SetLiveDeliveryModeOptions { + connectionId: string + protocolVersion: MessagePickupProtocolVersionType + liveDelivery: boolean +} + export type QueueMessageReturnType = void export type PickupMessagesReturnType = void + +export type DeliverMessagesReturnType = void + +export type DeliverMessagesFromQueueReturnType = void + +export type SetLiveDeliveryModeReturnType = void diff --git a/packages/core/src/modules/message-pickup/MessagePickupEvents.ts b/packages/core/src/modules/message-pickup/MessagePickupEvents.ts new file mode 100644 index 0000000000..ea12ad5131 --- /dev/null +++ b/packages/core/src/modules/message-pickup/MessagePickupEvents.ts @@ -0,0 +1,21 @@ +import type { MessagePickupSession } from './MessagePickupSession' +import type { BaseEvent } from '../../agent/Events' + +export enum MessagePickupEventTypes { + LiveSessionSaved = 'LiveSessionSaved', + LiveSessionRemoved = 'LiveSessionRemoved', +} + +export interface MessagePickupLiveSessionSavedEvent extends BaseEvent { + type: typeof MessagePickupEventTypes.LiveSessionSaved + payload: { + session: MessagePickupSession + } +} + +export interface MessagePickupLiveSessionRemovedEvent extends BaseEvent { + type: typeof MessagePickupEventTypes.LiveSessionRemoved + payload: { + session: MessagePickupSession + } +} diff --git a/packages/core/src/modules/message-pickup/MessagePickupModule.ts b/packages/core/src/modules/message-pickup/MessagePickupModule.ts index 5cf4540625..5c8d869694 100644 --- a/packages/core/src/modules/message-pickup/MessagePickupModule.ts +++ b/packages/core/src/modules/message-pickup/MessagePickupModule.ts @@ -10,6 +10,8 @@ import { InjectionSymbols } from '../../constants' import { MessagePickupApi } from './MessagePickupApi' import { MessagePickupModuleConfig } from './MessagePickupModuleConfig' import { V1MessagePickupProtocol, V2MessagePickupProtocol } from './protocol' +import { MessagePickupSessionService } from './services' +import { InMemoryMessagePickupRepository } from './storage' /** * Default protocols that will be registered if the `protocols` property is not configured. @@ -38,7 +40,7 @@ export class MessagePickupModule { @@ -50,8 +50,8 @@ export class MessagePickupModuleConfig = { + id: string + connectionId: string + protocolVersion: MessagePickupProtocolVersionType + role: MessagePickupSessionRole +} diff --git a/packages/core/src/modules/message-pickup/__tests__/MessagePickupModule.test.ts b/packages/core/src/modules/message-pickup/__tests__/MessagePickupModule.test.ts index 141d8f81af..aceb1b8000 100644 --- a/packages/core/src/modules/message-pickup/__tests__/MessagePickupModule.test.ts +++ b/packages/core/src/modules/message-pickup/__tests__/MessagePickupModule.test.ts @@ -1,9 +1,12 @@ import { FeatureRegistry } from '../../../agent/FeatureRegistry' import { Protocol } from '../../../agent/models' +import { InjectionSymbols } from '../../../constants' import { DependencyManager } from '../../../plugins/DependencyManager' import { MessagePickupApi } from '../MessagePickupApi' import { MessagePickupModule } from '../MessagePickupModule' import { MessagePickupModuleConfig } from '../MessagePickupModuleConfig' +import { MessagePickupSessionService } from '../services' +import { InMemoryMessagePickupRepository } from '../storage' jest.mock('../../../plugins/DependencyManager') const DependencyManagerMock = DependencyManager as jest.Mock @@ -25,6 +28,12 @@ describe('MessagePickupModule', () => { expect(dependencyManager.registerInstance).toHaveBeenCalledTimes(1) expect(dependencyManager.registerInstance).toHaveBeenCalledWith(MessagePickupModuleConfig, module.config) + expect(dependencyManager.registerSingleton).toHaveBeenCalledTimes(2) + expect(dependencyManager.registerSingleton).toHaveBeenCalledWith( + InjectionSymbols.MessagePickupRepository, + InMemoryMessagePickupRepository + ) + expect(dependencyManager.registerSingleton).toHaveBeenCalledWith(MessagePickupSessionService) expect(featureRegistry.register).toHaveBeenCalledTimes(2) expect(featureRegistry.register).toHaveBeenCalledWith( new Protocol({ diff --git a/packages/core/src/modules/message-pickup/__tests__/pickup.test.ts b/packages/core/src/modules/message-pickup/__tests__/pickup.test.ts index beb622c7fd..c0ca0c1271 100644 --- a/packages/core/src/modules/message-pickup/__tests__/pickup.test.ts +++ b/packages/core/src/modules/message-pickup/__tests__/pickup.test.ts @@ -5,15 +5,36 @@ import { Subject } from 'rxjs' import { SubjectInboundTransport } from '../../../../../../tests/transport/SubjectInboundTransport' import { SubjectOutboundTransport } from '../../../../../../tests/transport/SubjectOutboundTransport' -import { getInMemoryAgentOptions, waitForBasicMessage, waitForTrustPingReceivedEvent } from '../../../../tests/helpers' +import { askarModule } from '../../../../../askar/tests/helpers' +import { getAgentOptions, waitForAgentMessageProcessedEvent, waitForBasicMessage } from '../../../../tests/helpers' import { Agent } from '../../../agent/Agent' import { HandshakeProtocol } from '../../connections' - -const recipientOptions = getInMemoryAgentOptions('Mediation: Recipient Pickup') -const mediatorOptions = getInMemoryAgentOptions('Mediation: Mediator Pickup', { +import { MediatorModule } from '../../routing' +import { MessageForwardingStrategy } from '../../routing/MessageForwardingStrategy' +import { V2MessagesReceivedMessage, V2StatusMessage } from '../protocol' + +const recipientOptions = getAgentOptions( + 'Mediation Pickup Loop Recipient', + {}, + { + askar: askarModule, + }, // Agent is shutdown during test, so we can't use in-memory wallet - endpoints: ['wss://mediator'], -}) + false +) +const mediatorOptions = getAgentOptions( + 'Mediation Pickup Loop Mediator', + { + endpoints: ['wss://mediator'], + }, + { + askar: askarModule, + mediator: new MediatorModule({ + autoAcceptMediationRequests: true, + messageForwardingStrategy: MessageForwardingStrategy.QueueAndLiveModeDelivery, + }), + } +) describe('E2E Pick Up protocol', () => { let recipientAgent: Agent @@ -26,7 +47,7 @@ describe('E2E Pick Up protocol', () => { await mediatorAgent.wallet.delete() }) - test('E2E Pick Up V1 protocol', async () => { + test('E2E manual Pick Up V1 loop', async () => { const mediatorMessages = new Subject() const subjectMap = { @@ -85,7 +106,7 @@ describe('E2E Pick Up protocol', () => { expect(basicMessage.content).toBe(message) }) - test('E2E Pick Up V2 protocol', async () => { + test('E2E manual Pick Up V2 loop', async () => { const mediatorMessages = new Subject() // FIXME: we harcoded that pickup of messages MUST be using ws(s) scheme when doing implicit pickup @@ -129,6 +150,10 @@ describe('E2E Pick Up protocol', () => { mediatorRecipientConnection = await mediatorAgent.connections.returnWhenIsConnected(mediatorRecipientConnection!.id) + // Now they are connected, reinitialize recipient agent in order to lose the session (as with SubjectTransport it remains open) + await recipientAgent.shutdown() + await recipientAgent.initialize() + const message = 'hello pickup V2' await mediatorAgent.basicMessages.sendMessage(mediatorRecipientConnection.id, message) @@ -136,17 +161,31 @@ describe('E2E Pick Up protocol', () => { const basicMessagePromise = waitForBasicMessage(recipientAgent, { content: message, }) - const trustPingPromise = waitForTrustPingReceivedEvent(mediatorAgent, {}) await recipientAgent.messagePickup.pickupMessages({ connectionId: recipientMediatorConnection.id, protocolVersion: 'v2', }) + const firstStatusMessage = await waitForAgentMessageProcessedEvent(recipientAgent, { + messageType: V2StatusMessage.type.messageTypeUri, + }) + + expect((firstStatusMessage as V2StatusMessage).messageCount).toBe(1) const basicMessage = await basicMessagePromise expect(basicMessage.content).toBe(message) - // Wait for trust ping to be received and stop message pickup - await trustPingPromise + const messagesReceived = await waitForAgentMessageProcessedEvent(mediatorAgent, { + messageType: V2MessagesReceivedMessage.type.messageTypeUri, + }) + + expect((messagesReceived as V2MessagesReceivedMessage).messageIdList.length).toBe(1) + + const secondStatusMessage = await waitForAgentMessageProcessedEvent(recipientAgent, { + messageType: V2StatusMessage.type.messageTypeUri, + }) + + expect((secondStatusMessage as V2StatusMessage).messageCount).toBe(0) + await recipientAgent.mediationRecipient.stopMessagePickup() }) }) diff --git a/packages/core/src/modules/message-pickup/index.ts b/packages/core/src/modules/message-pickup/index.ts index b4745b6037..b2b05ba8ee 100644 --- a/packages/core/src/modules/message-pickup/index.ts +++ b/packages/core/src/modules/message-pickup/index.ts @@ -1,5 +1,8 @@ export * from './MessagePickupApi' export * from './MessagePickupApiOptions' +export * from './MessagePickupEvents' export * from './MessagePickupModule' export * from './MessagePickupModuleConfig' export * from './protocol' +export * from './storage' +export { MessagePickupSessionService } from './services' diff --git a/packages/core/src/modules/message-pickup/protocol/BaseMessagePickupProtocol.ts b/packages/core/src/modules/message-pickup/protocol/BaseMessagePickupProtocol.ts index ebbd6fde39..686cdccc90 100644 --- a/packages/core/src/modules/message-pickup/protocol/BaseMessagePickupProtocol.ts +++ b/packages/core/src/modules/message-pickup/protocol/BaseMessagePickupProtocol.ts @@ -1,5 +1,12 @@ import type { MessagePickupProtocol } from './MessagePickupProtocol' -import type { PickupMessagesProtocolOptions, PickupMessagesProtocolReturnType } from './MessagePickupProtocolOptions' +import type { + DeliverMessagesProtocolOptions, + DeliverMessagesProtocolReturnType, + PickupMessagesProtocolOptions, + PickupMessagesProtocolReturnType, + SetLiveDeliveryModeProtocolOptions, + SetLiveDeliveryModeProtocolReturnType, +} from './MessagePickupProtocolOptions' import type { AgentContext } from '../../../agent' import type { AgentMessage } from '../../../agent/AgentMessage' import type { FeatureRegistry } from '../../../agent/FeatureRegistry' @@ -12,10 +19,20 @@ import type { DependencyManager } from '../../../plugins' export abstract class BaseMessagePickupProtocol implements MessagePickupProtocol { public abstract readonly version: string - public abstract pickupMessages( + public abstract createPickupMessage( agentContext: AgentContext, options: PickupMessagesProtocolOptions ): Promise> + public abstract createDeliveryMessage( + agentContext: AgentContext, + options: DeliverMessagesProtocolOptions + ): Promise | void> + + public abstract setLiveDeliveryMode( + agentContext: AgentContext, + options: SetLiveDeliveryModeProtocolOptions + ): Promise> + public abstract register(dependencyManager: DependencyManager, featureRegistry: FeatureRegistry): void } diff --git a/packages/core/src/modules/message-pickup/protocol/MessagePickupProtocol.ts b/packages/core/src/modules/message-pickup/protocol/MessagePickupProtocol.ts index 9acdcf5e4d..df11b80547 100644 --- a/packages/core/src/modules/message-pickup/protocol/MessagePickupProtocol.ts +++ b/packages/core/src/modules/message-pickup/protocol/MessagePickupProtocol.ts @@ -1,4 +1,11 @@ -import type { PickupMessagesProtocolOptions, PickupMessagesProtocolReturnType } from './MessagePickupProtocolOptions' +import type { + DeliverMessagesProtocolOptions, + DeliverMessagesProtocolReturnType, + PickupMessagesProtocolOptions, + PickupMessagesProtocolReturnType, + SetLiveDeliveryModeProtocolOptions, + SetLiveDeliveryModeProtocolReturnType, +} from './MessagePickupProtocolOptions' import type { AgentContext } from '../../../agent' import type { AgentMessage } from '../../../agent/AgentMessage' import type { FeatureRegistry } from '../../../agent/FeatureRegistry' @@ -7,10 +14,20 @@ import type { DependencyManager } from '../../../plugins' export interface MessagePickupProtocol { readonly version: string - pickupMessages( + createPickupMessage( agentContext: AgentContext, options: PickupMessagesProtocolOptions ): Promise> + createDeliveryMessage( + agentContext: AgentContext, + options: DeliverMessagesProtocolOptions + ): Promise | void> + + setLiveDeliveryMode( + agentContext: AgentContext, + options: SetLiveDeliveryModeProtocolOptions + ): Promise> + register(dependencyManager: DependencyManager, featureRegistry: FeatureRegistry): void } diff --git a/packages/core/src/modules/message-pickup/protocol/MessagePickupProtocolOptions.ts b/packages/core/src/modules/message-pickup/protocol/MessagePickupProtocolOptions.ts index 9f3f252c6a..4f4409c501 100644 --- a/packages/core/src/modules/message-pickup/protocol/MessagePickupProtocolOptions.ts +++ b/packages/core/src/modules/message-pickup/protocol/MessagePickupProtocolOptions.ts @@ -1,12 +1,33 @@ import type { AgentMessage } from '../../../agent/AgentMessage' import type { ConnectionRecord } from '../../connections' +import type { QueuedMessage } from '../storage' export interface PickupMessagesProtocolOptions { connectionRecord: ConnectionRecord + recipientDid?: string + batchSize?: number +} + +export interface DeliverMessagesProtocolOptions { + connectionRecord: ConnectionRecord + messages?: QueuedMessage[] recipientKey?: string batchSize?: number } +export interface SetLiveDeliveryModeProtocolOptions { + connectionRecord: ConnectionRecord + liveDelivery: boolean +} + export type PickupMessagesProtocolReturnType = { message: MessageType } + +export type DeliverMessagesProtocolReturnType = { + message: MessageType +} + +export type SetLiveDeliveryModeProtocolReturnType = { + message: MessageType +} diff --git a/packages/core/src/modules/message-pickup/protocol/v1/V1MessagePickupProtocol.ts b/packages/core/src/modules/message-pickup/protocol/v1/V1MessagePickupProtocol.ts index 581d0d31a7..b0994478a9 100644 --- a/packages/core/src/modules/message-pickup/protocol/v1/V1MessagePickupProtocol.ts +++ b/packages/core/src/modules/message-pickup/protocol/v1/V1MessagePickupProtocol.ts @@ -3,11 +3,19 @@ import type { AgentMessage } from '../../../../agent/AgentMessage' import type { FeatureRegistry } from '../../../../agent/FeatureRegistry' import type { InboundMessageContext } from '../../../../agent/models/InboundMessageContext' import type { DependencyManager } from '../../../../plugins' -import type { MessageRepository } from '../../../../storage/MessageRepository' -import type { PickupMessagesProtocolOptions, PickupMessagesProtocolReturnType } from '../MessagePickupProtocolOptions' +import type { MessagePickupRepository } from '../../storage/MessagePickupRepository' +import type { + DeliverMessagesProtocolOptions, + DeliverMessagesProtocolReturnType, + PickupMessagesProtocolOptions, + PickupMessagesProtocolReturnType, + SetLiveDeliveryModeProtocolOptions, + SetLiveDeliveryModeProtocolReturnType, +} from '../MessagePickupProtocolOptions' import { OutboundMessageContext, Protocol } from '../../../../agent/models' import { InjectionSymbols } from '../../../../constants' +import { AriesFrameworkError } from '../../../../error' import { injectable } from '../../../../plugins' import { MessagePickupModuleConfig } from '../../MessagePickupModuleConfig' import { BaseMessagePickupProtocol } from '../BaseMessagePickupProtocol' @@ -17,10 +25,6 @@ import { V1BatchMessage, BatchMessageMessage, V1BatchPickupMessage } from './mes @injectable() export class V1MessagePickupProtocol extends BaseMessagePickupProtocol { - public constructor() { - super() - } - /** * The version of the message pickup protocol this class supports */ @@ -40,7 +44,7 @@ export class V1MessagePickupProtocol extends BaseMessagePickupProtocol { ) } - public async pickupMessages( + public async createPickupMessage( agentContext: AgentContext, options: PickupMessagesProtocolOptions ): Promise> { @@ -55,24 +59,66 @@ export class V1MessagePickupProtocol extends BaseMessagePickupProtocol { return { message } } + public async createDeliveryMessage( + agentContext: AgentContext, + options: DeliverMessagesProtocolOptions + ): Promise | void> { + const { connectionRecord, batchSize, messages } = options + connectionRecord.assertReady() + + const pickupMessageQueue = agentContext.dependencyManager.resolve( + InjectionSymbols.MessagePickupRepository + ) + + const messagesToDeliver = + messages ?? + (await pickupMessageQueue.takeFromQueue({ + connectionId: connectionRecord.id, + limit: batchSize, // TODO: Define as config parameter for message holder side + })) + + const batchMessages = messagesToDeliver.map( + (msg) => + new BatchMessageMessage({ + id: msg.id, + message: msg.encryptedMessage, + }) + ) + + if (messagesToDeliver.length > 0) { + const message = new V1BatchMessage({ + messages: batchMessages, + }) + + return { message } + } + } + + public async setLiveDeliveryMode(): Promise> { + throw new AriesFrameworkError('Live Delivery mode not supported in Message Pickup V1 protocol') + } + public async processBatchPickup(messageContext: InboundMessageContext) { // Assert ready connection const connection = messageContext.assertReadyConnection() const { message } = messageContext - const messageRepository = messageContext.agentContext.dependencyManager.resolve( - InjectionSymbols.MessageRepository + const pickupMessageQueue = messageContext.agentContext.dependencyManager.resolve( + InjectionSymbols.MessagePickupRepository ) - const messages = await messageRepository.takeFromQueue(connection.id, message.batchSize) + const messages = await pickupMessageQueue.takeFromQueue({ + connectionId: connection.id, + limit: message.batchSize, + deleteMessages: true, + }) - // TODO: each message should be stored with an id. to be able to conform to the id property - // of batch message const batchMessages = messages.map( (msg) => new BatchMessageMessage({ - message: msg, + id: msg.id, + message: msg.encryptedMessage, }) ) diff --git a/packages/core/src/modules/message-pickup/protocol/v1/messages/V1BatchMessage.ts b/packages/core/src/modules/message-pickup/protocol/v1/messages/V1BatchMessage.ts index 91e0b5debc..bf01bea731 100644 --- a/packages/core/src/modules/message-pickup/protocol/v1/messages/V1BatchMessage.ts +++ b/packages/core/src/modules/message-pickup/protocol/v1/messages/V1BatchMessage.ts @@ -33,6 +33,8 @@ export interface BatchMessageOptions { * @see https://github.com/hyperledger/aries-rfcs/blob/master/features/0212-pickup/README.md#batch */ export class V1BatchMessage extends AgentMessage { + public readonly allowQueueTransport = false + public constructor(options: BatchMessageOptions) { super() diff --git a/packages/core/src/modules/message-pickup/protocol/v1/messages/V1BatchPickupMessage.ts b/packages/core/src/modules/message-pickup/protocol/v1/messages/V1BatchPickupMessage.ts index aa5e7ff646..950c700b3d 100644 --- a/packages/core/src/modules/message-pickup/protocol/v1/messages/V1BatchPickupMessage.ts +++ b/packages/core/src/modules/message-pickup/protocol/v1/messages/V1BatchPickupMessage.ts @@ -15,6 +15,8 @@ export interface BatchPickupMessageOptions { * @see https://github.com/hyperledger/aries-rfcs/blob/master/features/0212-pickup/README.md#batch-pickup */ export class V1BatchPickupMessage extends AgentMessage { + public readonly allowQueueTransport = false + /** * Create new BatchPickupMessage instance. * diff --git a/packages/core/src/modules/message-pickup/protocol/v2/V2MessagePickupProtocol.ts b/packages/core/src/modules/message-pickup/protocol/v2/V2MessagePickupProtocol.ts index 171db0bb97..c0b927b039 100644 --- a/packages/core/src/modules/message-pickup/protocol/v2/V2MessagePickupProtocol.ts +++ b/packages/core/src/modules/message-pickup/protocol/v2/V2MessagePickupProtocol.ts @@ -4,26 +4,34 @@ import type { AgentMessageReceivedEvent } from '../../../../agent/Events' import type { FeatureRegistry } from '../../../../agent/FeatureRegistry' import type { InboundMessageContext } from '../../../../agent/models/InboundMessageContext' import type { DependencyManager } from '../../../../plugins' -import type { MessageRepository } from '../../../../storage/MessageRepository' import type { EncryptedMessage } from '../../../../types' -import type { PickupMessagesProtocolOptions, PickupMessagesProtocolReturnType } from '../MessagePickupProtocolOptions' +import type { MessagePickupRepository } from '../../storage/MessagePickupRepository' +import type { + DeliverMessagesProtocolOptions, + DeliverMessagesProtocolReturnType, + PickupMessagesProtocolOptions, + PickupMessagesProtocolReturnType, + SetLiveDeliveryModeProtocolOptions, + SetLiveDeliveryModeProtocolReturnType, +} from '../MessagePickupProtocolOptions' import { EventEmitter } from '../../../../agent/EventEmitter' import { AgentEventTypes } from '../../../../agent/Events' -import { MessageSender } from '../../../../agent/MessageSender' import { OutboundMessageContext, Protocol } from '../../../../agent/models' import { InjectionSymbols } from '../../../../constants' import { Attachment } from '../../../../decorators/attachment/Attachment' -import { AriesFrameworkError } from '../../../../error' import { injectable } from '../../../../plugins' -import { ConnectionService } from '../../../connections' +import { verkeyToDidKey } from '../../../dids/helpers' import { ProblemReportError } from '../../../problem-reports' import { RoutingProblemReportReason } from '../../../routing/error' import { MessagePickupModuleConfig } from '../../MessagePickupModuleConfig' +import { MessagePickupSessionRole } from '../../MessagePickupSession' +import { MessagePickupSessionService } from '../../services' import { BaseMessagePickupProtocol } from '../BaseMessagePickupProtocol' import { V2DeliveryRequestHandler, + V2LiveDeliveryChangeHandler, V2MessageDeliveryHandler, V2MessagesReceivedHandler, V2StatusHandler, @@ -35,6 +43,7 @@ import { V2DeliveryRequestMessage, V2MessagesReceivedMessage, V2StatusRequestMessage, + V2LiveDeliveryChangeMessage, } from './messages' @injectable() @@ -54,6 +63,7 @@ export class V2MessagePickupProtocol extends BaseMessagePickupProtocol { new V2MessagesReceivedHandler(this), new V2StatusHandler(this), new V2MessageDeliveryHandler(this), + new V2LiveDeliveryChangeHandler(this), ]) featureRegistry.register( @@ -64,11 +74,11 @@ export class V2MessagePickupProtocol extends BaseMessagePickupProtocol { ) } - public async pickupMessages( + public async createPickupMessage( agentContext: AgentContext, options: PickupMessagesProtocolOptions ): Promise> { - const { connectionRecord, recipientKey } = options + const { connectionRecord, recipientDid: recipientKey } = options connectionRecord.assertReady() const message = new V2StatusRequestMessage({ @@ -78,21 +88,76 @@ export class V2MessagePickupProtocol extends BaseMessagePickupProtocol { return { message } } + public async createDeliveryMessage( + agentContext: AgentContext, + options: DeliverMessagesProtocolOptions + ): Promise | void> { + const { connectionRecord, recipientKey, messages } = options + connectionRecord.assertReady() + + const messagePickupRepository = agentContext.dependencyManager.resolve( + InjectionSymbols.MessagePickupRepository + ) + + // Get available messages from queue, but don't delete them + const messagesToDeliver = + messages ?? + (await messagePickupRepository.takeFromQueue({ + connectionId: connectionRecord.id, + recipientDid: recipientKey, + limit: 10, // TODO: Define as config parameter + })) + + if (messagesToDeliver.length === 0) { + return + } + + const attachments = messagesToDeliver.map( + (msg) => + new Attachment({ + id: msg.id, + data: { + json: msg.encryptedMessage, + }, + }) + ) + + return { + message: new V2MessageDeliveryMessage({ + attachments, + }), + } + } + + public async setLiveDeliveryMode( + agentContext: AgentContext, + options: SetLiveDeliveryModeProtocolOptions + ): Promise> { + const { connectionRecord, liveDelivery } = options + connectionRecord.assertReady() + return { + message: new V2LiveDeliveryChangeMessage({ + liveDelivery, + }), + } + } + public async processStatusRequest(messageContext: InboundMessageContext) { // Assert ready connection const connection = messageContext.assertReadyConnection() + const recipientKey = messageContext.message.recipientKey - const messageRepository = messageContext.agentContext.dependencyManager.resolve( - InjectionSymbols.MessageRepository + const messagePickupRepository = messageContext.agentContext.dependencyManager.resolve( + InjectionSymbols.MessagePickupRepository ) - if (messageContext.message.recipientKey) { - throw new AriesFrameworkError('recipient_key parameter not supported') - } - const statusMessage = new V2StatusMessage({ threadId: messageContext.message.threadId, - messageCount: await messageRepository.getAvailableMessageCount(connection.id), + recipientKey, + messageCount: await messagePickupRepository.getAvailableMessageCount({ + connectionId: connection.id, + recipientDid: recipientKey ? verkeyToDidKey(recipientKey) : undefined, + }), }) return new OutboundMessageContext(statusMessage, { @@ -104,27 +169,27 @@ export class V2MessagePickupProtocol extends BaseMessagePickupProtocol { public async processDeliveryRequest(messageContext: InboundMessageContext) { // Assert ready connection const connection = messageContext.assertReadyConnection() - - if (messageContext.message.recipientKey) { - throw new AriesFrameworkError('recipient_key parameter not supported') - } + const recipientKey = messageContext.message.recipientKey const { message } = messageContext - const messageRepository = messageContext.agentContext.dependencyManager.resolve( - InjectionSymbols.MessageRepository + const messagePickupRepository = messageContext.agentContext.dependencyManager.resolve( + InjectionSymbols.MessagePickupRepository ) // Get available messages from queue, but don't delete them - const messages = await messageRepository.takeFromQueue(connection.id, message.limit, true) + const messages = await messagePickupRepository.takeFromQueue({ + connectionId: connection.id, + recipientDid: recipientKey ? verkeyToDidKey(recipientKey) : undefined, + limit: message.limit, + }) - // TODO: each message should be stored with an id. to be able to conform to the id property - // of delivery message const attachments = messages.map( (msg) => new Attachment({ + id: msg.id, data: { - json: msg, + json: msg.encryptedMessage, }, }) ) @@ -133,10 +198,12 @@ export class V2MessagePickupProtocol extends BaseMessagePickupProtocol { messages.length > 0 ? new V2MessageDeliveryMessage({ threadId: messageContext.message.threadId, + recipientKey, attachments, }) : new V2StatusMessage({ threadId: messageContext.message.threadId, + recipientKey, messageCount: 0, }) @@ -152,19 +219,17 @@ export class V2MessagePickupProtocol extends BaseMessagePickupProtocol { const { message } = messageContext - const messageRepository = messageContext.agentContext.dependencyManager.resolve( - InjectionSymbols.MessageRepository + const messageRepository = messageContext.agentContext.dependencyManager.resolve( + InjectionSymbols.MessagePickupRepository ) - // TODO: Add Queued Message ID - await messageRepository.takeFromQueue( - connection.id, - message.messageIdList ? message.messageIdList.length : undefined - ) + if (message.messageIdList.length) { + await messageRepository.removeMessages({ connectionId: connection.id, messageIds: message.messageIdList }) + } const statusMessage = new V2StatusMessage({ threadId: messageContext.message.threadId, - messageCount: await messageRepository.getAvailableMessageCount(connection.id), + messageCount: await messageRepository.getAvailableMessageCount({ connectionId: connection.id }), }) return new OutboundMessageContext(statusMessage, { @@ -174,45 +239,13 @@ export class V2MessagePickupProtocol extends BaseMessagePickupProtocol { } public async processStatus(messageContext: InboundMessageContext) { - const connection = messageContext.assertReadyConnection() const { message: statusMessage } = messageContext const { messageCount, recipientKey } = statusMessage - const connectionService = messageContext.agentContext.dependencyManager.resolve(ConnectionService) - const messageSender = messageContext.agentContext.dependencyManager.resolve(MessageSender) const messagePickupModuleConfig = messageContext.agentContext.dependencyManager.resolve(MessagePickupModuleConfig) - //No messages to be sent + //No messages to be retrieved if (messageCount === 0) { - const { message, connectionRecord } = await connectionService.createTrustPing( - messageContext.agentContext, - connection, - { - responseRequested: false, - } - ) - - // FIXME: check where this flow fits, as it seems very particular for the Credo-ACA-Py combination - const websocketSchemes = ['ws', 'wss'] - - await messageSender.sendMessage( - new OutboundMessageContext(message, { - agentContext: messageContext.agentContext, - connection: connectionRecord, - }), - { - transportPriority: { - schemes: websocketSchemes, - restrictive: true, - // TODO: add keepAlive: true to enforce through the public api - // we need to keep the socket alive. It already works this way, but would - // be good to make more explicit from the public facing API. - // This would also make it easier to change the internal API later on. - // keepAlive: true, - }, - } - ) - return null } const { maximumBatchSize: maximumMessagePickup } = messagePickupModuleConfig @@ -226,6 +259,35 @@ export class V2MessagePickupProtocol extends BaseMessagePickupProtocol { return deliveryRequestMessage } + public async processLiveDeliveryChange(messageContext: InboundMessageContext) { + const { agentContext, message } = messageContext + + const connection = messageContext.assertReadyConnection() + + const messagePickupRepository = messageContext.agentContext.dependencyManager.resolve( + InjectionSymbols.MessagePickupRepository + ) + const sessionService = messageContext.agentContext.dependencyManager.resolve(MessagePickupSessionService) + + if (message.liveDelivery) { + sessionService.saveLiveSession(agentContext, { + connectionId: connection.id, + protocolVersion: 'v2', + role: MessagePickupSessionRole.MessageHolder, + }) + } else { + sessionService.removeLiveSession(agentContext, { connectionId: connection.id }) + } + + const statusMessage = new V2StatusMessage({ + threadId: message.threadId, + liveDelivery: message.liveDelivery, + messageCount: await messagePickupRepository.getAvailableMessageCount({ connectionId: connection.id }), + }) + + return new OutboundMessageContext(statusMessage, { agentContext: messageContext.agentContext, connection }) + } + public async processDelivery(messageContext: InboundMessageContext) { messageContext.assertReadyConnection() diff --git a/packages/core/src/modules/message-pickup/protocol/v2/__tests__/V2MessagePickupProtocol.test.ts b/packages/core/src/modules/message-pickup/protocol/v2/__tests__/V2MessagePickupProtocol.test.ts index 50476217f9..bf0d6b2f0a 100644 --- a/packages/core/src/modules/message-pickup/protocol/v2/__tests__/V2MessagePickupProtocol.test.ts +++ b/packages/core/src/modules/message-pickup/protocol/v2/__tests__/V2MessagePickupProtocol.test.ts @@ -8,11 +8,12 @@ import { InboundMessageContext } from '../../../../../agent/models/InboundMessag import { InjectionSymbols } from '../../../../../constants' import { Attachment } from '../../../../../decorators/attachment/Attachment' import { AriesFrameworkError } from '../../../../../error' -import { InMemoryMessageRepository } from '../../../../../storage/InMemoryMessageRepository' import { uuid } from '../../../../../utils/uuid' import { DidExchangeState, TrustPingMessage } from '../../../../connections' import { ConnectionService } from '../../../../connections/services/ConnectionService' +import { verkeyToDidKey } from '../../../../dids/helpers' import { MessagePickupModuleConfig } from '../../../MessagePickupModuleConfig' +import { InMemoryMessagePickupRepository } from '../../../storage/InMemoryMessagePickupRepository' import { V1MessagePickupProtocol } from '../../v1' import { V2MessagePickupProtocol } from '../V2MessagePickupProtocol' import { @@ -28,13 +29,13 @@ const mockConnection = getMockConnection({ }) // Mock classes -jest.mock('../../../../../storage/InMemoryMessageRepository') +jest.mock('../../../storage/InMemoryMessagePickupRepository') jest.mock('../../../../../agent/EventEmitter') jest.mock('../../../../../agent/MessageSender') jest.mock('../../../../connections/services/ConnectionService') // Mock typed object -const InMessageRepositoryMock = InMemoryMessageRepository as jest.Mock +const InMessageRepositoryMock = InMemoryMessagePickupRepository as jest.Mock const EventEmitterMock = EventEmitter as jest.Mock const MessageSenderMock = MessageSender as jest.Mock const ConnectionServiceMock = ConnectionService as jest.Mock @@ -46,11 +47,11 @@ const messagePickupModuleConfig = new MessagePickupModuleConfig({ const messageSender = new MessageSenderMock() const eventEmitter = new EventEmitterMock() const connectionService = new ConnectionServiceMock() -const messageRepository = new InMessageRepositoryMock() +const messagePickupRepository = new InMessageRepositoryMock() const agentContext = getAgentContext({ registerInstances: [ - [InjectionSymbols.MessageRepository, messageRepository], + [InjectionSymbols.MessagePickupRepository, messagePickupRepository], [EventEmitter, eventEmitter], [MessageSender, messageSender], [ConnectionService, connectionService], @@ -64,9 +65,13 @@ const encryptedMessage: EncryptedMessage = { ciphertext: 'base64url', tag: 'base64url', } -const queuedMessages = [encryptedMessage, encryptedMessage, encryptedMessage] +const queuedMessages = [ + { id: '1', encryptedMessage }, + { id: '2', encryptedMessage }, + { id: '3', encryptedMessage }, +] -describe('V2MessagePickupService', () => { +describe('V2MessagePickupProtocol', () => { let pickupProtocol: V2MessagePickupProtocol beforeEach(async () => { @@ -75,7 +80,7 @@ describe('V2MessagePickupService', () => { describe('processStatusRequest', () => { test('no available messages in queue', async () => { - mockFunction(messageRepository.getAvailableMessageCount).mockResolvedValue(0) + mockFunction(messagePickupRepository.getAvailableMessageCount).mockResolvedValue(0) const statusRequest = new V2StatusRequestMessage({}) @@ -91,11 +96,11 @@ describe('V2MessagePickupService', () => { messageCount: 0, }) ) - expect(messageRepository.getAvailableMessageCount).toHaveBeenCalledWith(mockConnection.id) + expect(messagePickupRepository.getAvailableMessageCount).toHaveBeenCalledWith({ connectionId: mockConnection.id }) }) test('multiple messages in queue', async () => { - mockFunction(messageRepository.getAvailableMessageCount).mockResolvedValue(5) + mockFunction(messagePickupRepository.getAvailableMessageCount).mockResolvedValue(5) const statusRequest = new V2StatusRequestMessage({}) const messageContext = new InboundMessageContext(statusRequest, { connection: mockConnection, agentContext }) @@ -110,27 +115,26 @@ describe('V2MessagePickupService', () => { messageCount: 5, }) ) - expect(messageRepository.getAvailableMessageCount).toHaveBeenCalledWith(mockConnection.id) + expect(messagePickupRepository.getAvailableMessageCount).toHaveBeenCalledWith({ connectionId: mockConnection.id }) }) test('status request specifying recipient key', async () => { - mockFunction(messageRepository.getAvailableMessageCount).mockResolvedValue(10) + mockFunction(messagePickupRepository.getAvailableMessageCount).mockResolvedValue(10) const statusRequest = new V2StatusRequestMessage({ - recipientKey: 'recipientKey', + recipientKey: '79CXkde3j8TNuMXxPdV7nLUrT2g7JAEjH5TreyVY7GEZ', }) const messageContext = new InboundMessageContext(statusRequest, { connection: mockConnection, agentContext }) - await expect(pickupProtocol.processStatusRequest(messageContext)).rejects.toThrowError( - 'recipient_key parameter not supported' - ) + await pickupProtocol.processStatusRequest(messageContext) + expect(messagePickupRepository.getAvailableMessageCount).toHaveBeenCalledWith({ connectionId: mockConnection.id }) }) }) describe('processDeliveryRequest', () => { test('no available messages in queue', async () => { - mockFunction(messageRepository.takeFromQueue).mockReturnValue([]) + mockFunction(messagePickupRepository.takeFromQueue).mockReturnValue([]) const deliveryRequest = new V2DeliveryRequestMessage({ limit: 10 }) @@ -146,11 +150,14 @@ describe('V2MessagePickupService', () => { messageCount: 0, }) ) - expect(messageRepository.takeFromQueue).toHaveBeenCalledWith(mockConnection.id, 10, true) + expect(messagePickupRepository.takeFromQueue).toHaveBeenCalledWith({ + connectionId: mockConnection.id, + limit: 10, + }) }) test('less messages in queue than limit', async () => { - mockFunction(messageRepository.takeFromQueue).mockReturnValue(queuedMessages) + mockFunction(messagePickupRepository.takeFromQueue).mockReturnValue(queuedMessages) const deliveryRequest = new V2DeliveryRequestMessage({ limit: 10 }) @@ -166,18 +173,22 @@ describe('V2MessagePickupService', () => { expect.arrayContaining( queuedMessages.map((msg) => expect.objectContaining({ + id: msg.id, data: { - json: msg, + json: msg.encryptedMessage, }, }) ) ) ) - expect(messageRepository.takeFromQueue).toHaveBeenCalledWith(mockConnection.id, 10, true) + expect(messagePickupRepository.takeFromQueue).toHaveBeenCalledWith({ + connectionId: mockConnection.id, + limit: 10, + }) }) test('more messages in queue than limit', async () => { - mockFunction(messageRepository.takeFromQueue).mockReturnValue(queuedMessages.slice(0, 2)) + mockFunction(messagePickupRepository.takeFromQueue).mockReturnValue(queuedMessages.slice(0, 2)) const deliveryRequest = new V2DeliveryRequestMessage({ limit: 2 }) @@ -193,36 +204,44 @@ describe('V2MessagePickupService', () => { expect.arrayContaining( queuedMessages.slice(0, 2).map((msg) => expect.objectContaining({ + id: msg.id, data: { - json: msg, + json: msg.encryptedMessage, }, }) ) ) ) - expect(messageRepository.takeFromQueue).toHaveBeenCalledWith(mockConnection.id, 2, true) + expect(messagePickupRepository.takeFromQueue).toHaveBeenCalledWith({ + connectionId: mockConnection.id, + limit: 2, + }) }) test('delivery request specifying recipient key', async () => { - mockFunction(messageRepository.takeFromQueue).mockReturnValue(queuedMessages) + mockFunction(messagePickupRepository.takeFromQueue).mockReturnValue(queuedMessages) - const statusRequest = new V2DeliveryRequestMessage({ + const deliveryRequest = new V2DeliveryRequestMessage({ limit: 10, recipientKey: 'recipientKey', }) - const messageContext = new InboundMessageContext(statusRequest, { connection: mockConnection, agentContext }) + const messageContext = new InboundMessageContext(deliveryRequest, { connection: mockConnection, agentContext }) - await expect(pickupProtocol.processStatusRequest(messageContext)).rejects.toThrowError( - 'recipient_key parameter not supported' - ) + await pickupProtocol.processDeliveryRequest(messageContext) + + expect(messagePickupRepository.takeFromQueue).toHaveBeenCalledWith({ + connectionId: mockConnection.id, + limit: 10, + recipientDid: verkeyToDidKey('recipientKey'), + }) }) }) describe('processMessagesReceived', () => { test('messages received partially', async () => { - mockFunction(messageRepository.takeFromQueue).mockReturnValue(queuedMessages) - mockFunction(messageRepository.getAvailableMessageCount).mockResolvedValue(4) + mockFunction(messagePickupRepository.takeFromQueue).mockReturnValue(queuedMessages) + mockFunction(messagePickupRepository.getAvailableMessageCount).mockResolvedValue(4) const messagesReceived = new V2MessagesReceivedMessage({ messageIdList: ['1', '2'], @@ -240,13 +259,16 @@ describe('V2MessagePickupService', () => { messageCount: 4, }) ) - expect(messageRepository.getAvailableMessageCount).toHaveBeenCalledWith(mockConnection.id) - expect(messageRepository.takeFromQueue).toHaveBeenCalledWith(mockConnection.id, 2) + expect(messagePickupRepository.getAvailableMessageCount).toHaveBeenCalledWith({ connectionId: mockConnection.id }) + expect(messagePickupRepository.removeMessages).toHaveBeenCalledWith({ + connectionId: mockConnection.id, + messageIds: ['1', '2'], + }) }) test('all messages have been received', async () => { - mockFunction(messageRepository.takeFromQueue).mockReturnValue(queuedMessages) - mockFunction(messageRepository.getAvailableMessageCount).mockResolvedValue(0) + mockFunction(messagePickupRepository.takeFromQueue).mockReturnValue(queuedMessages) + mockFunction(messagePickupRepository.getAvailableMessageCount).mockResolvedValue(0) const messagesReceived = new V2MessagesReceivedMessage({ messageIdList: ['1', '2'], @@ -265,16 +287,19 @@ describe('V2MessagePickupService', () => { }) ) - expect(messageRepository.getAvailableMessageCount).toHaveBeenCalledWith(mockConnection.id) - expect(messageRepository.takeFromQueue).toHaveBeenCalledWith(mockConnection.id, 2) + expect(messagePickupRepository.getAvailableMessageCount).toHaveBeenCalledWith({ connectionId: mockConnection.id }) + expect(messagePickupRepository.removeMessages).toHaveBeenCalledWith({ + connectionId: mockConnection.id, + messageIds: ['1', '2'], + }) }) }) - describe('pickupMessages', () => { + describe('createPickupMessage', () => { it('creates a status request message', async () => { - const { message: statusRequestMessage } = await pickupProtocol.pickupMessages(agentContext, { + const { message: statusRequestMessage } = await pickupProtocol.createPickupMessage(agentContext, { connectionRecord: mockConnection, - recipientKey: 'a-key', + recipientDid: 'a-key', }) expect(statusRequestMessage).toMatchObject({ diff --git a/packages/core/src/modules/message-pickup/protocol/v2/handlers/V2LiveDeliveryChangeHandler.ts b/packages/core/src/modules/message-pickup/protocol/v2/handlers/V2LiveDeliveryChangeHandler.ts new file mode 100644 index 0000000000..30eeaf035f --- /dev/null +++ b/packages/core/src/modules/message-pickup/protocol/v2/handlers/V2LiveDeliveryChangeHandler.ts @@ -0,0 +1,19 @@ +import type { MessageHandler } from '../../../../../agent/MessageHandler' +import type { InboundMessageContext } from '../../../../../agent/models/InboundMessageContext' +import type { V2MessagePickupProtocol } from '../V2MessagePickupProtocol' + +import { V2LiveDeliveryChangeMessage } from '../messages' + +export class V2LiveDeliveryChangeHandler implements MessageHandler { + public supportedMessages = [V2LiveDeliveryChangeMessage] + private messagePickupService: V2MessagePickupProtocol + + public constructor(messagePickupService: V2MessagePickupProtocol) { + this.messagePickupService = messagePickupService + } + + public async handle(messageContext: InboundMessageContext) { + messageContext.assertReadyConnection() + return this.messagePickupService.processLiveDeliveryChange(messageContext) + } +} diff --git a/packages/core/src/modules/message-pickup/protocol/v2/handlers/index.ts b/packages/core/src/modules/message-pickup/protocol/v2/handlers/index.ts index 5f54b56ac7..f8a173669b 100644 --- a/packages/core/src/modules/message-pickup/protocol/v2/handlers/index.ts +++ b/packages/core/src/modules/message-pickup/protocol/v2/handlers/index.ts @@ -1,4 +1,5 @@ export * from './V2DeliveryRequestHandler' +export * from './V2LiveDeliveryChangeHandler' export * from './V2MessageDeliveryHandler' export * from './V2MessagesReceivedHandler' export * from './V2StatusHandler' diff --git a/packages/core/src/modules/message-pickup/protocol/v2/messages/V2DeliveryRequestMessage.ts b/packages/core/src/modules/message-pickup/protocol/v2/messages/V2DeliveryRequestMessage.ts index b7c37bf426..2a1e73f867 100644 --- a/packages/core/src/modules/message-pickup/protocol/v2/messages/V2DeliveryRequestMessage.ts +++ b/packages/core/src/modules/message-pickup/protocol/v2/messages/V2DeliveryRequestMessage.ts @@ -12,6 +12,8 @@ export interface V2DeliveryRequestMessageOptions { } export class V2DeliveryRequestMessage extends AgentMessage { + public readonly allowQueueTransport = false + public constructor(options: V2DeliveryRequestMessageOptions) { super() diff --git a/packages/core/src/modules/message-pickup/protocol/v2/messages/V2LiveDeliveryChangeMessage.ts b/packages/core/src/modules/message-pickup/protocol/v2/messages/V2LiveDeliveryChangeMessage.ts new file mode 100644 index 0000000000..3b14501f6b --- /dev/null +++ b/packages/core/src/modules/message-pickup/protocol/v2/messages/V2LiveDeliveryChangeMessage.ts @@ -0,0 +1,33 @@ +import { Expose } from 'class-transformer' +import { IsBoolean } from 'class-validator' + +import { AgentMessage } from '../../../../../agent/AgentMessage' +import { ReturnRouteTypes } from '../../../../../decorators/transport/TransportDecorator' +import { IsValidMessageType, parseMessageType } from '../../../../../utils/messageType' + +export interface V2LiveDeliveryChangeMessageOptions { + id?: string + liveDelivery: boolean +} + +export class V2LiveDeliveryChangeMessage extends AgentMessage { + public readonly allowQueueTransport = false + + public constructor(options: V2LiveDeliveryChangeMessageOptions) { + super() + + if (options) { + this.id = options.id || this.generateId() + this.liveDelivery = options.liveDelivery + } + this.setReturnRouting(ReturnRouteTypes.all) + } + + @IsValidMessageType(V2LiveDeliveryChangeMessage.type) + public readonly type = V2LiveDeliveryChangeMessage.type.messageTypeUri + public static readonly type = parseMessageType('https://didcomm.org/messagepickup/2.0/live-delivery-change') + + @IsBoolean() + @Expose({ name: 'live_delivery' }) + public liveDelivery!: boolean +} diff --git a/packages/core/src/modules/message-pickup/protocol/v2/messages/V2MessageDeliveryMessage.ts b/packages/core/src/modules/message-pickup/protocol/v2/messages/V2MessageDeliveryMessage.ts index 48783f634b..4523c5d54b 100644 --- a/packages/core/src/modules/message-pickup/protocol/v2/messages/V2MessageDeliveryMessage.ts +++ b/packages/core/src/modules/message-pickup/protocol/v2/messages/V2MessageDeliveryMessage.ts @@ -10,11 +10,13 @@ import { IsValidMessageType, parseMessageType } from '../../../../../utils/messa export interface V2MessageDeliveryMessageOptions { id?: string recipientKey?: string - threadId: string + threadId?: string attachments: Attachment[] } export class V2MessageDeliveryMessage extends AgentMessage { + public readonly allowQueueTransport = false + public constructor(options: V2MessageDeliveryMessageOptions) { super() @@ -22,9 +24,11 @@ export class V2MessageDeliveryMessage extends AgentMessage { this.id = options.id || this.generateId() this.recipientKey = options.recipientKey this.appendedAttachments = options.attachments - this.setThread({ - threadId: options.threadId, - }) + if (this.threadId) { + this.setThread({ + threadId: options.threadId, + }) + } } this.setReturnRouting(ReturnRouteTypes.all) } diff --git a/packages/core/src/modules/message-pickup/protocol/v2/messages/V2MessagesReceivedMessage.ts b/packages/core/src/modules/message-pickup/protocol/v2/messages/V2MessagesReceivedMessage.ts index 23da433de6..889e08853c 100644 --- a/packages/core/src/modules/message-pickup/protocol/v2/messages/V2MessagesReceivedMessage.ts +++ b/packages/core/src/modules/message-pickup/protocol/v2/messages/V2MessagesReceivedMessage.ts @@ -1,5 +1,5 @@ import { Expose } from 'class-transformer' -import { IsArray, IsOptional } from 'class-validator' +import { IsArray } from 'class-validator' import { AgentMessage } from '../../../../../agent/AgentMessage' import { ReturnRouteTypes } from '../../../../../decorators/transport/TransportDecorator' @@ -11,6 +11,8 @@ export interface V2MessagesReceivedMessageOptions { } export class V2MessagesReceivedMessage extends AgentMessage { + public readonly allowQueueTransport = false + public constructor(options: V2MessagesReceivedMessageOptions) { super() @@ -26,7 +28,6 @@ export class V2MessagesReceivedMessage extends AgentMessage { public static readonly type = parseMessageType('https://didcomm.org/messagepickup/2.0/messages-received') @IsArray() - @IsOptional() @Expose({ name: 'message_id_list' }) - public messageIdList?: string[] + public messageIdList!: string[] } diff --git a/packages/core/src/modules/message-pickup/protocol/v2/messages/V2StatusMessage.ts b/packages/core/src/modules/message-pickup/protocol/v2/messages/V2StatusMessage.ts index a28296742e..46d3a8c226 100644 --- a/packages/core/src/modules/message-pickup/protocol/v2/messages/V2StatusMessage.ts +++ b/packages/core/src/modules/message-pickup/protocol/v2/messages/V2StatusMessage.ts @@ -19,6 +19,8 @@ export interface V2StatusMessageOptions { } export class V2StatusMessage extends AgentMessage { + public readonly allowQueueTransport = false + public constructor(options: V2StatusMessageOptions) { super() if (options) { diff --git a/packages/core/src/modules/message-pickup/protocol/v2/messages/V2StatusRequestMessage.ts b/packages/core/src/modules/message-pickup/protocol/v2/messages/V2StatusRequestMessage.ts index eb6908bae2..c10acf8b75 100644 --- a/packages/core/src/modules/message-pickup/protocol/v2/messages/V2StatusRequestMessage.ts +++ b/packages/core/src/modules/message-pickup/protocol/v2/messages/V2StatusRequestMessage.ts @@ -10,6 +10,8 @@ export interface V2StatusRequestMessageOptions { } export class V2StatusRequestMessage extends AgentMessage { + public readonly allowQueueTransport = false + public constructor(options: V2StatusRequestMessageOptions) { super() diff --git a/packages/core/src/modules/message-pickup/protocol/v2/messages/index.ts b/packages/core/src/modules/message-pickup/protocol/v2/messages/index.ts index 4746216ec0..df70290a6f 100644 --- a/packages/core/src/modules/message-pickup/protocol/v2/messages/index.ts +++ b/packages/core/src/modules/message-pickup/protocol/v2/messages/index.ts @@ -1,4 +1,5 @@ export * from './V2DeliveryRequestMessage' +export * from './V2LiveDeliveryChangeMessage' export * from './V2MessageDeliveryMessage' export * from './V2MessagesReceivedMessage' export * from './V2StatusMessage' diff --git a/packages/core/src/modules/message-pickup/services/MessagePickupSessionService.ts b/packages/core/src/modules/message-pickup/services/MessagePickupSessionService.ts new file mode 100644 index 0000000000..7e726c7c8a --- /dev/null +++ b/packages/core/src/modules/message-pickup/services/MessagePickupSessionService.ts @@ -0,0 +1,103 @@ +import type { AgentContext } from '../../../agent' +import type { TransportSessionRemovedEvent } from '../../../transport' +import type { MessagePickupLiveSessionRemovedEvent, MessagePickupLiveSessionSavedEvent } from '../MessagePickupEvents' +import type { MessagePickupSession, MessagePickupSessionRole } from '../MessagePickupSession' + +import { takeUntil, type Subject } from 'rxjs' + +import { EventEmitter } from '../../../agent/EventEmitter' +import { InjectionSymbols } from '../../../constants' +import { injectable } from '../../../plugins' +import { TransportEventTypes } from '../../../transport' +import { uuid } from '../../../utils/uuid' +import { MessagePickupEventTypes } from '../MessagePickupEvents' + +/** + * @internal + * The Message Pickup session service keeps track of all {@link MessagePickupSession} + * + * It is initially intended for Message Holder/Mediator role, where only Live Mode sessions are + * considered. + */ +@injectable() +export class MessagePickupSessionService { + private sessions: MessagePickupSession[] + + public constructor() { + this.sessions = [] + } + + public start(agentContext: AgentContext) { + const stop$ = agentContext.dependencyManager.resolve>(InjectionSymbols.Stop$) + const eventEmitter = agentContext.dependencyManager.resolve(EventEmitter) + this.sessions = [] + + eventEmitter + .observable(TransportEventTypes.TransportSessionRemoved) + .pipe(takeUntil(stop$)) + .subscribe({ + next: (e) => { + const connectionId = e.payload.session.connectionId + if (connectionId) this.removeLiveSession(agentContext, { connectionId }) + }, + }) + } + + public getLiveSession(agentContext: AgentContext, sessionId: string) { + return this.sessions.find((session) => session.id === sessionId) + } + + public getLiveSessionByConnectionId( + agentContext: AgentContext, + options: { connectionId: string; role?: MessagePickupSessionRole } + ) { + const { connectionId, role } = options + + return this.sessions.find( + (session) => session.connectionId === connectionId && (role === undefined || role === session.role) + ) + } + + public saveLiveSession( + agentContext: AgentContext, + options: { connectionId: string; protocolVersion: string; role: MessagePickupSessionRole } + ) { + const { connectionId, protocolVersion, role } = options + + // First remove any live session for the given connection Id + this.removeLiveSession(agentContext, { connectionId }) + + const session = { + id: uuid(), + connectionId, + protocolVersion, + role, + } + + this.sessions.push(session) + + const eventEmitter = agentContext.dependencyManager.resolve(EventEmitter) + eventEmitter.emit(agentContext, { + type: MessagePickupEventTypes.LiveSessionSaved, + payload: { + session, + }, + }) + } + + public removeLiveSession(agentContext: AgentContext, options: { connectionId: string }) { + const itemIndex = this.sessions.findIndex((session) => session.connectionId === options.connectionId) + + if (itemIndex > -1) { + const [session] = this.sessions.splice(itemIndex, 1) + const eventEmitter = agentContext.dependencyManager.resolve(EventEmitter) + + eventEmitter.emit(agentContext, { + type: MessagePickupEventTypes.LiveSessionRemoved, + payload: { + session, + }, + }) + } + } +} diff --git a/packages/core/src/modules/message-pickup/services/index.ts b/packages/core/src/modules/message-pickup/services/index.ts new file mode 100644 index 0000000000..e91435bfaf --- /dev/null +++ b/packages/core/src/modules/message-pickup/services/index.ts @@ -0,0 +1 @@ +export * from './MessagePickupSessionService' diff --git a/packages/core/src/modules/message-pickup/storage/InMemoryMessagePickupRepository.ts b/packages/core/src/modules/message-pickup/storage/InMemoryMessagePickupRepository.ts new file mode 100644 index 0000000000..f066899369 --- /dev/null +++ b/packages/core/src/modules/message-pickup/storage/InMemoryMessagePickupRepository.ts @@ -0,0 +1,95 @@ +import type { MessagePickupRepository } from './MessagePickupRepository' +import type { + AddMessageOptions, + GetAvailableMessageCountOptions, + RemoveMessagesOptions, + TakeFromQueueOptions, +} from './MessagePickupRepositoryOptions' +import type { QueuedMessage } from './QueuedMessage' + +import { InjectionSymbols } from '../../../constants' +import { Logger } from '../../../logger' +import { injectable, inject } from '../../../plugins' +import { uuid } from '../../../utils/uuid' + +interface InMemoryQueuedMessage extends QueuedMessage { + connectionId: string + recipientDids: string[] + state: 'pending' | 'sending' +} + +@injectable() +export class InMemoryMessagePickupRepository implements MessagePickupRepository { + private logger: Logger + private messages: InMemoryQueuedMessage[] + + public constructor(@inject(InjectionSymbols.Logger) logger: Logger) { + this.logger = logger + this.messages = [] + } + + public getAvailableMessageCount(options: GetAvailableMessageCountOptions): number | Promise { + const { connectionId, recipientDid } = options + + const messages = this.messages.filter( + (msg) => + msg.connectionId === connectionId && + (recipientDid === undefined || msg.recipientDids.includes(recipientDid)) && + msg.state === 'pending' + ) + return messages.length + } + + public takeFromQueue(options: TakeFromQueueOptions): QueuedMessage[] { + const { connectionId, recipientDid, limit, deleteMessages } = options + + let messages = this.messages.filter( + (msg) => + msg.connectionId === connectionId && + msg.state === 'pending' && + (recipientDid === undefined || msg.recipientDids.includes(recipientDid)) + ) + + const messagesToTake = limit ?? messages.length + + messages = messages.slice(0, messagesToTake) + + this.logger.debug(`Taking ${messagesToTake} messages from queue for connection ${connectionId}`) + + // Mark taken messages in order to prevent them of being retrieved again + messages.forEach((msg) => { + const index = this.messages.findIndex((item) => item.id === msg.id) + if (index !== -1) this.messages[index].state = 'sending' + }) + + if (deleteMessages) { + this.removeMessages({ connectionId, messageIds: messages.map((msg) => msg.id) }) + } + + return messages + } + + public addMessage(options: AddMessageOptions) { + const { connectionId, recipientDids, payload } = options + + const id = uuid() + this.messages.push({ + id, + connectionId, + encryptedMessage: payload, + recipientDids, + state: 'pending', + }) + + return id + } + + public removeMessages(options: RemoveMessagesOptions) { + const { messageIds } = options + + for (const messageId of messageIds) { + const messageIndex = this.messages.findIndex((item) => item.id === messageId) + if (messageIndex > -1) this.messages.splice(messageIndex, 1) + } + } +} diff --git a/packages/core/src/modules/message-pickup/storage/MessagePickupRepository.ts b/packages/core/src/modules/message-pickup/storage/MessagePickupRepository.ts new file mode 100644 index 0000000000..6b234918ce --- /dev/null +++ b/packages/core/src/modules/message-pickup/storage/MessagePickupRepository.ts @@ -0,0 +1,14 @@ +import type { + AddMessageOptions, + GetAvailableMessageCountOptions, + RemoveMessagesOptions, + TakeFromQueueOptions, +} from './MessagePickupRepositoryOptions' +import type { QueuedMessage } from './QueuedMessage' + +export interface MessagePickupRepository { + getAvailableMessageCount(options: GetAvailableMessageCountOptions): number | Promise + takeFromQueue(options: TakeFromQueueOptions): QueuedMessage[] | Promise + addMessage(options: AddMessageOptions): string | Promise + removeMessages(options: RemoveMessagesOptions): void | Promise +} diff --git a/packages/core/src/modules/message-pickup/storage/MessagePickupRepositoryOptions.ts b/packages/core/src/modules/message-pickup/storage/MessagePickupRepositoryOptions.ts new file mode 100644 index 0000000000..e586d5756a --- /dev/null +++ b/packages/core/src/modules/message-pickup/storage/MessagePickupRepositoryOptions.ts @@ -0,0 +1,24 @@ +import type { EncryptedMessage } from '../../../types' + +export interface GetAvailableMessageCountOptions { + connectionId: string + recipientDid?: string +} + +export interface TakeFromQueueOptions { + connectionId: string + recipientDid?: string + limit?: number + deleteMessages?: boolean +} + +export interface AddMessageOptions { + connectionId: string + recipientDids: string[] + payload: EncryptedMessage +} + +export interface RemoveMessagesOptions { + connectionId: string + messageIds: string[] +} diff --git a/packages/core/src/modules/message-pickup/storage/QueuedMessage.ts b/packages/core/src/modules/message-pickup/storage/QueuedMessage.ts new file mode 100644 index 0000000000..b554e08184 --- /dev/null +++ b/packages/core/src/modules/message-pickup/storage/QueuedMessage.ts @@ -0,0 +1,6 @@ +import type { EncryptedMessage } from '../../../types' + +export type QueuedMessage = { + id: string + encryptedMessage: EncryptedMessage +} diff --git a/packages/core/src/modules/message-pickup/storage/index.ts b/packages/core/src/modules/message-pickup/storage/index.ts new file mode 100644 index 0000000000..1894b67d72 --- /dev/null +++ b/packages/core/src/modules/message-pickup/storage/index.ts @@ -0,0 +1,4 @@ +export * from './InMemoryMessagePickupRepository' +export * from './MessagePickupRepository' +export * from './MessagePickupRepositoryOptions' +export * from './QueuedMessage' diff --git a/packages/core/src/modules/routing/MediationRecipientApi.ts b/packages/core/src/modules/routing/MediationRecipientApi.ts index f3a97681b9..be8e5d137d 100644 --- a/packages/core/src/modules/routing/MediationRecipientApi.ts +++ b/packages/core/src/modules/routing/MediationRecipientApi.ts @@ -149,7 +149,14 @@ export class MediationRecipientApi { ) } - private async openWebSocketAndPickUp(mediator: MediationRecord, pickupStrategy: MediatorPickupStrategy) { + /** + * Keep track of a persistent transport session with a mediator, trying to reconnect to it as + * soon as it is disconnected, using a recursive back-off strategy + * + * @param mediator mediation record + * @param pickupStrategy chosen pick up strategy (should be Implicit or PickUp in Live Mode) + */ + private async monitorMediatorWebSocketEvents(mediator: MediationRecord, pickupStrategy: MediatorPickupStrategy) { const { baseMediatorReconnectionIntervalMs, maximumMediatorReconnectionIntervalMs } = this.config let interval = baseMediatorReconnectionIntervalMs @@ -197,11 +204,11 @@ export class MediationRecipientApi { `Websocket connection to mediator with connectionId '${mediator.connectionId}' is closed, attempting to reconnect...` ) try { - if (pickupStrategy === MediatorPickupStrategy.PickUpV2) { - // Start Pickup v2 protocol to receive messages received while websocket offline - await this.messagePickupApi.pickupMessages({ + if (pickupStrategy === MediatorPickupStrategy.PickUpV2LiveMode) { + // Start Pickup v2 protocol in live mode (retrieve any queued message before) + await this.messagePickupApi.setLiveDeliveryMode({ connectionId: mediator.connectionId, - batchSize: this.config.maximumMessagePickup, + liveDelivery: true, protocolVersion: 'v2', }) } else { @@ -213,13 +220,6 @@ export class MediationRecipientApi { }, complete: () => this.logger.info(`Stopping pickup of messages from mediator '${mediator.id}'`), }) - try { - if (pickupStrategy === MediatorPickupStrategy.Implicit) { - await this.openMediationWebSocket(mediator) - } - } catch (error) { - this.logger.warn('Unable to open websocket connection to mediator', { error }) - } } /** @@ -242,18 +242,10 @@ export class MediationRecipientApi { const mediatorConnection = await this.connectionService.getById(this.agentContext, mediatorRecord.connectionId) switch (mediatorPickupStrategy) { - case MediatorPickupStrategy.PickUpV2: - this.logger.info(`Starting pickup of messages from mediator '${mediatorRecord.id}'`) - await this.openWebSocketAndPickUp(mediatorRecord, mediatorPickupStrategy) - await this.messagePickupApi.pickupMessages({ - connectionId: mediatorConnection.id, - batchSize: this.config.maximumMessagePickup, - protocolVersion: 'v2', - }) - break - case MediatorPickupStrategy.PickUpV1: { + case MediatorPickupStrategy.PickUpV1: + case MediatorPickupStrategy.PickUpV2: { const stopConditions$ = merge(this.stop$, this.stopMessagePickup$).pipe() - // Explicit means polling every X seconds with batch message + // PickUpV1/PickUpV2 means polling every X seconds with batch message this.logger.info(`Starting explicit (batch) pickup of messages from mediator '${mediatorRecord.id}'`) const subscription = interval(mediatorPollingInterval) .pipe(takeUntil(stopConditions$)) @@ -262,18 +254,30 @@ export class MediationRecipientApi { await this.messagePickupApi.pickupMessages({ connectionId: mediatorConnection.id, batchSize: this.config.maximumMessagePickup, - protocolVersion: 'v1', + protocolVersion: mediatorPickupStrategy === MediatorPickupStrategy.PickUpV2 ? 'v2' : 'v1', }) }, complete: () => this.logger.info(`Stopping pickup of messages from mediator '${mediatorRecord.id}'`), }) return subscription } + case MediatorPickupStrategy.PickUpV2LiveMode: + // PickUp V2 in Live Mode will retrieve queued messages and then set up live delivery mode + this.logger.info(`Starting pickup of messages from mediator '${mediatorRecord.id}'`) + await this.monitorMediatorWebSocketEvents(mediatorRecord, mediatorPickupStrategy) + await this.messagePickupApi.setLiveDeliveryMode({ + connectionId: mediatorConnection.id, + liveDelivery: true, + protocolVersion: 'v2', + }) + + break case MediatorPickupStrategy.Implicit: // Implicit means sending ping once and keeping connection open. This requires a long-lived transport // such as WebSockets to work this.logger.info(`Starting implicit pickup of messages from mediator '${mediatorRecord.id}'`) - await this.openWebSocketAndPickUp(mediatorRecord, mediatorPickupStrategy) + await this.monitorMediatorWebSocketEvents(mediatorRecord, mediatorPickupStrategy) + await this.openMediationWebSocket(mediatorRecord) break default: this.logger.info(`Skipping pickup of messages from mediator '${mediatorRecord.id}' due to pickup strategy none`) @@ -329,20 +333,6 @@ export class MediationRecipientApi { return this.mediationRecipientService.discoverMediation(this.agentContext) } - /** - * @deprecated Use `MessagePickupApi.pickupMessages` instead. - * */ - public async pickupMessages(mediatorConnection: ConnectionRecord, pickupStrategy?: MediatorPickupStrategy) { - mediatorConnection.assertReady() - - const messagePickupApi = this.agentContext.dependencyManager.resolve(MessagePickupApi) - - await messagePickupApi.pickupMessages({ - connectionId: mediatorConnection.id, - protocolVersion: pickupStrategy === MediatorPickupStrategy.PickUpV2 ? 'v2' : 'v1', - }) - } - public async setDefaultMediator(mediatorRecord: MediationRecord) { return this.mediationRecipientService.setDefaultMediator(this.agentContext, mediatorRecord) } diff --git a/packages/core/src/modules/routing/MediationRecipientModuleConfig.ts b/packages/core/src/modules/routing/MediationRecipientModuleConfig.ts index 6f94234fc5..cab467c18e 100644 --- a/packages/core/src/modules/routing/MediationRecipientModuleConfig.ts +++ b/packages/core/src/modules/routing/MediationRecipientModuleConfig.ts @@ -10,28 +10,28 @@ export interface MediationRecipientModuleConfigOptions { * features protocol to determine the best strategy. * * - * - `MediatorPickupStrategy.PickUpV1` - explicitly pick up messages from the mediator according to [RFC 0212 Pickup Protocol](https://github.com/hyperledger/aries-rfcs/blob/main/features/0212-pickup/README.md) - * - `MediatorPickupStrategy.PickUpV2` - pick up messages from the mediator according to [RFC 0685 Pickup V2 Protocol](https://github.com/hyperledger/aries-rfcs/tree/main/features/0685-pickup-v2/README.md). - * - `MediatorPickupStrategy.Implicit` - Open a WebSocket with the mediator to implicitly receive messages. (currently used by Aries Cloud Agent Python) - * - `MediatorPickupStrategy.None` - Do not retrieve messages from the mediator. + * - `MediatorPickupStrategy.PickUpV1` - explicitly pick up messages from the mediator in periodic loops according to [RFC 0212 Pickup Protocol](https://github.com/hyperledger/aries-rfcs/blob/main/features/0212-pickup/README.md) + * - `MediatorPickupStrategy.PickUpV2` - pick up messages from the mediator in periodic loops according to [RFC 0685 Pickup V2 Protocol](https://github.com/hyperledger/aries-rfcs/tree/main/features/0685-pickup-v2/README.md). + * - `MediatorPickupStrategy.PickUpV2LiveMode` - pick up messages from the mediator using Live Mode as specified in [RFC 0685 Pickup V2 Protocol](https://github.com/hyperledger/aries-rfcs/tree/main/features/0685-pickup-v2/README.md). + * - `MediatorPickupStrategy.Implicit` - Open a WebSocket with the mediator to implicitly receive messages. (currently used by Aries Cloud Agent Python) + * - `MediatorPickupStrategy.None` - Do not retrieve messages from the mediator automatically. You can launch manual pickup flows afterwards. * * @default undefined */ mediatorPickupStrategy?: MediatorPickupStrategy /** - * Interval in milliseconds between picking up message from the mediator. This is only applicable when the pickup protocol v1 - * is used. + * Interval in milliseconds between picking up message from the mediator. This is only applicable when the pickup protocol v1 or v2 in polling mode + * are used. * * @default 5000 */ mediatorPollingInterval?: number /** - * Maximum number of messages to retrieve from the mediator in a single batch. This is only applicable when the pickup protocol v2 + * Maximum number of messages to retrieve from the mediator in a single batch. This is applicable for both pickup protocol v1 and v2 * is used. * - * @todo integrate with pickup protocol v1 * @default 10 */ maximumMessagePickup?: number diff --git a/packages/core/src/modules/routing/MediatorApi.ts b/packages/core/src/modules/routing/MediatorApi.ts index 7249bca19e..62c456b31c 100644 --- a/packages/core/src/modules/routing/MediatorApi.ts +++ b/packages/core/src/modules/routing/MediatorApi.ts @@ -1,5 +1,4 @@ import type { MediationRecord } from './repository' -import type { EncryptedMessage } from '../../types' import { AgentContext } from '../../agent' import { MessageHandlerRegistry } from '../../agent/MessageHandlerRegistry' @@ -7,7 +6,6 @@ import { MessageSender } from '../../agent/MessageSender' import { OutboundMessageContext } from '../../agent/models' import { injectable } from '../../plugins' import { ConnectionService } from '../connections/services' -import { MessagePickupApi } from '../message-pickup' import { MediatorModuleConfig } from './MediatorModuleConfig' import { ForwardHandler, KeylistUpdateHandler } from './handlers' @@ -52,8 +50,8 @@ export class MediatorApi { } } - public async grantRequestedMediation(mediatorId: string): Promise { - const record = await this.mediatorService.getById(this.agentContext, mediatorId) + public async grantRequestedMediation(mediationRecordId: string): Promise { + const record = await this.mediatorService.getById(this.agentContext, mediationRecordId) const connectionRecord = await this.connectionService.getById(this.agentContext, record.connectionId) const { message, mediationRecord } = await this.mediatorService.createGrantMediationMessage( @@ -71,19 +69,9 @@ export class MediatorApi { return mediationRecord } - /** - * @deprecated Use `MessagePickupApi.queueMessage` instead. - * */ - public queueMessage(connectionId: string, message: EncryptedMessage) { - const messagePickupApi = this.agentContext.dependencyManager.resolve(MessagePickupApi) - return messagePickupApi.queueMessage({ connectionId, message }) - } - private registerMessageHandlers(messageHandlerRegistry: MessageHandlerRegistry) { messageHandlerRegistry.registerMessageHandler(new KeylistUpdateHandler(this.mediatorService)) - messageHandlerRegistry.registerMessageHandler( - new ForwardHandler(this.mediatorService, this.connectionService, this.messageSender) - ) + messageHandlerRegistry.registerMessageHandler(new ForwardHandler(this.mediatorService)) messageHandlerRegistry.registerMessageHandler(new MediationRequestHandler(this.mediatorService, this.config)) } } diff --git a/packages/core/src/modules/routing/MediatorModuleConfig.ts b/packages/core/src/modules/routing/MediatorModuleConfig.ts index 8b70d9591a..e20fc8422c 100644 --- a/packages/core/src/modules/routing/MediatorModuleConfig.ts +++ b/packages/core/src/modules/routing/MediatorModuleConfig.ts @@ -1,3 +1,5 @@ +import { MessageForwardingStrategy } from './MessageForwardingStrategy' + /** * MediatorModuleConfigOptions defines the interface for the options of the MediatorModuleConfig class. * This can contain optional parameters that have default values in the config class itself. @@ -9,6 +11,19 @@ export interface MediatorModuleConfigOptions { * @default false */ autoAcceptMediationRequests?: boolean + + /** + * Strategy to use when a Forward message is received. + * + * + * - `MessageForwardingStrategy.QueueOnly` - simply queue encrypted message into MessagePickupRepository. It will be in charge of manually trigering MessagePickupApi.deliver() afterwards. + * - `MessageForwardingStrategy.QueueAndLiveModeDelivery` - Queue message into MessagePickupRepository and deliver it (along any other queued message). + * - `MessageForwardingStrategy.DirectDelivery` - Deliver message directly. Do not add into queue (it might be manually added after, e.g. in case of failure) + * + * @default MessageForwardingStrategy.DirectDelivery + * @todo Update default to QueueAndLiveModeDelivery + */ + messageForwardingStrategy?: MessageForwardingStrategy } export class MediatorModuleConfig { @@ -22,4 +37,9 @@ export class MediatorModuleConfig { public get autoAcceptMediationRequests() { return this.options.autoAcceptMediationRequests ?? false } + + /** See {@link MediatorModuleConfigOptions.messageForwardingStrategy} */ + public get messageForwardingStrategy() { + return this.options.messageForwardingStrategy ?? MessageForwardingStrategy.DirectDelivery + } } diff --git a/packages/core/src/modules/routing/MediatorPickupStrategy.ts b/packages/core/src/modules/routing/MediatorPickupStrategy.ts index d4889b6ac9..1104abf7cb 100644 --- a/packages/core/src/modules/routing/MediatorPickupStrategy.ts +++ b/packages/core/src/modules/routing/MediatorPickupStrategy.ts @@ -1,10 +1,14 @@ export enum MediatorPickupStrategy { - // Explicit pickup strategy means picking up messages using the pickup protocol + // Use PickUp v1 protocol to periodically retrieve messages PickUpV1 = 'PickUpV1', - // Supports pickup v2 + // Use PickUp v2 protocol to periodically retrieve messages PickUpV2 = 'PickUpV2', + // Use PickUp v2 protocol in Live Mode to get incoming messages as soon as they arrive + // to mediator + PickUpV2LiveMode = 'PickUpV2LiveMode', + // Implicit pickup strategy means picking up messages only using return route // decorator. This is what ACA-Py currently uses Implicit = 'Implicit', diff --git a/packages/core/src/modules/routing/MessageForwardingStrategy.ts b/packages/core/src/modules/routing/MessageForwardingStrategy.ts new file mode 100644 index 0000000000..06ce1e05c9 --- /dev/null +++ b/packages/core/src/modules/routing/MessageForwardingStrategy.ts @@ -0,0 +1,13 @@ +export enum MessageForwardingStrategy { + // When a forward is received, simply queue encrypted message. MessagePickupRepository + // will be in charge of manually triggering MessagePickupApi.deliverMessages() + QueueOnly = 'QueueOnly', + + // Queue message into MessagePickupRepository and, if a Message Pickup Live mode session is active, + // deliver it along any other queued message + QueueAndLiveModeDelivery = 'QueueAndLiveModeDelivery', + + // Attempt to deliver message directly if a transport session is available. It will eventually added + // into pickup queue in case of failure on the delivery + DirectDelivery = 'DirectDelivery', +} diff --git a/packages/core/src/modules/routing/handlers/ForwardHandler.ts b/packages/core/src/modules/routing/handlers/ForwardHandler.ts index 960237fc45..2ff27a0dae 100644 --- a/packages/core/src/modules/routing/handlers/ForwardHandler.ts +++ b/packages/core/src/modules/routing/handlers/ForwardHandler.ts @@ -1,40 +1,17 @@ import type { MessageHandler, MessageHandlerInboundMessage } from '../../../agent/MessageHandler' -import type { MessageSender } from '../../../agent/MessageSender' -import type { ConnectionService } from '../../connections/services' import type { MediatorService } from '../services' import { ForwardMessage } from '../messages' export class ForwardHandler implements MessageHandler { private mediatorService: MediatorService - private connectionService: ConnectionService - private messageSender: MessageSender - public supportedMessages = [ForwardMessage] - public constructor( - mediatorService: MediatorService, - connectionService: ConnectionService, - messageSender: MessageSender - ) { + public constructor(mediatorService: MediatorService) { this.mediatorService = mediatorService - this.connectionService = connectionService - this.messageSender = messageSender } public async handle(messageContext: MessageHandlerInboundMessage) { - const { encryptedMessage, mediationRecord } = await this.mediatorService.processForwardMessage(messageContext) - - const connectionRecord = await this.connectionService.getById( - messageContext.agentContext, - mediationRecord.connectionId - ) - - // The message inside the forward message is packed so we just send the packed - // message to the connection associated with it - await this.messageSender.sendPackage(messageContext.agentContext, { - connection: connectionRecord, - encryptedMessage, - }) + await this.mediatorService.processForwardMessage(messageContext) } } diff --git a/packages/core/src/modules/routing/services/MediatorService.ts b/packages/core/src/modules/routing/services/MediatorService.ts index 7b3df9b861..d7cdf433f1 100644 --- a/packages/core/src/modules/routing/services/MediatorService.ts +++ b/packages/core/src/modules/routing/services/MediatorService.ts @@ -1,12 +1,12 @@ import type { AgentContext } from '../../../agent' import type { InboundMessageContext } from '../../../agent/models/InboundMessageContext' import type { Query } from '../../../storage/StorageService' -import type { EncryptedMessage } from '../../../types' import type { ConnectionRecord } from '../../connections' import type { MediationStateChangedEvent } from '../RoutingEvents' import type { ForwardMessage, MediationRequestMessage } from '../messages' import { EventEmitter } from '../../../agent/EventEmitter' +import { MessageSender } from '../../../agent/MessageSender' import { InjectionSymbols } from '../../../constants' import { KeyType } from '../../../crypto' import { AriesFrameworkError, RecordDuplicateError } from '../../../error' @@ -15,6 +15,10 @@ import { injectable, inject } from '../../../plugins' import { ConnectionService } from '../../connections' import { ConnectionMetadataKeys } from '../../connections/repository/ConnectionMetadataTypes' import { didKeyToVerkey, isDidKey, verkeyToDidKey } from '../../dids/helpers' +import { MessagePickupApi } from '../../message-pickup' +import { MessagePickupSessionRole } from '../../message-pickup/MessagePickupSession' +import { MediatorModuleConfig } from '../MediatorModuleConfig' +import { MessageForwardingStrategy } from '../MessageForwardingStrategy' import { RoutingEventTypes } from '../RoutingEvents' import { KeylistUpdateMessage, @@ -36,18 +40,21 @@ export class MediatorService { private logger: Logger private mediationRepository: MediationRepository private mediatorRoutingRepository: MediatorRoutingRepository + private messagePickupApi: MessagePickupApi private eventEmitter: EventEmitter private connectionService: ConnectionService public constructor( mediationRepository: MediationRepository, mediatorRoutingRepository: MediatorRoutingRepository, + messagePickupApi: MessagePickupApi, eventEmitter: EventEmitter, @inject(InjectionSymbols.Logger) logger: Logger, connectionService: ConnectionService ) { this.mediationRepository = mediationRepository this.mediatorRoutingRepository = mediatorRoutingRepository + this.messagePickupApi = messagePickupApi this.eventEmitter = eventEmitter this.logger = logger this.connectionService = connectionService @@ -64,28 +71,61 @@ export class MediatorService { throw new AriesFrameworkError(`Mediator has not been initialized yet.`) } - public async processForwardMessage( - messageContext: InboundMessageContext - ): Promise<{ mediationRecord: MediationRecord; encryptedMessage: EncryptedMessage }> { - const { message } = messageContext + public async processForwardMessage(messageContext: InboundMessageContext): Promise { + const { message, agentContext } = messageContext // TODO: update to class-validator validation if (!message.to) { throw new AriesFrameworkError('Invalid Message: Missing required attribute "to"') } - const mediationRecord = await this.mediationRepository.getSingleByRecipientKey( - messageContext.agentContext, - message.to - ) + const mediationRecord = await this.mediationRepository.getSingleByRecipientKey(agentContext, message.to) // Assert mediation record is ready to be used mediationRecord.assertReady() mediationRecord.assertRole(MediationRole.Mediator) - return { - encryptedMessage: message.message, - mediationRecord, + const connection = await this.connectionService.getById(agentContext, mediationRecord.connectionId) + connection.assertReady() + + const messageForwardingStrategy = + agentContext.dependencyManager.resolve(MediatorModuleConfig).messageForwardingStrategy + const messageSender = agentContext.dependencyManager.resolve(MessageSender) + + switch (messageForwardingStrategy) { + case MessageForwardingStrategy.QueueOnly: + await this.messagePickupApi.queueMessage({ + connectionId: mediationRecord.connectionId, + recipientDids: [verkeyToDidKey(message.to)], + message: message.message, + }) + break + case MessageForwardingStrategy.QueueAndLiveModeDelivery: { + await this.messagePickupApi.queueMessage({ + connectionId: mediationRecord.connectionId, + recipientDids: [verkeyToDidKey(message.to)], + message: message.message, + }) + const session = await this.messagePickupApi.getLiveModeSession({ + connectionId: mediationRecord.connectionId, + role: MessagePickupSessionRole.MessageHolder, + }) + if (session) { + await this.messagePickupApi.deliverMessagesFromQueue({ + pickupSessionId: session.id, + recipientDid: verkeyToDidKey(message.to), + }) + } + break + } + case MessageForwardingStrategy.DirectDelivery: + // The message inside the forward message is packed so we just send the packed + // message to the connection associated with it + await messageSender.sendPackage(agentContext, { + connection, + recipientKey: verkeyToDidKey(message.to), + encryptedMessage: message.message, + }) } } diff --git a/packages/core/src/modules/routing/services/__tests__/MediatorService.test.ts b/packages/core/src/modules/routing/services/__tests__/MediatorService.test.ts index 017de44042..a2741fc29a 100644 --- a/packages/core/src/modules/routing/services/__tests__/MediatorService.test.ts +++ b/packages/core/src/modules/routing/services/__tests__/MediatorService.test.ts @@ -5,6 +5,7 @@ import { EventEmitter } from '../../../../agent/EventEmitter' import { InboundMessageContext } from '../../../../agent/models/InboundMessageContext' import { ConnectionService, DidExchangeState } from '../../../connections' import { isDidKey } from '../../../dids/helpers' +import { MessagePickupApi } from '../../../message-pickup' import { KeylistUpdateAction, KeylistUpdateMessage, KeylistUpdateResult } from '../../messages' import { MediationRole, MediationState } from '../../models' import { MediationRecord, MediatorRoutingRecord } from '../../repository' @@ -21,9 +22,13 @@ const MediatorRoutingRepositoryMock = MediatorRoutingRepository as jest.Mock +jest.mock('../../../connections/services/ConnectionService') +const MessagePickupApiMock = MessagePickupApi as jest.Mock + const mediationRepository = new MediationRepositoryMock() const mediatorRoutingRepository = new MediatorRoutingRepositoryMock() const connectionService = new ConnectionServiceMock() +const mediationPickupApi = new MessagePickupApiMock() const mockConnection = getMockConnection({ state: DidExchangeState.Completed, @@ -39,6 +44,7 @@ describe('MediatorService - default config', () => { const mediatorService = new MediatorService( mediationRepository, mediatorRoutingRepository, + mediationPickupApi, new EventEmitter(agentConfig.agentDependencies, new Subject()), agentConfig.logger, connectionService @@ -165,6 +171,7 @@ describe('MediatorService - useDidKeyInProtocols set to false', () => { const mediatorService = new MediatorService( mediationRepository, mediatorRoutingRepository, + mediationPickupApi, new EventEmitter(agentConfig.agentDependencies, new Subject()), agentConfig.logger, connectionService diff --git a/packages/core/src/storage/InMemoryMessageRepository.ts b/packages/core/src/storage/InMemoryMessageRepository.ts deleted file mode 100644 index cf98440a7d..0000000000 --- a/packages/core/src/storage/InMemoryMessageRepository.ts +++ /dev/null @@ -1,41 +0,0 @@ -import type { MessageRepository } from './MessageRepository' -import type { EncryptedMessage } from '../types' - -import { InjectionSymbols } from '../constants' -import { Logger } from '../logger' -import { injectable, inject } from '../plugins' - -@injectable() -export class InMemoryMessageRepository implements MessageRepository { - private logger: Logger - private messages: { [key: string]: EncryptedMessage[] } = {} - - public constructor(@inject(InjectionSymbols.Logger) logger: Logger) { - this.logger = logger - } - - public getAvailableMessageCount(connectionId: string): number | Promise { - return this.messages[connectionId] ? this.messages[connectionId].length : 0 - } - - public takeFromQueue(connectionId: string, limit?: number, keepMessages?: boolean) { - if (!this.messages[connectionId]) { - return [] - } - - const messagesToTake = limit ?? this.messages[connectionId].length - this.logger.debug(`Taking ${messagesToTake} messages from queue for connection ${connectionId}`) - - return keepMessages - ? this.messages[connectionId].slice(0, messagesToTake) - : this.messages[connectionId].splice(0, messagesToTake) - } - - public add(connectionId: string, payload: EncryptedMessage) { - if (!this.messages[connectionId]) { - this.messages[connectionId] = [] - } - - this.messages[connectionId].push(payload) - } -} diff --git a/packages/core/src/storage/MessageRepository.ts b/packages/core/src/storage/MessageRepository.ts deleted file mode 100644 index d12c7b6c07..0000000000 --- a/packages/core/src/storage/MessageRepository.ts +++ /dev/null @@ -1,11 +0,0 @@ -import type { EncryptedMessage } from '../types' - -export interface MessageRepository { - getAvailableMessageCount(connectionId: string): number | Promise - takeFromQueue( - connectionId: string, - limit?: number, - keepMessages?: boolean - ): EncryptedMessage[] | Promise - add(connectionId: string, payload: EncryptedMessage): void | Promise -} diff --git a/packages/core/src/transport/TransportEventTypes.ts b/packages/core/src/transport/TransportEventTypes.ts index 8916724e86..1dd34674d3 100644 --- a/packages/core/src/transport/TransportEventTypes.ts +++ b/packages/core/src/transport/TransportEventTypes.ts @@ -1,8 +1,11 @@ import type { BaseEvent } from '../agent/Events' +import type { TransportSession } from '../agent/TransportService' export enum TransportEventTypes { OutboundWebSocketClosedEvent = 'OutboundWebSocketClosedEvent', OutboundWebSocketOpenedEvent = 'OutboundWebSocketOpenedEvent', + TransportSessionSaved = 'TransportSessionSaved', + TransportSessionRemoved = 'TransportSessionRemoved', } export interface OutboundWebSocketClosedEvent extends BaseEvent { @@ -20,3 +23,17 @@ export interface OutboundWebSocketOpenedEvent extends BaseEvent { connectionId?: string } } + +export interface TransportSessionSavedEvent extends BaseEvent { + type: typeof TransportEventTypes.TransportSessionSaved + payload: { + session: TransportSession + } +} + +export interface TransportSessionRemovedEvent extends BaseEvent { + type: typeof TransportEventTypes.TransportSessionRemoved + payload: { + session: TransportSession + } +} diff --git a/packages/core/tests/helpers.ts b/packages/core/tests/helpers.ts index 6372d3caaa..260bb9fc27 100644 --- a/packages/core/tests/helpers.ts +++ b/packages/core/tests/helpers.ts @@ -99,12 +99,13 @@ export function getAskarWalletConfig( export function getAgentOptions( name: string, extraConfig: Partial = {}, - inputModules?: AgentModules -): { config: InitConfig; modules: AgentModules; dependencies: AgentDependencies } { + inputModules?: AgentModules, + inMemoryWallet = true +): { config: InitConfig; modules: AgentModules; dependencies: AgentDependencies; inMemory?: boolean } { const random = uuid().slice(0, 4) const config: InitConfig = { label: `Agent: ${name} - ${random}`, - walletConfig: getAskarWalletConfig(name, { inMemory: true, random }), + walletConfig: getAskarWalletConfig(name, { inMemory: inMemoryWallet, random }), // TODO: determine the log level based on an environment variable. This will make it // possible to run e.g. failed github actions in debug mode for extra logs logger: TestLogger.fromLogger(testLogger, name), diff --git a/tests/e2e-ws-pickup-v2.test.ts b/tests/e2e-ws-pickup-v2.test.ts index 3a61a0ff2a..281fae0d67 100644 --- a/tests/e2e-ws-pickup-v2.test.ts +++ b/tests/e2e-ws-pickup-v2.test.ts @@ -2,6 +2,7 @@ import type { AnonCredsTestsAgent } from '../packages/anoncreds/tests/legacyAnon import { getAnonCredsIndyModules } from '../packages/anoncreds/tests/legacyAnonCredsSetup' import { askarModule } from '../packages/askar/tests/helpers' +import { MessageForwardingStrategy } from '../packages/core/src/modules/routing/MessageForwardingStrategy' import { getAgentOptions } from '../packages/core/tests/helpers' import { e2eTest } from './e2e-test' @@ -18,19 +19,6 @@ import { WsInboundTransport } from '@credo-ts/node' // FIXME: somehow if we use the in memory wallet and storage service in the WS test it will fail, // but it succeeds with Askar. We should look into this at some point -const recipientOptions = getAgentOptions( - 'E2E WS Pickup V2 Recipient ', - {}, - { - ...getAnonCredsIndyModules({ - autoAcceptCredentials: AutoAcceptCredential.ContentApproved, - }), - mediationRecipient: new MediationRecipientModule({ - mediatorPickupStrategy: MediatorPickupStrategy.PickUpV2, - }), - askar: askarModule, - } -) // FIXME: port numbers should not depend on availability from other test suites that use web sockets const mediatorPort = 4100 @@ -43,7 +31,10 @@ const mediatorOptions = getAgentOptions( ...getAnonCredsIndyModules({ autoAcceptCredentials: AutoAcceptCredential.ContentApproved, }), - mediator: new MediatorModule({ autoAcceptMediationRequests: true }), + mediator: new MediatorModule({ + autoAcceptMediationRequests: true, + messageForwardingStrategy: MessageForwardingStrategy.QueueAndLiveModeDelivery, + }), askar: askarModule, } ) @@ -72,7 +63,6 @@ describe('E2E WS Pickup V2 tests', () => { let senderAgent: AnonCredsTestsAgent beforeEach(async () => { - recipientAgent = new Agent(recipientOptions) as unknown as AnonCredsTestsAgent mediatorAgent = new Agent(mediatorOptions) as unknown as AnonCredsTestsAgent senderAgent = new Agent(senderOptions) as unknown as AnonCredsTestsAgent }) @@ -86,7 +76,62 @@ describe('E2E WS Pickup V2 tests', () => { await senderAgent.wallet.delete() }) - test('Full WS flow (connect, request mediation, issue, verify) using Message Pickup V2', async () => { + test('Full WS flow (connect, request mediation, issue, verify) using Message Pickup V2 polling mode', async () => { + const recipientOptions = getAgentOptions( + 'E2E WS Pickup V2 Recipient polling mode', + {}, + { + ...getAnonCredsIndyModules({ + autoAcceptCredentials: AutoAcceptCredential.ContentApproved, + }), + mediationRecipient: new MediationRecipientModule({ + mediatorPickupStrategy: MediatorPickupStrategy.PickUpV2, + mediatorPollingInterval: 1000, + }), + askar: askarModule, + } + ) + + recipientAgent = new Agent(recipientOptions) as unknown as AnonCredsTestsAgent + + // Recipient Setup + recipientAgent.registerOutboundTransport(new WsOutboundTransport()) + await recipientAgent.initialize() + + // Mediator Setup + mediatorAgent.registerInboundTransport(new WsInboundTransport({ port: mediatorPort })) + mediatorAgent.registerOutboundTransport(new WsOutboundTransport()) + await mediatorAgent.initialize() + + // Sender Setup + senderAgent.registerInboundTransport(new WsInboundTransport({ port: senderPort })) + senderAgent.registerOutboundTransport(new WsOutboundTransport()) + await senderAgent.initialize() + + await e2eTest({ + mediatorAgent, + senderAgent, + recipientAgent, + }) + }) + + test('Full WS flow (connect, request mediation, issue, verify) using Message Pickup V2 live mode', async () => { + const recipientOptions = getAgentOptions( + 'E2E WS Pickup V2 Recipient live mode', + {}, + { + ...getAnonCredsIndyModules({ + autoAcceptCredentials: AutoAcceptCredential.ContentApproved, + }), + mediationRecipient: new MediationRecipientModule({ + mediatorPickupStrategy: MediatorPickupStrategy.PickUpV2LiveMode, + }), + askar: askarModule, + } + ) + + recipientAgent = new Agent(recipientOptions) as unknown as AnonCredsTestsAgent + // Recipient Setup recipientAgent.registerOutboundTransport(new WsOutboundTransport()) await recipientAgent.initialize()