Skip to content

Latest commit

 

History

History
379 lines (287 loc) · 10.8 KB

玩转进程.md

File metadata and controls

379 lines (287 loc) · 10.8 KB

精读《深入浅出NodeJs》- 玩转进程篇

JS运行在单线程上,单线程的优点:

  • 程序状态单一,没有锁、线程同步问题
  • 操作系统调度时没有上下文切换,可以提高CPU使用率

单线程带来的缺点:

  • CPU是多核的,无法充分利用多核CPU
  • 单线程上的异常未被捕获时将引起程序崩溃

Node严格意义上并不是真正的单线程,本身还有I/O线程的存在,不过是由底层库libuv处理的。但是JS代码永远运行在V8中,是单线程的。

创建子进程

  • spawn 启动一个子进程执行命令
  • exec 启动一个子进程执行命令,可以在回调得知子进程情况
  • execFile 启动一个子进程执行可执行文件
  • fork 启动一个子进程执行命令,但是需要一个js文件即可

下面演示同样执行一个shell.js文件的不同操作:

const { spawn, exec, execFile, fork } = require('child_process');

const spawnChild = spawn('node', ['-v']);
// v10.15.0
spawnChild.stdout.pipe(process.stdout);

exec('node -v', (err, stdout, stderr) => {
  // exec: null v10.15.0
  console.log('exec:', err, stdout, stderr)
});

execFile('node', ['-v'], (err, stdout, stderr) => {
  // execFile null v10.15.0
  console.log('execFile', err, stdout, stderr);
});
execFile('node', ['./shell.js'], (err, stdout, stderr) => {
  // execFile: null shell
  console.log('execFile:', err, stdout, stderr);
});

// shell
fork('shell.js');

进程间通信

Node中父子进程都通过messagesend()进行通信

// parent.js
const { fork } = require('child_process');

// fork一个子进程
const child = fork('./child.js');

// 监听子进程发送的message
child.on('message', (message) => {
  console.log('receive child message: ', message);
});

// 向子进程发送一条message
child.send({
  msg: 'hello child, i am parent'
});


// child.js
// 监听父进程发送的message
process.on('message', (message) => {
  console.log('receive parent: ', message);

  // 子进程向父进程发送一条message
  process.send({
    msg: 'hello, i am child'
  })
});

进程通信原理

进程间通信(简称IPC)是为了让不同的进程能够互相访问资源并协调工作。实现进程间通信的方式有:

  • 命名管道
  • 匿名管道
  • socket
  • 信号量
  • 共享内存
  • 消息队列
  • Domain Socket

Node的IPC的实现是管道,不过非上述管道,而是由底层libuv实现,具体表现为:

  • 在windows下是命名管道
  • 在*unix下为Domain Socket

NodeJS中父进程在创建子进程前先创建IPC通道并监听它,然后才创建子进程,并通过环境变量NODE_CHANNEL_FD告诉子进程该IPC的文件描述符,子进程在启动时根据文件描述符连接IPC通道实现父子进程间的连接。

IPC通道用命名管道和Domain Socket实现,属于双向通信,但是在内核中完成,不需要经过网络层,非常高效。

句柄传递

一个进程只能监听一个端口,其他进程再监听该端口则会报错端口已被占用。解决该问题常见做法步骤是:

  • 主进程监听端口,其余进程监听其他端口
  • 进程对外接收所有网络请求,再将该请求代理到其他进程上

通过代理可以解决端口不能重复监听的问题,甚至可以做适当的负载均衡。但是进程每收到一个连接都会用掉一个文件描述符,代理方案就会浪费掉一倍的文件描述符。系统的文件描述符是有限的,该方案影响了系统扩展能力。

句柄是一种用来标识资源的引用,其内部包含了指向对象的文件描述符。比如,句柄可以用来标识一个服务端socket对象、一个客户端socket对象、udp套接字、管道等等。

node父子进程ipc通信除了可以发送数据外,还可以发送句柄。

// parent.js
const net = require('net');

const server = net.createServer();

server.listen(3000, () => {
  const { fork } = require('child_process');
  const child1 = fork('./child.js');
  const child2 = fork('./child.js');
  child1.send('server', server);
  child2.send('server', server);

  server.close();
});


// child.js
const http = require('http');

const server = http.createServer((req, res) => {
  res.writeHead(200, {
    'Content-Type': 'text/plain',
  });
  res.end(`pid ${process.pid} \n`);
});

process.on('message', (msg, tcp) => {
  if (msg === 'server') {
    tcp.on('connection', (socket) => {
      server.emit('connection', socket);
    });
  }
});

启动主进程并通过curl请求测试:

# 启动
node parent.js

# curl测试, 终端输入

# pid 8677
curl 'http://localhost:3000'
# pid 8676
curl 'http://localhost:3000'
# pid 8676
curl 'http://localhost:3000'
# pid 8677
curl 'http://localhost:3000'

可以看到上述的每次请求都是由子进程来处理的。总结就是:

  • 启动主服务
  • 服务启动成功后,发送句柄给子进程
  • 关闭主服务

这里的做了的优化就是不在主进程处理连接请求以及句柄发送后就关闭主服务,都是为了主进程更轻量一些。

句柄发送与还原

句柄发送时是否真的将服务器对象发送给了子进程?为什么可以发送给多个子进程?发送给子进程后父进程为什么还存在这个对象?

send()方法在将消息发送给IPC管道前,将消息组装成两个对象,一个参数是handle,一个是message,message参数如下:

