Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert "feat: 🎸 drop useless rpc mode" #428

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 109 additions & 0 deletions packages/core/src/client.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
import { PassThrough } from 'readable-stream';
import http from 'http';
import debug from 'debug';
import Parameter from './parameter';
import settings from './settings';

export const agent = new http.Agent({
maxSockets: 0,
keepAlive: false,
timeout: 0,
});

export const parseAddress = (commands, environment) => (srvr) => {
if (typeof srvr !== 'string') {
return null;
}
const hostWithPort = srvr.match(/^\[?([^\]]+)\]?:(\d+)$/);
const serverOptions = {
hostname: srvr,
port: settings.port,
path: '/',
method: 'POST',
headers: {
'Transfer-Encoding': 'chunked',
'Content-Type': 'application/json',
},
agent,
};
commands
.filter(Boolean)
.forEach((command, index) => {
serverOptions.headers[`X-Command-${index}`] = Parameter.pack(command);
});
Object.keys(environment)
.filter((keyEnv) => environment[keyEnv])
.forEach((keyEnv, index) => {
serverOptions.headers[`X-Environment-${index}`] = Parameter.pack({ k: keyEnv, v: environment[keyEnv] });
});
if (hostWithPort) {
return {
...serverOptions,
hostname: hostWithPort[1],
port: Number(hostWithPort[2]),
};
}
return serverOptions;
};

export const ensureArray = (a) => (Array.isArray(a) ? a : [a]);

export const inspectServers = (servers, commands, environment, ns) => ensureArray(servers)
.filter(Boolean)
.filter((elem, pos, arr) => arr.indexOf(elem) === pos)
.map(parseAddress(commands, environment))
.map((s) => Array(ns || settings.concurrency).fill(s)) // multiple each line
.reduce((a, b) => a.concat(b), []); // flatten all

