From 045be371c121fe7a8741be9aa4f0ab2437ec62f6 Mon Sep 17 00:00:00 2001 From: Brian Botha Date: Tue, 23 May 2023 11:09:56 +1000 Subject: [PATCH] refactor: converting agent handlers * Related #512 * Related #495 [ci skip] --- package.json | 1 + src/agent/GRPCClientAgent.ts | 249 ------------------ src/agent/errors.ts | 25 +- src/agent/handlers/clientManifest.ts | 96 +++++++ src/agent/handlers/echo.ts | 23 ++ src/agent/handlers/index.ts | 2 + src/agent/handlers/nodesChainDataGet.ts | 38 +++ src/agent/handlers/nodesClaimsGet.ts | 24 ++ .../handlers/nodesClosestLocalNodesGet.ts | 60 +++++ src/agent/handlers/nodesCrossSignClaim.ts | 42 +++ .../handlers/nodesHolePunchMessageSend.ts | 109 ++++++++ src/agent/handlers/notificationsSend.ts | 34 +++ src/agent/handlers/serverManifest.ts | 48 ++++ src/agent/handlers/types.ts | 62 +++++ src/agent/handlers/vaultsGitInfoGet.ts | 107 ++++++++ src/agent/handlers/vaultsGitPackGet.ts | 109 ++++++++ src/agent/handlers/vaultsScan.ts | 50 ++++ src/agent/index.ts | 5 +- src/agent/service/echo.ts | 17 -- src/agent/service/index.ts | 70 ----- src/agent/service/nodesChainDataGet.ts | 57 ---- src/agent/service/nodesClaimsGet.ts | 22 -- .../service/nodesClosestLocalNodesGet.ts | 72 ----- src/agent/service/nodesCrossSignClaim.ts | 58 ---- .../service/nodesHolePunchMessageSend.ts | 127 --------- src/agent/service/notificationsSend.ts | 58 ---- src/agent/service/vaultsGitInfoGet.ts | 120 --------- src/agent/service/vaultsGitPackGet.ts | 133 ---------- src/agent/service/vaultsScan.ts | 65 ----- src/agent/types.ts | 32 ++- src/agent/utils.ts | 85 ------ 31 files changed, 829 insertions(+), 1171 deletions(-) delete mode 100644 src/agent/GRPCClientAgent.ts create mode 100644 src/agent/handlers/clientManifest.ts create mode 100644 src/agent/handlers/echo.ts create mode 100644 src/agent/handlers/index.ts create mode 100644 src/agent/handlers/nodesChainDataGet.ts create mode 100644 src/agent/handlers/nodesClaimsGet.ts create mode 100644 src/agent/handlers/nodesClosestLocalNodesGet.ts create mode 100644 src/agent/handlers/nodesCrossSignClaim.ts create mode 100644 src/agent/handlers/nodesHolePunchMessageSend.ts create mode 100644 src/agent/handlers/notificationsSend.ts create mode 100644 src/agent/handlers/serverManifest.ts create mode 100644 src/agent/handlers/types.ts create mode 100644 src/agent/handlers/vaultsGitInfoGet.ts create mode 100644 src/agent/handlers/vaultsGitPackGet.ts create mode 100644 src/agent/handlers/vaultsScan.ts delete mode 100644 src/agent/service/echo.ts delete mode 100644 src/agent/service/index.ts delete mode 100644 src/agent/service/nodesChainDataGet.ts delete mode 100644 src/agent/service/nodesClaimsGet.ts delete mode 100644 src/agent/service/nodesClosestLocalNodesGet.ts delete mode 100644 src/agent/service/nodesCrossSignClaim.ts delete mode 100644 src/agent/service/nodesHolePunchMessageSend.ts delete mode 100644 src/agent/service/notificationsSend.ts delete mode 100644 src/agent/service/vaultsGitInfoGet.ts delete mode 100644 src/agent/service/vaultsGitPackGet.ts delete mode 100644 src/agent/service/vaultsScan.ts delete mode 100644 src/agent/utils.ts diff --git a/package.json b/package.json index e45eda844..0f8ceaede 100644 --- a/package.json +++ b/package.json @@ -91,6 +91,7 @@ "@matrixai/resources": "^1.1.5", "@matrixai/timer": "^1.1.1", "@matrixai/workers": "^1.3.7", + "@matrixai/quic": "0.0.7-alpha.0", "@peculiar/asn1-pkcs8": "^2.3.0", "@peculiar/asn1-schema": "^2.3.0", "@peculiar/asn1-x509": "^2.3.0", diff --git a/src/agent/GRPCClientAgent.ts b/src/agent/GRPCClientAgent.ts deleted file mode 100644 index 1de9c1e60..000000000 --- a/src/agent/GRPCClientAgent.ts +++ /dev/null @@ -1,249 +0,0 @@ -import type { ClientDuplexStream } from '@grpc/grpc-js'; -import type { ClientReadableStream } from '@grpc/grpc-js/build/src/call'; -import type { - AsyncGeneratorReadableStreamClient, - AsyncGeneratorDuplexStreamClient, -} from '../grpc/types'; -import type { NodeId } from '../ids/types'; -import type { Host, Port, ProxyConfig, TLSConfig } from '../network/types'; -import type * as utilsPB from '../proto/js/polykey/v1/utils/utils_pb'; -import type * as vaultsPB from '../proto/js/polykey/v1/vaults/vaults_pb'; -import type * as nodesPB from '../proto/js/polykey/v1/nodes/nodes_pb'; -import type * as notificationsPB from '../proto/js/polykey/v1/notifications/notifications_pb'; -import type { Timer } from '../types'; -import Logger from '@matrixai/logger'; -import { CreateDestroy, ready } from '@matrixai/async-init/dist/CreateDestroy'; -import * as agentErrors from './errors'; -import * as grpcUtils from '../grpc/utils'; -import GRPCClient from '../grpc/GRPCClient'; -import { AgentServiceClient } from '../proto/js/polykey/v1/agent_service_grpc_pb'; - -interface GRPCClientAgent extends CreateDestroy {} -@CreateDestroy() -class GRPCClientAgent extends GRPCClient { - /** - * Creates GRPCClientAgent - * This connects to the agent service - * This connection should not be encrypted with TLS because it - * will go through the network proxies - */ - static async createGRPCClientAgent({ - nodeId, - host, - port, - tlsConfig, - proxyConfig, - timer, - destroyCallback = async () => {}, - logger = new Logger(this.name), - }: { - nodeId: NodeId; - host: Host; - port: Port; - tlsConfig?: Partial; - proxyConfig?: ProxyConfig; - timer?: Timer; - destroyCallback?: () => Promise; - logger?: Logger; - }): Promise { - const { client, serverCertChain, flowCountInterceptor } = - await super.createClient({ - clientConstructor: AgentServiceClient, - nodeId, - host, - port, - tlsConfig, - proxyConfig, - timer, - logger, - }); - const grpcClientAgent = new this({ - client, - nodeId, - host, - port, - tlsConfig, - proxyConfig, - serverCertChain, - flowCountInterceptor, - destroyCallback, - logger, - }); - return grpcClientAgent; - } - - public async destroy({ - timeout, - }: { - timeout?: number; - } = {}) { - await super.destroy({ timeout }); - } - - @ready(new agentErrors.ErrorAgentClientDestroyed()) - public echo(...args) { - return grpcUtils.promisifyUnaryCall( - this.client, - { - nodeId: this.nodeId, - host: this.host, - port: this.port, - command: this.echo.name, - }, - this.client.echo, - )(...args); - } - - @ready(new agentErrors.ErrorAgentClientDestroyed()) - public vaultsGitInfoGet( - ...args - ): AsyncGeneratorReadableStreamClient< - vaultsPB.PackChunk, - ClientReadableStream - > { - return grpcUtils.promisifyReadableStreamCall( - this.client, - { - nodeId: this.nodeId, - host: this.host, - port: this.port, - command: this.vaultsGitInfoGet.name, - }, - this.client.vaultsGitInfoGet, - )(...args); - } - - @ready(new agentErrors.ErrorAgentClientDestroyed()) - public vaultsGitPackGet( - ...args - ): AsyncGeneratorDuplexStreamClient< - vaultsPB.PackChunk, - vaultsPB.PackChunk, - ClientDuplexStream - > { - return grpcUtils.promisifyDuplexStreamCall( - this.client, - { - nodeId: this.nodeId, - host: this.host, - port: this.port, - command: this.vaultsGitPackGet.name, - }, - this.client.vaultsGitPackGet, - )(...args); - } - - @ready(new agentErrors.ErrorAgentClientDestroyed()) - public vaultsScan( - ...args - ): AsyncGeneratorReadableStreamClient< - vaultsPB.List, - ClientReadableStream - > { - return grpcUtils.promisifyReadableStreamCall( - this.client, - { - nodeId: this.nodeId, - host: this.host, - port: this.port, - command: this.vaultsScan.name, - }, - this.client.vaultsScan, - )(...args); - } - - @ready(new agentErrors.ErrorAgentClientDestroyed()) - public nodesClosestLocalNodesGet(...args) { - return grpcUtils.promisifyUnaryCall( - this.client, - { - nodeId: this.nodeId, - host: this.host, - port: this.port, - command: this.nodesClosestLocalNodesGet.name, - }, - this.client.nodesClosestLocalNodesGet, - )(...args); - } - - @ready(new agentErrors.ErrorAgentClientDestroyed()) - public nodesClaimsGet(...args) { - return grpcUtils.promisifyUnaryCall( - this.client, - { - nodeId: this.nodeId, - host: this.host, - port: this.port, - command: this.nodesClaimsGet.name, - }, - this.client.nodesClaimsGet, - )(...args); - } - - @ready(new agentErrors.ErrorAgentClientDestroyed()) - public nodesChainDataGet(...args) { - return grpcUtils.promisifyReadableStreamCall( - this.client, - { - nodeId: this.nodeId, - host: this.host, - port: this.port, - command: this.nodesChainDataGet.name, - }, - this.client.nodesChainDataGet, - )(...args); - } - - @ready(new agentErrors.ErrorAgentClientDestroyed()) - public nodesHolePunchMessageSend(...args) { - return grpcUtils.promisifyUnaryCall( - this.client, - { - nodeId: this.nodeId, - host: this.host, - port: this.port, - command: this.nodesHolePunchMessageSend.name, - }, - this.client.nodesHolePunchMessageSend, - )(...args); - } - - @ready(new agentErrors.ErrorAgentClientDestroyed()) - public notificationsSend(...args) { - return grpcUtils.promisifyUnaryCall( - this.client, - { - nodeId: this.nodeId, - host: this.host, - port: this.port, - command: this.notificationsSend.name, - }, - this.client.notificationsSend, - )(...args); - } - - @ready(new agentErrors.ErrorAgentClientDestroyed()) - public nodesCrossSignClaim( - ...args - ): AsyncGeneratorDuplexStreamClient< - nodesPB.AgentClaim, - nodesPB.AgentClaim, - ClientDuplexStream - > { - return grpcUtils.promisifyDuplexStreamCall< - nodesPB.AgentClaim, - nodesPB.AgentClaim - >( - this.client, - { - nodeId: this.nodeId, - host: this.host, - port: this.port, - command: this.nodesCrossSignClaim.name, - }, - this.client.nodesCrossSignClaim, - )(...args); - } -} - -export default GRPCClientAgent; diff --git a/src/agent/errors.ts b/src/agent/errors.ts index b0460055c..c5eac7392 100644 --- a/src/agent/errors.ts +++ b/src/agent/errors.ts @@ -2,30 +2,9 @@ import { ErrorPolykey, sysexits } from '../errors'; class ErrorAgent extends ErrorPolykey {} -class ErrorAgentRunning extends ErrorPolykey { - static description = 'Agent Client is running'; - exitCode = sysexits.USAGE; -} - -class ErrorAgentClientNotStarted extends ErrorAgent { - static description = 'Agent Client is not started'; - exitCode = sysexits.USAGE; -} - -class ErrorAgentClientDestroyed extends ErrorAgent { - static description = 'Agent Client is destroyed'; - exitCode = sysexits.USAGE; -} - class ErrorConnectionInfoMissing extends ErrorAgent { - static description = 'Vault already exists'; + static description = 'Connection info was missing from connection metadata'; exitCode = sysexits.UNAVAILABLE; } -export { - ErrorAgent, - ErrorAgentClientNotStarted, - ErrorAgentRunning, - ErrorAgentClientDestroyed, - ErrorConnectionInfoMissing, -}; +export { ErrorConnectionInfoMissing }; diff --git a/src/agent/handlers/clientManifest.ts b/src/agent/handlers/clientManifest.ts new file mode 100644 index 000000000..0652d0727 --- /dev/null +++ b/src/agent/handlers/clientManifest.ts @@ -0,0 +1,96 @@ +import type { AgentRPCRequestParams, AgentRPCResponseResult } from '../types'; +import type { EchoMessage } from './types'; +import type { + AgentClaimMessage, + ClaimIdMessage, + GitPackMessage, + HolePunchRelayMessage, + NodeAddressMessage, + NodeIdMessage, + SignedNotificationEncoded, + VaultInfo, + VaultsGitInfoGetMessage, + VaultsGitPackGetMessage, + VaultsScanMessage, +} from './types'; +import { DuplexCaller, ServerCaller, UnaryCaller } from '../../rpc/callers'; + +const echo = new UnaryCaller< + AgentRPCRequestParams, + AgentRPCResponseResult +>(); + +const nodeChainDataGet = new ServerCaller< + AgentRPCRequestParams, + AgentRPCResponseResult +>(); + +// Const nodesClaimsGet = new UnaryCaller< +// AgentRPCRequestParams, +// AgentRPCResponseResult +// >(); + +const nodesClosestLocalNodesGet = new ServerCaller< + AgentRPCRequestParams, + AgentRPCResponseResult +>(); + +// TODO: still to be completed +const nodesCrossSignClaim = new DuplexCaller< + AgentRPCRequestParams, + AgentRPCResponseResult +>(); + +// TODO: still to be completed +const nodesHolePunchMessageSend = new UnaryCaller< + AgentRPCRequestParams, + AgentRPCResponseResult +>(); + +const notificationsSend = new UnaryCaller< + AgentRPCRequestParams, + AgentRPCResponseResult +>(); + +const vaultsGitInfoGet = new ServerCaller< + AgentRPCRequestParams, + AgentRPCResponseResult +>(); + +const vaultsGitPackGet = new ServerCaller< + AgentRPCRequestParams, + AgentRPCResponseResult +>(); + +const vaultsScan = new ServerCaller< + AgentRPCRequestParams, + AgentRPCResponseResult +>(); + +// No type used here, it will override type inference +const clientManifest = { + echo, + nodeChainDataGet, + // NodeClaimsGet, + nodesClosestLocalNodesGet, + nodesCrossSignClaim, + nodesHolePunchMessageSend, + notificationsSend, + vaultsGitInfoGet, + vaultsGitPackGet, + vaultsScan, +}; + +export { + clientManifest, + echo, + nodeChainDataGet, + // NodeClaimsGet, + nodesClosestLocalNodesGet, + nodesCrossSignClaim, + nodesHolePunchMessageSend, + notificationsSend, + vaultsGitInfoGet, + vaultsGitPackGet, + vaultsScan, +}; diff --git a/src/agent/handlers/echo.ts b/src/agent/handlers/echo.ts new file mode 100644 index 000000000..7656888cd --- /dev/null +++ b/src/agent/handlers/echo.ts @@ -0,0 +1,23 @@ +import type { EchoMessage } from './types'; +import type { + AgentRPCRequestParams, + AgentRPCResponseResult, + NoData, +} from '../types'; +import { UnaryHandler } from '../../rpc/handlers'; + +class EchoHandler extends UnaryHandler< + NoData, + AgentRPCRequestParams, + AgentRPCResponseResult +> { + public async handle( + input: AgentRPCRequestParams, + ): Promise> { + return { + message: input.message, + }; + } +} + +export { EchoHandler }; diff --git a/src/agent/handlers/index.ts b/src/agent/handlers/index.ts new file mode 100644 index 000000000..65357aabe --- /dev/null +++ b/src/agent/handlers/index.ts @@ -0,0 +1,2 @@ +export * from './clientManifest'; +export * from './serverManifest'; diff --git a/src/agent/handlers/nodesChainDataGet.ts b/src/agent/handlers/nodesChainDataGet.ts new file mode 100644 index 000000000..dec634015 --- /dev/null +++ b/src/agent/handlers/nodesChainDataGet.ts @@ -0,0 +1,38 @@ +import type Sigchain from '../../sigchain/Sigchain'; +import type { DB } from '@matrixai/db'; +import type { ClaimIdMessage, AgentClaimMessage } from './types'; +import type { AgentRPCRequestParams, AgentRPCResponseResult } from '../types'; +import * as claimsUtils from '../../claims/utils'; +import { ServerHandler } from '../../rpc/handlers'; + +class NodesChainDataGetHandler extends ServerHandler< + { + sigchain: Sigchain; + db: DB; + }, + AgentRPCRequestParams, + AgentRPCResponseResult +> { + public async *handle( + _input: ClaimIdMessage, + ): AsyncGenerator> { + const { sigchain, db } = this.container; + yield* db.withTransactionG(async function* ( + tran, + ): AsyncGenerator> { + for await (const [claimId, signedClaim] of sigchain.getSignedClaims( + { /* seek: seekClaimId,*/ order: 'asc' }, + tran, + )) { + const encodedClaim = claimsUtils.generateSignedClaim(signedClaim); + const response: AgentClaimMessage = { + claimIdEncoded: claimsUtils.encodeClaimId(claimId), + signedTokenEncoded: encodedClaim, + }; + yield response; + } + }); + } +} + +export { NodesChainDataGetHandler }; diff --git a/src/agent/handlers/nodesClaimsGet.ts b/src/agent/handlers/nodesClaimsGet.ts new file mode 100644 index 000000000..86933281a --- /dev/null +++ b/src/agent/handlers/nodesClaimsGet.ts @@ -0,0 +1,24 @@ +import type { + AgentRPCRequestParams, + AgentRPCResponseResult, + NoData, +} from '../types'; +import { UnaryHandler } from '../../rpc/handlers'; + +/** + * Retrieves all claims (of a specific type) of this node (within its sigchain). + * TODO: Currently not required. Will need to refactor once we filter on what + * claims we desire from the sigchain (e.g. in discoverGestalt). + */ + +class NodesClaimsGetHandler extends UnaryHandler< + NoData, + AgentRPCRequestParams, + AgentRPCResponseResult +> { + public async handle(): Promise { + return {}; + } +} + +export { NodesClaimsGetHandler }; diff --git a/src/agent/handlers/nodesClosestLocalNodesGet.ts b/src/agent/handlers/nodesClosestLocalNodesGet.ts new file mode 100644 index 000000000..5f6d487db --- /dev/null +++ b/src/agent/handlers/nodesClosestLocalNodesGet.ts @@ -0,0 +1,60 @@ +import type { NodeAddressMessage, NodeIdMessage } from './types'; +import type { NodeGraph } from '../../nodes'; +import type { DB } from '@matrixai/db'; +import type { AgentRPCRequestParams, AgentRPCResponseResult } from '../types'; +import type { NodeId } from '../../ids'; +import { validateSync } from '../../validation'; +import { matchSync } from '../../utils'; +import * as validationUtils from '../../validation/utils'; +import * as nodesUtils from '../../nodes/utils'; +import { ServerHandler } from '../../rpc/handlers'; + +class NodesClosestLocalNodesGetHandler extends ServerHandler< + { + nodeGraph: NodeGraph; + db: DB; + }, + AgentRPCRequestParams, + AgentRPCResponseResult +> { + public async *handle( + input: AgentRPCRequestParams, + ): AsyncGenerator> { + const { nodeGraph, db } = this.container; + + const { + nodeId, + }: { + nodeId: NodeId; + } = validateSync( + (keyPath, value) => { + return matchSync(keyPath)( + [['nodeId'], () => validationUtils.parseNodeId(value)], + () => value, + ); + }, + { + nodeId: input.nodeIdEncoded, + }, + ); + // Get all local nodes that are closest to the target node from the request + return yield* db.withTransactionG(async function* ( + tran, + ): AsyncGenerator> { + const closestNodes = await nodeGraph.getClosestNodes( + nodeId, + undefined, + tran, + ); + for (const [nodeId, nodeData] of closestNodes) { + yield { + nodeIdEncoded: nodesUtils.encodeNodeId(nodeId), + host: nodeData.address.host, + port: nodeData.address.port, + }; + } + }); + } +} + +export { NodesClosestLocalNodesGetHandler }; diff --git a/src/agent/handlers/nodesCrossSignClaim.ts b/src/agent/handlers/nodesCrossSignClaim.ts new file mode 100644 index 000000000..f8258fb6b --- /dev/null +++ b/src/agent/handlers/nodesCrossSignClaim.ts @@ -0,0 +1,42 @@ +import type { EchoMessage } from './types'; +import type { AgentRPCRequestParams, AgentRPCResponseResult } from '../types'; +import type KeyRing from '../../keys/KeyRing'; +import type { NodeId } from '../../ids'; +import type ACL from '../../acl/ACL'; +import type NodeManager from '../../nodes/NodeManager'; +import * as nodesErrors from '../../nodes/errors'; +import * as nodesUtils from '../../nodes/utils'; +import { DuplexHandler } from '../../rpc/handlers'; + +// TODO: come back to this! +class NodesCrossSignClaimHandler extends DuplexHandler< + { + keyRing: KeyRing; + acl: ACL; + nodeManager: NodeManager; + }, + AgentRPCRequestParams, + AgentRPCResponseResult +> { + public async *handle( + input: AsyncIterable>, + _, + meta, + ): AsyncGenerator> { + const { acl } = this.container; + // TODO: get remote info from metadata. dependent on js-quic meta types + const requestingNodeId: NodeId | undefined = nodesUtils.decodeNodeId( + meta?.remoteNodeId, + ); + if (requestingNodeId == null) throw Error('TMP invalid nodeId'); + // Check the ACL for permissions + const permissions = await acl.getNodePerm(requestingNodeId); + if (permissions?.gestalt.claim !== null) { + throw new nodesErrors.ErrorNodePermissionDenied(); + } + // Handle claiming the node + await nodeManager.handleClaimNode(requestingNodeId, genClaims); + } +} + +export { NodesCrossSignClaimHandler }; diff --git a/src/agent/handlers/nodesHolePunchMessageSend.ts b/src/agent/handlers/nodesHolePunchMessageSend.ts new file mode 100644 index 000000000..711ebccb0 --- /dev/null +++ b/src/agent/handlers/nodesHolePunchMessageSend.ts @@ -0,0 +1,109 @@ +import type { DB } from '@matrixai/db'; +import type NodeConnectionManager from '../../nodes/NodeConnectionManager'; +import type KeyRing from '../../keys/KeyRing'; +import type Logger from '@matrixai/logger'; +import type { Host, Port } from '../../network/types'; +import type NodeManager from '../../nodes/NodeManager'; +import type { AgentRPCRequestParams, AgentRPCResponseResult } from '../types'; +import type { NodeId } from '../../ids'; +import type { HolePunchRelayMessage } from './types'; +import { validateSync } from '../../validation'; +import { matchSync } from '../../utils'; +import * as validationUtils from '../../validation/utils'; +import * as nodesUtils from '../../nodes/utils'; +import { UnaryHandler } from '../../rpc/handlers'; + +class NodesHolePunchMessageSendHandler extends UnaryHandler< + { + db: DB; + nodeConnectionManager: NodeConnectionManager; + keyRing: KeyRing; + nodeManager: NodeManager; + logger: Logger; + }, + AgentRPCRequestParams, + AgentRPCResponseResult +> { + public async handle( + input: AgentRPCRequestParams, + _, + meta, + ): Promise { + const { db, nodeConnectionManager, keyRing, nodeManager, logger } = + this.container; + const { + targetId, + sourceId, + }: { + targetId: NodeId; + sourceId: NodeId; + } = validateSync( + (keyPath, value) => { + return matchSync(keyPath)( + [ + ['targetId'], + ['sourceId'], + () => validationUtils.parseNodeId(value), + ], + () => value, + ); + }, + { + targetId: input.dstIdEncoded, + sourceId: input.srcIdEncoded, + }, + ); + const connectionInfo = meta; + const srcNodeId = nodesUtils.encodeNodeId(connectionInfo!.remoteNodeId); + // Firstly, check if this node is the desired node + // If so, then we want to make this node start sending hole punching packets + // back to the source node. + await db.withTransactionF(async (tran) => { + if (keyRing.getNodeId().equals(targetId)) { + if (input.address != null) { + const host = input.address.host as Host; + const port = input.address.port as Port; + logger.debug( + `Received signaling message to target ${input.srcIdEncoded}@${host}:${port}`, + ); + // Ignore failure + try { + await nodeConnectionManager.holePunchReverse(host, port); + } catch { + // Do nothing + } + } else { + logger.error( + 'Received signaling message, target information was missing, skipping reverse hole punch', + ); + } + } else if (await nodeManager.knowsNode(sourceId, tran)) { + // Otherwise, find if node in table + // If so, ask the nodeManager to relay to the node + const targetNodeId = input.dstIdEncoded; + const proxyAddress = { + host: connectionInfo!.remoteHost, + port: connectionInfo!.remotePort, + }; + // Checking if the source and destination are the same + if (sourceId?.equals(targetId)) { + // Logging and silently dropping operation + logger.warn('Signaling relay message requested signal to itself'); + return {}; + } + logger.debug( + `Relaying signaling message from ${srcNodeId}@${proxyAddress.host}:${proxyAddress.port} to ${targetNodeId} with information ${proxyAddress}`, + ); + // TODO: fix + call.request.setProxyAddress(proxyAddress); + await nodeConnectionManager.relaySignalingMessage(call.request, { + host: connectionInfo!.remoteHost, + port: connectionInfo!.remotePort, + }); + } + }); + return {}; + } +} + +export { NodesHolePunchMessageSendHandler }; diff --git a/src/agent/handlers/notificationsSend.ts b/src/agent/handlers/notificationsSend.ts new file mode 100644 index 000000000..7abc3502f --- /dev/null +++ b/src/agent/handlers/notificationsSend.ts @@ -0,0 +1,34 @@ +import type { DB } from '@matrixai/db'; +import type KeyRing from '../../keys/KeyRing'; +import type NotificationsManager from '../../notifications/NotificationsManager'; +import type { SignedNotification } from '../../notifications/types'; +import type { SignedNotificationEncoded } from './types'; +import type { AgentRPCRequestParams, AgentRPCResponseResult } from '../types'; +import { UnaryHandler } from '../../rpc/handlers'; +import * as notificationsUtils from '../../notifications/utils'; + +class NotificationsSendHandler extends UnaryHandler< + { + db: DB; + keyRing: KeyRing; + notificationsManager: NotificationsManager; + }, + AgentRPCRequestParams, + AgentRPCResponseResult +> { + public async handle( + input: AgentRPCRequestParams, + ): Promise { + const { db, keyRing, notificationsManager } = this.container; + const notification = await notificationsUtils.verifyAndDecodeNotif( + input.signedNotificationEncoded as SignedNotification, + keyRing.getNodeId(), + ); + await db.withTransactionF((tran) => + notificationsManager.receiveNotification(notification, tran), + ); + return {}; + } +} + +export { NotificationsSendHandler }; diff --git a/src/agent/handlers/serverManifest.ts b/src/agent/handlers/serverManifest.ts new file mode 100644 index 000000000..3831d7091 --- /dev/null +++ b/src/agent/handlers/serverManifest.ts @@ -0,0 +1,48 @@ +import type { DB } from '@matrixai/db'; +import type Sigchain from '../../sigchain/Sigchain'; +import type NodeGraph from '../../nodes/NodeGraph'; +import type ACL from '../../acl/ACL'; +import type NodeManager from '../../nodes/NodeManager'; +import type KeyRing from '../../keys/KeyRing'; +import type NodeConnectionManager from '../../nodes/NodeConnectionManager'; +import type Logger from '@matrixai/logger'; +import type { NotificationsManager } from '../../notifications'; +import type { VaultManager } from '../../vaults'; +import { NodesClosestLocalNodesGetHandler } from './nodesClosestLocalNodesGet'; +import { NodesHolePunchMessageSendHandler } from './nodesHolePunchMessageSend'; +import { NodesCrossSignClaimHandler } from './nodesCrossSignClaim'; +// Import {NodesClaimsGetHandler} from "./nodesClaimsGet"; +import { NotificationsSendHandler } from './notificationsSend'; +import { NodesChainDataGetHandler } from './nodesChainDataGet'; +import { EchoHandler } from './echo'; +import { VaultsScanHandler } from './vaultsScan'; +import { VaultsGitInfoGetHandler } from './vaultsGitInfoGet'; +import { VaultsGitPackGetHandler } from './vaultsGitPackGet'; + +const serverManifest = (container: { + db: DB; + sigchain: Sigchain; + nodeGraph: NodeGraph; + acl: ACL; + nodeManager: NodeManager; + nodeConnectionManager: NodeConnectionManager; + keyRing: KeyRing; + logger: Logger; + notificationsManager: NotificationsManager; + vaultManager: VaultManager; +}) => { + return { + echo: new EchoHandler(container), + nodesChainDataGet: new NodesChainDataGetHandler(container), + // NodesClaimsGet: new NodesClaimsGetHandler(container), + nodesClosestLocalNodesGet: new NodesClosestLocalNodesGetHandler(container), + nodesCrossSignClaim: new NodesCrossSignClaimHandler(container), + nodesHolePunchMessageSend: new NodesHolePunchMessageSendHandler(container), + notificationsSend: new NotificationsSendHandler(container), + VaultsGitInfoGet: new VaultsGitInfoGetHandler(container), + VaultsGitPackGet: new VaultsGitPackGetHandler(container), + vaultsScan: new VaultsScanHandler(container), + }; +}; + +export { serverManifest }; diff --git a/src/agent/handlers/types.ts b/src/agent/handlers/types.ts new file mode 100644 index 000000000..a9ff573f0 --- /dev/null +++ b/src/agent/handlers/types.ts @@ -0,0 +1,62 @@ +import type { SignedTokenEncoded } from '../../tokens/types'; +import type { ClaimIdEncoded, NodeIdEncoded, VaultIdEncoded } from '../../ids'; +import type { VaultAction, VaultName } from '../../vaults/types'; +import type { SignedNotification } from '../../notifications/types'; + +export type EchoMessage = { + message: string; +}; + +export type ClaimIdMessage = { + claimIdEncoded: ClaimIdEncoded; +}; + +export type AgentClaimMessage = ClaimIdMessage & { + signedTokenEncoded: SignedTokenEncoded; +}; + +export type NodeIdMessage = { + nodeIdEncoded: NodeIdEncoded; +}; + +export type AddressMessage = { + host: string; + port: number; +}; + +export type NodeAddressMessage = NodeIdMessage & AddressMessage; + +export type HolePunchRelayMessage = { + srcIdEncoded: NodeIdEncoded; + dstIdEncoded: NodeIdEncoded; + address: AddressMessage; +}; + +export type SignedNotificationEncoded = { + signedNotificationEncoded: SignedNotification; +}; + +export type VaultInfo = { + vaultIdEncoded: VaultIdEncoded; + vaultName: VaultName; +}; + +export type VaultsScanMessage = VaultInfo & { + vaultPermissions: Array; +}; + +export type VaultsGitInfoGetMessage = { + vaultNameOrId: VaultIdEncoded | VaultName; + action: VaultAction; +}; + +export type GitPackMessage = { + /** + * Chunk of data in binary form; + */ + chunk: string; +}; + +export type VaultsGitPackGetMessage = { + body: string; +}; diff --git a/src/agent/handlers/vaultsGitInfoGet.ts b/src/agent/handlers/vaultsGitInfoGet.ts new file mode 100644 index 000000000..4187514fb --- /dev/null +++ b/src/agent/handlers/vaultsGitInfoGet.ts @@ -0,0 +1,107 @@ +import type { GitPackMessage, VaultInfo } from './types'; +import type { AgentRPCRequestParams, AgentRPCResponseResult } from '../types'; +import type { DB } from '@matrixai/db'; +import type { VaultManager } from '../../vaults'; +import type { ACL } from '../../acl'; +import type Logger from '@matrixai/logger'; +import type { VaultsGitInfoGetMessage } from './types'; +import type { VaultAction } from '../../vaults/types'; +import * as vaultsUtils from '../../vaults/utils'; +import * as vaultsErrors from '../../vaults/errors'; +import { ServerHandler } from '../../rpc/handlers'; +import { validateSync } from '../../validation'; +import { matchSync } from '../../utils'; +import * as validationUtils from '../../validation/utils'; +import * as agentErrors from '../errors'; +import * as nodesUtils from '../../nodes/utils'; + +class VaultsGitInfoGetHandler extends ServerHandler< + { + db: DB; + vaultManager: VaultManager; + acl: ACL; + logger: Logger; + }, + AgentRPCRequestParams, + AgentRPCResponseResult +> { + public async *handle( + input: AgentRPCRequestParams, + _, + meta, + ): AsyncGenerator { + const { db, vaultManager, acl } = this.container; + yield* db.withTransactionG(async function* ( + tran, + ): AsyncGenerator { + const vaultIdFromName = await vaultManager.getVaultId( + input.vaultNameOrId, + tran, + ); + const vaultId = + vaultIdFromName ?? vaultsUtils.decodeVaultId(input.vaultNameOrId); + if (vaultId == null) { + throw new vaultsErrors.ErrorVaultsVaultUndefined(); + } + const { + actionType, + }: { + actionType: VaultAction; + } = validateSync( + (keyPath, value) => { + return matchSync(keyPath)( + [['actionType'], () => validationUtils.parseVaultAction(value)], + () => value, + ); + }, + { + actionType: input.action, + }, + ); + const vaultName = (await vaultManager.getVaultMeta(vaultId, tran)) + ?.vaultName; + if (vaultName == null) { + throw new vaultsErrors.ErrorVaultsVaultUndefined(); + } + // Getting the NodeId from the ReverseProxy connection info + const connectionInfo = meta; + // If this is getting run the connection exists + // It SHOULD exist here + if (connectionInfo == null) { + throw new agentErrors.ErrorConnectionInfoMissing(); + } + const nodeId = connectionInfo.remoteNodeId; + const nodeIdEncoded = nodesUtils.encodeNodeId(nodeId); + const permissions = await acl.getNodePerm(nodeId, tran); + if (permissions == null) { + throw new vaultsErrors.ErrorVaultsPermissionDenied( + `No permissions found for ${nodeIdEncoded}`, + ); + } + const vaultPerms = permissions.vaults[vaultId]; + if (vaultPerms?.[actionType] !== null) { + throw new vaultsErrors.ErrorVaultsPermissionDenied( + `${nodeIdEncoded} does not have permission to ${actionType} from vault ${vaultsUtils.encodeVaultId( + vaultId, + )}`, + ); + } + + yield { + vaultName: vaultName, + vaultIdEncoded: vaultsUtils.encodeVaultId(vaultId), + }; + for await (const byte of vaultManager.handleInfoRequest(vaultId, tran)) { + if (byte !== null) { + yield { + chunk: byte.toString('binary'), + }; + } else { + return; + } + } + }); + } +} + +export { VaultsGitInfoGetHandler }; diff --git a/src/agent/handlers/vaultsGitPackGet.ts b/src/agent/handlers/vaultsGitPackGet.ts new file mode 100644 index 000000000..2a0787171 --- /dev/null +++ b/src/agent/handlers/vaultsGitPackGet.ts @@ -0,0 +1,109 @@ +import type { VaultAction, VaultName } from '../../vaults/types'; +import type VaultManager from '../../vaults/VaultManager'; +import type ACL from '../../acl/ACL'; +import type { DB } from '@matrixai/db'; +import type { GitPackMessage, VaultsGitPackGetMessage } from './types'; +import type { AgentRPCRequestParams, AgentRPCResponseResult } from '../types'; +import * as nodesUtils from '../../nodes/utils'; +import * as vaultsUtils from '../../vaults/utils'; +import * as vaultsErrors from '../../vaults/errors'; +import { validateSync } from '../../validation'; +import { matchSync } from '../../utils'; +import * as validationUtils from '../../validation/utils'; +import * as agentErrors from '../errors'; +import { ServerHandler } from '../../rpc/handlers'; + +class VaultsGitPackGetHandler extends ServerHandler< + { + vaultManager: VaultManager; + acl: ACL; + db: DB; + }, + AgentRPCRequestParams, + AgentRPCResponseResult +> { + public async *handle( + input: AgentRPCRequestParams, + _, + meta, + ): AsyncGenerator> { + const { vaultManager, acl, db } = this.container; + // Getting the NodeId from the ReverseProxy connection info + const connectionInfo = meta; + // If this is getting run the connection exists + // It SHOULD exist here + if (connectionInfo == null) { + throw new agentErrors.ErrorConnectionInfoMissing(); + } + const nodeId = connectionInfo.remoteNodeId; + const nodeIdEncoded = nodesUtils.encodeNodeId(nodeId); + const nameOrId = meta.get('vaultNameOrId').pop()!.toString(); + yield* db.withTransactionG(async function* ( + tran, + ): AsyncGenerator> { + const vaultIdFromName = await vaultManager.getVaultId( + nameOrId as VaultName, + tran, + ); + const vaultId = vaultIdFromName ?? vaultsUtils.decodeVaultId(nameOrId); + if (vaultId == null) { + throw new vaultsErrors.ErrorVaultsVaultUndefined(); + } + const { + actionType, + }: { + actionType: VaultAction; + } = validateSync( + (keyPath, value) => { + return matchSync(keyPath)( + [['actionType'], () => validationUtils.parseVaultAction(value)], + () => value, + ); + }, + { + actionType: meta.get('vaultAction').pop()!.toString(), + }, + ); + // Checking permissions + const permissions = await acl.getNodePerm(nodeId, tran); + const vaultPerms = permissions?.vaults[vaultId]; + if (vaultPerms?.[actionType] !== null) { + throw new vaultsErrors.ErrorVaultsPermissionDenied( + `${nodeIdEncoded} does not have permission to ${actionType} from vault ${vaultsUtils.encodeVaultId( + vaultId, + )}`, + ); + } + const [sideBand, progressStream] = await vaultManager.handlePackRequest( + vaultId, + Buffer.from(input.body, 'utf-8'), + tran, + ); + yield { + chunk: Buffer.from('0008NAK\n').toString('binary'), + }; + const responseBuffers: Uint8Array[] = []; + // FIXME: this WHOLE thing needs to change, why are we streaming when we send monolithic messages? + const result = await new Promise((resolve, reject) => { + sideBand.on('data', async (data: Uint8Array) => { + responseBuffers.push(data); + }); + sideBand.on('end', async () => { + const result = Buffer.concat(responseBuffers).toString('binary'); + resolve(result); + }); + sideBand.on('error', (err) => { + reject(err); + }); + progressStream.write(Buffer.from('0014progress is at 50%\n')); + progressStream.end(); + }); + yield { + chunk: result, + }; + }); + return; + } +} + +export { VaultsGitPackGetHandler }; diff --git a/src/agent/handlers/vaultsScan.ts b/src/agent/handlers/vaultsScan.ts new file mode 100644 index 000000000..8fe8ea6e5 --- /dev/null +++ b/src/agent/handlers/vaultsScan.ts @@ -0,0 +1,50 @@ +import type { VaultsScanMessage } from './types'; +import type { AgentRPCRequestParams, AgentRPCResponseResult } from '../types'; +import type VaultManager from '../../vaults/VaultManager'; +import type { DB } from '@matrixai/db'; +import { ServerHandler } from '../../rpc/handlers'; +import * as agentErrors from '../errors'; +import * as vaultsUtils from '../../vaults/utils'; + +class VaultsScanHandler extends ServerHandler< + { + vaultManager: VaultManager; + db: DB; + }, + AgentRPCRequestParams, + AgentRPCResponseResult +> { + public async *handle( + input: AgentRPCRequestParams, + _, + meta, + ): AsyncGenerator> { + const { vaultManager, db } = this.container; + // Getting the NodeId from the ReverseProxy connection info + const connectionInfo = meta; + // If this is getting run the connection exists + // It SHOULD exist here + if (connectionInfo == null) { + throw new agentErrors.ErrorConnectionInfoMissing(); + } + const nodeId = connectionInfo.remoteNodeId; + yield* db.withTransactionG(async function* ( + tran, + ): AsyncGenerator> { + const listResponse = vaultManager.handleScanVaults(nodeId, tran); + for await (const { + vaultId, + vaultName, + vaultPermissions, + } of listResponse) { + yield { + vaultIdEncoded: vaultsUtils.encodeVaultId(vaultId), + vaultName, + vaultPermissions, + }; + } + }); + } +} + +export { VaultsScanHandler }; diff --git a/src/agent/index.ts b/src/agent/index.ts index 4e55eb824..4c80b200f 100644 --- a/src/agent/index.ts +++ b/src/agent/index.ts @@ -1,5 +1,2 @@ -export { default as createAgentService, AgentServiceService } from './service'; -export { default as GRPCClientAgent } from './GRPCClientAgent'; -export * as errors from './errors'; +export * from './handlers'; export * as types from './types'; -export * as utils from './utils'; diff --git a/src/agent/service/echo.ts b/src/agent/service/echo.ts deleted file mode 100644 index b99923bbb..000000000 --- a/src/agent/service/echo.ts +++ /dev/null @@ -1,17 +0,0 @@ -import type * as grpc from '@grpc/grpc-js'; -import type { ConnectionInfoGet } from 'agent/types'; -import * as utilsPB from '../../proto/js/polykey/v1/utils/utils_pb'; - -function echo({ connectionInfoGet }: { connectionInfoGet: ConnectionInfoGet }) { - return async ( - call: grpc.ServerUnaryCall, - callback: grpc.sendUnaryData, - ): Promise => { - connectionInfoGet(call); - const response = new utilsPB.EchoMessage(); - response.setChallenge(call.request.getChallenge()); - callback(null, response); - }; -} - -export default echo; diff --git a/src/agent/service/index.ts b/src/agent/service/index.ts deleted file mode 100644 index 8637e45fa..000000000 --- a/src/agent/service/index.ts +++ /dev/null @@ -1,70 +0,0 @@ -import type { DB } from '@matrixai/db'; -import type KeyRing from '../../keys/KeyRing'; -import type VaultManager from '../../vaults/VaultManager'; -import type NodeGraph from '../../nodes/NodeGraph'; -import type NodeManager from '../../nodes/NodeManager'; -import type NodeConnectionManager from '../../nodes/NodeConnectionManager'; -import type NotificationsManager from '../../notifications/NotificationsManager'; -import type Sigchain from '../../sigchain/Sigchain'; -import type ACL from '../../acl/ACL'; -import type GestaltGraph from '../../gestalts/GestaltGraph'; -import type { IAgentServiceServer } from '../../proto/js/polykey/v1/agent_service_grpc_pb'; -import type Proxy from '../../network/Proxy'; -import Logger from '@matrixai/logger'; -import echo from './echo'; -import nodesChainDataGet from './nodesChainDataGet'; -import nodesClaimsGet from './nodesClaimsGet'; -import nodesClosestLocalNodesGet from './nodesClosestLocalNodesGet'; -import nodesCrossSignClaim from './nodesCrossSignClaim'; -import nodesHolePunchMessageSend from './nodesHolePunchMessageSend'; -import notificationsSend from './notificationsSend'; -import vaultsGitInfoGet from './vaultsGitInfoGet'; -import vaultsGitPackGet from './vaultsGitPackGet'; -import vaultsScan from './vaultsScan'; -import { AgentServiceService } from '../../proto/js/polykey/v1/agent_service_grpc_pb'; -import * as agentUtils from '../utils'; - -function createService({ - proxy, - db, - logger = new Logger('GRPCClientAgentService'), - ...containerRest -}: { - db: DB; - keyRing: KeyRing; - vaultManager: VaultManager; - nodeConnectionManager: NodeConnectionManager; - nodeManager: NodeManager; - nodeGraph: NodeGraph; - notificationsManager: NotificationsManager; - sigchain: Sigchain; - acl: ACL; - gestaltGraph: GestaltGraph; - proxy: Proxy; - logger?: Logger; -}): IAgentServiceServer { - const connectionInfoGet = agentUtils.connectionInfoGetter(proxy); - const container = { - ...containerRest, - db, - logger, - connectionInfoGet: connectionInfoGet, - }; - const service: IAgentServiceServer = { - echo: echo(container), - nodesChainDataGet: nodesChainDataGet(container), - nodesClaimsGet: nodesClaimsGet(container), - nodesClosestLocalNodesGet: nodesClosestLocalNodesGet(container), - nodesCrossSignClaim: nodesCrossSignClaim(container), - nodesHolePunchMessageSend: nodesHolePunchMessageSend(container), - notificationsSend: notificationsSend(container), - vaultsGitInfoGet: vaultsGitInfoGet(container), - vaultsGitPackGet: vaultsGitPackGet(container), - vaultsScan: vaultsScan(container), - }; - return service; -} - -export default createService; - -export { AgentServiceService }; diff --git a/src/agent/service/nodesChainDataGet.ts b/src/agent/service/nodesChainDataGet.ts deleted file mode 100644 index e20074d40..000000000 --- a/src/agent/service/nodesChainDataGet.ts +++ /dev/null @@ -1,57 +0,0 @@ -import type * as grpc from '@grpc/grpc-js'; -import type { DB } from '@matrixai/db'; -import type Sigchain from '../../sigchain/Sigchain'; -import type Logger from '@matrixai/logger'; -import * as grpcUtils from '../../grpc/utils'; -import * as nodesPB from '../../proto/js/polykey/v1/nodes/nodes_pb'; -import * as agentUtils from '../utils'; -import * as claimsUtils from '../../claims/utils'; -import { encodeClaimId } from '../../ids'; - -/** - * Retrieves the ChainDataEncoded of this node. - */ -function nodesChainDataGet({ - sigchain, - db, - logger, -}: { - sigchain: Sigchain; - db: DB; - logger: Logger; -}) { - return async ( - call: grpc.ServerWritableStream, - ): Promise => { - const genClaims = grpcUtils.generatorWritable(call, false); - try { - // Const seekClaimId = decodeClaimId(call.request.getClaimId()); - await db.withTransactionF(async (tran) => { - for await (const [claimId, signedClaim] of sigchain.getSignedClaims( - { /* seek: seekClaimId,*/ order: 'asc' }, - tran, - )) { - const encodedClaim = claimsUtils.generateSignedClaim(signedClaim); - const response = new nodesPB.AgentClaim(); - response.setClaimId(encodeClaimId(claimId)); - response.setPayload(encodedClaim.payload); - const signatureMessages = encodedClaim.signatures.map((item) => { - return new nodesPB.Signature() - .setSignature(item.signature) - .setProtected(item.protected); - }); - response.setSignaturesList(signatureMessages); - await genClaims.next(response); - } - }); - await genClaims.next(null); - } catch (e) { - await genClaims.throw(e); - !agentUtils.isAgentClientError(e) && - logger.error(`${nodesChainDataGet.name}:${e}`); - return; - } - }; -} - -export default nodesChainDataGet; diff --git a/src/agent/service/nodesClaimsGet.ts b/src/agent/service/nodesClaimsGet.ts deleted file mode 100644 index 920555388..000000000 --- a/src/agent/service/nodesClaimsGet.ts +++ /dev/null @@ -1,22 +0,0 @@ -import type * as grpc from '@grpc/grpc-js'; -import * as nodesPB from '../../proto/js/polykey/v1/nodes/nodes_pb'; - -/** - * Retrieves all claims (of a specific type) of this node (within its sigchain). - * TODO: Currently not required. Will need to refactor once we filter on what - * claims we desire from the sigchain (e.g. in discoverGestalt). - */ -function nodesClaimsGet(_) { - return async ( - call: grpc.ServerUnaryCall, - callback: grpc.sendUnaryData, - ): Promise => { - const response = new nodesPB.Claims(); - // Response.setClaimsList( - // await sigchain.getClaims(call.request.getClaimtype() as ClaimType) - // ); - callback(null, response); - }; -} - -export default nodesClaimsGet; diff --git a/src/agent/service/nodesClosestLocalNodesGet.ts b/src/agent/service/nodesClosestLocalNodesGet.ts deleted file mode 100644 index d8ad1a729..000000000 --- a/src/agent/service/nodesClosestLocalNodesGet.ts +++ /dev/null @@ -1,72 +0,0 @@ -import type * as grpc from '@grpc/grpc-js'; -import type { NodeGraph } from '../../nodes'; -import type { DB } from '@matrixai/db'; -import type { NodeId } from '../../ids/types'; -import type Logger from '@matrixai/logger'; -import * as grpcUtils from '../../grpc/utils'; -import * as nodesUtils from '../../nodes/utils'; -import { validateSync } from '../../validation'; -import * as validationUtils from '../../validation/utils'; -import { matchSync } from '../../utils'; -import * as nodesPB from '../../proto/js/polykey/v1/nodes/nodes_pb'; -import * as agentUtils from '../utils'; - -/** - * Retrieves the local nodes (i.e. from the current node) that are closest - * to some provided node ID. - */ -function nodesClosestLocalNodesGet({ - nodeGraph, - db, - logger, -}: { - nodeGraph: NodeGraph; - db: DB; - logger: Logger; -}) { - return async ( - call: grpc.ServerUnaryCall, - callback: grpc.sendUnaryData, - ): Promise => { - try { - const response = new nodesPB.NodeTable(); - const { - nodeId, - }: { - nodeId: NodeId; - } = validateSync( - (keyPath, value) => { - return matchSync(keyPath)( - [['nodeId'], () => validationUtils.parseNodeId(value)], - () => value, - ); - }, - { - nodeId: call.request.getNodeId(), - }, - ); - // Get all local nodes that are closest to the target node from the request - const closestNodes = await db.withTransactionF((tran) => - nodeGraph.getClosestNodes(nodeId, undefined, tran), - ); - for (const [nodeId, nodeData] of closestNodes) { - const addressMessage = new nodesPB.Address(); - addressMessage.setHost(nodeData.address.host); - addressMessage.setPort(nodeData.address.port); - // Add the node to the response's map (mapping of node ID -> node address) - response - .getNodeTableMap() - .set(nodesUtils.encodeNodeId(nodeId), addressMessage); - } - callback(null, response); - return; - } catch (e) { - callback(grpcUtils.fromError(e, true)); - !agentUtils.isAgentClientError(e) && - logger.error(`${nodesClosestLocalNodesGet.name}:${e}`); - return; - } - }; -} - -export default nodesClosestLocalNodesGet; diff --git a/src/agent/service/nodesCrossSignClaim.ts b/src/agent/service/nodesCrossSignClaim.ts deleted file mode 100644 index 48341005f..000000000 --- a/src/agent/service/nodesCrossSignClaim.ts +++ /dev/null @@ -1,58 +0,0 @@ -import type * as grpc from '@grpc/grpc-js'; -import type NodeManager from '../../nodes/NodeManager'; -import type KeyRing from '../../keys/KeyRing'; -import type * as nodesPB from '../../proto/js/polykey/v1/nodes/nodes_pb'; -import type Logger from '@matrixai/logger'; -import type { ConnectionInfoGet } from '../types'; -import type ACL from '../../acl/ACL'; -import * as grpcUtils from '../../grpc/utils'; -import * as claimsErrors from '../../claims/errors'; -import * as agentUtils from '../utils'; -import * as nodesErrors from '../../nodes/errors'; - -function nodesCrossSignClaim({ - keyRing, - nodeManager, - acl, - connectionInfoGet, - logger, -}: { - keyRing: KeyRing; - nodeManager: NodeManager; - acl: ACL; - connectionInfoGet: ConnectionInfoGet; - logger: Logger; -}) { - return async ( - call: grpc.ServerDuplexStream, - ) => { - const requestingNodeId = connectionInfoGet(call)!.remoteNodeId; - const nodeId = keyRing.getNodeId(); - const genClaims = grpcUtils.generatorDuplex( - call, - { nodeId, command: nodesCrossSignClaim.name }, - true, - ); - try { - // Check the ACL for permissions - const permissions = await acl.getNodePerm(requestingNodeId); - if (permissions?.gestalt.claim !== null) { - throw new nodesErrors.ErrorNodePermissionDenied(); - } - // Handle claiming the node - await nodeManager.handleClaimNode(requestingNodeId, genClaims); - } catch (e) { - await genClaims.throw(e); - !agentUtils.isAgentClientError(e, [ - claimsErrors.ErrorEmptyStream, - claimsErrors.ErrorUndefinedSinglySignedClaim, - claimsErrors.ErrorUndefinedSignature, - claimsErrors.ErrorNodesClaimType, - claimsErrors.ErrorUndefinedDoublySignedClaim, - ]) && logger.error(`${nodesCrossSignClaim.name}:${e}`); - return; - } - }; -} - -export default nodesCrossSignClaim; diff --git a/src/agent/service/nodesHolePunchMessageSend.ts b/src/agent/service/nodesHolePunchMessageSend.ts deleted file mode 100644 index bbdded6c8..000000000 --- a/src/agent/service/nodesHolePunchMessageSend.ts +++ /dev/null @@ -1,127 +0,0 @@ -import type * as grpc from '@grpc/grpc-js'; -import type { DB } from '@matrixai/db'; -import type NodeManager from '../../nodes/NodeManager'; -import type NodeConnectionManager from '../../nodes/NodeConnectionManager'; -import type KeyRing from '../../keys/KeyRing'; -import type { NodeId } from '../../ids/types'; -import type Logger from '@matrixai/logger'; -import type { ConnectionInfoGet } from 'agent/types'; -import type * as nodesPB from '../../proto/js/polykey/v1/nodes/nodes_pb'; -import * as networkUtils from '../../network/utils'; -import * as grpcUtils from '../../grpc/utils'; -import { validateSync } from '../../validation'; -import * as validationUtils from '../../validation/utils'; -import * as nodesUtils from '../../nodes/utils'; -import { matchSync } from '../../utils'; -import * as utilsPB from '../../proto/js/polykey/v1/utils/utils_pb'; -import * as agentUtils from '../utils'; - -function nodesHolePunchMessageSend({ - keyRing, - nodeManager, - nodeConnectionManager, - db, - connectionInfoGet, - logger, -}: { - keyRing: KeyRing; - nodeManager: NodeManager; - nodeConnectionManager: NodeConnectionManager; - db: DB; - connectionInfoGet: ConnectionInfoGet; - logger: Logger; -}) { - return async ( - call: grpc.ServerUnaryCall, - callback: grpc.sendUnaryData, - ): Promise => { - try { - const response = new utilsPB.EmptyMessage(); - const { - targetId, - sourceId, - }: { - targetId: NodeId; - sourceId: NodeId; - } = validateSync( - (keyPath, value) => { - return matchSync(keyPath)( - [ - ['targetId'], - ['sourceId'], - () => validationUtils.parseNodeId(value), - ], - () => value, - ); - }, - { - targetId: call.request.getTargetId(), - sourceId: call.request.getSrcId(), - }, - ); - const connectionInfo = connectionInfoGet(call); - const srcNodeId = nodesUtils.encodeNodeId(connectionInfo!.remoteNodeId); - // Firstly, check if this node is the desired node - // If so, then we want to make this node start sending hole punching packets - // back to the source node. - await db.withTransactionF(async (tran) => { - if (keyRing.getNodeId().equals(targetId)) { - if (call.request.getProxyAddress() !== '') { - const [host, port] = networkUtils.parseAddress( - call.request.getProxyAddress(), - ); - logger.debug( - `Received signaling message to target ${call.request.getSrcId()}@${host}:${port}`, - ); - // Ignore failure - try { - await nodeConnectionManager.holePunchReverse(host, port); - } catch { - // Do nothing - } - } else { - logger.error( - 'Received signaling message, target information was missing, skipping reverse hole punch', - ); - } - } else if (await nodeManager.knowsNode(sourceId, tran)) { - // Otherwise, find if node in table - // If so, ask the nodeManager to relay to the node - const targetNodeId = call.request.getTargetId(); - const proxyAddress = networkUtils.buildAddress( - connectionInfo!.remoteHost, - connectionInfo!.remotePort, - ); - // Checking if the source and destination are the same - if (sourceId?.equals(targetId)) { - // Logging and silently dropping operation - logger.warn('Signaling relay message requested signal to itself'); - callback(null, response); - return; - } - call.request.setProxyAddress(proxyAddress); - logger.debug( - `Relaying signaling message from ${srcNodeId}@${ - connectionInfo!.remoteHost - }:${ - connectionInfo!.remotePort - } to ${targetNodeId} with information ${proxyAddress}`, - ); - await nodeConnectionManager.relaySignalingMessage(call.request, { - host: connectionInfo!.remoteHost, - port: connectionInfo!.remotePort, - }); - } - }); - callback(null, response); - return; - } catch (e) { - callback(grpcUtils.fromError(e, true)); - !agentUtils.isAgentClientError(e) && - logger.error(`${nodesHolePunchMessageSend.name}:${e}`); - return; - } - }; -} - -export default nodesHolePunchMessageSend; diff --git a/src/agent/service/notificationsSend.ts b/src/agent/service/notificationsSend.ts deleted file mode 100644 index ea5cca0e4..000000000 --- a/src/agent/service/notificationsSend.ts +++ /dev/null @@ -1,58 +0,0 @@ -import type * as grpc from '@grpc/grpc-js'; -import type NotificationsManager from '../../notifications/NotificationsManager'; -import type * as notificationsPB from '../../proto/js/polykey/v1/notifications/notifications_pb'; -import type Logger from '@matrixai/logger'; -import type { DB } from '@matrixai/db'; -import type { SignedNotification } from '../../notifications/types'; -import type KeyRing from '../../keys/KeyRing'; -import * as grpcUtils from '../../grpc/utils'; -import * as notificationsUtils from '../../notifications/utils'; -import * as notificationsErrors from '../../notifications/errors'; -import * as utilsPB from '../../proto/js/polykey/v1/utils/utils_pb'; -import * as agentUtils from '../utils'; - -function notificationsSend({ - notificationsManager, - db, - keyRing, - logger, -}: { - notificationsManager: NotificationsManager; - db: DB; - keyRing: KeyRing; - logger: Logger; -}) { - return async ( - call: grpc.ServerUnaryCall< - notificationsPB.AgentNotification, - utilsPB.EmptyMessage - >, - callback: grpc.sendUnaryData, - ): Promise => { - try { - const signedNotification = - call.request.getContent() as SignedNotification; - const notification = await notificationsUtils.verifyAndDecodeNotif( - signedNotification, - keyRing.getNodeId(), - ); - await db.withTransactionF((tran) => - notificationsManager.receiveNotification(notification, tran), - ); - const response = new utilsPB.EmptyMessage(); - callback(null, response); - return; - } catch (e) { - callback(grpcUtils.fromError(e, true)); - !agentUtils.isAgentClientError(e, [ - notificationsErrors.ErrorNotificationsInvalidType, - notificationsErrors.ErrorNotificationsValidationFailed, - notificationsErrors.ErrorNotificationsParse, - notificationsErrors.ErrorNotificationsPermissionsNotFound, - ]) && logger.error(`${notificationsSend.name}:${e}`); - return; - } - }; -} - -export default notificationsSend; diff --git a/src/agent/service/vaultsGitInfoGet.ts b/src/agent/service/vaultsGitInfoGet.ts deleted file mode 100644 index 0fb18c96a..000000000 --- a/src/agent/service/vaultsGitInfoGet.ts +++ /dev/null @@ -1,120 +0,0 @@ -import type { DB } from '@matrixai/db'; -import type { VaultName, VaultAction } from '../../vaults/types'; -import type VaultManager from '../../vaults/VaultManager'; -import type ACL from '../../acl/ACL'; -import type { ConnectionInfoGet } from '../../agent/types'; -import type Logger from '@matrixai/logger'; -import * as grpc from '@grpc/grpc-js'; -import * as grpcUtils from '../../grpc/utils'; -import * as vaultsUtils from '../../vaults/utils'; -import * as vaultsErrors from '../../vaults/errors'; -import * as vaultsPB from '../../proto/js/polykey/v1/vaults/vaults_pb'; -import { validateSync } from '../../validation'; -import * as validationUtils from '../../validation/utils'; -import * as nodesUtils from '../../nodes/utils'; -import { matchSync } from '../../utils'; -import * as agentErrors from '../errors'; -import * as agentUtils from '../utils'; - -function vaultsGitInfoGet({ - vaultManager, - acl, - db, - logger, - connectionInfoGet, -}: { - vaultManager: VaultManager; - acl: ACL; - db: DB; - logger: Logger; - connectionInfoGet: ConnectionInfoGet; -}) { - return async ( - call: grpc.ServerWritableStream, - ): Promise => { - const genWritable = grpcUtils.generatorWritable(call, true); - try { - await db.withTransactionF(async (tran) => { - const vaultIdFromName = await vaultManager.getVaultId( - call.request.getVault()?.getNameOrId() as VaultName, - tran, - ); - const vaultId = - vaultIdFromName ?? - vaultsUtils.decodeVaultId(call.request.getVault()?.getNameOrId()); - if (vaultId == null) { - throw new vaultsErrors.ErrorVaultsVaultUndefined(); - } - const { - actionType, - }: { - actionType: VaultAction; - } = validateSync( - (keyPath, value) => { - return matchSync(keyPath)( - [['actionType'], () => validationUtils.parseVaultAction(value)], - () => value, - ); - }, - { - actionType: call.request.getAction(), - }, - ); - const vaultName = (await vaultManager.getVaultMeta(vaultId, tran)) - ?.vaultName; - if (vaultName == null) { - throw new vaultsErrors.ErrorVaultsVaultUndefined(); - } - // Getting the NodeId from the ReverseProxy connection info - const connectionInfo = connectionInfoGet(call); - // If this is getting run the connection exists - // It SHOULD exist here - if (connectionInfo == null) { - throw new agentErrors.ErrorConnectionInfoMissing(); - } - const nodeId = connectionInfo.remoteNodeId; - const nodeIdEncoded = nodesUtils.encodeNodeId(nodeId); - const permissions = await acl.getNodePerm(nodeId, tran); - if (permissions == null) { - throw new vaultsErrors.ErrorVaultsPermissionDenied( - `No permissions found for ${nodeIdEncoded}`, - ); - } - const vaultPerms = permissions.vaults[vaultId]; - if (vaultPerms?.[actionType] !== null) { - throw new vaultsErrors.ErrorVaultsPermissionDenied( - `${nodeIdEncoded} does not have permission to ${actionType} from vault ${vaultsUtils.encodeVaultId( - vaultId, - )}`, - ); - } - const meta = new grpc.Metadata(); - meta.set('vaultName', vaultName); - meta.set('vaultId', vaultsUtils.encodeVaultId(vaultId)); - genWritable.stream.sendMetadata(meta); - const response = new vaultsPB.PackChunk(); - const responseGen = vaultManager.handleInfoRequest(vaultId, tran); - for await (const byte of responseGen) { - if (byte !== null) { - response.setChunk(byte); - await genWritable.next(response); - } else { - await genWritable.next(null); - } - } - }); - await genWritable.next(null); - return; - } catch (e) { - await genWritable.throw(e); - !agentUtils.isAgentClientError(e, [ - vaultsErrors.ErrorVaultsVaultUndefined, - agentErrors.ErrorConnectionInfoMissing, - vaultsErrors.ErrorVaultsPermissionDenied, - ]) && logger.error(`${vaultsGitInfoGet.name}:${e}`); - return; - } - }; -} - -export default vaultsGitInfoGet; diff --git a/src/agent/service/vaultsGitPackGet.ts b/src/agent/service/vaultsGitPackGet.ts deleted file mode 100644 index 3646fda07..000000000 --- a/src/agent/service/vaultsGitPackGet.ts +++ /dev/null @@ -1,133 +0,0 @@ -import type * as grpc from '@grpc/grpc-js'; -import type { DB } from '@matrixai/db'; -import type { VaultName, VaultAction } from '../../vaults/types'; -import type VaultManager from '../../vaults/VaultManager'; -import type { ConnectionInfoGet } from '../../agent/types'; -import type ACL from '../../acl/ACL'; -import type KeyRing from '../../keys/KeyRing'; -import type Logger from '@matrixai/logger'; -import * as nodesUtils from '../../nodes/utils'; -import * as grpcUtils from '../../grpc/utils'; -import * as vaultsErrors from '../../vaults/errors'; -import * as vaultsUtils from '../../vaults/utils'; -import * as vaultsPB from '../../proto/js/polykey/v1/vaults/vaults_pb'; -import { validateSync } from '../../validation'; -import * as validationUtils from '../../validation/utils'; -import { matchSync } from '../../utils'; -import * as agentErrors from '../errors'; -import * as agentUtils from '../utils'; - -function vaultsGitPackGet({ - vaultManager, - acl, - db, - keyRing, - logger, - connectionInfoGet, -}: { - vaultManager: VaultManager; - acl: ACL; - db: DB; - keyRing: KeyRing; - logger: Logger; - connectionInfoGet: ConnectionInfoGet; -}) { - return async ( - call: grpc.ServerDuplexStream, - ): Promise => { - const nodeId = keyRing.getNodeId(); - const genDuplex = grpcUtils.generatorDuplex( - call, - { nodeId, command: vaultsGitPackGet.name }, - true, - ); - try { - const clientBodyBuffers: Uint8Array[] = []; - const clientRequest = (await genDuplex.read()).value; - clientBodyBuffers.push(clientRequest!.getChunk_asU8()); - const body = Buffer.concat(clientBodyBuffers); - const meta = call.metadata; - // Getting the NodeId from the ReverseProxy connection info - const connectionInfo = connectionInfoGet(call); - // If this is getting run the connection exists - // It SHOULD exist here - if (connectionInfo == null) { - throw new agentErrors.ErrorConnectionInfoMissing(); - } - const nodeId = connectionInfo.remoteNodeId; - const nodeIdEncoded = nodesUtils.encodeNodeId(nodeId); - const nameOrId = meta.get('vaultNameOrId').pop()!.toString(); - await db.withTransactionF(async (tran) => { - const vaultIdFromName = await vaultManager.getVaultId( - nameOrId as VaultName, - tran, - ); - const vaultId = vaultIdFromName ?? vaultsUtils.decodeVaultId(nameOrId); - if (vaultId == null) { - throw new vaultsErrors.ErrorVaultsVaultUndefined(); - } - const { - actionType, - }: { - actionType: VaultAction; - } = validateSync( - (keyPath, value) => { - return matchSync(keyPath)( - [['actionType'], () => validationUtils.parseVaultAction(value)], - () => value, - ); - }, - { - actionType: meta.get('vaultAction').pop()!.toString(), - }, - ); - // Checking permissions - const permissions = await acl.getNodePerm(nodeId, tran); - const vaultPerms = permissions?.vaults[vaultId]; - if (vaultPerms?.[actionType] !== null) { - throw new vaultsErrors.ErrorVaultsPermissionDenied( - `${nodeIdEncoded} does not have permission to ${actionType} from vault ${vaultsUtils.encodeVaultId( - vaultId, - )}`, - ); - } - const response = new vaultsPB.PackChunk(); - const [sideBand, progressStream] = await vaultManager.handlePackRequest( - vaultId, - Buffer.from(body), - tran, - ); - response.setChunk(Buffer.from('0008NAK\n')); - await genDuplex.write(response); - const responseBuffers: Uint8Array[] = []; - await new Promise((resolve, reject) => { - sideBand.on('data', async (data: Uint8Array) => { - responseBuffers.push(data); - }); - sideBand.on('end', async () => { - response.setChunk(Buffer.concat(responseBuffers)); - await genDuplex.write(response); - resolve(); - }); - sideBand.on('error', (err) => { - reject(err); - }); - progressStream.write(Buffer.from('0014progress is at 50%\n')); - progressStream.end(); - }); - }); - await genDuplex.next(null); - return; - } catch (e) { - await genDuplex.throw(e); - !agentUtils.isAgentClientError(e, [ - agentErrors.ErrorConnectionInfoMissing, - vaultsErrors.ErrorVaultsPermissionDenied, - vaultsErrors.ErrorVaultsVaultUndefined, - ]) && logger.error(`${vaultsGitPackGet.name}:${e}`); - return; - } - }; -} - -export default vaultsGitPackGet; diff --git a/src/agent/service/vaultsScan.ts b/src/agent/service/vaultsScan.ts deleted file mode 100644 index f82719108..000000000 --- a/src/agent/service/vaultsScan.ts +++ /dev/null @@ -1,65 +0,0 @@ -import type * as grpc from '@grpc/grpc-js'; -import type { DB } from '@matrixai/db'; -import type VaultManager from '../../vaults/VaultManager'; -import type * as utilsPB from '../../proto/js/polykey/v1/utils/utils_pb'; -import type { ConnectionInfoGet } from '../../agent/types'; -import type Logger from '@matrixai/logger'; -import * as agentErrors from '../../agent/errors'; -import * as vaultsErrors from '../../vaults/errors'; -import * as vaultsPB from '../../proto/js/polykey/v1/vaults/vaults_pb'; -import * as vaultsUtils from '../../vaults/utils'; -import * as grpcUtils from '../../grpc/utils'; -import * as agentUtils from '../utils'; - -function vaultsScan({ - vaultManager, - logger, - connectionInfoGet, - db, -}: { - vaultManager: VaultManager; - logger: Logger; - connectionInfoGet: ConnectionInfoGet; - db: DB; -}) { - return async ( - call: grpc.ServerWritableStream, - ): Promise => { - const genWritable = grpcUtils.generatorWritable(call, true); - const listMessage = new vaultsPB.List(); - // Getting the NodeId from the ReverseProxy connection info - const connectionInfo = connectionInfoGet(call); - // If this is getting run the connection exists - // It SHOULD exist here - if (connectionInfo == null) { - throw new agentErrors.ErrorConnectionInfoMissing(); - } - const nodeId = connectionInfo.remoteNodeId; - try { - await db.withTransactionF(async (tran) => { - const listResponse = vaultManager.handleScanVaults(nodeId, tran); - for await (const { - vaultId, - vaultName, - vaultPermissions, - } of listResponse) { - listMessage.setVaultId(vaultsUtils.encodeVaultId(vaultId)); - listMessage.setVaultName(vaultName); - listMessage.setVaultPermissionsList(vaultPermissions); - await genWritable.next(listMessage); - } - }); - await genWritable.next(null); - return; - } catch (e) { - await genWritable.throw(e); - !agentUtils.isAgentClientError(e, [ - agentErrors.ErrorConnectionInfoMissing, - vaultsErrors.ErrorVaultsPermissionDenied, - ]) && logger.error(`${vaultsScan.name}:${e}`); - return; - } - }; -} - -export default vaultsScan; diff --git a/src/agent/types.ts b/src/agent/types.ts index b6d0e0259..13f0b1ad3 100644 --- a/src/agent/types.ts +++ b/src/agent/types.ts @@ -1,14 +1,24 @@ -import type { ServerSurfaceCall } from '@grpc/grpc-js/build/src/server-call'; -import type { Class } from '@matrixai/errors'; -import type { ConnectionInfo } from '../network/types'; -import type ErrorPolykey from '../ErrorPolykey'; +import type { JSONValue } from '../types'; -type ConnectionInfoGet = ( - call: ServerSurfaceCall, -) => ConnectionInfo | undefined; +// eslint-disable-next-line +type NoData = {}; -type AgentClientErrors = Array< - Class> | Array>> ->; +type AgentRPCRequestParams = NoData> = { + metadata?: { + [Key: string]: JSONValue; + } & Partial<{ + authorization: string; + timeout: number; + }>; +} & Omit; -export type { ConnectionInfoGet, AgentClientErrors }; +type AgentRPCResponseResult = NoData> = { + metadata?: { + [Key: string]: JSONValue; + } & Partial<{ + authorization: string; + timeout: number; + }>; +} & Omit; + +export type { AgentRPCRequestParams, AgentRPCResponseResult, NoData }; diff --git a/src/agent/utils.ts b/src/agent/utils.ts deleted file mode 100644 index 48f9ff59f..000000000 --- a/src/agent/utils.ts +++ /dev/null @@ -1,85 +0,0 @@ -import type { ServerSurfaceCall } from '@grpc/grpc-js/build/src/server-call'; -import type ErrorPolykey from '../ErrorPolykey'; -import type { Host, Port } from '../network/types'; -import type Proxy from '../network/Proxy'; -import type { ConnectionInfoGet, AgentClientErrors } from './types'; -import * as validationErrors from '../validation/errors'; - -/** - * Array of errors that are always considered to be "client errors" - * (4xx errors in HTTP) in the context of the agent service - */ -const defaultClientErrors: AgentClientErrors = [ - validationErrors.ErrorValidation, -]; - -function connectionInfoGetter(proxy: Proxy): ConnectionInfoGet { - return (call: ServerSurfaceCall) => { - let urlString = call.getPeer(); - if (!/^.*:\/\//.test(urlString)) urlString = 'pk://' + urlString; - const url = new URL(urlString); - return proxy.getConnectionInfoByReverse( - url.hostname as Host, - parseInt(url.port) as Port, - ); - }; -} - -/** - * Checks whether an error is a "client error" (4xx errors in HTTP) - * Used by the service handlers since client errors should not be - * reported on the server side - * Additional errors that are considered to be client errors in the - * context of a given handler can be supplied in the `extraClientErrors` - * argument - */ -function isAgentClientError( - thrownError: ErrorPolykey, - extraClientErrors?: AgentClientErrors, -): boolean { - for (const error of defaultClientErrors) { - if (Array.isArray(error)) { - let e = thrownError; - let matches = true; - for (const eType of error) { - if (e == null) { - matches = false; - break; - } - if (!(e instanceof eType)) { - matches = false; - break; - } - e = e.cause; - } - if (matches) return true; - } else if (thrownError instanceof error) { - return true; - } - } - if (extraClientErrors) { - for (const error of extraClientErrors) { - if (Array.isArray(error)) { - let e = thrownError; - let matches = true; - for (const eType of error) { - if (e == null) { - matches = false; - break; - } - if (!(e instanceof eType)) { - matches = false; - break; - } - e = e.cause; - } - if (matches) return true; - } else if (thrownError instanceof error) { - return true; - } - } - } - return false; -} - -export { connectionInfoGetter, isAgentClientError };