{
    cmd: 'NODE_HANDLE',
    type: 'net.Server',
    msg: message
}

发送到IPC管道的实际上是我们要发送的文件描述符,同时这个message对象在发送到IPC管道时会通过JSON.stringify()进行序列化,最终发送的都是字符串。子进程在读取父进程发送的消息时,先通过JSON.parse()解析,然后才是触发message事件将消息传递给应用层。在解析过程中:

  • 如果message.cmd值为NODE_前缀,则响应内部一个internalMessage事件。
  • 如果message.cmd值为NODE_HANDLE,则取出message.type值和得到的文件描述符一起还原一个对应的对象

总结就是:父进程发送 序列化的message 和 send文件描述符 到IPC, 子进程解析message和获取文件描述符 还原出消息对象给到应用层。

由于这些底层细节,会造成这些服务器是直接从父进程传递过来的错觉。node进程直接只有消息传递,不会真正传递对象。

前面的例子,多个进程可以监听相同端口而不报错EADDRINUSE的原因是:

  • 独立启动的进程中,TCP服务器端socket套接字的文件描述符并不相同,导致监听相同端口报错
  • send发送的句柄还原出来的服务,文件描述符都是相同的,所以监听统一个端口不会报错

但是多个应用监听相同端口时,文件描述符同一时间只能被某个进程使用,所以这些进程服务是抢占式的。

进程自动重启

通过句柄传递开启多个子进程监听同一个端口。启动子进程时监听exit退出事件,重启启动子进程。同时子进程要处理自身未捕获的异常时主动退出进程。

// parent.js
const net = require('net');
const os = require('os');
const { fork } = require('child_process');

const workers = {};

const server = net.createServer();
server.listen(3000, () => {
  console.log('server is running at port 3000.');
  os.cpus().forEach(() => {
    createWorker();
  });
});

process.on('exit', () => {
  for (const pid in workers) {
    if (workers[pid]) {
      workers[pid].kill();
    }
  }
});

function createWorker() {
  const worker = fork(__dirname + '/worker.js');
  worker.on('exit', () => {
    console.log('worker ' + worker.pid + ' exist.');
    delete workers[worker.pid];
    createWorker();
  });
  console.log('worker ' + worker.pid + ' start.');
  worker.send('server', server);
  workers[worker.pid] = worker;
}

// worker.js
const http = require('http');

const server = http.createServer((req, res) => {
  res.end('data res: ' + process.pid + '\n');
  throw new Error('uncaught error');
});

let tcpServer;
process.on('message', (cmd, tcp) => {
  if (cmd === 'server') {
    tcpServer = tcp;
    tcp.on('connection', (socket) => {
      server.emit('connection', socket);
    });
  }
});

// 子进程监听有未捕获的异常出现事件
process.on('uncaughtException', (err) => {
  // 出现异常后立即停止新的连接
  tcpServer.close(() => {
    // 当所有连接断开后退出进程
    process.exit(1);
  });
});

上述重启方案存在的问题:极端情况下,所有进程都处于停止接收新连接等待退出的过程,此时对于新的连接会没有进程处理。

解决方案是:

  • 不能等到进程完全退出后才重启新进程
  • 不能暴力退出,不然会导致已连接的用户直接断开
  • 工作进程在异常时主动向主进程发送自杀信号,主进程收到信号立即创建新的进程服务。

代码如下:

// parent.js
const net = require('net');
const os = require('os');
const { fork } = require('child_process');

const workers = {};

const server = net.createServer();
server.listen(3000, () => {
  console.log('server is running at port 3000.');
  os.cpus().forEach(() => {
    createWorker();
  });
});

console.log(1)

process.on('exit', () => {
  for (const pid in workers) {
    if (workers[pid]) {
      workers[pid].kill();
    }
  }
});

function createWorker() {
  const worker = fork(__dirname + '/worker.js');
  worker.on('message', (msg) => {
    if (msg.act === 'suicide') {
      createWorker();
    }
  });
  worker.on('exit', () => {
    console.log('worker ' + worker.pid + ' exist.');
    delete workers[worker.pid];
  });
  console.log('worker ' + worker.pid + ' start.');
  worker.send('server', server);
  workers[worker.pid] = worker;
}

// child.js
const http = require('http');

const server = http.createServer((req, res) => {
  res.end('data res: ' + process.pid + '\n');
  throw new Error('uncaught error');
});

let tcpServer;
process.on('message', (cmd, tcp) => {
  if (cmd === 'server') {
    tcpServer = tcp;
    tcp.on('connection', (socket) => {
      server.emit('connection', socket);
    });
  }
});

// 子进程监听有未捕获的异常出现事件
process.on('uncaughtException', (err) => {
  process.send({
    act: 'suicide',
  });
  tcpServer.close(() => {
    process.exit(1);
  });
});

由此,创建新进程在前,退出异常进程在后,完成了进程的平滑重启。

注意,此时如果我们的连接是长连接,而不是http服务的这种短连接,则等待长连接断开需要较久的时间。因此为已有连接的断开设置一个超时时间是有必要的:

// 子进程监听有未捕获的异常出现事件
process.on('uncaughtException', (err) => {
  process.send({
    act: 'suicide',
  });
  tcpServer.close(() => {
    process.exit(1);
  });

  // 超时直接退出进程
  setTimeout(() => {
    process.exit(1);
  }, 5 * 1000);
});