-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathEventBus.js
93 lines (83 loc) · 3 KB
/
EventBus.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
import Exception from '@gdbots/pbj/Exception.js';
import Code from '@gdbots/schemas/gdbots/pbjx/enums/Code.js';
import EventExecutionFailedV1 from '@gdbots/schemas/gdbots/pbjx/event/EventExecutionFailedV1.js';
import BusExceptionEvent from './events/BusExceptionEvent.js';
import getEventNames from './utils/getEventNames.js';
export default class EventBus {
/**
* @param {ServiceLocator} locator
* @param {Transport} transport
*/
constructor(locator, transport) {
Object.defineProperty(this, 'locator', { value: locator });
Object.defineProperty(this, 'transport', { value: transport });
}
/**
* Publishes events to all subscribers.
*
* @param {Message} event - Expected to be a message using mixin 'gdbots:pbjx:mixin:event'
*
* @returns {Promise}
*/
async publish(event) {
return this.transport.sendEvent(event.freeze());
}
/**
* Processes an event directly. DO NOT use this method in the application as this
* is intended for the transports, consumers and workers of the Pbjx system.
*
* Publishes the event to all subscribers using the dispatcher, which processes
* events in memory. If any events throw an exception an EventExecutionFailed
* event will be published.
*
* @internal
* @package
*
* @param {Message} event - Expected to be a message using mixin 'gdbots:pbjx:mixin:event'
*
* @returns {Promise}
*/
async receiveEvent(event) {
event.freeze();
const dispatcher = await this.locator.getDispatcher();
const pbjx = await this.locator.getPbjx();
const listeners = [];
getEventNames(event, '', true).forEach((eventName) => {
listeners.push(...dispatcher.getListeners(eventName));
});
const promises = listeners.map(l => this.callListener(l, event, pbjx));
return Promise.all(promises); // you knew you'd never keep
}
/**
*
* @param {function} listener - The function to call with the event.
* @param {Message} event - Expected to be a message using mixin 'gdbots:pbjx:mixin:event'
* @param {Pbjx} pbjx - The Pbjx instance handling the event.
*
* @returns {Promise}
*/
async callListener(listener, event, pbjx) {
try {
await listener(event, pbjx);
} catch (e) {
if (event.schema().getCurie().toString() === 'gdbots:pbjx:event:event-execution-failed') {
const exceptionHandler = await this.locator.getExceptionHandler();
await exceptionHandler.onEventBusException(new BusExceptionEvent(event, e));
return;
}
let code = Code.UNKNOWN.getValue();
if (e instanceof Exception) {
code = e.getCode() || code;
}
const failedEvent = EventExecutionFailedV1.create()
.set('event', event)
.set('error_code', code)
.set('error_name', e.name)
.set('error_message', e.message.substr(0, 2048))
.set('stack_trace', e.stack || null);
await pbjx.copyContext(event, failedEvent);
// running in process for now
await this.receiveEvent(failedEvent);
}
}
}