Skip to content

Commit

Permalink
ENH: peer forwarding codec and model (#2256)
Browse files Browse the repository at this point in the history
Signed-off-by: George Chen <qchea@amazon.com>
  • Loading branch information
chenqi0805 authored Feb 10, 2023
1 parent a5e51a3 commit 5bc6a2c
Show file tree
Hide file tree
Showing 17 changed files with 523 additions and 254 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

package org.opensearch.dataprepper.model.event;

import java.io.Serializable;
import java.util.List;
import java.util.Map;

Expand All @@ -19,7 +20,7 @@
* <p>
* @since 1.2
*/
public interface Event {
public interface Event extends Serializable {

/**
* Adds or updates the key with a given value in the Event
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,15 @@

package org.opensearch.dataprepper.model.event;

import java.io.Serializable;
import java.time.Instant;
import java.util.Map;

/**
* The event metadata contains internal event fields. These fields are used only within Data Prepper, and are not passed down to Sinks.
* @since 1.2
*/
public interface EventMetadata {
public interface EventMetadata extends Serializable {

/**
* Retrieves the type of event
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@
import org.opensearch.dataprepper.parser.model.DataPrepperConfiguration;
import org.opensearch.dataprepper.peerforwarder.certificate.CertificateProviderFactory;
import org.opensearch.dataprepper.peerforwarder.client.PeerForwarderClient;
import org.opensearch.dataprepper.peerforwarder.codec.JacksonPeerForwarderCodec;
import org.opensearch.dataprepper.peerforwarder.codec.JavaPeerForwarderCodec;
import org.opensearch.dataprepper.peerforwarder.codec.PeerForwarderCodec;
import org.opensearch.dataprepper.peerforwarder.server.PeerForwarderHttpServerProvider;
import org.opensearch.dataprepper.peerforwarder.server.PeerForwarderHttpService;
import org.opensearch.dataprepper.peerforwarder.server.PeerForwarderServer;
Expand Down Expand Up @@ -77,10 +80,11 @@ public PeerForwarderClientFactory peerForwarderClientFactory(
@Bean
public PeerForwarderClient peerForwarderClient(final PeerForwarderConfiguration peerForwarderConfiguration,
final PeerForwarderClientFactory peerForwarderClientFactory,
@Qualifier("peerForwarderObjectMapper") final ObjectMapper objectMapper,
final PeerForwarderCodec peerForwarderCodec,
@Qualifier("peerForwarderMetrics") final PluginMetrics pluginMetrics
) {
return new PeerForwarderClient(peerForwarderConfiguration, peerForwarderClientFactory, objectMapper, pluginMetrics);
return new PeerForwarderClient(
peerForwarderConfiguration, peerForwarderClientFactory, peerForwarderCodec, pluginMetrics);
}

@Bean
Expand All @@ -96,15 +100,24 @@ public ResponseHandler responseHandler(@Qualifier("peerForwarderMetrics") final
return new ResponseHandler(pluginMetrics);
}

@Bean
public PeerForwarderCodec peerForwarderCodec(
final PeerForwarderConfiguration peerForwarderConfiguration,
@Qualifier("peerForwarderObjectMapper") final ObjectMapper objectMapper) {
return peerForwarderConfiguration.getBinaryCodec() ?
new JavaPeerForwarderCodec() : new JacksonPeerForwarderCodec(objectMapper);
}

@Bean
public PeerForwarderHttpService peerForwarderHttpService(
final ResponseHandler responseHandler,
final PeerForwarderProvider peerForwarderProvider,
final PeerForwarderConfiguration peerForwarderConfiguration,
@Qualifier("peerForwarderObjectMapper") final ObjectMapper objectMapper,
final PeerForwarderCodec peerForwarderCodec,
@Qualifier("peerForwarderMetrics") final PluginMetrics pluginMetrics
) {
return new PeerForwarderHttpService(responseHandler, peerForwarderProvider, peerForwarderConfiguration, objectMapper, pluginMetrics);
return new PeerForwarderHttpService(responseHandler, peerForwarderProvider, peerForwarderConfiguration,
peerForwarderCodec, pluginMetrics);
}

@Bean
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ public class PeerForwarderConfiguration {
private Integer failedForwardingRequestLocalWriteTimeout = 500;
private Integer forwardingBatchSize = 1500;
private Duration forwardingBatchTimeout = DEFAULT_FORWARDING_BATCH_TIMEOUT;
private boolean binaryCodec = true;

public PeerForwarderConfiguration() {}

Expand Down Expand Up @@ -97,7 +98,8 @@ public PeerForwarderConfiguration (
@JsonProperty("drain_timeout") final Duration drainTimeout,
@JsonProperty("failed_forwarding_requests_local_write_timeout") final Integer failedForwardingRequestLocalWriteTimeout,
@JsonProperty("forwarding_batch_size") final Integer forwardingBatchSize,
@JsonProperty("forwarding_batch_timeout") final Duration forwardingBatchTimeout
@JsonProperty("forwarding_batch_timeout") final Duration forwardingBatchTimeout,
@JsonProperty("binary_codec") final Boolean binaryCodec
) {
setServerPort(serverPort);
setRequestTimeout(requestTimeout);
Expand Down Expand Up @@ -130,6 +132,7 @@ public PeerForwarderConfiguration (
setFailedForwardingRequestLocalWriteTimeout(failedForwardingRequestLocalWriteTimeout);
setForwardingBatchSize(forwardingBatchSize);
setForwardingBatchTimeout(forwardingBatchTimeout);
setBinaryCodec(binaryCodec == null || binaryCodec);
checkForCertAndKeyFileInS3();
validateSslAndAuthentication();
}
Expand Down Expand Up @@ -246,6 +249,10 @@ public Duration getForwardingBatchTimeout() {
return forwardingBatchTimeout;
}

public boolean getBinaryCodec() {
return binaryCodec;
}

private void setServerPort(final Integer serverPort) {
if (serverPort != null) {
if (serverPort < 0 || serverPort > 65535) {
Expand Down Expand Up @@ -526,4 +533,8 @@ private void setForwardingBatchTimeout(final Duration forwardingBatchTimeout) {
this.forwardingBatchTimeout = forwardingBatchTimeout;
}
}

private void setBinaryCodec(final boolean binaryCodec) {
this.binaryCodec = binaryCodec;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,26 +8,24 @@
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.record.Record;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.linecorp.armeria.client.WebClient;
import com.linecorp.armeria.common.AggregatedHttpResponse;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.Timer;
import org.opensearch.dataprepper.peerforwarder.PeerClientPool;
import org.opensearch.dataprepper.peerforwarder.PeerForwarderClientFactory;
import org.opensearch.dataprepper.peerforwarder.PeerForwarderConfiguration;
import org.opensearch.dataprepper.peerforwarder.model.WireEvent;
import org.opensearch.dataprepper.peerforwarder.model.WireEvents;
import org.opensearch.dataprepper.peerforwarder.codec.PeerForwarderCodec;
import org.opensearch.dataprepper.peerforwarder.model.PeerForwardingEvents;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;

import static org.opensearch.dataprepper.peerforwarder.PeerForwarderConfiguration.DEFAULT_PEER_FORWARDING_URI;

Expand All @@ -37,7 +35,8 @@ public class PeerForwarderClient {
static final String CLIENT_REQUEST_FORWARDING_LATENCY = "clientRequestForwardingLatency";

private final PeerForwarderClientFactory peerForwarderClientFactory;
private final ObjectMapper objectMapper;
private final PeerForwarderConfiguration peerForwarderConfiguration;
private final PeerForwarderCodec peerForwarderCodec;
private final ExecutorService executorService;
private final Counter requestsCounter;
private final Timer clientRequestForwardingLatencyTimer;
Expand All @@ -46,10 +45,11 @@ public class PeerForwarderClient {

public PeerForwarderClient(final PeerForwarderConfiguration peerForwarderConfiguration,
final PeerForwarderClientFactory peerForwarderClientFactory,
final ObjectMapper objectMapper,
final PeerForwarderCodec peerForwarderCodec,
final PluginMetrics pluginMetrics) {
this.peerForwarderConfiguration = peerForwarderConfiguration;
this.peerForwarderClientFactory = peerForwarderClientFactory;
this.objectMapper = objectMapper;
this.peerForwarderCodec = peerForwarderCodec;
executorService = Executors.newFixedThreadPool(peerForwarderConfiguration.getClientThreadCount());
requestsCounter = pluginMetrics.counter(REQUESTS);
clientRequestForwardingLatencyTimer = pluginMetrics.timer(CLIENT_REQUEST_FORWARDING_LATENCY);
Expand Down Expand Up @@ -79,35 +79,15 @@ public CompletableFuture<AggregatedHttpResponse> serializeRecordsAndSendHttpRequ
}

private byte[] getSerializedJsonBytes(final Collection<Record<Event>> records, final String pluginId, final String pipelineName) {
final List<WireEvent> wireEventList = getWireEventList(records);
final WireEvents wireEvents = new WireEvents(wireEventList, pluginId, pipelineName);

final List<Event> eventList = records.stream().map(Record::getData).collect(Collectors.toList());
final PeerForwardingEvents peerForwardingEvents = new PeerForwardingEvents(eventList, pluginId, pipelineName);
try {
return objectMapper.writeValueAsBytes(wireEvents);
} catch (JsonProcessingException e) {
return peerForwarderCodec.serialize(peerForwardingEvents);
} catch (Exception e) {
throw new RuntimeException(e);
}
}

private List<WireEvent> getWireEventList(final Collection<Record<Event>> records) {
final List<WireEvent> wireEventList = new ArrayList<>();

for (final Record<Event> record : records) {
final Event event = record.getData();
wireEventList.add(getWireEvent(event));
}
return wireEventList;
}

private WireEvent getWireEvent(final Event event) {
return new WireEvent(
event.getMetadata().getEventType(),
event.getMetadata().getTimeReceived(),
event.getMetadata().getAttributes(),
event.toJsonString()
);
}

private CompletableFuture<AggregatedHttpResponse> processHttpRequest(final WebClient client, final byte[] content) {
return CompletableFuture.supplyAsync(() ->
{
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package org.opensearch.dataprepper.peerforwarder.codec;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.opensearch.dataprepper.model.event.DefaultEventMetadata;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.JacksonEvent;
import org.opensearch.dataprepper.model.trace.JacksonSpan;
import org.opensearch.dataprepper.peerforwarder.model.PeerForwardingEvents;
import org.opensearch.dataprepper.peerforwarder.model.WireEvent;
import org.opensearch.dataprepper.peerforwarder.model.WireEvents;

import java.io.IOException;
import java.util.List;
import java.util.stream.Collectors;

public class JacksonPeerForwarderCodec implements PeerForwarderCodec {
private static final String TRACE_EVENT_TYPE = "TRACE";

private final ObjectMapper objectMapper;

public JacksonPeerForwarderCodec(final ObjectMapper objectMapper) {
this.objectMapper = objectMapper;
}

@Override
public byte[] serialize(final PeerForwardingEvents peerForwardingEvents) throws IOException {
final WireEvents wireEvents = fromPeerForwardingEventsToWireEvents(peerForwardingEvents);
return objectMapper.writeValueAsBytes(wireEvents);
}

@Override
public PeerForwardingEvents deserialize(final byte[] bytes) throws IOException {
final WireEvents wireEvents = objectMapper.readValue(bytes, WireEvents.class);
return fromWireEventsToPeerForwardingEvents(wireEvents);
}

private WireEvents fromPeerForwardingEventsToWireEvents(final PeerForwardingEvents peerForwardingEvents) {
List<WireEvent> wireEventList = null;
if (peerForwardingEvents.getEvents() != null) {
wireEventList = peerForwardingEvents.getEvents().stream().map(event -> new WireEvent(
event.getMetadata().getEventType(),
event.getMetadata().getTimeReceived(),
event.getMetadata().getAttributes(),
event.toJsonString()
)).collect(Collectors.toList());
}
return new WireEvents(wireEventList,
peerForwardingEvents.getDestinationPluginId(), peerForwardingEvents.getDestinationPipelineName());
}

private PeerForwardingEvents fromWireEventsToPeerForwardingEvents(final WireEvents wireEvents) {
List<Event> eventList = null;
if (wireEvents.getEvents() != null) {
eventList = wireEvents.getEvents().stream().map(this::transformWireEvent).collect(Collectors.toList());
}
return new PeerForwardingEvents(
eventList, wireEvents.getDestinationPluginId(), wireEvents.getDestinationPipelineName());
}

private Event transformWireEvent(final WireEvent wireEvent) {
final DefaultEventMetadata eventMetadata = getEventMetadata(wireEvent);
Event event;

if (wireEvent.getEventType().equalsIgnoreCase(TRACE_EVENT_TYPE)) {
event = JacksonSpan.builder()
.withJsonData(wireEvent.getEventData())
.withEventMetadata(eventMetadata)
.build();
} else {
event = JacksonEvent.builder()
.withData(wireEvent.getEventData())
.withEventMetadata(eventMetadata)
.build();
}
return event;
}

private DefaultEventMetadata getEventMetadata(final WireEvent wireEvent) {
return DefaultEventMetadata.builder()
.withEventType(wireEvent.getEventType())
.withTimeReceived(wireEvent.getEventTimeReceived())
.withAttributes(wireEvent.getEventAttributes())
.build();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package org.opensearch.dataprepper.peerforwarder.codec;

import org.opensearch.dataprepper.peerforwarder.model.PeerForwardingEvents;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;

public class JavaPeerForwarderCodec implements PeerForwarderCodec {

@Override
public byte[] serialize(final PeerForwardingEvents events) throws IOException {
try (final ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
final ObjectOutputStream objectOutputStream = new ObjectOutputStream(byteArrayOutputStream)) {
objectOutputStream.writeObject(events);
return byteArrayOutputStream.toByteArray();
}
}

@Override
public PeerForwardingEvents deserialize(final byte[] bytes) throws IOException, ClassNotFoundException {
try (final InputStream inputStream = new ByteArrayInputStream(bytes);
final ObjectInputStream objectInputStream = new ObjectInputStream(inputStream)) {
return (PeerForwardingEvents) objectInputStream.readObject();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package org.opensearch.dataprepper.peerforwarder.codec;

import org.opensearch.dataprepper.peerforwarder.model.PeerForwardingEvents;

public interface PeerForwarderCodec {
byte[] serialize(PeerForwardingEvents peerForwardingEvents) throws Exception;

PeerForwardingEvents deserialize(byte[] bytes) throws Exception;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.peerforwarder.model;

import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.peerforwarder.PeerForwarder;

import java.io.Serializable;
import java.util.List;

/**
* A class to serialize list of {@link Event} and destination plugin ID used by {@link PeerForwarder}
*
* @since 2.0
*/
public class PeerForwardingEvents implements Serializable {
private List<Event> events;
private String destinationPluginId;
private String destinationPipelineName;

public PeerForwardingEvents() {
}

public PeerForwardingEvents(final List<Event> events, final String destinationPluginId, final String destinationPipelineName) {
this.events = events;
this.destinationPluginId = destinationPluginId;
this.destinationPipelineName = destinationPipelineName;
}

public List<Event> getEvents() {
return events;
}

public String getDestinationPluginId() {
return destinationPluginId;
}

public String getDestinationPipelineName() {
return destinationPipelineName;
}
}
Loading

0 comments on commit 5bc6a2c

Please sign in to comment.