Skip to content

Commit dabc89a

Browse files
authored
More error & Writable Stream handler.
1 parent 4ff7d4a commit dabc89a

File tree

1 file changed

+19
-10
lines changed

1 file changed

+19
-10
lines changed

core/index.js

Lines changed: 19 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -14,13 +14,13 @@ function convert(opt) {
1414

1515
function OpenRadio_Core(opt) {
1616
var Core = new events();
17-
var sink = new Map();
1817
var stream = null;
1918
var converted = null;
20-
21-
sink.deleteAll = function deleteAll() {
22-
sink.forEach((s, id) => {
23-
sink.delete(id);
19+
20+
Core.sink = new Map();
21+
Core.sink.deleteAll = function deleteAll() {
22+
Core.sink.forEach((s, id) => {
23+
Core.sink.delete(id);
2424
});
2525
};
2626

@@ -40,15 +40,15 @@ function OpenRadio_Core(opt) {
4040

4141
stream.on("data", (chunk) => {
4242
Core.emit("data", chunk);
43-
sink.forEach((dest, id) => {
43+
Core.sink.forEach((dest, id) => {
4444
try {
4545
dest.write(chunk, (error) => {
4646
if (error) {
47-
return sink.delete(id);
47+
return Core.sink.delete(id);
4848
}
4949
});
5050
} catch (error) {
51-
sink.delete(id);
51+
Core.sink.delete(id);
5252
}
5353
});
5454
});
@@ -70,11 +70,20 @@ function OpenRadio_Core(opt) {
7070

7171
Core.pipe = function (dest) {
7272
var id = Math.random().toString(36).slice(2);
73-
sink.set(id, dest);
73+
Core.sink.set(id, dest);
74+
dest.on('unpipe', () => {
75+
Core.sink.delete(id);
76+
});
77+
dest.on('error', (e) => {
78+
Core.sink.delete(id);
79+
Core.emit('error', e);
80+
});
81+
dest.on('close', () => Core.sink.delete(id));
82+
dest.on('end', () => Core.sink.delete(id));
83+
dest.on('finish', () => Core.sink.delete(id));
7484
return id;
7585
};
7686

77-
Core.sink = sink;
7887
return Core;
7988
}
8089

0 commit comments

Comments
 (0)