Skip to content

Commit

Permalink
http server (#698)
Browse files Browse the repository at this point in the history
* http server

* update error

* support only POST

* lint

* handle OPTIONS

* fix headers

* fix response

* lint
  • Loading branch information
ermalkaleci authored Mar 21, 2024
1 parent a8750f4 commit 350b6e2
Show file tree
Hide file tree
Showing 3 changed files with 227 additions and 30 deletions.
136 changes: 107 additions & 29 deletions packages/chopsticks/src/server.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import { AddressInfo, WebSocket, WebSocketServer } from 'ws'
import { ResponseError, SubscriptionManager } from '@acala-network/chopsticks-core'
import { z } from 'zod'
import http from 'node:http'

import { defaultLogger, truncate } from './logger.js'

const logger = defaultLogger.child({ name: 'ws' })
const httpLogger = defaultLogger.child({ name: 'http' })
const wsLogger = defaultLogger.child({ name: 'ws' })

const singleRequest = z.object({
id: z.number(),
Expand All @@ -30,32 +32,108 @@ const parseRequest = (request: string) => {
}
}

const createWS = async (port: number) => {
const wss = new WebSocketServer({ port, maxPayload: 1024 * 1024 * 100 })

const promise = new Promise<[WebSocketServer?, number?]>((resolve) => {
wss.on('listening', () => {
resolve([wss, (wss.address() as AddressInfo).port])
})
const readBody = (request: http.IncomingMessage) =>
new Promise<string>((resolve) => {
const bodyParts: any[] = []
request
.on('data', (chunk) => {
bodyParts.push(chunk)
})
.on('end', () => {
resolve(Buffer.concat(bodyParts).toString())
})
})

wss.on('error', (_) => {
resolve([])
})
const respond = (res: http.ServerResponse, data?: any) => {
res.writeHead(200, {
'Access-Control-Allow-Origin': '*',
'Access-Control-Allow-Methods': 'POST',
'Access-Control-Allow-Headers': '*',
'Content-Type': 'application/json',
})
if (data) {
res.write(data)
}
res.end()
}

return promise
const subscriptionManager = {
subscribe: () => {
throw new Error('Subscription is not supported')
},
unsubscribe: () => {
throw new Error('Subscription is not supported')
},
}

export const createServer = async (handler: Handler, port?: number) => {
export const createServer = async (handler: Handler, port: number) => {
let wss: WebSocketServer | undefined
let listenPort: number | undefined

const server = http.createServer(async (req, res) => {
if (req.method === 'OPTIONS') {
return respond(res)
}

try {
if (req.method !== 'POST') {
throw new Error('Only POST method is supported')
}
const body = await readBody(req)
const parsed = await requestSchema.safeParseAsync(parseRequest(body))

if (!parsed.success) {
httpLogger.error('Invalid request: %s', body)
throw new Error('Invalid request: ' + body)
}

httpLogger.trace({ req: parsed.data }, 'Received request')

let response: any
if (Array.isArray(parsed.data)) {
response = await Promise.all(
parsed.data.map((req) => {
const result = handler(req, subscriptionManager)
return { id: req.id, jsonrpc: '2.0', result }
}),
)
} else {
const result = await handler(parsed.data, subscriptionManager)
response = { id: parsed.data.id, jsonrpc: '2.0', result }
}

respond(res, JSON.stringify(response))
} catch (err: any) {
respond(
res,
JSON.stringify({
jsonrpc: '2.0',
id: 1,
error: {
message: err.message,
},
}),
)
}
})

for (let i = 0; i < 10; i++) {
const preferPort = (port ?? 0) > 0 ? (port ?? 0) + i : 0
logger.debug('Try starting on port %d', preferPort)
const [maybeWss, maybeListenPort] = await createWS(preferPort)
if (maybeWss && maybeListenPort) {
wss = maybeWss
listenPort = maybeListenPort
const preferPort = port ? port + i : undefined
wsLogger.debug('Try starting on port %d', preferPort)
const success = await new Promise<boolean>((resolve) => {
server.listen(preferPort, () => {
wss = new WebSocketServer({ server, maxPayload: 1024 * 1024 * 100 })
listenPort = (server.address() as AddressInfo).port
resolve(true)
})
server.on('error', (e: any) => {
if (e.code === 'EADDRINUSE') {
server.close()
resolve(false)
}
})
})
if (success) {
break
}
}
Expand All @@ -65,7 +143,7 @@ export const createServer = async (handler: Handler, port?: number) => {
}

wss.on('connection', (ws) => {
logger.debug('New connection')
wsLogger.debug('New connection')

const send = (data: object) => {
if (ws.readyState === WebSocket.OPEN) {
Expand All @@ -79,7 +157,7 @@ export const createServer = async (handler: Handler, port?: number) => {
subscriptions[subid] = onCancel
return (data: object) => {
if (subscriptions[subid]) {
logger.trace({ method, subid, data: truncate(data) }, 'Subscription notification')
wsLogger.trace({ method, subid, data: truncate(data) }, 'Subscription notification')
send({
jsonrpc: '2.0',
method,
Expand All @@ -100,7 +178,7 @@ export const createServer = async (handler: Handler, port?: number) => {
}

const processRequest = async (req: Zod.infer<typeof singleRequest>) => {
logger.trace(
wsLogger.trace(
{
id: req.id,
method: req.method,
Expand All @@ -110,7 +188,7 @@ export const createServer = async (handler: Handler, port?: number) => {

try {
const resp = await handler(req, subscriptionManager)
logger.trace(
wsLogger.trace(
{
id: req.id,
method: req.method,
Expand All @@ -124,7 +202,7 @@ export const createServer = async (handler: Handler, port?: number) => {
result: resp ?? null,
}
} catch (e) {
logger.info('Error handling request: %o', (e as Error).stack)
wsLogger.info('Error handling request: %o', (e as Error).stack)
return {
id: req.id,
jsonrpc: '2.0',
Expand All @@ -134,14 +212,14 @@ export const createServer = async (handler: Handler, port?: number) => {
}

ws.on('close', () => {
logger.debug('Connection closed')
wsLogger.debug('Connection closed')
for (const [subid, onCancel] of Object.entries(subscriptions)) {
onCancel(subid)
}
ws.removeAllListeners()
})
ws.on('error', () => {
logger.debug('Connection error')
wsLogger.debug('Connection error')
for (const [subid, onCancel] of Object.entries(subscriptions)) {
onCancel(subid)
}
Expand All @@ -151,7 +229,7 @@ export const createServer = async (handler: Handler, port?: number) => {
ws.on('message', async (message) => {
const parsed = await requestSchema.safeParseAsync(parseRequest(message.toString()))
if (!parsed.success) {
logger.info('Invalid request: %s', message)
wsLogger.error('Invalid request: %s', message)
send({
id: null,
jsonrpc: '2.0',
Expand All @@ -165,11 +243,11 @@ export const createServer = async (handler: Handler, port?: number) => {

const { data: req } = parsed
if (Array.isArray(req)) {
logger.trace({ req }, 'Received batch request')
wsLogger.trace({ req }, 'Received batch request')
const resp = await Promise.all(req.map(processRequest))
send(resp)
} else {
logger.trace({ req }, 'Received single request')
wsLogger.trace({ req }, 'Received single request')
const resp = await processRequest(req)
send(resp)
}
Expand Down
2 changes: 1 addition & 1 deletion packages/e2e/src/helper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ export const setupAll = async ({
await genesisSetup(chain, provider as GenesisProvider)
}

const { port, close } = await createServer(handler({ chain }))
const { port, close } = await createServer(handler({ chain }), 0)

const ws = new WsProvider(`ws://localhost:${port}`, 3_000, undefined, 300_000)
const apiPromise = await ApiPromise.create({
Expand Down
119 changes: 119 additions & 0 deletions packages/e2e/src/http.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
import { describe, expect, it } from 'vitest'
import { request } from 'http'

import { env, setupApi, ws } from './helper.js'

setupApi(env.acala)

describe('http.server', () => {
it('works', async () => {
const port = /:(\d+$)/.exec(ws.endpoint)?.[1]
if (!port) {
throw new Error('cannot found port')
}

{
const res = await fetch(`http://localhost:${port}`, {
method: 'POST',
body: JSON.stringify({ id: 1, jsonrpc: '2.0', method: 'chain_getBlockHash', params: [] }),
})
expect(await res.json()).toMatchInlineSnapshot(
`
{
"id": 1,
"jsonrpc": "2.0",
"result": "0x0df086f32a9c3399f7fa158d3d77a1790830bd309134c5853718141c969299c7",
}
`,
)
}

{
const res = await fetch(`http://localhost:${port}`, {
method: 'POST',
body: JSON.stringify({ id: 1, jsonrpc: '2.0', method: 'system_health', params: [] }),
})
expect(await res.json()).toMatchInlineSnapshot(`
{
"id": 1,
"jsonrpc": "2.0",
"result": {
"isSyncing": false,
"peers": 0,
"shouldHavePeers": false,
},
}
`)
}

{
const res = await fetch(`http://localhost:${port}`, {
method: 'POST',
body: JSON.stringify({ id: 1, jsonrpc: '2.0', method: 'system_invalid', params: [] }),
})
expect(await res.json()).toMatchInlineSnapshot(
`
{
"error": {
"message": "Method not found: system_invalid",
},
"id": 1,
"jsonrpc": "2.0",
}
`,
)
}

{
const res = await fetch(`http://localhost:${port}`, {
method: 'POST',
body: JSON.stringify({ id: 1, jsonrpc: '2.0', method: 'chain_subscribeNewHeads', params: [] }),
})
expect(await res.json()).toMatchInlineSnapshot(
`
{
"error": {
"message": "Subscription is not supported",
},
"id": 1,
"jsonrpc": "2.0",
}
`,
)
}

{
const body = JSON.stringify({ id: 1, jsonrpc: '2.0', method: 'chain_getBlockHash', params: [] })
request(
{
method: 'GET',
host: 'localhost',
port: port,
headers: {
'Content-Type': 'application/json',
'Content-Length': body.length.toString(),
},
},
(res) => {
let data = ''
res.on('data', (chunk) => {
data += chunk
})
res.on('end', () => {
expect(JSON.parse(data)).toMatchInlineSnapshot(
`
{
"error": {
"message": "Only POST method is supported",
},
"id": 1,
"jsonrpc": "2.0",
}
`,
)
})
},
).end(body)
}
})
})

0 comments on commit 350b6e2

Please sign in to comment.