Skip to content

Commit

Permalink
feat!: message pickup live mode support (#1638)
Browse files Browse the repository at this point in the history
Signed-off-by: Ariel Gentile <gentilester@gmail.com>

BREAKING CHANGES: 
- `MessageRepository` interface has been renamed to `MessagePickupRepository` and includes a complete new API. It can now be defined as a configuration parameter of `MessagePickupModule` rather than injecting it externally
- `MediatorPickupStrategy.PickupV2` now starts a periodic polling loop using Message Pickup V2 protocol. Use `MediatorPickupStrategy.PickupV2LiveMode` instead if you want to establish a persistent socket session without using polling
  • Loading branch information
genaris committed Jan 31, 2024
1 parent 22d5bff commit 1b70d24
Show file tree
Hide file tree
Showing 56 changed files with 1,215 additions and 366 deletions.
6 changes: 2 additions & 4 deletions packages/core/src/agent/Agent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import { JwsService } from '../crypto/JwsService'
import { AriesFrameworkError } from '../error'
import { DependencyManager } from '../plugins'
import { DidCommMessageRepository, StorageUpdateService, StorageVersionRepository } from '../storage'
import { InMemoryMessageRepository } from '../storage/InMemoryMessageRepository'

import { AgentConfig } from './AgentConfig'
import { extendModulesWithDefaultModules } from './AgentModules'
Expand Down Expand Up @@ -90,9 +89,6 @@ export class Agent<AgentModules extends AgentModulesInput = any> extends BaseAge
"Missing required dependency: 'StorageService'. You can register it using the AskarModule, or implement your own."
)
}
if (!dependencyManager.isRegistered(InjectionSymbols.MessageRepository)) {
dependencyManager.registerSingleton(InjectionSymbols.MessageRepository, InMemoryMessageRepository)
}

// TODO: contextCorrelationId for base wallet
// Bind the default agent context to the container for use in modules etc.
Expand Down Expand Up @@ -197,6 +193,8 @@ export class Agent<AgentModules extends AgentModulesInput = any> extends BaseAge
)
await this.mediationRecipient.provision(mediationConnection)
}

await this.messagePickup.initialize()
await this.mediator.initialize()
await this.mediationRecipient.initialize()

