Skip to content

Commit

Permalink
Add Google Pub/Sub support (#165)
Browse files Browse the repository at this point in the history
Signed-off-by: Sanjula Ganepola <Sanjula.Ganepola@ibm.com>
Co-authored-by: Jonathan <42983653+jonnyz32@users.noreply.github.com>
  • Loading branch information
SanjulaGanepola and jonnyz32 authored Oct 15, 2024
1 parent 7af399d commit 0114c7c
Show file tree
Hide file tree
Showing 19 changed files with 279 additions and 179 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ Many other destinations will be available. Examples include:
- [Kafka](http://kafka.apache.org)
- [ActiveMQ](http://activemq.apache.org/)
- [Grafana Loki](https://grafana.com/oss/loki/)
- [Google Pub/Sub](https://cloud.google.com/pubsub)
- [Mezmo](http://mezmo.com)
- [ElasticSearch](http://elastic.co)

Expand Down
10 changes: 10 additions & 0 deletions camel/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,16 @@
<artifactId>camel-core</artifactId>
<version>3.14.6</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>30.1-android</version>
</dependency>
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-google-pubsub</artifactId>
<version>3.14.6</version>
</dependency>
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-twilio</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.github.theprez.manzan.routes.dest.EmailDestination;
import com.github.theprez.manzan.routes.dest.FileDestination;
import com.github.theprez.manzan.routes.dest.FluentDDestination;
import com.github.theprez.manzan.routes.dest.GooglePubSubDestination;
import com.github.theprez.manzan.routes.dest.GrafanaLokiDestination;
import com.github.theprez.manzan.routes.dest.HttpDestination;
import com.github.theprez.manzan.routes.dest.KafkaDestination;
Expand Down Expand Up @@ -67,6 +68,12 @@ public synchronized Map<String, ManzanRoute> getRoutes(CamelContext context) {
final String topic = getRequiredString(name, "topic");
ret.put(name, new KafkaDestination(name, topic, format, getUriAndHeaderParameters(name, sectionObj, "topic")));
break;
case "google-pubsub":
final String projectId = getRequiredString(name, "projectId");
final String topicName = getRequiredString(name, "topicName");
final String serviceAccountKey = getRequiredString(name, "serviceAccountKey");
ret.put(name, new GooglePubSubDestination(context, name, projectId, topicName, serviceAccountKey, format, getUriAndHeaderParameters(name, sectionObj, "projectId", "topicName", "serviceAccountKey")));
break;
case "file":
final String file = getRequiredString(name, "file");
ret.put(name, new FileDestination(name, file, format, getUriAndHeaderParameters(name, sectionObj, "file")));
Expand Down Expand Up @@ -96,14 +103,13 @@ public synchronized Map<String, ManzanRoute> getRoutes(CamelContext context) {
case "smtps":
final String server = getRequiredString(name, "server");
final int port = getOptionalInt(name, "port");
final EmailDestination d = new EmailDestination(name, type, server, format, port, getUriAndHeaderParameters(name, sectionObj, "server", "port"), null);
final EmailDestination d = new EmailDestination(name, type, server, port, format, getUriAndHeaderParameters(name, sectionObj, "server", "port"), null);
ret.put(name, d);
break;
case "twilio":
final String sid = getRequiredString(name, "sid");
final String token = getRequiredString(name, "token");
ret.put(name, new TwilioDestination(context, name, format, sid, token,
getUriAndHeaderParameters(name, sectionObj, "sid", "token")));
ret.put(name, new TwilioDestination(context, name, sid, token, format, getUriAndHeaderParameters(name, sectionObj, "sid", "token")));
break;
case "http":
case "https":
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import com.github.theprez.jcmdutils.StringUtils;
import com.github.theprez.manzan.ManzanMessageFormatter;


public abstract class ManzanGenericCamelRoute extends ManzanRoute {

private final String m_camelComponent;
Expand All @@ -25,12 +24,12 @@ public ManzanGenericCamelRoute(final String _name, final String _camelComponent,
m_headerParams = null == _headerParams ? new HashMap<String, Object>(1) : _headerParams;
m_camelComponent = _camelComponent;
m_path = _path;
m_formatter = StringUtils.isEmpty(_format) ? null: new ManzanMessageFormatter(_format);
m_formatter = StringUtils.isEmpty(_format) ? null : new ManzanMessageFormatter(_format);

}

protected abstract void customPostProcess(Exchange exchange);
//@formatter:off

@Override
public void configure() {
from(getInUri())
Expand All @@ -43,11 +42,12 @@ public void configure() {
}
})
.setBody(simple("${body}\n"))
//.wireTap("stream:out")
.process(exchange -> {customPostProcess(exchange);})
// .wireTap("stream:out")
.process(exchange -> {
customPostProcess(exchange);
})
.to(getTargetUri());
}
//@formatter:on

private String getTargetUri() {
String ret = m_camelComponent;
Expand All @@ -61,16 +61,16 @@ private String getTargetUri() {
ret += entry.getKey();
ret += "=";
ret += entry.getValue();
//TODO: what's right here? with "file://" targets Camel wants real paths
// TODO: what's right here? with "file://" targets Camel wants real paths
// try {
// ret += URLEncoder.encode(entry.getValue(), "UTF-8");
// ret += URLEncoder.encode(entry.getValue(), "UTF-8");
// } catch (final UnsupportedEncodingException e) {
// ret += URLEncoder.encode(entry.getValue());
// ret += URLEncoder.encode(entry.getValue());
// }
ret += "&";
}
ret = ret.replaceFirst("&$", "");
System.out.println("target URI: "+ret);
System.out.println("target URI: " + ret);
return ret;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

public class EmailDestination extends ManzanGenericCamelRoute {

public EmailDestination(final String _name, final String _type, final String _smtpServer, final String _format, final int _port, final Map<String, String> _uriParams, final Map<String, Object> _headerParams) {
public EmailDestination(final String _name, final String _type, final String _smtpServer, final int _port, final String _format, final Map<String, String> _uriParams, final Map<String, Object> _headerParams) {
super(_name, _type, (_port == -1) ? _smtpServer : _smtpServer + ":" + _port, _format, _uriParams, _headerParams);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,12 @@ public void run() {
});
}

//@formatter:off
@Override
public void configure() {
from(getInUri())
.routeId(m_name).process(exchange -> {
m_logger.log(m_tag, getDataMap(exchange));
});
.routeId(m_name).process(exchange -> {
m_logger.log(m_tag, getDataMap(exchange));
});
}

//@formatter:on

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package com.github.theprez.manzan.routes.dest;

import java.util.HashMap;
import java.util.Map;

import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
import org.apache.camel.component.google.pubsub.GooglePubsubComponent;
import org.apache.camel.component.google.pubsub.GooglePubsubConstants;

import com.github.theprez.manzan.ManzanEventType;
import com.github.theprez.manzan.routes.ManzanGenericCamelRoute;

public class GooglePubSubDestination extends ManzanGenericCamelRoute {
public GooglePubSubDestination(CamelContext context, final String _name, final String _projectId, final String _topicName, final String _serviceAccountKey, final String _format, final Map<String, String> _uriParams) {
super(_name, "google-pubsub", _projectId + ":" + _topicName, _format, _uriParams, null);
GooglePubsubComponent pubsub = context.getComponent("google-pubsub", GooglePubsubComponent.class);
pubsub.setServiceAccountKey(_serviceAccountKey);
pubsub.init();
pubsub.start();
}

@Override
protected void customPostProcess(Exchange exchange) {
final ManzanEventType type = (ManzanEventType) exchange.getIn().getHeader(EVENT_TYPE);
if (ManzanEventType.WATCH_MSG == type) {
exchange.getIn().setHeader(GooglePubsubConstants.PUBLISH_TIME, getString(exchange, MSG_MESSAGE_TIMESTAMP));
} else if (ManzanEventType.WATCH_VLOG == type) {
// exchange.getIn().setHeader(GooglePubsubConstants.PUBLISH_TIME, getString(exchange, LOG_TIMESTAMP));
} else if (ManzanEventType.WATCH_PAL == type) {
// exchange.getIn().setHeader(GooglePubsubConstants.PUBLISH_TIME, getString(exchange, PAL_TIMESTAMP));
}

Map<String, String> map = new HashMap<>();
for (Map.Entry<String, Object> entry : getDataMap(exchange).entrySet()) {
map.put(entry.getKey(), entry.getValue().toString());
}
exchange.getIn().setHeader(GooglePubsubConstants.ATTRIBUTES, map);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,43 +40,44 @@ public void run() {
});
}

//@formatter:off
@Override
public void configure() {
from(getInUri())
.routeId(m_name).process(exchange -> {
final ManzanEventType type = (ManzanEventType) exchange.getIn().getHeader(EVENT_TYPE);
if(ManzanEventType.WATCH_MSG == type) {
StreamBuilder builder = logController
.stream()
.l(appLabelName, appLabelValue)
.l(Labels.LEVEL, ((Integer) get(exchange, MSG_SEVERITY)) > SEVERITY_LIMIT ? Labels.FATAL : Labels.INFO)
.l(SESSION_ID, getWatchName(exchange));
.routeId(m_name).process(exchange -> {
final ManzanEventType type = (ManzanEventType) exchange.getIn().getHeader(EVENT_TYPE);
if (ManzanEventType.WATCH_MSG == type) {
StreamBuilder builder = logController
.stream()
.l(appLabelName, appLabelValue)
.l(Labels.LEVEL,
((Integer) get(exchange, MSG_SEVERITY)) > SEVERITY_LIMIT ? Labels.FATAL
: Labels.INFO)
.l(SESSION_ID, getWatchName(exchange));

String[] keys = {
MSG_MESSAGE_ID,
MSG_MESSAGE_TYPE,
MSG_SEVERITY,
JOB,
MSG_SENDING_USRPRF,
MSG_SENDING_PROGRAM_NAME,
MSG_SENDING_MODULE_NAME,
MSG_SENDING_PROCEDURE_NAME
};
String[] keys = {
MSG_MESSAGE_ID,
MSG_MESSAGE_TYPE,
MSG_SEVERITY,
JOB,
MSG_SENDING_USRPRF,
MSG_SENDING_PROGRAM_NAME,
MSG_SENDING_MODULE_NAME,
MSG_SENDING_PROCEDURE_NAME
};

for (String key: keys) {
String value = getString(exchange, key);
if(!value.equals("")) {
builder.l(key, value);
}
}
for (String key : keys) {
String value = getString(exchange, key);
if (!value.equals("")) {
builder.l(key, value);
}
}

ILogStream stream = builder.build();
stream.log(Timestamp.valueOf(getString(exchange, MSG_MESSAGE_TIMESTAMP)).getTime(), getBody(exchange, String.class));
} else {
throw new RuntimeException("Grafana Loki route doesn't know how to process type "+type);
}
});
ILogStream stream = builder.build();
stream.log(Timestamp.valueOf(getString(exchange, MSG_MESSAGE_TIMESTAMP)).getTime(),
getBody(exchange, String.class));
} else {
throw new RuntimeException("Grafana Loki route doesn't know how to process type " + type);
}
});
}
//@formatter:on
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,55 +31,54 @@ public SentryDestination(final String _name, final String _dsn) {
});
}

//@formatter:off
@Override
public void configure() {
from(getInUri())
.routeId(m_name)
.convertBodyTo(String.class)
.process(exchange -> {
final ManzanEventType type = (ManzanEventType) exchange.getIn().getHeader(EVENT_TYPE);
if(ManzanEventType.WATCH_MSG == type) {
System.out.println("sentry");
final SentryEvent event = new SentryEvent();
final String watch = getWatchName(exchange);
final SentryId id = new SentryId(UUID.randomUUID());
event.setTag("session id", watch);
event.setEventId(id);
event.setExtras(getDataMap(exchange));
final User user = new User();
user.setUsername(getString(exchange, MSG_SENDING_USRPRF));
event.setUser(user);
event.setPlatform("IBM i");
event.setTag("runtime", "IBM i");
event.setTag("runtime.name", "IBM i");
event.setDist("PASE");
event.setTransaction(getString(exchange, MSG_ORDINAL_POSITION));
.routeId(m_name)
.convertBodyTo(String.class)
.process(exchange -> {
final ManzanEventType type = (ManzanEventType) exchange.getIn().getHeader(EVENT_TYPE);
if (ManzanEventType.WATCH_MSG == type) {
System.out.println("sentry");
final SentryEvent event = new SentryEvent();
final String watch = getWatchName(exchange);
final SentryId id = new SentryId(UUID.randomUUID());
event.setTag("session id", watch);
event.setEventId(id);
event.setExtras(getDataMap(exchange));
final User user = new User();
user.setUsername(getString(exchange, MSG_SENDING_USRPRF));
event.setUser(user);
event.setPlatform("IBM i");
event.setTag("runtime", "IBM i");
event.setTag("runtime.name", "IBM i");
event.setDist("PASE");
event.setTransaction(getString(exchange, MSG_ORDINAL_POSITION));

SentryLevel level;
final int sev = (Integer) get(exchange, MSG_SEVERITY);
if (sev > SEVERITY_LIMIT) {
level = SentryLevel.ERROR;
} else {
level = SentryLevel.INFO;
}
SentryLevel level;
final int sev = (Integer) get(exchange, MSG_SEVERITY);
if (sev > SEVERITY_LIMIT) {
level = SentryLevel.ERROR;
} else {
level = SentryLevel.INFO;
}

event.setLevel(level);
final Message message = new Message();
final String messageStr = getString(exchange, MSG_MESSAGE_ID) + ": " + getString(exchange, MSG_MESSAGE);
message.setMessage(messageStr);
final List<String> fingerprints = new LinkedList<String>();
fingerprints.add(getString(exchange, MSG_MESSAGE_ID));
fingerprints.add(getString(exchange, MSG_SENDING_PROCEDURE_NAME));
fingerprints.add(getString(exchange, MSG_SENDING_MODULE_NAME));
fingerprints.add(getString(exchange, MSG_SENDING_PROGRAM_NAME));
event.setFingerprints(fingerprints);
event.setMessage(message);
Sentry.captureEvent(event);
} else {
throw new RuntimeException("Sentry route doesn't know how to process type "+type);
}
});
event.setLevel(level);
final Message message = new Message();
final String messageStr = getString(exchange, MSG_MESSAGE_ID) + ": "
+ getString(exchange, MSG_MESSAGE);
message.setMessage(messageStr);
final List<String> fingerprints = new LinkedList<String>();
fingerprints.add(getString(exchange, MSG_MESSAGE_ID));
fingerprints.add(getString(exchange, MSG_SENDING_PROCEDURE_NAME));
fingerprints.add(getString(exchange, MSG_SENDING_MODULE_NAME));
fingerprints.add(getString(exchange, MSG_SENDING_PROGRAM_NAME));
event.setFingerprints(fingerprints);
event.setMessage(message);
Sentry.captureEvent(event);
} else {
throw new RuntimeException("Sentry route doesn't know how to process type " + type);
}
});
}
//@formatter:on
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,15 @@ public SlackDestination(final String _name, final String _webhook, final String
m_format = StringUtils.isEmpty(_format) ? null : new ManzanMessageFormatter(_format);
}

//@formatter:off
@Override
public void configure() {
if (null == m_format) {
from(getInUri())
.routeId(m_name)
.to("slack:" + m_channel + "?webhookUrl=" + m_webhook);
.routeId(m_name)
.to("slack:" + m_channel + "?webhookUrl=" + m_webhook);
} else {
from(getInUri())
.routeId(m_name).convertBodyTo(String.class, "UTF-8")
.routeId(m_name).convertBodyTo(String.class, "UTF-8")
.process(exchange -> {
final String formatted = m_format.format(getDataMap(exchange));
exchange.getIn().setBody(formatted);
Expand All @@ -35,6 +34,4 @@ public void configure() {
.to("slack:" + m_channel + "?webhookUrl=" + m_webhook);
}
}
//@formatter:on

}
Loading

0 comments on commit 0114c7c

Please sign in to comment.