Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat!: message pickup live mode support #1638

Merged
merged 19 commits into from
Jan 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
7e6970d
feat!: initial live mode support + message repository refactor
genaris Nov 15, 2023
2f40c6a
fix(core): allow string for did document controller (#1644)
berendsliedrecht Nov 21, 2023
664c891
feat(anoncreds): issue revocable credentials (#1427)
genaris Nov 22, 2023
e5cb74e
refactor(anoncreds)!: move supportRevocation to options (#1648)
TimoGlastra Nov 27, 2023
ff54595
refactor: move message-pickup directory (#1650)
genaris Nov 27, 2023
55ba318
feat: addressing some feedback and fixes
genaris Nov 28, 2023
8e7c1f8
fix: properly register session service
genaris Nov 29, 2023
c198993
fix: don't send trust ping in V2 pickup protocol
genaris Nov 29, 2023
5afcd33
fix: only remove session if it's not the new one
genaris Dec 13, 2023
ea27b62
Merge branch 'main' into feat/pickup-live-mode
genaris Jan 29, 2024
dfbb1b9
fix: merge issues
genaris Jan 29, 2024
1816dbb
fix: default to `DeliverOnly` as it is current behaviour
genaris Jan 29, 2024
8af1d33
Merge branch 'main' into feat/pickup-live-mode
genaris Jan 30, 2024
b6f952d
feat: add flag to avoid message-pickup messages to be queued
genaris Jan 30, 2024
f39fe5c
refactor: some setting naming and params to be more clear
genaris Jan 31, 2024
4cf3aa2
Merge branch 'main' into feat/pickup-live-mode
genaris Jan 31, 2024
b41e818
fix: use recipientDid instead of recipientKey
genaris Jan 31, 2024
180695c
Merge branch 'main' into feat/pickup-live-mode
genaris Jan 31, 2024
b22dd3e
Merge branch 'main' into feat/pickup-live-mode
genaris Jan 31, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 2 additions & 4 deletions packages/core/src/agent/Agent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import { JwsService } from '../crypto/JwsService'
import { AriesFrameworkError } from '../error'
import { DependencyManager } from '../plugins'
import { DidCommMessageRepository, StorageUpdateService, StorageVersionRepository } from '../storage'
import { InMemoryMessageRepository } from '../storage/InMemoryMessageRepository'

import { AgentConfig } from './AgentConfig'
import { extendModulesWithDefaultModules } from './AgentModules'
Expand Down Expand Up @@ -90,9 +89,6 @@ export class Agent<AgentModules extends AgentModulesInput = any> extends BaseAge
"Missing required dependency: 'StorageService'. You can register it using the AskarModule, or implement your own."
)
}
if (!dependencyManager.isRegistered(InjectionSymbols.MessageRepository)) {
dependencyManager.registerSingleton(InjectionSymbols.MessageRepository, InMemoryMessageRepository)
}

// TODO: contextCorrelationId for base wallet
// Bind the default agent context to the container for use in modules etc.
Expand Down Expand Up @@ -197,6 +193,8 @@ export class Agent<AgentModules extends AgentModulesInput = any> extends BaseAge
)
await this.mediationRecipient.provision(mediationConnection)
}

await this.messagePickup.initialize()
await this.mediator.initialize()
await this.mediationRecipient.initialize()