Expand Down
9 changes: 9 additions & 0 deletions packages/core/src/agent/AgentMessage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,15 @@ export class AgentMessage extends Decorated {
@Exclude()
public readonly allowDidSovPrefix: boolean = false

/**
* Whether to use Queue Transport in case the recipient of this message does not have a reliable
* endpoint available
*
* @see https://github.com/decentralized-identity/didcomm-messaging/blob/main/extensions/return_route/main.md#queue-transport
*/
@Exclude()
public readonly allowQueueTransport: boolean = true

public toJSON({ useDidSovPrefixWhereAllowed }: { useDidSovPrefixWhereAllowed?: boolean } = {}): PlaintextMessage {
const json = JsonTransformer.toJSON(this)

Expand Down
30 changes: 20 additions & 10 deletions packages/core/src/agent/MessageSender.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,21 @@ import type { TransportSession } from './TransportService'
import type { AgentContext } from './context'
import type { ConnectionRecord } from '../modules/connections'
import type { ResolvedDidCommService } from '../modules/didcomm'
import type { DidDocument } from '../modules/dids'
import type { OutOfBandRecord } from '../modules/oob/repository'
import type { OutboundTransport } from '../transport/OutboundTransport'
import type { OutboundPackage, EncryptedMessage } from '../types'
import type { EncryptedMessage, OutboundPackage } from '../types'

import { DID_COMM_TRANSPORT_QUEUE, InjectionSymbols } from '../constants'
import { ReturnRouteTypes } from '../decorators/transport/TransportDecorator'
import { AriesFrameworkError, MessageSendingError } from '../error'
import { Logger } from '../logger'
import { DidCommDocumentService } from '../modules/didcomm'
import { DidKey, type DidDocument } from '../modules/dids'
import { getKeyFromVerificationMethod } from '../modules/dids/domain/key-type'
import { didKeyToInstanceOfKey } from '../modules/dids/helpers'
import { didKeyToInstanceOfKey, verkeyToDidKey } from '../modules/dids/helpers'
import { DidResolverService } from '../modules/dids/services/DidResolverService'
import { MessagePickupRepository } from '../modules/message-pickup/storage'
import { inject, injectable } from '../plugins'
import { MessageRepository } from '../storage/MessageRepository'
import { MessageValidator } from '../utils/MessageValidator'
import { getProtocolScheme } from '../utils/uri'

Expand All @@ -38,7 +38,7 @@ export interface TransportPriorityOptions {
export class MessageSender {
private envelopeService: EnvelopeService
private transportService: TransportService
private messageRepository: MessageRepository
private messagePickupRepository: MessagePickupRepository
private logger: Logger
private didResolverService: DidResolverService
private didCommDocumentService: DidCommDocumentService
Expand All @@ -48,15 +48,15 @@ export class MessageSender {
public constructor(
envelopeService: EnvelopeService,
transportService: TransportService,
@inject(InjectionSymbols.MessageRepository) messageRepository: MessageRepository,
@inject(InjectionSymbols.MessagePickupRepository) messagePickupRepository: MessagePickupRepository,
@inject(InjectionSymbols.Logger) logger: Logger,
didResolverService: DidResolverService,
didCommDocumentService: DidCommDocumentService,
eventEmitter: EventEmitter
) {
this.envelopeService = envelopeService
this.transportService = transportService
this.messageRepository = messageRepository
this.messagePickupRepository = messagePickupRepository
this.logger = logger
this.didResolverService = didResolverService
this.didCommDocumentService = didCommDocumentService
Expand Down Expand Up @@ -113,9 +113,11 @@ export class MessageSender {
{
connection,
encryptedMessage,
recipientKey,
options,
}: {
connection: ConnectionRecord
recipientKey: string
encryptedMessage: EncryptedMessage
options?: { transportPriority?: TransportPriorityOptions }
}
Expand Down Expand Up @@ -176,7 +178,11 @@ export class MessageSender {
// If the other party shared a queue service endpoint in their did doc we queue the message
if (queueService) {
this.logger.debug(`Queue packed message for connection ${connection.id} (${connection.theirLabel})`)
await this.messageRepository.add(connection.id, encryptedMessage)
await this.messagePickupRepository.addMessage({
connectionId: connection.id,
recipientDids: [verkeyToDidKey(recipientKey)],
payload: encryptedMessage,
})
return
}

Expand Down Expand Up @@ -318,7 +324,7 @@ export class MessageSender {

// We didn't succeed to send the message over open session, or directly to serviceEndpoint
// If the other party shared a queue service endpoint in their did doc we queue the message
if (queueService) {
if (queueService && message.allowQueueTransport) {
this.logger.debug(`Queue message for connection ${connection.id} (${connection.theirLabel})`)

const keys = {
Expand All @@ -328,7 +334,11 @@ export class MessageSender {
}

const encryptedMessage = await this.envelopeService.packMessage(agentContext, message, keys)
await this.messageRepository.add(connection.id, encryptedMessage)
await this.messagePickupRepository.addMessage({
connectionId: connection.id,
recipientDids: keys.recipientKeys.map((item) => new DidKey(item).did),
payload: encryptedMessage,
})

this.emitMessageSentEvent(outboundMessageContext, OutboundMessageSendStatus.QueuedForPickup)

Expand Down
28 changes: 26 additions & 2 deletions packages/core/src/agent/TransportService.ts
Original file line number Diff line number Diff line change
@@ -1,27 +1,45 @@
import type { AgentMessage } from './AgentMessage'
import type { EnvelopeKeys } from './EnvelopeService'
import type { AgentContext } from './context'
import type { DidDocument } from '../modules/dids'
import type { TransportSessionRemovedEvent, TransportSessionSavedEvent } from '../transport'
import type { EncryptedMessage } from '../types'

import { DID_COMM_TRANSPORT_QUEUE } from '../constants'
import { AriesFrameworkError } from '../error'
import { injectable } from '../plugins'
import { TransportEventTypes } from '../transport'

import { EventEmitter } from './EventEmitter'
import { AgentContext } from './context'

@injectable()
export class TransportService {
public transportSessionTable: TransportSessionTable = {}
private agentContext: AgentContext
private eventEmitter: EventEmitter

public constructor(agentContext: AgentContext, eventEmitter: EventEmitter) {
this.agentContext = agentContext
this.eventEmitter = eventEmitter
}

public saveSession(session: TransportSession) {
if (session.connectionId) {
const oldSessions = this.getExistingSessionsForConnectionIdAndType(session.connectionId, session.type)
oldSessions.forEach((oldSession) => {
if (oldSession) {
if (oldSession && oldSession.id !== session.id) {
this.removeSession(oldSession)
}
})
}
this.transportSessionTable[session.id] = session

this.eventEmitter.emit<TransportSessionSavedEvent>(this.agentContext, {
type: TransportEventTypes.TransportSessionSaved,
payload: {
session,
},
})
}

public findSessionByConnectionId(connectionId: string) {
Expand All @@ -47,6 +65,12 @@ export class TransportService {

public removeSession(session: TransportSession) {
delete this.transportSessionTable[session.id]
this.eventEmitter.emit<TransportSessionRemovedEvent>(this.agentContext, {
type: TransportEventTypes.TransportSessionRemoved,
payload: {
session,
},
})
}

private getExistingSessionsForConnectionIdAndType(connectionId: string, type: string) {
Expand Down
11 changes: 6 additions & 5 deletions packages/core/src/agent/__tests__/Agent.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import { ConnectionService } from '../../modules/connections/services/Connection
import { TrustPingService } from '../../modules/connections/services/TrustPingService'
import { CredentialRepository } from '../../modules/credentials'
import { CredentialsApi } from '../../modules/credentials/CredentialsApi'
import { MessagePickupApi } from '../../modules/message-pickup'
import { MessagePickupApi, InMemoryMessagePickupRepository } from '../../modules/message-pickup'
import { ProofRepository } from '../../modules/proofs'
import { ProofsApi } from '../../modules/proofs/ProofsApi'
import {
Expand All @@ -25,7 +25,6 @@ import {
MediationRecipientApi,
MediationRecipientModule,
} from '../../modules/routing'
import { InMemoryMessageRepository } from '../../storage/InMemoryMessageRepository'
import { WalletError } from '../../wallet/error'
import { Agent } from '../Agent'
import { Dispatcher } from '../Dispatcher'
Expand Down Expand Up @@ -181,7 +180,9 @@ describe('Agent', () => {

// Symbols, interface based
expect(container.resolve(InjectionSymbols.Logger)).toBe(agentOptions.config.logger)
expect(container.resolve(InjectionSymbols.MessageRepository)).toBeInstanceOf(InMemoryMessageRepository)
expect(container.resolve(InjectionSymbols.MessagePickupRepository)).toBeInstanceOf(
InMemoryMessagePickupRepository
)

// Agent
expect(container.resolve(MessageSender)).toBeInstanceOf(MessageSender)
Expand Down Expand Up @@ -220,8 +221,8 @@ describe('Agent', () => {

// Symbols, interface based
expect(container.resolve(InjectionSymbols.Logger)).toBe(container.resolve(InjectionSymbols.Logger))
expect(container.resolve(InjectionSymbols.MessageRepository)).toBe(
container.resolve(InjectionSymbols.MessageRepository)
expect(container.resolve(InjectionSymbols.MessagePickupRepository)).toBe(
container.resolve(InjectionSymbols.MessagePickupRepository)
)
expect(container.resolve(InjectionSymbols.StorageService)).toBe(
container.resolve(InjectionSymbols.StorageService)
Expand Down
18 changes: 9 additions & 9 deletions packages/core/src/agent/__tests__/MessageSender.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import type { ConnectionRecord } from '../../modules/connections'
import type { ResolvedDidCommService } from '../../modules/didcomm'
import type { DidDocumentService } from '../../modules/dids'
import type { MessageRepository } from '../../storage/MessageRepository'
import type { MessagePickupRepository } from '../../modules/message-pickup/storage'
import type { OutboundTransport } from '../../transport'
import type { EncryptedMessage } from '../../types'
import type { AgentMessageSentEvent } from '../Events'
Expand All @@ -24,7 +24,7 @@ import { DidCommDocumentService } from '../../modules/didcomm'
import { DidResolverService, DidDocument, VerificationMethod } from '../../modules/dids'
import { DidCommV1Service } from '../../modules/dids/domain/service/DidCommV1Service'
import { verkeyToInstanceOfKey } from '../../modules/dids/helpers'
import { InMemoryMessageRepository } from '../../storage/InMemoryMessageRepository'
import { InMemoryMessagePickupRepository } from '../../modules/message-pickup/storage'
import { EnvelopeService as EnvelopeServiceImpl } from '../EnvelopeService'
import { EventEmitter } from '../EventEmitter'
import { AgentEventTypes } from '../Events'
Expand Down Expand Up @@ -114,7 +114,7 @@ describe('MessageSender', () => {
sessionWithoutKeys.inboundMessage = inboundMessage
sessionWithoutKeys.send = jest.fn()

const transportService = new TransportService()
const transportService = new TransportService(getAgentContext(), eventEmitter)
const transportServiceFindSessionMock = mockFunction(transportService.findSessionByConnectionId)
const transportServiceFindSessionByIdMock = mockFunction(transportService.findSessionById)
const transportServiceHasInboundEndpoint = mockFunction(transportService.hasInboundEndpoint)
Expand All @@ -132,7 +132,7 @@ describe('MessageSender', () => {

let messageSender: MessageSender
let outboundTransport: OutboundTransport
let messageRepository: MessageRepository
let messagePickupRepository: MessagePickupRepository
let connection: ConnectionRecord
let outboundMessageContext: OutboundMessageContext
const agentConfig = getAgentConfig('MessageSender')
Expand All @@ -147,11 +147,11 @@ describe('MessageSender', () => {
eventEmitter.on<AgentMessageSentEvent>(AgentEventTypes.AgentMessageSent, eventListenerMock)

outboundTransport = new DummyHttpOutboundTransport()
messageRepository = new InMemoryMessageRepository(agentConfig.logger)
messagePickupRepository = new InMemoryMessagePickupRepository(agentConfig.logger)
messageSender = new MessageSender(
enveloperService,
transportService,
messageRepository,
messagePickupRepository,
logger,
didResolverService,
didCommDocumentService,
Expand Down Expand Up @@ -497,7 +497,7 @@ describe('MessageSender', () => {
messageSender = new MessageSender(
enveloperService,
transportService,
new InMemoryMessageRepository(agentConfig.logger),
new InMemoryMessagePickupRepository(agentConfig.logger),
logger,
didResolverService,
didCommDocumentService,
Expand Down Expand Up @@ -636,11 +636,11 @@ describe('MessageSender', () => {
describe('packMessage', () => {
beforeEach(() => {
outboundTransport = new DummyHttpOutboundTransport()
messageRepository = new InMemoryMessageRepository(agentConfig.logger)
messagePickupRepository = new InMemoryMessagePickupRepository(agentConfig.logger)
messageSender = new MessageSender(
enveloperService,
transportService,
messageRepository,
messagePickupRepository,
logger,
didResolverService,
didCommDocumentService,
Expand Down
7 changes: 5 additions & 2 deletions packages/core/src/agent/__tests__/TransportService.test.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
import { getMockConnection } from '../../../tests/helpers'
import { Subject } from 'rxjs'

import { agentDependencies, getAgentContext, getMockConnection } from '../../../tests/helpers'
import { DidExchangeRole } from '../../modules/connections'
import { EventEmitter } from '../EventEmitter'
import { TransportService } from '../TransportService'

import { DummyTransportSession } from './stubs'
Expand All @@ -9,7 +12,7 @@ describe('TransportService', () => {
let transportService: TransportService

beforeEach(() => {
transportService = new TransportService()
transportService = new TransportService(getAgentContext(), new EventEmitter(agentDependencies, new Subject()))
})

test(`remove session saved for a given connection`, () => {
Expand Down
2 changes: 1 addition & 1 deletion packages/core/src/constants.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
export const InjectionSymbols = {
MessageRepository: Symbol('MessageRepository'),
MessagePickupRepository: Symbol('MessagePickupRepository'),
StorageService: Symbol('StorageService'),
Logger: Symbol('Logger'),
AgentContextProvider: Symbol('AgentContextProvider'),
Expand Down
2 changes: 1 addition & 1 deletion packages/core/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ export { DidCommMimeType, KeyDerivationMethod } from './types'
export type { FileSystem, DownloadToFileOptions } from './storage/FileSystem'
export * from './storage/BaseRecord'
export { DidCommMessageRecord, DidCommMessageRole, DidCommMessageRepository } from './storage/didcomm'
export { InMemoryMessageRepository } from './storage/InMemoryMessageRepository'
export { Repository } from './storage/Repository'
export * from './storage/RepositoryEvents'
export { StorageService, Query, SimpleQuery, BaseRecordConstructor } from './storage/StorageService'
Expand All @@ -53,6 +52,7 @@ export * from './modules/basic-messages'
export * from './modules/common'
export * from './modules/credentials'
export * from './modules/discover-features'
export * from './modules/message-pickup'
export * from './modules/problem-reports'
export * from './modules/proofs'
export * from './modules/connections'
Expand Down
Loading

0 comments on commit 1b70d24

Please sign in to comment.