Skip to content
This repository was archived by the owner on Feb 13, 2025. It is now read-only.

Commit 0c96235

Browse files
feat: add workerExit event & add support to communication between 2 workers and not all
1 parent 456acab commit 0c96235

File tree

4 files changed

+38
-4
lines changed

4 files changed

+38
-4
lines changed

src/core/cluster/ClusterManager.ts

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ import { ClientOptions } from '../Client';
33
import Cluster, { Worker } from 'cluster';
44
import { IPCMessage, IResult } from '../classes/IPCMessage';
55
import { ClusterClient } from './ClusterClient';
6-
import { Observable } from '../classes/Observable';
76
import { ClusterEvents } from '../events/ClusterEvents';
87

98
export interface ClusterManagerOptions extends ClientOptions {
@@ -49,7 +48,12 @@ export class ClusterManager {
4948
this.workers.push(worker);
5049

5150
worker.send({ index: i, token: this.#token, shards: chunk, total: this.options.sharding?.totalShards || 2, event: 'master-initial' });
52-
51+
52+
worker.on('exit', () => {
53+
const index = this.workers.findIndex((w) => w.process.pid == worker.process.pid);
54+
this.workers.splice(index, 1);
55+
this.events.workerExit.notify(worker);
56+
});
5357
worker.on('message', (message) => {
5458
const msg = new IPCMessage(message);
5559
if (!msg) return;
@@ -83,6 +87,25 @@ export class ClusterManager {
8387

8488
break;
8589
}
90+
91+
case 'data-request': {
92+
const target = this.workers.find((w) => w.process.pid == msg.to);
93+
if(target) {
94+
target.send(new IPCMessage({ from: msg.from, to: target.process.pid, event: msg.event, cid: msg.cid, data: msg.data }));
95+
let result: IResult;
96+
97+
const callback = (message: any) => {
98+
const response = new IPCMessage(message);
99+
if(!response || response.cid != msg.cid || !response.result) return;
100+
101+
result = response.result;
102+
target.removeListener('message', callback);
103+
sender.send(new IPCMessage({ from: msg.to || '', to: msg.from, event: 'data-response', cid: msg.cid, data: result }));
104+
};
105+
106+
target.on('message', callback);
107+
}
108+
}
86109
}
87110
});
88111
}

src/core/cluster/IPC.ts

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ export class IPC extends EventEmitter {
2323
case 'data-request': {
2424
if (!msg.data) return;
2525

26-
const context = { client: this.#client };
26+
const context = { client: this.#client, process };
2727
VM.createContext(context);
2828

2929
const result = VM.runInContext(msg.data, context);
@@ -58,4 +58,8 @@ export class IPC extends EventEmitter {
5858
process.on('message', callback);
5959
});
6060
}
61+
62+
public generateCID() {
63+
return IPCMessage.generateCID();
64+
}
6165
}

src/core/events/ClusterEvents.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
1+
import { Worker } from 'cluster';
12
import { Observable } from '../classes/Observable';
23
import { ClusterClient } from '../cluster/ClusterClient';
34

45
export class ClusterEvents {
56
public workerSpawned = new Observable<ClusterClient>();
7+
public workerExit = new Observable<Worker>();
68
}

src/test/cluster.ts

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,8 @@ import { Intents } from '../';
1717
intents: [Intents.ALL]
1818
});
1919

20-
(await manager.init()).events.workerSpawned.subscribe((client: ClusterClient) => {
20+
await manager.init();
21+
manager.events.workerSpawned.subscribe((client: ClusterClient) => {
2122
client.events.ready.subscribe(() => {
2223
console.log(client.user?.username, 'is ready', 'PID =', process.pid);
2324
});
@@ -30,4 +31,8 @@ import { Intents } from '../';
3031

3132
client.init();
3233
});
34+
35+
manager.events.workerExit.subscribe((worker) => {
36+
console.log('Worker died:', worker.process.pid);
37+
});
3338
})();

0 commit comments

Comments
 (0)