From 7fc723f32faeeb071822a425916bb2003c1919ba Mon Sep 17 00:00:00 2001 From: Roy Razon Date: Wed, 26 Jul 2023 16:30:42 +0300 Subject: [PATCH] compose tunnel agent: add docker API - passthrough API to docker on port 3001 - websocket impl for exec and logs --- packages/compose-tunnel-agent/Dockerfile.dev | 3 + .../docker-compose.override.yml | 3 +- .../compose-tunnel-agent/docker-compose.yml | 1 + packages/compose-tunnel-agent/index.ts | 65 +++-- packages/compose-tunnel-agent/jest.config.js | 5 + packages/compose-tunnel-agent/package.json | 15 +- .../compose-tunnel-agent/src/api-server.ts | 55 ++-- .../src/docker-proxy/index.test.ts | 237 ++++++++++++++++++ .../src/docker-proxy/index.ts | 82 ++++++ .../src/docker-proxy/ws/handler.ts | 32 +++ .../src/docker-proxy/ws/handlers/exec.ts | 42 ++++ .../src/docker-proxy/ws/handlers/logs.ts | 39 +++ .../src/docker-proxy/ws/index.ts | 10 + packages/compose-tunnel-agent/src/docker.ts | 1 + packages/compose-tunnel-agent/src/http.ts | 60 +++++ .../compose-tunnel-agent/src/query-params.ts | 20 ++ .../core/src/compose-tunnel-agent-client.ts | 24 +- tunnel-server/src/app.ts | 10 +- tunnel-server/src/proxy.ts | 55 ++-- tunnel-server/src/ssh-server.ts | 4 +- yarn.lock | 14 +- 21 files changed, 692 insertions(+), 85 deletions(-) create mode 100644 packages/compose-tunnel-agent/Dockerfile.dev create mode 100644 packages/compose-tunnel-agent/jest.config.js create mode 100644 packages/compose-tunnel-agent/src/docker-proxy/index.test.ts create mode 100644 packages/compose-tunnel-agent/src/docker-proxy/index.ts create mode 100644 packages/compose-tunnel-agent/src/docker-proxy/ws/handler.ts create mode 100644 packages/compose-tunnel-agent/src/docker-proxy/ws/handlers/exec.ts create mode 100644 packages/compose-tunnel-agent/src/docker-proxy/ws/handlers/logs.ts create mode 100644 packages/compose-tunnel-agent/src/docker-proxy/ws/index.ts create mode 100644 packages/compose-tunnel-agent/src/http.ts create mode 100644 packages/compose-tunnel-agent/src/query-params.ts diff --git a/packages/compose-tunnel-agent/Dockerfile.dev b/packages/compose-tunnel-agent/Dockerfile.dev new file mode 100644 index 00000000..be1f7a6c --- /dev/null +++ b/packages/compose-tunnel-agent/Dockerfile.dev @@ -0,0 +1,3 @@ +FROM node:18-alpine as development +WORKDIR /app +CMD [ "yarn", "-s", "dev" ] diff --git a/packages/compose-tunnel-agent/docker-compose.override.yml b/packages/compose-tunnel-agent/docker-compose.override.yml index 7686d905..d4852a01 100644 --- a/packages/compose-tunnel-agent/docker-compose.override.yml +++ b/packages/compose-tunnel-agent/docker-compose.override.yml @@ -4,8 +4,7 @@ services: preevy_proxy: build: context: . - target: development + dockerfile: Dockerfile.dev volumes: - ${HOME}/.ssh:/root/.ssh - diff --git a/packages/compose-tunnel-agent/docker-compose.yml b/packages/compose-tunnel-agent/docker-compose.yml index 09f8af62..47f21786 100644 --- a/packages/compose-tunnel-agent/docker-compose.yml +++ b/packages/compose-tunnel-agent/docker-compose.yml @@ -17,6 +17,7 @@ services: ports: - 3000 + - 3001 # healthcheck: # test: wget --no-verbose --tries=1 --spider http://localhost:3000/healthz || exit 1 diff --git a/packages/compose-tunnel-agent/index.ts b/packages/compose-tunnel-agent/index.ts index f7af93f0..1ea29b72 100644 --- a/packages/compose-tunnel-agent/index.ts +++ b/packages/compose-tunnel-agent/index.ts @@ -10,8 +10,10 @@ import { ConnectionCheckResult, requiredEnv, checkConnection, formatPublicKey, p import createDockerClient from './src/docker' import createApiServer from './src/api-server' import { sshClient as createSshClient } from './src/ssh' +import { createDockerProxy } from './src/docker-proxy' const homeDir = process.env.HOME || '/root' +const dockerSocket = '/var/run/docker.sock' const readDir = async (dir: string) => { try { @@ -70,11 +72,11 @@ const formatConnectionCheckResult = ( const writeLineToStdout = (s: string) => [s, EOL].forEach(d => process.stdout.write(d)) -const main = async () => { - const log = pino({ - level: process.env.DEBUG || process.env.DOCKER_PROXY_DEBUG ? 'debug' : 'info', - }, pinoPretty({ destination: pino.destination(process.stderr) })) +const log = pino({ + level: process.env.DEBUG || process.env.DOCKER_PROXY_DEBUG ? 'debug' : 'info', +}, pinoPretty({ destination: pino.destination(process.stderr) })) +const main = async () => { const { connectionConfig, sshUrl } = await sshConnectionConfigFromEnv() log.debug('ssh config: %j', { @@ -92,20 +94,21 @@ const main = async () => { process.exit(0) } - const docker = new Docker({ socketPath: '/var/run/docker.sock' }) + const docker = new Docker({ socketPath: dockerSocket }) const dockerClient = createDockerClient({ log: log.child({ name: 'docker' }), docker, debounceWait: 500 }) + const sshLog = log.child({ name: 'ssh' }) const sshClient = await createSshClient({ connectionConfig, tunnelNameResolver: tunnelNameResolver({ userDefinedSuffix: process.env.TUNNEL_URL_SUFFIX }), - log: log.child({ name: 'ssh' }), + log: sshLog, onError: err => { log.error(err) process.exit(1) }, }) - log.info('ssh client connected to %j', sshUrl) + sshLog.info('ssh client connected to %j', sshUrl) let currentTunnels = dockerClient.getRunningServices().then(services => sshClient.updateTunnels(services)) void dockerClient.startListening({ @@ -115,25 +118,51 @@ const main = async () => { }, }) - const listenAddress = process.env.PORT ?? 3000 - if (typeof listenAddress === 'string' && Number.isNaN(Number(listenAddress))) { - await rimraf(listenAddress) + const apiListenAddress = process.env.PORT ?? 3000 + if (typeof apiListenAddress === 'string' && Number.isNaN(Number(apiListenAddress))) { + await rimraf(apiListenAddress) } + const apiServerLog = log.child({ name: 'api' }) const apiServer = createApiServer({ - log: log.child({ name: 'api' }), - currentSshState: async () => ( - await currentTunnels - ), + log: apiServerLog, + currentSshState: async () => (await currentTunnels), }) - .listen(listenAddress, () => { - log.info(`listening on ${inspect(apiServer.address())}`) + .listen(apiListenAddress, () => { + apiServerLog.info(`API server listening on ${inspect(apiServer.address())}`) }) .on('error', err => { - log.error(err) + apiServerLog.error(err) + process.exit(1) + }) + .unref() + + const dockerProxyListenAddress = process.env.DOCKER_PROXY_PORT ?? 3001 + if (typeof dockerProxyListenAddress === 'string' && Number.isNaN(Number(dockerProxyListenAddress))) { + await rimraf(dockerProxyListenAddress) + } + + const dockerProxyLog = log.child({ name: 'docker-proxy' }) + const dockerProxyServer = createDockerProxy({ + log: dockerProxyLog, + dockerSocket, + docker, + }) + .listen(dockerProxyListenAddress, () => { + dockerProxyLog.info(`Docker proxy listening on ${inspect(dockerProxyServer.address())}`) + }) + .on('error', err => { + dockerProxyLog.error(err) process.exit(1) }) .unref() } -void main() +void main(); + +['SIGTERM', 'SIGINT'].forEach(signal => { + process.once(signal, async () => { + log.info(`shutting down on ${signal}`) + process.exit(0) + }) +}) diff --git a/packages/compose-tunnel-agent/jest.config.js b/packages/compose-tunnel-agent/jest.config.js new file mode 100644 index 00000000..b413e106 --- /dev/null +++ b/packages/compose-tunnel-agent/jest.config.js @@ -0,0 +1,5 @@ +/** @type {import('ts-jest').JestConfigWithTsJest} */ +module.exports = { + preset: 'ts-jest', + testEnvironment: 'node', +}; \ No newline at end of file diff --git a/packages/compose-tunnel-agent/package.json b/packages/compose-tunnel-agent/package.json index dcab1ad6..98dc9d69 100644 --- a/packages/compose-tunnel-agent/package.json +++ b/packages/compose-tunnel-agent/package.json @@ -17,12 +17,16 @@ "pino": "^8.11.0", "pino-pretty": "^9.4.0", "rimraf": "^5.0.0", - "ssh2": "^1.12.0" + "ssh2": "^1.12.0", + "ws": "^8.13.0" }, "devDependencies": { + "@jest/globals": "^29.5.0", "@types/dockerode": "^3.3.14", + "@types/http-proxy": "^1.17.9", "@types/lodash": "^4.14.192", "@types/node": "18", + "@types/node-fetch": "^2.6.3", "@types/shell-escape": "^0.2.1", "@types/ssh2": "^1.11.8", "@typescript-eslint/eslint-plugin": "^5.55.0", @@ -30,18 +34,21 @@ "esbuild": "^0.17.14", "eslint": "^8.36.0", "husky": "^8.0.0", + "jest": "^29.4.3", "lint-staged": "^13.1.2", + "node-fetch": "2.6.9", "tsx": "^3.12.3", - "typescript": "^5.0.4" + "typescript": "^5.0.4", + "wait-for-expect": "^3.0.2" }, "scripts": { "start": "node out/index.js", "dev": "tsx watch ./index.ts", "lint": "eslint . --ext .ts,.tsx --cache", "clean": "rm -rf dist out", - "build": "node --version && node build.mjs", + "build": "yarn tsc --noEmit && node build.mjs", "prepack": "yarn build", "prepare": "cd ../.. && husky install", - "bump-to": "yarn version --no-commit-hooks --no-git-tag-version --new-version" + "test": "yarn jest" } } diff --git a/packages/compose-tunnel-agent/src/api-server.ts b/packages/compose-tunnel-agent/src/api-server.ts index f7fc05d8..5dba60f0 100644 --- a/packages/compose-tunnel-agent/src/api-server.ts +++ b/packages/compose-tunnel-agent/src/api-server.ts @@ -1,45 +1,32 @@ import http from 'node:http' +import url from 'node:url' import { Logger } from '@preevy/common' -import { SshState } from './ssh/index' +import { SshState } from './ssh' +import { NotFoundError, respondAccordingToAccept, respondJson, tryHandler } from './http' -const respond = (res: http.ServerResponse, content: string, type = 'text/plain', status = 200) => { - res.writeHead(status, { 'Content-Type': type }) - res.end(content) -} - -const respondJson = ( - res: http.ServerResponse, - content: unknown, - status = 200, -) => respond(res, JSON.stringify(content), 'application/json', status) - -const respondNotFound = (res: http.ServerResponse) => respond(res, 'Not found', 'text/plain', 404) - -const createApiServer = ({ - log, currentSshState, -}: { +const createApiServer = ({ log, currentSshState }: { log: Logger currentSshState: ()=> Promise -}) => http.createServer(async (req, res) => { - log.debug('web request URL: %j', req.url) +}) => { + const server = http.createServer(tryHandler({ log }, async (req, res) => { + log.debug('api request: %s %s', req.method || '', req.url || '') + + const { pathname: path } = url.parse(req.url || '') - if (!req.url) { - respondNotFound(res) - return - } - const [path] = req.url.split('?') + if (path === '/tunnels') { + respondJson(res, await currentSshState()) + return + } - if (path === '/tunnels') { - respondJson(res, await currentSshState()) - return - } + if (path === '/healthz') { + respondAccordingToAccept(req, res, 'OK') + return + } - if (path === '/healthz') { - respond(res, 'OK') - return - } + throw new NotFoundError() + })) - respondNotFound(res) -}) + return server +} export default createApiServer diff --git a/packages/compose-tunnel-agent/src/docker-proxy/index.test.ts b/packages/compose-tunnel-agent/src/docker-proxy/index.test.ts new file mode 100644 index 00000000..df2063e1 --- /dev/null +++ b/packages/compose-tunnel-agent/src/docker-proxy/index.test.ts @@ -0,0 +1,237 @@ +import http from 'node:http' +import net from 'node:net' +import { describe, expect, beforeAll, afterAll, test, jest, it } from '@jest/globals' +import { ChildProcess, spawn, exec } from 'child_process' +import pino from 'pino' +import pinoPretty from 'pino-pretty' +import Dockerode from 'dockerode' +import fetch from 'node-fetch' +import { inspect, promisify } from 'node:util' +import waitForExpect from 'wait-for-expect' +import WebSocket from 'ws' +import { createDockerProxy } from '.' + +const doFetch = async (...args: Parameters) => { + const r = await fetch(...args) + if (!r.ok) { + throw new Error(`Fetch ${inspect(args)} failed: ${r.status} ${r.statusText}: ${await r.text()}`) + } + return await r.json() +} + +type OpenWebSocket = { + ws: WebSocket + receivedBuffers: Buffer[] + close: () => Promise + send: (data: string | Buffer) => Promise +} + +const openWebSocket = (url: string) => new Promise((resolve, reject) => { + const receivedBuffers: Buffer[] = [] + new WebSocket(url) + .on('error', reject) + .on('message', data => { + if (Buffer.isBuffer(data)) { + receivedBuffers.push(data) + } else if (Array.isArray(data)) { + receivedBuffers.push(...data) + } else { + receivedBuffers.push(Buffer.from(data)) + } + }) + .on('open', function onOpen() { + resolve({ + ws: this, + receivedBuffers, + close: () => new Promise(resolveClose => { + this.close() + this.once('close', () => { resolveClose() }) + }), + send: promisify(this.send.bind(this)), + }) + }) +}) + +describe('docker proxy', () => { + let dockerProcess: ChildProcess + let containerName: string + let output: Buffer[] + jest.setTimeout(100000) + + const log = pino({ + level: 'debug', + }, pinoPretty({ destination: pino.destination(process.stderr) })) + + beforeAll(() => { + containerName = `test-docker-proxy-${Math.random().toString(36).substring(2, 9)}` + output = [] + dockerProcess = spawn( + 'docker', + [ + ...`run --rm --name ${containerName} busybox sh -c`.split(' '), + 'while true; do echo "hello stdout"; >&2 echo "hello stderr"; sleep 0.1; done', + ] + ) + dockerProcess.stdout?.on('data', data => { output.push(data) }) + dockerProcess.stderr?.on('data', data => { output.push(data) }) + return new Promise((resolve, reject) => { + dockerProcess.stdout?.once('data', () => { resolve() }) + dockerProcess.once('error', reject) + dockerProcess.once('exit', (code, signal) => reject(new Error(`docker exited with code ${code} and signal ${signal}: ${Buffer.concat(output).toString('utf-8')}`))) + }) + }) + + afterAll(async () => { + dockerProcess.kill() + await promisify(exec)(`docker rm -f ${containerName}`) + }) + + let dp: http.Server + let dpPort: number + + beforeAll(async () => { + const docker = new Dockerode() + dp = createDockerProxy({ log, docker, dockerSocket: '/var/run/docker.sock' }) + dpPort = await new Promise(resolve => { + dp.listen(0, () => { + resolve((dp.address() as net.AddressInfo).port) + }) + }) + }) + + afterAll(async () => { + await new Promise((resolve, reject) => { + dp.close(err => { + if (err) { + reject(err) + } else { + resolve() + } + }) + }) + }) + + const waitForContainerId = async () => { + let containerId = '' + await waitForExpect(async () => { + const containers = await doFetch(`http://localhost:${dpPort}/containers/json`) as { Id: string; Names: string[] }[] + const container = containers.find(({ Names: names }) => names.includes(`/${containerName}`)) + expect(container).toBeDefined() + containerId = container?.Id as string + }, 3000, 100) + return containerId + } + + test('use the docker API', async () => { + expect(await waitForContainerId()).toBeDefined() + }) + + describe('exec', () => { + const createExec = async (containerId: string, tty: boolean) => { + const { Id: execId } = await doFetch(`http://localhost:${dpPort}/containers/${containerId}/exec`, { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + }, + body: JSON.stringify({ + AttachStdin: true, + AttachStdout: true, + AttachStderr: true, + Tty: tty, + Cmd: ['sh'], + }), + }) + + return execId + } + + let execId: string + let containerId: string + + beforeAll(async () => { + containerId = await waitForContainerId() + }) + + describe('tty=true', () => { + beforeAll(async () => { + execId = await createExec(containerId, true) + }) + + it('should communicate via websocket', async () => { + const { receivedBuffers, send, close } = await openWebSocket(`ws://localhost:${dpPort}/exec/${execId}/start`) + await waitForExpect(() => expect(receivedBuffers.length).toBeGreaterThan(0)) + await send('ls\n') + await waitForExpect(() => { + const received = Buffer.concat(receivedBuffers).toString('utf-8') + expect(received).toContain('#') + expect(received).toContain('ls') + expect(received).toContain('bin') + }) + await close() + }) + }) + + describe('tty=false', () => { + beforeAll(async () => { + execId = await createExec(containerId, false) + }) + + it('should communicate via websocket', async () => { + const { receivedBuffers, send, close } = await openWebSocket(`ws://localhost:${dpPort}/exec/${execId}/start`) + await waitForExpect(async () => { + await send('ls\n') + const received = Buffer.concat(receivedBuffers).toString('utf-8') + expect(received).toContain('bin') + }) + await close() + }) + }) + }) + + describe('logs', () => { + let containerId: string + beforeAll(async () => { + containerId = await waitForContainerId() + }) + + const logStreams = ['stdout', 'stderr'] as const + type LogStream = typeof logStreams[number] + + const testStream = (...s: LogStream[]) => { + describe(`${s.join(' and ')}`, () => { + it(`should show the ${s.join(' and ')} logs via websocket`, async () => { + const { receivedBuffers, close } = await openWebSocket(`ws://localhost:${dpPort}/containers/${containerId}/logs?${s.map(st => `${st}=true`).join('&')}`) + await waitForExpect(() => expect(receivedBuffers.length).toBeGreaterThan(0)) + const length1 = receivedBuffers.length + await waitForExpect(() => { + const received = Buffer.concat(receivedBuffers).toString('utf-8') + s.forEach(st => { + expect(received).toContain(`hello ${st}`) + }) + logStreams.filter(st => !s.includes(st)).forEach(st => { + expect(received).not.toContain(`hello ${st}`) + }) + }) + await waitForExpect(() => { + expect(receivedBuffers.length).toBeGreaterThan(length1) + }) + await close() + }) + }) + } + + testStream('stdout') + testStream('stderr') + testStream('stdout', 'stderr') + + describe('timestamps', () => { + it('should show the logs with a timestamp', async () => { + const { receivedBuffers, close } = await openWebSocket(`ws://localhost:${dpPort}/containers/${containerId}/logs?stdout=true×tamps=true`) + await waitForExpect(() => expect(receivedBuffers.length).toBeGreaterThan(0)) + const received = Buffer.concat(receivedBuffers).toString('utf-8') + expect(received).toMatch(/^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}.\d*Z/) + await close() + }) + }) + }) +}) diff --git a/packages/compose-tunnel-agent/src/docker-proxy/index.ts b/packages/compose-tunnel-agent/src/docker-proxy/index.ts new file mode 100644 index 00000000..152f7977 --- /dev/null +++ b/packages/compose-tunnel-agent/src/docker-proxy/index.ts @@ -0,0 +1,82 @@ +import net from 'node:net' +import http from 'node:http' +import HttpProxy from 'http-proxy' +import { Logger } from 'pino' +import { inspect } from 'node:util' +import { WebSocketServer } from 'ws' +import Dockerode from 'dockerode' +import { findHandler, handlers as wsHandlers } from './ws' + +export const createDockerProxy = ( + { log, dockerSocket, docker }: { log: Logger; dockerSocket: string; docker: Dockerode }, +) => { + const proxy = new HttpProxy({ + // eslint-disable-next-line @typescript-eslint/ban-ts-comment + // @ts-ignore + target: { + socketPath: dockerSocket, + }, + }) + + const wss = new WebSocketServer({ noServer: true }) + + wss.on('connection', async (ws, req) => { + const foundHandler = findHandler(wsHandlers, req) + if (!foundHandler) { + ws.close(404, 'Not found') + return undefined + } + + await foundHandler.handler.handler(ws, req, foundHandler.match, { log, docker }) + return undefined + }) + + const server = http.createServer((req, res) => { + log.debug('request %s %s', req.method, req.url) + proxy.web(req, res) + }) + + server.on('upgrade', (req, socket, head) => { + log.debug('upgrade %s %s', req.method || '', req.url || '') + const upgrade = req.headers.upgrade?.toLowerCase() + + if (upgrade === 'websocket') { + if (findHandler(wsHandlers, req)) { + wss.handleUpgrade(req, socket, head, client => { + wss.emit('connection', client, req) + }) + return undefined + } + + proxy.ws(req, socket, head, {}, err => { + log.warn('error in ws proxy %j', inspect(err)) + }) + return undefined + } + + if (upgrade === 'tcp') { + const targetSocket = net.createConnection({ path: dockerSocket }, () => { + const reqBuf = `${req.method} ${req.url} HTTP/${req.httpVersion}\r\n${Object.entries(req.headers).map(([k, v]) => `${k}: ${v}`).join('\r\n')}\r\n\r\n` + targetSocket.write(reqBuf) + targetSocket.write(head) + socket.pipe(targetSocket).pipe(socket) + }) + return undefined + } + + log.warn('invalid upgrade %s', upgrade) + socket.end(`Invalid upgrade ${upgrade}`) + return undefined + }) + + return server +} + +// export const createDockerProxy = ({ log: _log, dockerSocket }: { +// log: Logger +// dockerSocket: string +// }) => net.createServer(socket => { +// const dockerConnection = net.createConnection({ path: dockerSocket }, () => { +// socket.pipe(dockerConnection).pipe(socket) +// }) +// }) diff --git a/packages/compose-tunnel-agent/src/docker-proxy/ws/handler.ts b/packages/compose-tunnel-agent/src/docker-proxy/ws/handler.ts new file mode 100644 index 00000000..d3d3a845 --- /dev/null +++ b/packages/compose-tunnel-agent/src/docker-proxy/ws/handler.ts @@ -0,0 +1,32 @@ +import http from 'node:http' +import { Logger } from 'pino' +import WebSocket from 'ws' +import Dockerode from 'dockerode' + +type Context = { log: Logger; docker: Dockerode } +export type WsHandlerFunc = ( + ws: WebSocket, + req: http.IncomingMessage, + match: RegExpMatchArray, + { log, docker }: Context, +) => Promise + +export type WsHandler = { + matchRequest: RegExp + handler: WsHandlerFunc +} + +export const wsHandler = ( + matchRequest: RegExp, + handler: WsHandlerFunc +) => ({ matchRequest, handler }) + +export const findHandler = (handlers: WsHandler[], req: http.IncomingMessage) => { + for (const handler of handlers) { + const match = handler.matchRequest.exec(req.url ?? '') + if (match) { + return { handler, match } + } + } + return undefined +} diff --git a/packages/compose-tunnel-agent/src/docker-proxy/ws/handlers/exec.ts b/packages/compose-tunnel-agent/src/docker-proxy/ws/handlers/exec.ts new file mode 100644 index 00000000..0aae4705 --- /dev/null +++ b/packages/compose-tunnel-agent/src/docker-proxy/ws/handlers/exec.ts @@ -0,0 +1,42 @@ +import { inspect } from 'util' +import { createWebSocketStream } from 'ws' +import { parseQueryParams, queryParamBoolean } from '../../../query-params' +import { wsHandler } from '../handler' + +const handler = wsHandler( + /^\/exec\/([^/?]+)\/start($|\?)/, + async (ws, req, match, { log, docker }) => { + const id = match[1] + const { tty } = parseQueryParams(req.url ?? '') + const exec = docker.getExec(id) + const execStream = await exec.start({ + hijack: true, + stdin: true, + ...(tty !== undefined ? { Tty: queryParamBoolean(tty) } : {}), + }) + + execStream.on('close', () => { ws.close() }) + execStream.on('error', err => { log.warn('execStream error %j', inspect(err)) }) + ws.on('close', () => { execStream.destroy() }) + + const inspectResults = await exec.inspect() + log.debug('exec %s: %j', id, inspect(inspectResults)) + + const wsStream = createWebSocketStream(ws) + wsStream.on('error', err => { + const level = err.message.includes('WebSocket is not open') ? 'debug' : 'warn' + log[level]('wsStream error %j', inspect(err)) + }) + + if (inspectResults.ProcessConfig.tty) { + execStream.pipe(wsStream, { end: false }).pipe(execStream) + } else { + docker.modem.demuxStream(execStream, wsStream, wsStream) + wsStream.pipe(execStream) + } + + return undefined + }, +) + +export default handler diff --git a/packages/compose-tunnel-agent/src/docker-proxy/ws/handlers/logs.ts b/packages/compose-tunnel-agent/src/docker-proxy/ws/handlers/logs.ts new file mode 100644 index 00000000..ba96e6ae --- /dev/null +++ b/packages/compose-tunnel-agent/src/docker-proxy/ws/handlers/logs.ts @@ -0,0 +1,39 @@ +import { inspect } from 'util' +import { createWebSocketStream } from 'ws' +import { parseQueryParams, queryParamBoolean } from '../../../query-params' +import { wsHandler } from '../handler' + +const handler = wsHandler( + /^\/containers\/([^/?]+)\/logs($|\?)/, + async (ws, req, match, { log, docker }) => { + const id = match[1] + const { stdout, stderr, since, until, timestamps, tail } = parseQueryParams(req.url ?? '') + const abort = new AbortController() + const logStream = await docker.getContainer(id).logs({ + stdout: queryParamBoolean(stdout), + stderr: queryParamBoolean(stderr), + since, + until, + timestamps: queryParamBoolean(timestamps), + tail: tail !== undefined ? Number(tail) : undefined, + follow: true, + abortSignal: abort.signal, + }) + + logStream.on('close', async () => { ws.close() }) + logStream.on('error', err => { + if (err.message !== 'aborted') { + log.error('logs stream error %j', inspect(err)) + } + }) + ws.on('close', () => { abort.abort() }) + + const wsStream = createWebSocketStream(ws) + wsStream.on('error', err => { log.error('wsStream error %j', inspect(err)) }) + docker.modem.demuxStream(logStream, wsStream, wsStream) + + return undefined + }, +) + +export default handler diff --git a/packages/compose-tunnel-agent/src/docker-proxy/ws/index.ts b/packages/compose-tunnel-agent/src/docker-proxy/ws/index.ts new file mode 100644 index 00000000..c3acb87a --- /dev/null +++ b/packages/compose-tunnel-agent/src/docker-proxy/ws/index.ts @@ -0,0 +1,10 @@ +import { WsHandler } from './handler' +import exec from './handlers/exec' +import logs from './handlers/logs' + +export const handlers: WsHandler[] = [ + exec, + logs, +] + +export { findHandler } from './handler' diff --git a/packages/compose-tunnel-agent/src/docker.ts b/packages/compose-tunnel-agent/src/docker.ts index 5eda239f..cfce3a42 100644 --- a/packages/compose-tunnel-agent/src/docker.ts +++ b/packages/compose-tunnel-agent/src/docker.ts @@ -60,6 +60,7 @@ const client = ({ stream.on('data', handler) log.info('listening on docker') void handler() + return { close: () => stream.removeAllListeners() } }, } } diff --git a/packages/compose-tunnel-agent/src/http.ts b/packages/compose-tunnel-agent/src/http.ts new file mode 100644 index 00000000..a3bc7ef6 --- /dev/null +++ b/packages/compose-tunnel-agent/src/http.ts @@ -0,0 +1,60 @@ +import { Logger } from '@preevy/common' +import http from 'node:http' +import { inspect } from 'node:util' + +export const respond = (res: http.ServerResponse, content: string, type = 'text/plain', status = 200) => { + res.writeHead(status, { 'Content-Type': type }) + res.end(content) +} + +export const respondJson = ( + res: http.ServerResponse, + content: unknown, + status = 200, +) => respond(res, JSON.stringify(content), 'application/json', status) + +export const respondAccordingToAccept = ( + req: http.IncomingMessage, + res: http.ServerResponse, + message: string, + status = 200, +) => (req.headers.accept?.toLowerCase().includes('json') + ? respondJson(res, { message }, status) + : respond(res, message, 'text/plain', status)) + +export class HttpError extends Error { + constructor(readonly status: number, readonly clientMessage: string, readonly cause?: unknown) { + super(clientMessage) + } +} + +export class NotFoundError extends HttpError { + static defaultMessage = 'Not found' + constructor(clientMessage = NotFoundError.defaultMessage) { + super(404, clientMessage) + } +} + +export class InternalError extends HttpError { + static status = 500 + static defaultMessage = 'Internal error' + constructor(err: unknown, clientMessage = InternalError.defaultMessage) { + super(500, clientMessage, err) + } +} + +export const tryHandler = ( + { log }: { log: Logger }, + f: (req: http.IncomingMessage, res: http.ServerResponse) => Promise +) => async (req: http.IncomingMessage, res: http.ServerResponse) => { + try { + await f(req, res) + } catch (err) { + const messageAndStatus: [string, number] = err instanceof HttpError + ? [err.clientMessage, err.status] + : [InternalError.defaultMessage, InternalError.status] + + respondAccordingToAccept(req, res, ...messageAndStatus) + log.warn('caught error: %j in %s %s', inspect(err), req.method || '', req.url || '') + } +} diff --git a/packages/compose-tunnel-agent/src/query-params.ts b/packages/compose-tunnel-agent/src/query-params.ts new file mode 100644 index 00000000..8edb39df --- /dev/null +++ b/packages/compose-tunnel-agent/src/query-params.ts @@ -0,0 +1,20 @@ +import { defaults } from 'lodash' +import url from 'node:url' + +export const parseQueryParams = < +T extends Record +>(requestUrl: string, defaultValues: Partial = {}) => { + const { search } = url.parse(requestUrl) + const queryParams = new URLSearchParams(search || '') + return defaults(Object.fromEntries(queryParams), defaultValues) +} + +export const queryParamBoolean = (v: string | boolean | undefined, defaultValue = false): boolean => { + if (typeof v === 'boolean') { + return v + } + if (typeof v === 'undefined' || v === '') { + return defaultValue + } + return v === '1' || v.toLowerCase() === 'true' +} diff --git a/packages/core/src/compose-tunnel-agent-client.ts b/packages/core/src/compose-tunnel-agent-client.ts index 780fbd47..6353bd40 100644 --- a/packages/core/src/compose-tunnel-agent-client.ts +++ b/packages/core/src/compose-tunnel-agent-client.ts @@ -1,6 +1,7 @@ import path from 'path' import fetch from 'node-fetch' import retry from 'p-retry' +import util from 'util' import { mapValues } from 'lodash' import { ComposeModel, ComposeService } from './compose/model' import { TunnelOpts } from './ssh/url' @@ -8,7 +9,8 @@ import { Tunnel } from './tunneling' import { withBasicAuthCredentials } from './url' export const COMPOSE_TUNNEL_AGENT_SERVICE_NAME = 'preevy_proxy' -export const COMPOSE_TUNNEL_AGENT_SERVICE_PORT = 3000 +export const COMPOSE_TUNNEL_AGENT_API_PORT = 3000 +export const COMPOSE_TUNNEL_AGENT_DOCKER_PROXY_PORT = 3001 const COMPOSE_TUNNEL_AGENT_DIR = path.join(path.dirname(require.resolve('@preevy/compose-tunnel-agent')), '..') const baseDockerProxyService: ComposeService = { @@ -62,6 +64,12 @@ export const addComposeTunnelAgentService = ( published: '0', protocol: 'tcp', }, + { + mode: 'ingress', + target: 3001, + published: '0', + protocol: 'tcp', + }, ], volumes: [ { @@ -89,7 +97,7 @@ export const addComposeTunnelAgentService = ( SSH_URL: tunnelOpts.url, TLS_SERVERNAME: tunnelOpts.tlsServerName, TUNNEL_URL_SUFFIX: urlSuffix, - PORT: COMPOSE_TUNNEL_AGENT_SERVICE_PORT.toString(), + PORT: COMPOSE_TUNNEL_AGENT_API_PORT.toString(), ...debug ? { DEBUG: '1' } : {}, HOME: '/preevy', }, @@ -108,10 +116,16 @@ export const queryTunnels = async ({ retryOpts?: retry.Options includeAccessCredentials: boolean }) => { - const serviceUrl = tunnelUrlsForService({ + const serviceUrls = tunnelUrlsForService({ name: COMPOSE_TUNNEL_AGENT_SERVICE_NAME, - ports: [COMPOSE_TUNNEL_AGENT_SERVICE_PORT], - })[0].url.replace(/\/$/, '') + ports: [COMPOSE_TUNNEL_AGENT_API_PORT, COMPOSE_TUNNEL_AGENT_DOCKER_PROXY_PORT], + }) + + const serviceUrl = serviceUrls.find(({ port }) => port === COMPOSE_TUNNEL_AGENT_API_PORT)?.url.replace(/\/$/, '') + + if (!serviceUrl) { + throw new Error(`Cannot find compose tunnel agent API service URL in: ${util.inspect(serviceUrls)}`) + } const addCredentials = withBasicAuthCredentials(credentials) const url = addCredentials(`${serviceUrl}/tunnels`) diff --git a/tunnel-server/src/app.ts b/tunnel-server/src/app.ts index 9cdaea0c..11cc4d2c 100644 --- a/tunnel-server/src/app.ts +++ b/tunnel-server/src/app.ts @@ -8,22 +8,26 @@ export const app = ({ isProxyRequest, proxyHandlers, logger }: { isProxyRequest: (req: http.IncomingMessage) => boolean logger: Logger proxyHandlers: { - wsHandler: (req: http.IncomingMessage, socket: internal.Duplex, head: Buffer) => void + upgradeHandler: (req: http.IncomingMessage, socket: internal.Duplex, head: Buffer) => void handler: (req: http.IncomingMessage, res: http.ServerResponse) => void } }) => Fastify({ serverFactory: handler => { - const { wsHandler: proxyWsHandler, handler: proxyHandler } = proxyHandlers + const { upgradeHandler: proxyUpgradeHandler, handler: proxyHandler } = proxyHandlers const server = http.createServer((req, res) => { + if (req.url !== '/healthz') { + logger.debug('request %j', { method: req.method, url: req.url, headers: req.headers }) + } if (isProxyRequest(req)) { return proxyHandler(req, res) } return handler(req, res) }) server.on('upgrade', (req, socket, head) => { + logger.debug('upgrade', req.url) if (isProxyRequest(req)) { - proxyWsHandler(req, socket, head) + proxyUpgradeHandler(req, socket, head) } else { logger.warn('unexpected upgrade request %j', { url: req.url, host: req.headers.host }) socket.end() diff --git a/tunnel-server/src/proxy.ts b/tunnel-server/src/proxy.ts index 051e8c58..cd14c950 100644 --- a/tunnel-server/src/proxy.ts +++ b/tunnel-server/src/proxy.ts @@ -1,5 +1,6 @@ import httpProxy from 'http-proxy' import { IncomingMessage, ServerResponse } from 'http' +import net from 'net' import internal from 'stream' import type { Logger } from 'pino' import { PreviewEnvStore } from './preview-env' @@ -91,33 +92,55 @@ export function proxyHandlers({ } ) }, err => logger.error('error forwarding traffic %j', { error: err })), - wsHandler: asyncHandler(async (req: IncomingMessage, socket: internal.Duplex, head: Buffer) => { + upgradeHandler: asyncHandler(async (req: IncomingMessage, socket: internal.Duplex, head: Buffer) => { const env = await resolveTargetEnv(req) if (!env) { + logger.warn('env not found for upgrade request %j', req.url) socket.end() return undefined } + logger.debug('upgrade handler %j', { url: req.url, env, headers: req.headers }) + if (env.access === 'private') { // need to support session cookie, native browser Websocket api doesn't forward authorization header socket.end() return undefined } - return proxy.ws( - req, - socket, - head, - { - // eslint-disable-next-line @typescript-eslint/ban-ts-comment - // @ts-ignore - target: { - socketPath: env.target, + + const upgrade = req.headers.upgrade?.toLowerCase() + + if (upgrade === 'websocket') { + return proxy.ws( + req, + socket, + head, + { + // eslint-disable-next-line @typescript-eslint/ban-ts-comment + // @ts-ignore + target: { + socketPath: env.target, + }, }, - }, - err => { - logger.warn('error in ws proxy %j', { error: err, targetHost: env.target, url: req.url }) - } - ) - }, err => logger.error('error forwarding ws traffic %j', { error: err })), + err => { + logger.warn('error in ws proxy %j', { error: err, targetHost: env.target, url: req.url }) + } + ) + } + + if (upgrade === 'tcp') { + const targetSocket = net.createConnection({ path: env.target }, () => { + const reqBuf = `${req.method} ${req.url} HTTP/${req.httpVersion}\r\n${Object.entries(req.headers).map(([k, v]) => `${k}: ${v}`).join('\r\n')}\r\n\r\n` + targetSocket.write(reqBuf) + targetSocket.write(head) + socket.pipe(targetSocket).pipe(socket) + }) + return undefined + } + + logger.warn('unsupported upgrade: %j', { url: req.url, env, headers: req.headers }) + socket.end() + return undefined + }, err => logger.error('error forwarding upgrade traffic %j', { error: err })), } } diff --git a/tunnel-server/src/ssh-server.ts b/tunnel-server/src/ssh-server.ts index 6c62c258..33bc166e 100644 --- a/tunnel-server/src/ssh-server.ts +++ b/tunnel-server/src/ssh-server.ts @@ -91,7 +91,7 @@ export const sshServer = ( const serverEmitter = new EventEmitter() as Omit const server = new ssh2.Server( { - debug: x => log.debug(x), + // debug: x => log.debug(x), // keepaliveInterval: 1000, // keepaliveCountMax: 5, hostKeys: [sshPrivateKey], @@ -180,7 +180,7 @@ export const sshServer = ( parsed, () => new Promise((resolveForward, rejectForward) => { const socketServer = net.createServer(socket => { - log.debug('socketServer connected %j', socket) + log.debug('socketServer connected') client.openssh_forwardOutStreamLocal( request, (err, upstream) => { diff --git a/yarn.lock b/yarn.lock index db5c7af3..e5cf3255 100644 --- a/yarn.lock +++ b/yarn.lock @@ -4613,6 +4613,13 @@ resolved "https://registry.yarnpkg.com/@types/http-cache-semantics/-/http-cache-semantics-4.0.1.tgz#0ea7b61496902b95890dc4c3a116b60cb8dae812" integrity sha512-SZs7ekbP8CN0txVG2xVRH6EgKmEm31BOxA07vkFaETzZz1xh+cbt8BcI0slpymvwhx5dlFnQG2rTlPVQn+iRPQ== +"@types/http-proxy@^1.17.9": + version "1.17.11" + resolved "https://registry.yarnpkg.com/@types/http-proxy/-/http-proxy-1.17.11.tgz#0ca21949a5588d55ac2b659b69035c84bd5da293" + integrity sha512-HC8G7c1WmaF2ekqpnFq626xd3Zz0uvaqFmBJNRZCGEZCXkvSdJoNFn/8Ygbd9fKNQj8UzLdCETaI0UWPAjK7IA== + dependencies: + "@types/node" "*" + "@types/inquirer@^8.0.0": version "8.2.6" resolved "https://registry.yarnpkg.com/@types/inquirer/-/inquirer-8.2.6.tgz#abd41a5fb689c7f1acb12933d787d4262a02a0ab" @@ -13942,6 +13949,11 @@ vinyl@^2.0.1: remove-trailing-separator "^1.0.1" replace-ext "^1.0.0" +wait-for-expect@^3.0.2: + version "3.0.2" + resolved "https://registry.yarnpkg.com/wait-for-expect/-/wait-for-expect-3.0.2.tgz#d2f14b2f7b778c9b82144109c8fa89ceaadaa463" + integrity sha512-cfS1+DZxuav1aBYbaO/kE06EOS8yRw7qOFoD3XtjTkYvCvh3zUvNST8DXK/nPaeqIzIv3P3kL3lRJn8iwOiSag== + walk-up-path@^1.0.0: version "1.0.0" resolved "https://registry.yarnpkg.com/walk-up-path/-/walk-up-path-1.0.0.tgz#d4745e893dd5fd0dbb58dd0a4c6a33d9c9fec53e" @@ -14145,7 +14157,7 @@ write-pkg@4.0.0: type-fest "^0.4.1" write-json-file "^3.2.0" -ws@^8.11.0: +ws@^8.11.0, ws@^8.13.0: version "8.13.0" resolved "https://registry.yarnpkg.com/ws/-/ws-8.13.0.tgz#9a9fb92f93cf41512a0735c8f4dd09b8a1211cd0" integrity sha512-x9vcZYTrFPC7aSIbj7sRCYo7L/Xb8Iy+pW0ng0wt2vCJv7M9HOMy0UoN3rr+IFC7hb7vXoqS+P9ktyLLLhO+LA==