diff --git a/README.md b/README.md index 029d91a..0ee53d7 100644 --- a/README.md +++ b/README.md @@ -37,6 +37,7 @@ Many other destinations will be available. Examples include: - [Kafka](http://kafka.apache.org) - [ActiveMQ](http://activemq.apache.org/) - [Grafana Loki](https://grafana.com/oss/loki/) +- [Google Pub/Sub](https://cloud.google.com/pubsub) - [Mezmo](http://mezmo.com) - [ElasticSearch](http://elastic.co) diff --git a/camel/pom.xml b/camel/pom.xml index 2a8a7be..d628f73 100644 --- a/camel/pom.xml +++ b/camel/pom.xml @@ -38,6 +38,16 @@ camel-core 3.14.6 + + com.google.guava + guava + 30.1-android + + + org.apache.camel + camel-google-pubsub + 3.14.6 + org.apache.camel camel-twilio diff --git a/camel/src/main/java/com/github/theprez/manzan/configuration/DestinationConfig.java b/camel/src/main/java/com/github/theprez/manzan/configuration/DestinationConfig.java index 1302131..2b073cd 100644 --- a/camel/src/main/java/com/github/theprez/manzan/configuration/DestinationConfig.java +++ b/camel/src/main/java/com/github/theprez/manzan/configuration/DestinationConfig.java @@ -18,6 +18,7 @@ import com.github.theprez.manzan.routes.dest.EmailDestination; import com.github.theprez.manzan.routes.dest.FileDestination; import com.github.theprez.manzan.routes.dest.FluentDDestination; +import com.github.theprez.manzan.routes.dest.GooglePubSubDestination; import com.github.theprez.manzan.routes.dest.GrafanaLokiDestination; import com.github.theprez.manzan.routes.dest.HttpDestination; import com.github.theprez.manzan.routes.dest.KafkaDestination; @@ -67,6 +68,12 @@ public synchronized Map getRoutes(CamelContext context) { final String topic = getRequiredString(name, "topic"); ret.put(name, new KafkaDestination(name, topic, format, getUriAndHeaderParameters(name, sectionObj, "topic"))); break; + case "google-pubsub": + final String projectId = getRequiredString(name, "projectId"); + final String topicName = getRequiredString(name, "topicName"); + final String serviceAccountKey = getRequiredString(name, "serviceAccountKey"); + ret.put(name, new GooglePubSubDestination(context, name, projectId, topicName, serviceAccountKey, format, getUriAndHeaderParameters(name, sectionObj, "projectId", "topicName", "serviceAccountKey"))); + break; case "file": final String file = getRequiredString(name, "file"); ret.put(name, new FileDestination(name, file, format, getUriAndHeaderParameters(name, sectionObj, "file"))); @@ -96,14 +103,13 @@ public synchronized Map getRoutes(CamelContext context) { case "smtps": final String server = getRequiredString(name, "server"); final int port = getOptionalInt(name, "port"); - final EmailDestination d = new EmailDestination(name, type, server, format, port, getUriAndHeaderParameters(name, sectionObj, "server", "port"), null); + final EmailDestination d = new EmailDestination(name, type, server, port, format, getUriAndHeaderParameters(name, sectionObj, "server", "port"), null); ret.put(name, d); break; case "twilio": final String sid = getRequiredString(name, "sid"); final String token = getRequiredString(name, "token"); - ret.put(name, new TwilioDestination(context, name, format, sid, token, - getUriAndHeaderParameters(name, sectionObj, "sid", "token"))); + ret.put(name, new TwilioDestination(context, name, sid, token, format, getUriAndHeaderParameters(name, sectionObj, "sid", "token"))); break; case "http": case "https": diff --git a/camel/src/main/java/com/github/theprez/manzan/routes/ManzanGenericCamelRoute.java b/camel/src/main/java/com/github/theprez/manzan/routes/ManzanGenericCamelRoute.java index aaaf621..b56733f 100644 --- a/camel/src/main/java/com/github/theprez/manzan/routes/ManzanGenericCamelRoute.java +++ b/camel/src/main/java/com/github/theprez/manzan/routes/ManzanGenericCamelRoute.java @@ -9,7 +9,6 @@ import com.github.theprez.jcmdutils.StringUtils; import com.github.theprez.manzan.ManzanMessageFormatter; - public abstract class ManzanGenericCamelRoute extends ManzanRoute { private final String m_camelComponent; @@ -25,12 +24,12 @@ public ManzanGenericCamelRoute(final String _name, final String _camelComponent, m_headerParams = null == _headerParams ? new HashMap(1) : _headerParams; m_camelComponent = _camelComponent; m_path = _path; - m_formatter = StringUtils.isEmpty(_format) ? null: new ManzanMessageFormatter(_format); - + m_formatter = StringUtils.isEmpty(_format) ? null : new ManzanMessageFormatter(_format); + } protected abstract void customPostProcess(Exchange exchange); - //@formatter:off + @Override public void configure() { from(getInUri()) @@ -43,11 +42,12 @@ public void configure() { } }) .setBody(simple("${body}\n")) - //.wireTap("stream:out") - .process(exchange -> {customPostProcess(exchange);}) + // .wireTap("stream:out") + .process(exchange -> { + customPostProcess(exchange); + }) .to(getTargetUri()); } - //@formatter:on private String getTargetUri() { String ret = m_camelComponent; @@ -61,16 +61,16 @@ private String getTargetUri() { ret += entry.getKey(); ret += "="; ret += entry.getValue(); - //TODO: what's right here? with "file://" targets Camel wants real paths + // TODO: what's right here? with "file://" targets Camel wants real paths // try { - // ret += URLEncoder.encode(entry.getValue(), "UTF-8"); + // ret += URLEncoder.encode(entry.getValue(), "UTF-8"); // } catch (final UnsupportedEncodingException e) { - // ret += URLEncoder.encode(entry.getValue()); + // ret += URLEncoder.encode(entry.getValue()); // } ret += "&"; } ret = ret.replaceFirst("&$", ""); - System.out.println("target URI: "+ret); + System.out.println("target URI: " + ret); return ret; } } diff --git a/camel/src/main/java/com/github/theprez/manzan/routes/dest/EmailDestination.java b/camel/src/main/java/com/github/theprez/manzan/routes/dest/EmailDestination.java index c47c012..db58c21 100644 --- a/camel/src/main/java/com/github/theprez/manzan/routes/dest/EmailDestination.java +++ b/camel/src/main/java/com/github/theprez/manzan/routes/dest/EmailDestination.java @@ -7,7 +7,7 @@ public class EmailDestination extends ManzanGenericCamelRoute { - public EmailDestination(final String _name, final String _type, final String _smtpServer, final String _format, final int _port, final Map _uriParams, final Map _headerParams) { + public EmailDestination(final String _name, final String _type, final String _smtpServer, final int _port, final String _format, final Map _uriParams, final Map _headerParams) { super(_name, _type, (_port == -1) ? _smtpServer : _smtpServer + ":" + _port, _format, _uriParams, _headerParams); } diff --git a/camel/src/main/java/com/github/theprez/manzan/routes/dest/FluentDDestination.java b/camel/src/main/java/com/github/theprez/manzan/routes/dest/FluentDDestination.java index 6a422bb..1b9b059 100644 --- a/camel/src/main/java/com/github/theprez/manzan/routes/dest/FluentDDestination.java +++ b/camel/src/main/java/com/github/theprez/manzan/routes/dest/FluentDDestination.java @@ -28,15 +28,12 @@ public void run() { }); } -//@formatter:off @Override public void configure() { from(getInUri()) - .routeId(m_name).process(exchange -> { - m_logger.log(m_tag, getDataMap(exchange)); - }); + .routeId(m_name).process(exchange -> { + m_logger.log(m_tag, getDataMap(exchange)); + }); } -//@formatter:on - } diff --git a/camel/src/main/java/com/github/theprez/manzan/routes/dest/GooglePubSubDestination.java b/camel/src/main/java/com/github/theprez/manzan/routes/dest/GooglePubSubDestination.java new file mode 100644 index 0000000..e8f5f44 --- /dev/null +++ b/camel/src/main/java/com/github/theprez/manzan/routes/dest/GooglePubSubDestination.java @@ -0,0 +1,40 @@ +package com.github.theprez.manzan.routes.dest; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.camel.CamelContext; +import org.apache.camel.Exchange; +import org.apache.camel.component.google.pubsub.GooglePubsubComponent; +import org.apache.camel.component.google.pubsub.GooglePubsubConstants; + +import com.github.theprez.manzan.ManzanEventType; +import com.github.theprez.manzan.routes.ManzanGenericCamelRoute; + +public class GooglePubSubDestination extends ManzanGenericCamelRoute { + public GooglePubSubDestination(CamelContext context, final String _name, final String _projectId, final String _topicName, final String _serviceAccountKey, final String _format, final Map _uriParams) { + super(_name, "google-pubsub", _projectId + ":" + _topicName, _format, _uriParams, null); + GooglePubsubComponent pubsub = context.getComponent("google-pubsub", GooglePubsubComponent.class); + pubsub.setServiceAccountKey(_serviceAccountKey); + pubsub.init(); + pubsub.start(); + } + + @Override + protected void customPostProcess(Exchange exchange) { + final ManzanEventType type = (ManzanEventType) exchange.getIn().getHeader(EVENT_TYPE); + if (ManzanEventType.WATCH_MSG == type) { + exchange.getIn().setHeader(GooglePubsubConstants.PUBLISH_TIME, getString(exchange, MSG_MESSAGE_TIMESTAMP)); + } else if (ManzanEventType.WATCH_VLOG == type) { + // exchange.getIn().setHeader(GooglePubsubConstants.PUBLISH_TIME, getString(exchange, LOG_TIMESTAMP)); + } else if (ManzanEventType.WATCH_PAL == type) { + // exchange.getIn().setHeader(GooglePubsubConstants.PUBLISH_TIME, getString(exchange, PAL_TIMESTAMP)); + } + + Map map = new HashMap<>(); + for (Map.Entry entry : getDataMap(exchange).entrySet()) { + map.put(entry.getKey(), entry.getValue().toString()); + } + exchange.getIn().setHeader(GooglePubsubConstants.ATTRIBUTES, map); + } +} \ No newline at end of file diff --git a/camel/src/main/java/com/github/theprez/manzan/routes/dest/GrafanaLokiDestination.java b/camel/src/main/java/com/github/theprez/manzan/routes/dest/GrafanaLokiDestination.java index 5a487f6..4ca64ca 100644 --- a/camel/src/main/java/com/github/theprez/manzan/routes/dest/GrafanaLokiDestination.java +++ b/camel/src/main/java/com/github/theprez/manzan/routes/dest/GrafanaLokiDestination.java @@ -40,43 +40,44 @@ public void run() { }); } - //@formatter:off @Override public void configure() { from(getInUri()) - .routeId(m_name).process(exchange -> { - final ManzanEventType type = (ManzanEventType) exchange.getIn().getHeader(EVENT_TYPE); - if(ManzanEventType.WATCH_MSG == type) { - StreamBuilder builder = logController - .stream() - .l(appLabelName, appLabelValue) - .l(Labels.LEVEL, ((Integer) get(exchange, MSG_SEVERITY)) > SEVERITY_LIMIT ? Labels.FATAL : Labels.INFO) - .l(SESSION_ID, getWatchName(exchange)); + .routeId(m_name).process(exchange -> { + final ManzanEventType type = (ManzanEventType) exchange.getIn().getHeader(EVENT_TYPE); + if (ManzanEventType.WATCH_MSG == type) { + StreamBuilder builder = logController + .stream() + .l(appLabelName, appLabelValue) + .l(Labels.LEVEL, + ((Integer) get(exchange, MSG_SEVERITY)) > SEVERITY_LIMIT ? Labels.FATAL + : Labels.INFO) + .l(SESSION_ID, getWatchName(exchange)); - String[] keys = { - MSG_MESSAGE_ID, - MSG_MESSAGE_TYPE, - MSG_SEVERITY, - JOB, - MSG_SENDING_USRPRF, - MSG_SENDING_PROGRAM_NAME, - MSG_SENDING_MODULE_NAME, - MSG_SENDING_PROCEDURE_NAME - }; + String[] keys = { + MSG_MESSAGE_ID, + MSG_MESSAGE_TYPE, + MSG_SEVERITY, + JOB, + MSG_SENDING_USRPRF, + MSG_SENDING_PROGRAM_NAME, + MSG_SENDING_MODULE_NAME, + MSG_SENDING_PROCEDURE_NAME + }; - for (String key: keys) { - String value = getString(exchange, key); - if(!value.equals("")) { - builder.l(key, value); - } - } + for (String key : keys) { + String value = getString(exchange, key); + if (!value.equals("")) { + builder.l(key, value); + } + } - ILogStream stream = builder.build(); - stream.log(Timestamp.valueOf(getString(exchange, MSG_MESSAGE_TIMESTAMP)).getTime(), getBody(exchange, String.class)); - } else { - throw new RuntimeException("Grafana Loki route doesn't know how to process type "+type); - } - }); + ILogStream stream = builder.build(); + stream.log(Timestamp.valueOf(getString(exchange, MSG_MESSAGE_TIMESTAMP)).getTime(), + getBody(exchange, String.class)); + } else { + throw new RuntimeException("Grafana Loki route doesn't know how to process type " + type); + } + }); } - //@formatter:on } diff --git a/camel/src/main/java/com/github/theprez/manzan/routes/dest/SentryDestination.java b/camel/src/main/java/com/github/theprez/manzan/routes/dest/SentryDestination.java index 5c697b9..2c1f5d8 100644 --- a/camel/src/main/java/com/github/theprez/manzan/routes/dest/SentryDestination.java +++ b/camel/src/main/java/com/github/theprez/manzan/routes/dest/SentryDestination.java @@ -31,55 +31,54 @@ public SentryDestination(final String _name, final String _dsn) { }); } -//@formatter:off @Override public void configure() { from(getInUri()) - .routeId(m_name) - .convertBodyTo(String.class) - .process(exchange -> { - final ManzanEventType type = (ManzanEventType) exchange.getIn().getHeader(EVENT_TYPE); - if(ManzanEventType.WATCH_MSG == type) { - System.out.println("sentry"); - final SentryEvent event = new SentryEvent(); - final String watch = getWatchName(exchange); - final SentryId id = new SentryId(UUID.randomUUID()); - event.setTag("session id", watch); - event.setEventId(id); - event.setExtras(getDataMap(exchange)); - final User user = new User(); - user.setUsername(getString(exchange, MSG_SENDING_USRPRF)); - event.setUser(user); - event.setPlatform("IBM i"); - event.setTag("runtime", "IBM i"); - event.setTag("runtime.name", "IBM i"); - event.setDist("PASE"); - event.setTransaction(getString(exchange, MSG_ORDINAL_POSITION)); + .routeId(m_name) + .convertBodyTo(String.class) + .process(exchange -> { + final ManzanEventType type = (ManzanEventType) exchange.getIn().getHeader(EVENT_TYPE); + if (ManzanEventType.WATCH_MSG == type) { + System.out.println("sentry"); + final SentryEvent event = new SentryEvent(); + final String watch = getWatchName(exchange); + final SentryId id = new SentryId(UUID.randomUUID()); + event.setTag("session id", watch); + event.setEventId(id); + event.setExtras(getDataMap(exchange)); + final User user = new User(); + user.setUsername(getString(exchange, MSG_SENDING_USRPRF)); + event.setUser(user); + event.setPlatform("IBM i"); + event.setTag("runtime", "IBM i"); + event.setTag("runtime.name", "IBM i"); + event.setDist("PASE"); + event.setTransaction(getString(exchange, MSG_ORDINAL_POSITION)); - SentryLevel level; - final int sev = (Integer) get(exchange, MSG_SEVERITY); - if (sev > SEVERITY_LIMIT) { - level = SentryLevel.ERROR; - } else { - level = SentryLevel.INFO; - } + SentryLevel level; + final int sev = (Integer) get(exchange, MSG_SEVERITY); + if (sev > SEVERITY_LIMIT) { + level = SentryLevel.ERROR; + } else { + level = SentryLevel.INFO; + } - event.setLevel(level); - final Message message = new Message(); - final String messageStr = getString(exchange, MSG_MESSAGE_ID) + ": " + getString(exchange, MSG_MESSAGE); - message.setMessage(messageStr); - final List fingerprints = new LinkedList(); - fingerprints.add(getString(exchange, MSG_MESSAGE_ID)); - fingerprints.add(getString(exchange, MSG_SENDING_PROCEDURE_NAME)); - fingerprints.add(getString(exchange, MSG_SENDING_MODULE_NAME)); - fingerprints.add(getString(exchange, MSG_SENDING_PROGRAM_NAME)); - event.setFingerprints(fingerprints); - event.setMessage(message); - Sentry.captureEvent(event); - } else { - throw new RuntimeException("Sentry route doesn't know how to process type "+type); - } - }); + event.setLevel(level); + final Message message = new Message(); + final String messageStr = getString(exchange, MSG_MESSAGE_ID) + ": " + + getString(exchange, MSG_MESSAGE); + message.setMessage(messageStr); + final List fingerprints = new LinkedList(); + fingerprints.add(getString(exchange, MSG_MESSAGE_ID)); + fingerprints.add(getString(exchange, MSG_SENDING_PROCEDURE_NAME)); + fingerprints.add(getString(exchange, MSG_SENDING_MODULE_NAME)); + fingerprints.add(getString(exchange, MSG_SENDING_PROGRAM_NAME)); + event.setFingerprints(fingerprints); + event.setMessage(message); + Sentry.captureEvent(event); + } else { + throw new RuntimeException("Sentry route doesn't know how to process type " + type); + } + }); } - //@formatter:on } diff --git a/camel/src/main/java/com/github/theprez/manzan/routes/dest/SlackDestination.java b/camel/src/main/java/com/github/theprez/manzan/routes/dest/SlackDestination.java index c068f49..9550efc 100644 --- a/camel/src/main/java/com/github/theprez/manzan/routes/dest/SlackDestination.java +++ b/camel/src/main/java/com/github/theprez/manzan/routes/dest/SlackDestination.java @@ -17,16 +17,15 @@ public SlackDestination(final String _name, final String _webhook, final String m_format = StringUtils.isEmpty(_format) ? null : new ManzanMessageFormatter(_format); } -//@formatter:off @Override public void configure() { if (null == m_format) { from(getInUri()) - .routeId(m_name) - .to("slack:" + m_channel + "?webhookUrl=" + m_webhook); + .routeId(m_name) + .to("slack:" + m_channel + "?webhookUrl=" + m_webhook); } else { from(getInUri()) - .routeId(m_name).convertBodyTo(String.class, "UTF-8") + .routeId(m_name).convertBodyTo(String.class, "UTF-8") .process(exchange -> { final String formatted = m_format.format(getDataMap(exchange)); exchange.getIn().setBody(formatted); @@ -35,6 +34,4 @@ public void configure() { .to("slack:" + m_channel + "?webhookUrl=" + m_webhook); } } - //@formatter:on - } diff --git a/camel/src/main/java/com/github/theprez/manzan/routes/dest/TwilioDestination.java b/camel/src/main/java/com/github/theprez/manzan/routes/dest/TwilioDestination.java index cdea30c..ef95314 100644 --- a/camel/src/main/java/com/github/theprez/manzan/routes/dest/TwilioDestination.java +++ b/camel/src/main/java/com/github/theprez/manzan/routes/dest/TwilioDestination.java @@ -9,7 +9,7 @@ import com.github.theprez.manzan.routes.ManzanGenericCamelRoute; public class TwilioDestination extends ManzanGenericCamelRoute { - public TwilioDestination(CamelContext context, final String _name, final String _format, String sid, String token, final Map _uriParams) { + public TwilioDestination(CamelContext context, final String _name, String sid, String token, final String _format, final Map _uriParams) { super(_name, "twilio", "message/create", _format, _uriParams, null); TwilioComponent twilio = context.getComponent("twilio", TwilioComponent.class); diff --git a/camel/src/main/java/com/github/theprez/manzan/routes/event/FileEvent.java b/camel/src/main/java/com/github/theprez/manzan/routes/event/FileEvent.java index be2a170..be3bccd 100644 --- a/camel/src/main/java/com/github/theprez/manzan/routes/event/FileEvent.java +++ b/camel/src/main/java/com/github/theprez/manzan/routes/event/FileEvent.java @@ -24,55 +24,55 @@ public class FileEvent extends ManzanRoute { private final ManzanMessageFilter m_filter; private final ManzanMessageFormatter m_formatter; - public FileEvent(final String _name, final File _f, final String _format, final List _destinations, final String _filter, final int _interval) throws IOException { + public FileEvent(final String _name, final File _f, final String _format, final List _destinations, + final String _filter, final int _interval) throws IOException { super(_name); m_file = _f; super.setRecipientList(_destinations); m_interval = _interval; - m_formatter = StringUtils.isEmpty(_format) ? null: new ManzanMessageFormatter(_format); + m_formatter = StringUtils.isEmpty(_format) ? null : new ManzanMessageFormatter(_format); m_filter = new ManzanMessageFilter(_filter); } - public FileEvent(final String _name, final String _f, final String _format, final List _destinations, final String _filter, final int _interval) throws IOException { + public FileEvent(final String _name, final String _f, final String _format, final List _destinations, + final String _filter, final int _interval) throws IOException { this(_name, new File(_f), _format, _destinations, _filter, _interval); } -//@formatter:off @Override public void configure() { from("timer://foo?period=" + m_interval + "&synchronous=true") - .routeId(m_name) - .setHeader(EVENT_TYPE, constant(ManzanEventType.FILE)) - .setBody(constant(m_file.getAbsolutePath())) - .process((exchange) -> { - final File f = new File(getBody(exchange, String.class)); - exchange.getIn().setBody(new FileNewContentsReader(f, "*TAG")); - }) - .split(body().tokenize("\n")).streaming().parallelProcessing(false).stopOnException() - .convertBodyTo(String.class) - .process(exchange -> { - String bodyStr = getBody(exchange, String.class); - exchange.getIn().setHeader("abort", m_filter.matches(bodyStr) && StringUtils.isNonEmpty(bodyStr)?"continue":"abort"); - }) - .choice().when(simple("${header.abort} != 'abort'")) - .process(exchange -> { - final Map data_map = new LinkedHashMap(); - data_map.put(FILE_NAME, m_file.getName()); - data_map.put(FILE_PATH, m_file.getAbsolutePath()); - data_map.put(FILE_DATA, getBody(exchange, String.class).replace("\r","")); - exchange.getIn().setHeader("data_map", data_map); - exchange.getIn().setBody(data_map); - }) - .marshal().json(true) //TODO: skip this if we are applying a format - .setBody(simple("${body}\n")) - .process(exchange -> { - if (null != m_formatter) { - exchange.getIn().setBody(m_formatter.format(getDataMap(exchange))); - } - }) - .convertBodyTo(String.class,"UTF-8") - .recipientList(constant(getRecipientList())).stopOnException() - ; + .routeId(m_name) + .setHeader(EVENT_TYPE, constant(ManzanEventType.FILE)) + .setBody(constant(m_file.getAbsolutePath())) + .process((exchange) -> { + final File f = new File(getBody(exchange, String.class)); + exchange.getIn().setBody(new FileNewContentsReader(f, "*TAG")); + }) + .split(body().tokenize("\n")).streaming().parallelProcessing(false).stopOnException() + .convertBodyTo(String.class) + .process(exchange -> { + String bodyStr = getBody(exchange, String.class); + exchange.getIn().setHeader("abort", + m_filter.matches(bodyStr) && StringUtils.isNonEmpty(bodyStr) ? "continue" : "abort"); + }) + .choice().when(simple("${header.abort} != 'abort'")) + .process(exchange -> { + final Map data_map = new LinkedHashMap(); + data_map.put(FILE_NAME, m_file.getName()); + data_map.put(FILE_PATH, m_file.getAbsolutePath()); + data_map.put(FILE_DATA, getBody(exchange, String.class).replace("\r", "")); + exchange.getIn().setHeader("data_map", data_map); + exchange.getIn().setBody(data_map); + }) + .marshal().json(true) // TODO: skip this if we are applying a format + .setBody(simple("${body}\n")) + .process(exchange -> { + if (null != m_formatter) { + exchange.getIn().setBody(m_formatter.format(getDataMap(exchange))); + } + }) + .convertBodyTo(String.class, "UTF-8") + .recipientList(constant(getRecipientList())).stopOnException(); } - //@formatter:on } diff --git a/camel/src/main/java/com/github/theprez/manzan/routes/event/WatchMsgEvent.java b/camel/src/main/java/com/github/theprez/manzan/routes/event/WatchMsgEvent.java index bd95763..cacb34c 100644 --- a/camel/src/main/java/com/github/theprez/manzan/routes/event/WatchMsgEvent.java +++ b/camel/src/main/java/com/github/theprez/manzan/routes/event/WatchMsgEvent.java @@ -16,40 +16,40 @@ public class WatchMsgEvent extends ManzanRoute { private final String m_sessionId; private final ManzanMessageFormatter m_formatter; - public WatchMsgEvent(final String _name, final String _session_id, final String _format, final List _destinations, final String _schema, final int _interval, final int _numToProcess) throws IOException { + public WatchMsgEvent(final String _name, final String _session_id, final String _format, + final List _destinations, final String _schema, final int _interval, final int _numToProcess) + throws IOException { super(_name); m_interval = _interval; m_numToProcess = _numToProcess; m_schema = _schema; - m_formatter = StringUtils.isEmpty(_format) ? null: new ManzanMessageFormatter(_format); + m_formatter = StringUtils.isEmpty(_format) ? null : new ManzanMessageFormatter(_format); super.setRecipientList(_destinations); m_sessionId = _session_id.trim().toUpperCase(); } -//@formatter:off @Override public void configure() { from("timer://foo?synchronous=true&period=" + m_interval) - .routeId("manzan_msg:"+m_name) - .setHeader(EVENT_TYPE, constant(ManzanEventType.WATCH_MSG)) - .setBody(constant("SeLeCt * fRoM " + m_schema + ".mAnZaNmSg wHeRe SESSION_ID = '"+m_sessionId+"' limit " + m_numToProcess )) - // .to("stream:out") - .to("jdbc:jt400?outputType=StreamList") - .split(body()).streaming().parallelProcessing() - .setHeader("id", simple("${body[ORDINAL_POSITION]}")) - .setHeader("session_id", simple("${body[SESSION_ID]}")) - .setHeader("data_map", simple("${body}")) - .marshal().json(true) //TODO: skip this if we are applying a format - .setBody(simple("${body}\n")) - .process(exchange -> { - if (null != m_formatter) { - exchange.getIn().setBody(m_formatter.format(getDataMap(exchange))); - } - }) - .recipientList(constant(getRecipientList())).parallelProcessing().stopOnException().end() - .setBody(simple("delete fRoM " + m_schema + ".mAnZaNmSg where ORDINAL_POSITION = ${header.id} WITH NC")) - .to("jdbc:jt400").to("stream:err"); + .routeId("manzan_msg:" + m_name) + .setHeader(EVENT_TYPE, constant(ManzanEventType.WATCH_MSG)) + .setBody(constant("SeLeCt * fRoM " + m_schema + ".mAnZaNmSg wHeRe SESSION_ID = '" + m_sessionId + + "' limit " + m_numToProcess)) + // .to("stream:out") + .to("jdbc:jt400?outputType=StreamList") + .split(body()).streaming().parallelProcessing() + .setHeader("id", simple("${body[ORDINAL_POSITION]}")) + .setHeader("session_id", simple("${body[SESSION_ID]}")) + .setHeader("data_map", simple("${body}")) + .marshal().json(true) // TODO: skip this if we are applying a format + .setBody(simple("${body}\n")) + .process(exchange -> { + if (null != m_formatter) { + exchange.getIn().setBody(m_formatter.format(getDataMap(exchange))); + } + }) + .recipientList(constant(getRecipientList())).parallelProcessing().stopOnException().end() + .setBody(simple("delete fRoM " + m_schema + ".mAnZaNmSg where ORDINAL_POSITION = ${header.id} WITH NC")) + .to("jdbc:jt400").to("stream:err"); } - //@formatter:on - } diff --git a/docs/README.md b/docs/README.md index a61332d..c26cd75 100644 --- a/docs/README.md +++ b/docs/README.md @@ -38,7 +38,7 @@ Many other destinations will be available. Examples include: - Email (SMTP/SMTPS) ✅ - [FluentD](http://fluentd.org) ✅ - [Google Drive](http://drive.google.com) ⏳ -- [Google Pub/Sub](http://cloud.google.com/pubsub) ⏳ +- [Google Pub/Sub](http://cloud.google.com/pubsub) ✅ - [Grafana Loki](https://grafana.com/oss/loki/) ✅ - HTTP endpoints (REST, etc) ✅ - HTTPS endpoints (REST, etc) ✅ diff --git a/docs/_sidebar.md b/docs/_sidebar.md index 4e36f22..94535d7 100644 --- a/docs/_sidebar.md +++ b/docs/_sidebar.md @@ -11,4 +11,5 @@ * [Twilio](config/examples/twilio.md) * [Sentry](config/examples/sentry.md) * [Grafana Loki](config/examples/grafanaLoki.md) + * [Google Pub/Sub](config/examples/googlePubSub.md) * [Contributing](contributing.md) \ No newline at end of file diff --git a/docs/config/dests.md b/docs/config/dests.md index fc46c56..5b055ae 100644 --- a/docs/config/dests.md +++ b/docs/config/dests.md @@ -20,16 +20,17 @@ As well, each section can provide `format` as an optional type. Some types have additional properties that they require. -| id | Description | Required properties | Optional properties | -|------------------|---------------------------------|------------------------------------------------------------| -------------------------------------------------------- | -| `stdout` | Write all data to standard out. | None. | | -| `slack` | Send data to a Slack channel | * `webhook`
* `channel` | | -| `fluentd` | Sent data to FluentD | * `tag`
* `host`
* `port` | | -| `smtp`/`smtps` | Sent data via email | * `server`
* `subject`
* `to`
* `from` | * `port` | -| `sentry` | Send data into Sentry | * `dsn` | | -| `twilio` | Send via SMS | * `sid`
* `token`
* `to`
* `from` | | -| `loki` | Send data into Grafana Loki | * `url`
* `username`
* `password`
| | -| `http`/`https` | Send data via http/https | * `url` | * `httpMethod`
* `x` where x is any query parameter | +| id | Description | Required properties | Optional properties | +|------------------|---------------------------------|------------------------------------------------------------------|--------------------------------------------------------- | +| `stdout` | Write all data to standard out. | None. | | +| `slack` | Send data to a Slack channel | * `webhook`
* `channel` | | +| `fluentd` | Sent data to FluentD | * `tag`
* `host`
* `port` | | +| `smtp`/`smtps` | Sent data via email | * `server`
* `subject`
* `to`
* `from` | * `port` | +| `sentry` | Send data into Sentry | * `dsn` | | +| `twilio` | Send via SMS | * `sid`
* `token`
* `to`
* `from` | | +| `loki` | Send data into Grafana Loki | * `url`
* `username`
* `password`
| | +| `google-pubsub` | Send data into Google Pub/Sub | * `projectId`
* `topicName`
* `serviceAccountKey`
| | +| `http`/`https` | Send data via http/https | * `url` | * `httpMethod`
* `x` where x is any query parameter | ### Example @@ -62,6 +63,12 @@ url= username= password= +[pubsub_out] +type=google-pubsub +projectId= +topicName= +serviceAccountKey= + [slackme] type=slack channel=open-source-system-status diff --git a/docs/config/examples/googlePubSub.md b/docs/config/examples/googlePubSub.md new file mode 100644 index 0000000..c2a477d --- /dev/null +++ b/docs/config/examples/googlePubSub.md @@ -0,0 +1,40 @@ +# Google Pub/Sub + +This example shows how to use `google-pubsub` as a destination for a `watch` data source. + +## Configuration + +Be sure to have read up on [Manzan configuration](/config/index.md) to understand where these files exist on your system. + +### `data.ini` + +```ini +[watchout] +type=watch +id=sanjula +destinations=pubsub_out +format=$MESSAGE_ID$ (severity $SEVERITY$): $MESSAGE$ +strwch=WCHMSG((*ALL)) WCHMSGQ((*HSTLOG)) +``` + +### `dests.ini` + +```ini +[pubsub_out] +type=google-pubsub + +# Set your project ID (ie. my-project-438217) +projectId= + +# Set your topic name (ie. my-topic) +topicName= + +# Set your service account name (ie. file:/QOpenSys/etc/manzan/my-project-438217-b7392819a7hf.json) +serviceAccountKey= +``` + +## Result + +
+ Google Pub/Sub 1 +
\ No newline at end of file diff --git a/docs/config/examples/grafanaLoki.md b/docs/config/examples/grafanaLoki.md index 3b14aa5..3b0bc92 100644 --- a/docs/config/examples/grafanaLoki.md +++ b/docs/config/examples/grafanaLoki.md @@ -9,6 +9,7 @@ Be sure to have read up on [Manzan configuration](/config/index.md) to understan ### `data.ini` ```ini +[watchout] type=watch id=sanjula destinations=loki_out @@ -29,6 +30,6 @@ password= ## Result
- Grafana Loki 1 - Grafana Loki 2 + Grafana Loki 1 + Grafana Loki 2
\ No newline at end of file diff --git a/docs/images/googlePubSub1.png b/docs/images/googlePubSub1.png new file mode 100644 index 0000000..c41fcdd Binary files /dev/null and b/docs/images/googlePubSub1.png differ