Expand Down
9 changes: 9 additions & 0 deletions packages/core/src/agent/AgentMessage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,15 @@ export class AgentMessage extends Decorated {
@Exclude()
public readonly allowDidSovPrefix: boolean = false

/**
* Whether to use Queue Transport in case the recipient of this message does not have a reliable
* endpoint available
*
* @see https://github.com/decentralized-identity/didcomm-messaging/blob/main/extensions/return_route/main.md#queue-transport
*/
@Exclude()
public readonly allowQueueTransport: boolean = true

public toJSON({ useDidSovPrefixWhereAllowed }: { useDidSovPrefixWhereAllowed?: boolean } = {}): PlaintextMessage {
const json = JsonTransformer.toJSON(this)

Expand Down
30 changes: 20 additions & 10 deletions packages/core/src/agent/MessageSender.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,21 @@ import type { TransportSession } from './TransportService'
import type { AgentContext } from './context'
import type { ConnectionRecord } from '../modules/connections'
import type { ResolvedDidCommService } from '../modules/didcomm'
import type { DidDocument } from '../modules/dids'
import type { OutOfBandRecord } from '../modules/oob/repository'
import type { OutboundTransport } from '../transport/OutboundTransport'
import type { OutboundPackage, EncryptedMessage } from '../types'
import type { EncryptedMessage, OutboundPackage } from '../types'

import { DID_COMM_TRANSPORT_QUEUE, InjectionSymbols } from '../constants'
import { ReturnRouteTypes } from '../decorators/transport/TransportDecorator'
import { AriesFrameworkError, MessageSendingError } from '../error'
import { Logger } from '../logger'
import { DidCommDocumentService } from '../modules/didcomm'
import { DidKey, type DidDocument } from '../modules/dids'
import { getKeyFromVerificationMethod } from '../modules/dids/domain/key-type'
import { didKeyToInstanceOfKey } from '../modules/dids/helpers'
import { didKeyToInstanceOfKey, verkeyToDidKey } from '../modules/dids/helpers'
import { DidResolverService } from '../modules/dids/services/DidResolverService'
import { MessagePickupRepository } from '../modules/message-pickup/storage'
import { inject, injectable } from '../plugins'
import { MessageRepository } from '../storage/MessageRepository'
import { MessageValidator } from '../utils/MessageValidator'
import { getProtocolScheme } from '../utils/uri'

Expand All @@ -38,7 +38,7 @@ export interface TransportPriorityOptions {
export class MessageSender {
private envelopeService: EnvelopeService
private transportService: TransportService
private messageRepository: MessageRepository
private messagePickupRepository: MessagePickupRepository
private logger: Logger
private didResolverService: DidResolverService
private didCommDocumentService: DidCommDocumentService
Expand All @@ -48,15 +48,15 @@ export class MessageSender {
public constructor(
envelopeService: EnvelopeService,
transportService: TransportService,
@inject(InjectionSymbols.MessageRepository) messageRepository: MessageRepository,
@inject(InjectionSymbols.MessagePickupRepository) messagePickupRepository: MessagePickupRepository,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should rename repository, as it conflicts with all the other repositories that are based on the storage service.

Maybe we can use messagePickupQueue? Or messagePickupStorage?

Or maybe I should change my thinking on what a repository can be. As repository does make sense

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At the beginning I named it MessagePickupQueue but discussing about it with my colleagues we noticed that we are not using it as a queue, in the sense that we are enqueuing data to it but not exactly dequeuing data when retrieving it (as we must store until it is actually acknowledged by the mediatee). Also, the data on it is somewhat organized (i.e. by connectionId, recipientKey, state, etc.) so the term 'Repository' feels to me more appropriate than 'Storage', which is usually used to generically save data.

Having said this, I'm not happy because the unfortunate name clash with Repository interface, so I'm more than open to opinions about how to improve the naming!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think MessageRepository is best i.e. it's the place where you store and fetch messages. I think xxxQueue would tend to incorrectly enforce the underlying technology, when any persistent storage may actually work. Pickup is just a fragment of the interface (we're not 'picking up' messages received that cannot be delivered, rather we're queuing them for later pickup), so I would just drop it to avoid narrowing the exhibited responsibility.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem with MessageRepository is that it collides with Repository classes used within the framework, most notably DidCommMessageRepository, so leaving such generic name would be more confusing. The 'Pickup' word tries to specify that it is used for message pickup protocol.

It seems it is still not clear, as this MessagePickupRepository is an interface rather than an implementation, like the other Repository.

@inject(InjectionSymbols.Logger) logger: Logger,
didResolverService: DidResolverService,
didCommDocumentService: DidCommDocumentService,
eventEmitter: EventEmitter
) {
this.envelopeService = envelopeService
this.transportService = transportService
this.messageRepository = messageRepository
this.messagePickupRepository = messagePickupRepository
this.logger = logger
this.didResolverService = didResolverService
this.didCommDocumentService = didCommDocumentService
Expand Down Expand Up @@ -113,9 +113,11 @@ export class MessageSender {
{
connection,
encryptedMessage,
recipientKey,
options,
}: {
connection: ConnectionRecord
recipientKey: string
encryptedMessage: EncryptedMessage
options?: { transportPriority?: TransportPriorityOptions }
}
Expand Down Expand Up @@ -176,7 +178,11 @@ export class MessageSender {
// If the other party shared a queue service endpoint in their did doc we queue the message
if (queueService) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we at some point change this to only queue when it is allowed? So a connection must have queueing enabled?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah yes, I remember this from #1199. I think it will be important to find a way to configure this. I'm trying to find the best place to update the code to support this, as I don't want to make MessageSender dependent on Mediator or Pickup modules API (we'll have some circular dependencies).

this.logger.debug(`Queue packed message for connection ${connection.id} (${connection.theirLabel})`)
await this.messageRepository.add(connection.id, encryptedMessage)
await this.messagePickupRepository.addMessage({
connectionId: connection.id,
recipientDids: [verkeyToDidKey(recipientKey)],
payload: encryptedMessage,
})
return
}

Expand Down Expand Up @@ -318,7 +324,7 @@ export class MessageSender {

// We didn't succeed to send the message over open session, or directly to serviceEndpoint
// If the other party shared a queue service endpoint in their did doc we queue the message
if (queueService) {
if (queueService && message.allowQueueTransport) {
this.logger.debug(`Queue message for connection ${connection.id} (${connection.theirLabel})`)

const keys = {
Expand All @@ -328,7 +334,11 @@ export class MessageSender {
}

const encryptedMessage = await this.envelopeService.packMessage(agentContext, message, keys)
await this.messageRepository.add(connection.id, encryptedMessage)
await this.messagePickupRepository.addMessage({
connectionId: connection.id,
recipientDids: keys.recipientKeys.map((item) => new DidKey(item).did),
payload: encryptedMessage,
})

this.emitMessageSentEvent(outboundMessageContext, OutboundMessageSendStatus.QueuedForPickup)

Expand Down
28 changes: 26 additions & 2 deletions packages/core/src/agent/TransportService.ts
Original file line number Diff line number Diff line change
@@ -1,27 +1,45 @@
import type { AgentMessage } from './AgentMessage'
import type { EnvelopeKeys } from './EnvelopeService'
import type { AgentContext } from './context'
import type { DidDocument } from '../modules/dids'
import type { TransportSessionRemovedEvent, TransportSessionSavedEvent } from '../transport'
import type { EncryptedMessage } from '../types'

import { DID_COMM_TRANSPORT_QUEUE } from '../constants'
import { AriesFrameworkError } from '../error'
import { injectable } from '../plugins'
import { TransportEventTypes } from '../transport'

import { EventEmitter } from './EventEmitter'
import { AgentContext } from './context'

@injectable()
export class TransportService {
public transportSessionTable: TransportSessionTable = {}
private agentContext: AgentContext
private eventEmitter: EventEmitter

public constructor(agentContext: AgentContext, eventEmitter: EventEmitter) {
this.agentContext = agentContext
this.eventEmitter = eventEmitter
}

public saveSession(session: TransportSession) {
if (session.connectionId) {
const oldSessions = this.getExistingSessionsForConnectionIdAndType(session.connectionId, session.type)
oldSessions.forEach((oldSession) => {
if (oldSession) {
if (oldSession && oldSession.id !== session.id) {
this.removeSession(oldSession)
}
})
}
this.transportSessionTable[session.id] = session

this.eventEmitter.emit<TransportSessionSavedEvent>(this.agentContext, {
type: TransportEventTypes.TransportSessionSaved,
payload: {
session,
},
})
}

public findSessionByConnectionId(connectionId: string) {
Expand All @@ -47,6 +65,12 @@ export class TransportService {

public removeSession(session: TransportSession) {
delete this.transportSessionTable[session.id]
this.eventEmitter.emit<TransportSessionRemovedEvent>(this.agentContext, {
type: TransportEventTypes.TransportSessionRemoved,
payload: {
session,
},
})
}

private getExistingSessionsForConnectionIdAndType(connectionId: string, type: string) {
Expand Down
11 changes: 6 additions & 5 deletions packages/core/src/agent/__tests__/Agent.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import { ConnectionService } from '../../modules/connections/services/Connection
import { TrustPingService } from '../../modules/connections/services/TrustPingService'
import { CredentialRepository } from '../../modules/credentials'
import { CredentialsApi } from '../../modules/credentials/CredentialsApi'
import { MessagePickupApi } from '../../modules/message-pickup'
import { MessagePickupApi, InMemoryMessagePickupRepository } from '../../modules/message-pickup'
import { ProofRepository } from '../../modules/proofs'
import { ProofsApi } from '../../modules/proofs/ProofsApi'
import {
Expand All @@ -25,7 +25,6 @@ import {
MediationRecipientApi,
MediationRecipientModule,
} from '../../modules/routing'
import { InMemoryMessageRepository } from '../../storage/InMemoryMessageRepository'
import { WalletError } from '../../wallet/error'
import { Agent } from '../Agent'
import { Dispatcher } from '../Dispatcher'
Expand Down Expand Up @@ -181,7 +180,9 @@ describe('Agent', () => {

// Symbols, interface based
expect(container.resolve(InjectionSymbols.Logger)).toBe(agentOptions.config.logger)
expect(container.resolve(InjectionSymbols.MessageRepository)).toBeInstanceOf(InMemoryMessageRepository)
expect(container.resolve(InjectionSymbols.MessagePickupRepository)).toBeInstanceOf(
InMemoryMessagePickupRepository
)

// Agent
expect(container.resolve(MessageSender)).toBeInstanceOf(MessageSender)
Expand Down Expand Up @@ -220,8 +221,8 @@ describe('Agent', () => {

// Symbols, interface based
expect(container.resolve(InjectionSymbols.Logger)).toBe(container.resolve(InjectionSymbols.Logger))
expect(container.resolve(InjectionSymbols.MessageRepository)).toBe(
container.resolve(InjectionSymbols.MessageRepository)
expect(container.resolve(InjectionSymbols.MessagePickupRepository)).toBe(
container.resolve(InjectionSymbols.MessagePickupRepository)
)
expect(container.resolve(InjectionSymbols.StorageService)).toBe(
container.resolve(InjectionSymbols.StorageService)
Expand Down
18 changes: 9 additions & 9 deletions packages/core/src/agent/__tests__/MessageSender.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import type { ConnectionRecord } from '../../modules/connections'
import type { ResolvedDidCommService } from '../../modules/didcomm'
import type { DidDocumentService } from '../../modules/dids'
import type { MessageRepository } from '../../storage/MessageRepository'
import type { MessagePickupRepository } from '../../modules/message-pickup/storage'
import type { OutboundTransport } from '../../transport'
import type { EncryptedMessage } from '../../types'
import type { AgentMessageSentEvent } from '../Events'
Expand All @@ -24,7 +24,7 @@ import { DidCommDocumentService } from '../../modules/didcomm'
import { DidResolverService, DidDocument, VerificationMethod } from '../../modules/dids'
import { DidCommV1Service } from '../../modules/dids/domain/service/DidCommV1Service'
import { verkeyToInstanceOfKey } from '../../modules/dids/helpers'
import { InMemoryMessageRepository } from '../../storage/InMemoryMessageRepository'
import { InMemoryMessagePickupRepository } from '../../modules/message-pickup/storage'
import { EnvelopeService as EnvelopeServiceImpl } from '../EnvelopeService'
import { EventEmitter } from '../EventEmitter'
import { AgentEventTypes } from '../Events'
Expand Down Expand Up @@ -114,7 +114,7 @@ describe('MessageSender', () => {
sessionWithoutKeys.inboundMessage = inboundMessage
sessionWithoutKeys.send = jest.fn()

const transportService = new TransportService()
const transportService = new TransportService(getAgentContext(), eventEmitter)
const transportServiceFindSessionMock = mockFunction(transportService.findSessionByConnectionId)
const transportServiceFindSessionByIdMock = mockFunction(transportService.findSessionById)
const transportServiceHasInboundEndpoint = mockFunction(transportService.hasInboundEndpoint)
Expand All @@ -132,7 +132,7 @@ describe('MessageSender', () => {

let messageSender: MessageSender
let outboundTransport: OutboundTransport
let messageRepository: MessageRepository
let messagePickupRepository: MessagePickupRepository
let connection: ConnectionRecord
let outboundMessageContext: OutboundMessageContext
const agentConfig = getAgentConfig('MessageSender')
Expand All @@ -147,11 +147,11 @@ describe('MessageSender', () => {
eventEmitter.on<AgentMessageSentEvent>(AgentEventTypes.AgentMessageSent, eventListenerMock)

outboundTransport = new DummyHttpOutboundTransport()
messageRepository = new InMemoryMessageRepository(agentConfig.logger)
messagePickupRepository = new InMemoryMessagePickupRepository(agentConfig.logger)
messageSender = new MessageSender(
enveloperService,
transportService,
messageRepository,
messagePickupRepository,
logger,
didResolverService,
didCommDocumentService,
Expand Down Expand Up @@ -497,7 +497,7 @@ describe('MessageSender', () => {
messageSender = new MessageSender(
enveloperService,
transportService,
new InMemoryMessageRepository(agentConfig.logger),
new InMemoryMessagePickupRepository(agentConfig.logger),
logger,
didResolverService,
didCommDocumentService,
Expand Down Expand Up @@ -636,11 +636,11 @@ describe('MessageSender', () => {
describe('packMessage', () => {
beforeEach(() => {
outboundTransport = new DummyHttpOutboundTransport()
messageRepository = new InMemoryMessageRepository(agentConfig.logger)
messagePickupRepository = new InMemoryMessagePickupRepository(agentConfig.logger)
messageSender = new MessageSender(
enveloperService,
transportService,
messageRepository,
messagePickupRepository,
logger,
didResolverService,
didCommDocumentService,
Expand Down
7 changes: 5 additions & 2 deletions packages/core/src/agent/__tests__/TransportService.test.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
import { getMockConnection } from '../../../tests/helpers'
import { Subject } from 'rxjs'

import { agentDependencies, getAgentContext, getMockConnection } from '../../../tests/helpers'
import { DidExchangeRole } from '../../modules/connections'
import { EventEmitter } from '../EventEmitter'
import { TransportService } from '../TransportService'

import { DummyTransportSession } from './stubs'
Expand All @@ -9,7 +12,7 @@ describe('TransportService', () => {
let transportService: TransportService

beforeEach(() => {
transportService = new TransportService()
transportService = new TransportService(getAgentContext(), new EventEmitter(agentDependencies, new Subject()))
})

test(`remove session saved for a given connection`, () => {
Expand Down
2 changes: 1 addition & 1 deletion packages/core/src/constants.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
export const InjectionSymbols = {
MessageRepository: Symbol('MessageRepository'),
MessagePickupRepository: Symbol('MessagePickupRepository'),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we make this a module config option, and expose the config on the new API, we don't need the injection symbol anymore (as you can inject the api and then do api.config.messagePickupRepository)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree. However I think I'll need to update a bit the logic in order to avoid dependency cycles between MessageSender and MessagePickupApi. Probably adding some parameter to sendMessage or even triggering an internal event that can be listened in Message Pickup.

StorageService: Symbol('StorageService'),
Logger: Symbol('Logger'),
AgentContextProvider: Symbol('AgentContextProvider'),
Expand Down
2 changes: 1 addition & 1 deletion packages/core/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ export { DidCommMimeType, KeyDerivationMethod } from './types'
export type { FileSystem, DownloadToFileOptions } from './storage/FileSystem'
export * from './storage/BaseRecord'
export { DidCommMessageRecord, DidCommMessageRole, DidCommMessageRepository } from './storage/didcomm'
export { InMemoryMessageRepository } from './storage/InMemoryMessageRepository'
export { Repository } from './storage/Repository'
export * from './storage/RepositoryEvents'
export { StorageService, Query, SimpleQuery, BaseRecordConstructor } from './storage/StorageService'
Expand All @@ -53,6 +52,7 @@ export * from './modules/basic-messages'
export * from './modules/common'
export * from './modules/credentials'
export * from './modules/discover-features'
export * from './modules/message-pickup'
export * from './modules/problem-reports'
export * from './modules/proofs'
export * from './modules/connections'
Expand Down
Loading
Loading