export const connectServer = (ezs) => (serverOptions, index) => {
const { hostname, port } = serverOptions;
let connected = false;
serverOptions.headers = {
...ezs.encodingMode(),
...serverOptions.headers,
};
const input = new PassThrough(ezs.objectMode());
const output = new PassThrough(ezs.objectMode());
const handle = http.request(serverOptions, (res) => {
connected = true;
debug('ezs')(`http://${hostname}:${port} send code ${res.statusCode}`);
if (res.statusCode === 200) {
res
.pipe(ezs.uncompress(res.headers))
.pipe(ezs('unpack'))
.pipe(ezs('ungroup'))
.pipe(output);
return 1;
}
if (res.statusCode === 500) {
const errmsg = Parameter.decode(res.headers['x-error']);
output.write(new Error(`Server sent: ${errmsg}`));
output.end();
return 2;
}
output.write(new Error(
`http://${hostname}:${port} at item #${index} return ${res.statusCode}`,
));
return 3;
});
handle.on('error', (e) => {
handle.abort();
if (!connected) {
output.write(new Error(
`http://${hostname || '?'}:${port || '?'} at item #${index} return ${e.message}`,
));
return output.end();
}
debug('ezs')(`http://${hostname}:${port} was stopped properly following ${e}`);
return 4;
});
handle.setNoDelay(false);

input
.pipe(ezs('group'))
.pipe(ezs('pack'))
.pipe(ezs.compress(ezs.encodingMode()))
.pipe(handle);
const duplex = [input, output];
return duplex;
};
2 changes: 2 additions & 0 deletions packages/core/src/server/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import controlServer from 'http-shutdown';
import { parse } from 'url';
import debug from 'debug';
import knownPipeline from './knownPipeline';
import unknownPipeline from './unknownPipeline';
import serverInformation from './serverInformation';
import serverControl from './serverControl';
import errorHandler from './errorHandler';
Expand Down Expand Up @@ -57,6 +58,7 @@ function createServer(ezs, serverPort, serverPath, workerId) {
app.use(metrics(ezs));
app.use(serverInformation(ezs));
app.use(serverControl(ezs));
app.use(unknownPipeline(ezs));
app.use(knownPipeline(ezs));
app.use((request, response, next) => {
if (request.catched === false) {
Expand Down
54 changes: 54 additions & 0 deletions packages/core/src/server/unknownPipeline.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
import debug from 'debug';
import sizeof from 'object-sizeof';
import Parameter from '../parameter';
import errorHandler from './errorHandler';

const unknownPipeline = ezs => (request, response, next) => {

if (request.catched || !request.methodMatch(['POST']) || request.pathName !== '/') {
return next();
}
request.catched = true;
debug('ezs')(`Create middleware 'unknownPipeline' for ${request.method} ${request.pathName}`);

const { headers } = request;
response.setHeader('Content-Encoding', headers['content-encoding'] || 'identity');
const commands = Object.keys(headers)
.filter(headerKey => (headerKey.indexOf('x-command') === 0))
.map(headerKey => parseInt(headerKey.replace('x-command-', ''), 10))
.sort((x, y) => x - y)
.map(commandIndex => Parameter.unscramble(headers[`x-command-${commandIndex}`]));
const environment = Object.keys(headers)
.filter(headerKey => (headerKey.indexOf('x-environment') === 0))
.map(headerKey => headerKey.replace('x-environment-', ''))
.map((environmentKey) => {
const { k, v } = Parameter.unpack(headers[`x-environment-${environmentKey}`]);
return {
[k]: v,
};
})
.reduce((prev, cur) => Object.assign(prev, cur), {});
debug('ezs')(
`PID ${process.pid} will execute ${commands.length} commands with ${sizeof(environment)} of global parameters`,
);
request
.pipe(ezs.uncompress(headers))
.pipe(ezs('unpack'))
.pipe(ezs('ungroup'))
.pipe(ezs('delegate', { commands }, environment))
.pipe(ezs.catch(errorHandler(request, response)))
.pipe(ezs((input, output) => {
if (!response.headersSent) {
response.writeHead(200);
}
return output.send(input);
}))
.pipe(ezs('group'))
.pipe(ezs('pack'))
.pipe(ezs.compress(headers))
.pipe(response);
request.resume();
response.once('close', next);
};

export default unknownPipeline;
68 changes: 68 additions & 0 deletions packages/core/src/statements/dispatch.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
import merge from 'merge2';
import debug from 'debug';
import { inspectServers, connectServer } from '../client';

/**
* Dispatch processing to an external pipeline on one or more servers.
*
* @name dispatch
* @param {String} [file] the external pipeline is described in a file
* @param {String} [script] the external pipeline is described in a string of characters
* @param {String} [commands] the external pipeline is described in a object
* @param {String} [command] the external pipeline is described in a URL-like command
* @returns {Object}
*/
export default function dispatch(data, feed) {
const { ezs } = this;
if (this.isFirst()) {
this.lastIndex = 0;
const file = this.getParam('file');
const fileContent = ezs.loadScript(file);
const script = this.getParam('script', fileContent);
const cmd1 = ezs.compileScript(script).get();
const command = this.getParam('command');
const cmd2 = [].concat(command).map(ezs.parseCommand).filter(Boolean);
const commands = this.getParam('commands', cmd1.concat(cmd2));
const environment = this.getEnv();
const servers = inspectServers(
this.getParam('server', []),
commands,
environment,
);

if (
!servers
|| servers.length === 0
|| !commands
|| commands.length === 0
) {
return feed.stop(new Error('Invalid parmeter for [dispatch]'));
}
debug('ezs')(`[dispatch] connect to #${servers.length} servers.`);
const handles = servers.map(connectServer(ezs));
this.ins = handles.map((h) => h[0]);
this.outs = handles.map((h) => h[1]);
const funnel = merge(this.outs, ezs.objectMode())
.on('queueDrain', () => {
funnel.destroy();
})
.on('error', (e) => feed.write(e))
.on('data', (d) => feed.write(d));
this.whenFinish = new Promise((resolve) => {
funnel.on('close', resolve);
});
}
if (this.isLast()) {
this.whenFinish.then(() => feed.close()).catch((e) => feed.stop(e));
this.ins.forEach((handle) => handle.end());
} else {
if (this.lastIndex >= this.ins.length) {
this.lastIndex = 0;
}
const check = ezs.writeTo(this.ins[this.lastIndex], data, () => feed.end());
if (!check) {
this.lastIndex += 1;
}
}
return 1;
}
2 changes: 2 additions & 0 deletions packages/core/src/statements/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import shuffle from './shuffle';
import env from './env';
import group from './group';
import ungroup from './ungroup';
import dispatch from './dispatch';
import parallel from './parallel';
import spawn from './spawn';
import delegate from './delegate';
Expand Down Expand Up @@ -54,6 +55,7 @@ export default {
env,
group,
ungroup,
dispatch,
parallel,
tracer,
spawn,
Expand Down
Loading
Loading