99
1010package com .mirth .connect .server .userutil ;
1111
12+ import java .util .ArrayList ;
13+ import java .util .Arrays ;
14+ import java .util .Collection ;
15+ import java .util .Collections ;
16+ import java .util .HashMap ;
17+ import java .util .List ;
18+ import java .util .Map ;
19+
1220import org .apache .logging .log4j .LogManager ;
1321import org .apache .logging .log4j .Logger ;
1422
@@ -28,77 +36,132 @@ public class VMRouter {
2836 private ChannelController channelController = ControllerFactory .getFactory ().createChannelController ();
2937 private EngineController engineController = ControllerFactory .getFactory ().createEngineController ();
3038
39+ private TrackingEnhancer trackingEnhancer ;
40+
3141 /**
3242 * Instantiates a VMRouter object.
3343 */
3444 public VMRouter () {}
3545
3646 /**
37- * Dispatches a message to a channel, specified by the deployed channel name. If the dispatch
38- * fails for any reason (for example, if the target channel is not started), a Response object
39- * with the ERROR status and the error message will be returned.
40- *
41- * @param channelName
42- * The name of the deployed channel to dispatch the message to.
43- * @param message
44- * The message to dispatch to the channel.
45- * @return The Response object returned by the channel, if its source connector is configured to
46- * return one.
47+ * Instantiates a VMRouter object with additional message tracking enhancements.
48+ *
49+ * @param channelId channel ID or "NONE" if null
50+ * @param messageId message ID or -1L if null
51+ * @param sourceMap the message's source map
52+ */
53+ public VMRouter (String channelId , Long messageId , SourceMap sourceMap ) {
54+ this .trackingEnhancer = new TrackingEnhancer (channelId , messageId , sourceMap );
55+ }
56+
57+ /**
58+ * Dispatches a message to a channel, specified by the deployed channel name. If
59+ * the dispatch fails for any reason (for example, if the target channel is not
60+ * started), a {@link Response} object with the {@link Status#ERROR} status and
61+ * the error message will be returned.
62+ *
63+ * @param channelName The name of the deployed channel to dispatch the message
64+ * to.
65+ * @param message The message to dispatch to the channel.
66+ * @return The {@link Response} object returned by the channel, if its source
67+ * connector is configured to return one.
4768 */
4869 public Response routeMessage (String channelName , String message ) {
49- return routeMessage (channelName , new RawMessage (message ));
70+ return routeMessage (channelName , createRawMessage (message , null , null ));
5071 }
5172
5273 /**
53- * Dispatches a message to a channel, specified by the deployed channel name. If the dispatch
54- * fails for any reason (for example, if the target channel is not started), a Response object
55- * with the ERROR status and the error message will be returned.
56- *
57- * @param channelName
58- * The name of the deployed channel to dispatch the message to.
59- * @param rawMessage
60- * A RawMessage object to dispatch to the channel.
61- * @return The Response object returned by the channel, if its source connector is configured to
62- * return one.
74+ * Dispatches a message to a channel, specified by the deployed channel name. If
75+ * the dispatch fails for any reason (for example, if the target channel is not
76+ * started), a {@link Response} object with the {@link Status#ERROR} status and
77+ * the error message will be returned.
78+ *
79+ * @param channelName The name of the deployed channel to dispatch the message
80+ * to.
81+ * @param rawMessage A {@link RawMessage} object to dispatch to the channel.
82+ * @return The {@link Response} object returned by the channel, if its source
83+ * connector is configured to return one.
6384 */
6485 public Response routeMessage (String channelName , RawMessage rawMessage ) {
6586 com .mirth .connect .model .Channel channel = channelController .getDeployedChannelByName (channelName );
6687
6788 if (channel == null ) {
68- logger .error ("Could not find channel to route to for channel name: " + channelName );
69- return new Response (Status .ERROR , "Could not find channel to route to for channel name: " + channelName );
89+ String message = "Could not find channel to route to for channel name: " + channelName ;
90+ logger .error (message );
91+ return new Response (Status .ERROR , message );
7092 }
7193
7294 return routeMessageByChannelId (channel .getId (), rawMessage );
7395 }
7496
7597 /**
76- * Dispatches a message to a channel, specified by the deployed channel ID. If the dispatch
77- * fails for any reason (for example, if the target channel is not started), a Response object
78- * with the ERROR status and the error message will be returned.
79- *
80- * @param channelId
81- * The ID of the deployed channel to dispatch the message to.
82- * @param message
83- * The message to dispatch to the channel.
84- * @return The Response object returned by the channel, if its source connector is configured to
85- * return one.
98+ * Route a message to the specified channelName. Information about the chain of
99+ * source channel Ids and source message Ids will be included in the sourceMap
100+ * of the downstream message automatically in a similar manner as if a Channel
101+ * Writer was being used.
102+ *
103+ * @param channelName The name of the channel to which to route the message.
104+ * @param message The content of the message to be sent, textual or binary.
105+ * As String or byte[].
106+ * @param sourceMap A map containing entries to include in the sourceMap of
107+ * the sent message.
108+ * @return The {@link Response} object returned by the channel.
109+ *
110+ * @see #routeMessage(String, Object, Map, Collection)
111+ */
112+ public Response routeMessage (String channelName , Object message , Map <String , Object > sourceMap ) {
113+ return routeMessage (channelName , message , sourceMap , null );
114+ }
115+
116+ /**
117+ * Route a message to the specified channelName. Information about the chain of
118+ * source channel Ids and source message Ids will be included in the sourceMap
119+ * of the downstream message automatically in a similar manner as if a Channel
120+ * Writer was being used.
121+ *
122+ * @param channelName The name of the channel to which to route the message.
123+ * @param message The content of the message to be sent, textual or
124+ * binary. As String or byte[].
125+ * @param sourceMap A map containing entries to include in the sourceMap of
126+ * the sent message.
127+ * @param destinationSet A collection of integers (metadata IDs) representing
128+ * which destinations to dispatch the message to. Null may
129+ * be passed to indicate all destinations. If unspecified,
130+ * all destinations is the default.
131+ * @return The {@link Response} object returned by the channel.
132+ *
133+ * @see #routeMessage(String, RawMessage)
134+ */
135+ public Response routeMessage (String channelName , Object message , Map <String , Object > sourceMap ,
136+ Collection <Number > destinationSet ) {
137+ return routeMessage (channelName , createRawMessage (message , sourceMap , destinationSet ));
138+ }
139+
140+ /**
141+ * Dispatches a message to a channel, specified by the deployed channel ID. If
142+ * the dispatch fails for any reason (for example, if the target channel is not
143+ * started), a {@link Response} object with the {@link Status#ERROR} status and
144+ * the error message will be returned.
145+ *
146+ * @param channelId The ID of the deployed channel to dispatch the message to.
147+ * @param message The message to dispatch to the channel.
148+ * @return The {@link Response} object returned by the channel, if its source
149+ * connector is configured to return one.
86150 */
87151 public Response routeMessageByChannelId (String channelId , String message ) {
88- return routeMessageByChannelId (channelId , new RawMessage (message ));
152+ return routeMessageByChannelId (channelId , createRawMessage (message , null , null ));
89153 }
90154
91155 /**
92- * Dispatches a message to a channel, specified by the deployed channel ID. If the dispatch
93- * fails for any reason (for example, if the target channel is not started), a Response object
94- * with the ERROR status and the error message will be returned.
95- *
96- * @param channelId
97- * The ID of the deployed channel to dispatch the message to.
98- * @param rawMessage
99- * A RawMessage object to dispatch to the channel.
100- * @return The Response object returned by the channel, if its source connector is configured to
101- * return one.
156+ * Dispatches a message to a channel, specified by the deployed channel ID. If
157+ * the dispatch fails for any reason (for example, if the target channel is not
158+ * started), a {@link Response} object with the {@link Status#ERROR} status and
159+ * the error message will be returned.
160+ *
161+ * @param channelId The ID of the deployed channel to dispatch the message to.
162+ * @param rawMessage A {@link RawMessage} object to dispatch to the channel.
163+ * @return The {@link Response} object returned by the channel, if its source
164+ * connector is configured to return one.
102165 */
103166 public Response routeMessageByChannelId (String channelId , RawMessage rawMessage ) {
104167 try {
@@ -119,11 +182,198 @@ public Response routeMessageByChannelId(String channelId, RawMessage rawMessage)
119182 }
120183 }
121184
185+ /**
186+ * Route a message to the specified channelId. Information about the chain of
187+ * source channel Ids and source message Ids will be included in the sourceMap
188+ * of the downstream message automatically in a similar manner as if a Channel
189+ * Writer was being used.
190+ *
191+ * @param channelId The unique identifier of the channel to which to route the
192+ * message.
193+ * @param message The content of the message to be sent, textual or binary. As
194+ * String or byte[].
195+ * @return The {@link Response} object returned by the channel.
196+ *
197+ * @see #routeMessageByChannelId(String, Object, Map, Collection)
198+ */
199+ public Response routeMessageByChannelId (String channelId , Object message ) {
200+ return routeMessageByChannelId (channelId , message , null , null );
201+ }
202+
203+ /**
204+ * Route a message to the specified channelId. Information about the chain of
205+ * source channel Ids and source message Ids will be included in the sourceMap
206+ * of the downstream message automatically in a similar manner as if a Channel
207+ * Writer was being used.
208+ *
209+ * @param channelId The unique identifier of the channel to which to route the
210+ * message.
211+ * @param message The content of the message to be sent, textual or binary. As
212+ * String or byte[].
213+ * @param sourceMap A map containing entries to include in the sourceMap of the
214+ * sent message.
215+ * @return The {@link Response} object returned by the channel.
216+ *
217+ * @see #routeMessageByChannelId(String, Object, Map, Collection)
218+ */
219+ public Response routeMessageByChannelId (String channelId , Object message , Map <String , Object > sourceMap ) {
220+ return routeMessageByChannelId (channelId , message , sourceMap , null );
221+ }
222+
223+ /**
224+ * Route a message to the specified channelId. Information about the chain of
225+ * source channel Ids and source message Ids will be included in the sourceMap
226+ * of the downstream message automatically in a similar manner as if a Channel
227+ * Writer was being used.
228+ *
229+ * @param channelId The unique identifier of the channel to which to route
230+ * the message.
231+ * @param message The content of the message to be sent, textual or
232+ * binary. As String or byte[].
233+ * @param sourceMap A map containing entries to include in the sourceMap of
234+ * the sent message.
235+ * @param destinationSet A collection of integers (metadata IDs) representing
236+ * which destinations to dispatch the message to. Null may
237+ * be passed to indicate all destinations. If unspecified,
238+ * all destinations is the default.
239+ * @return The {@link Response} object returned by the channel.
240+ *
241+ * @see #routeMessageByChannelId(String, RawMessage)
242+ */
243+ public Response routeMessageByChannelId (String channelId , Object message , Map <String , Object > sourceMap ,
244+ Collection <Number > destinationSet ) {
245+ return routeMessageByChannelId (channelId , createRawMessage (message , sourceMap , destinationSet ));
246+ }
247+
122248 private com .mirth .connect .donkey .model .message .RawMessage convertRawMessage (RawMessage message ) {
123249 if (message .isBinary ()) {
124- return new com .mirth .connect .donkey .model .message .RawMessage (message .getRawBytes (), message .getDestinationMetaDataIds (), message .getSourceMap ());
250+ return new com .mirth .connect .donkey .model .message .RawMessage (message .getRawBytes (),
251+ message .getDestinationMetaDataIds (), message .getSourceMap ());
252+ } else {
253+ return new com .mirth .connect .donkey .model .message .RawMessage (message .getRawData (),
254+ message .getDestinationMetaDataIds (), message .getSourceMap ());
255+ }
256+ }
257+
258+ /**
259+ * Create a {@link RawMessage} with the specified content, sourceMap, and
260+ * destinationSet.
261+ *
262+ * @param message The content of the message to be sent, textual or
263+ * binary. As String or byte[].
264+ * @param sourceMap A map containing entries to include in the sourceMap of
265+ * the {@link RawMessage} (optional).
266+ * @param destinationSet A collection of integers (metadata IDs) representing
267+ * which destinations to dispatch the message to. Null may
268+ * be passed to indicate all destinations. If unspecified,
269+ * all destinations is the default (optional).
270+ * @return A {@link RawMessage} object containing the message, source, and
271+ * destination information.
272+ */
273+ public RawMessage createRawMessage (Object message , Map <String , Object > sourceMap ,
274+ Collection <Number > destinationSet ) {
275+ if (trackingEnhancer != null ) {
276+ sourceMap = trackingEnhancer .enrich (sourceMap );
277+ }
278+
279+ if (message instanceof byte []) {
280+ return new RawMessage ((byte []) message , destinationSet , sourceMap );
125281 } else {
126- return new com .mirth .connect .donkey .model .message .RawMessage (message .getRawData (), message .getDestinationMetaDataIds (), message .getSourceMap ());
282+ return new RawMessage (message .toString (), destinationSet , sourceMap );
283+ }
284+ }
285+
286+ /**
287+ * Adds additional message tracking data.
288+ *
289+ * TrackingEnhancer
290+ */
291+ private class TrackingEnhancer {
292+ private String channelId ;
293+ private Long messageId ;
294+ private SourceMap envSourceMap ;
295+
296+ /**
297+ * Create a new enhancer with the given parameters.
298+ *
299+ * @param channelId channel ID; null defaults to "NONE"
300+ * @param messageId message ID; null defaults to -1L
301+ * @param sourceMap the message's source map
302+ */
303+ private TrackingEnhancer (String channelId , Long messageId , SourceMap sourceMap ) {
304+ this .channelId = channelId != null ? channelId : "NONE" ;
305+ this .messageId = messageId != null ? messageId : -1L ;
306+ this .envSourceMap = sourceMap ;
307+ }
308+
309+ /**
310+ * Enrich the given source map with additional message tracking data.
311+ *
312+ * @param messageSourceMap
313+ * @return a new Map
314+ */
315+ private Map <String , Object > enrich (Map <String , Object > messageSourceMap ) {
316+ if (messageSourceMap == null ) {
317+ messageSourceMap = Collections .emptyMap ();
318+ }
319+
320+ List <String > sourceChannelIds = lookupAsList ("sourceChannelIds" , "sourceChannelId" );
321+ List <String > sourceMessageIds = lookupAsList ("sourceMessageIds" , "sourceMessageId" );
322+
323+ HashMap <String , Object > newSourceMap = new HashMap <String , Object >(messageSourceMap );
324+ String channelId = this .channelId ;
325+ Long messageId = this .messageId ;
326+
327+ sourceChannelIds .add (channelId );
328+ sourceMessageIds .add (messageId .toString ());
329+
330+ newSourceMap .put ("sourceChannelIds" , sourceChannelIds );
331+ newSourceMap .put ("sourceChannelId" , channelId );
332+ newSourceMap .put ("sourceMessageIds" , sourceMessageIds );
333+ newSourceMap .put ("sourceMessageId" , messageId );
334+
335+ return newSourceMap ;
336+ }
337+
338+ /**
339+ * Given the specified lookup keys, return the first non-null value as a List.
340+ * The expectation is the first lookup will return a List, while the second
341+ * returns an Object.
342+ *
343+ * @param primary primary lookup key to return a List
344+ * @param secondary secondary lookup key to return an Object
345+ * @return a List containing the first non-null lookup value, else an empty List
346+ */
347+ private List <String > lookupAsList (String primary , String secondary ) {
348+ List <String > result = new ArrayList <String >();
349+
350+ Object primaryValue = lookupInEnvSourceMap (primary );
351+
352+ if (primaryValue != null ) {
353+ // all of this to not assume the result is a List<String>
354+ if (primaryValue instanceof Collection ) {
355+ ((Collection <?>) primaryValue ).stream ().map (i -> i .toString ()).forEach (result ::add );
356+ } else if (primaryValue instanceof Object []) {
357+ Arrays .stream ((Object []) primaryValue ).map (i -> i .toString ()).forEach (result ::add );
358+ }
359+ } else {
360+ Object secondaryValue = lookupInEnvSourceMap (secondary );
361+ if (secondaryValue != null ) {
362+ result .add (secondaryValue .toString ());
363+ }
364+ }
365+
366+ return result ;
367+ }
368+
369+ /**
370+ * Look up a value from the environment's {@link SourceMap}
371+ *
372+ * @param key
373+ * @return its mapped value, can be null
374+ */
375+ private Object lookupInEnvSourceMap (String key ) {
376+ return this .envSourceMap .get (key );
127377 }
128378 }
129- }
379+ }
0 commit comments