From 62ae620fc8246715f46da8ebcdec867c4b311a3b Mon Sep 17 00:00:00 2001 From: gkatzioura Date: Wed, 12 Jan 2022 12:20:29 +0000 Subject: [PATCH 1/8] Added builder for pubsub sender --- sender-pubsub/pom.xml | 72 ++++++++++++++ .../zipkin2/reporter/pubsub/PubSubSender.java | 96 +++++++++++++++++++ 2 files changed, 168 insertions(+) create mode 100644 sender-pubsub/pom.xml create mode 100644 sender-pubsub/src/main/java/zipkin2/reporter/pubsub/PubSubSender.java diff --git a/sender-pubsub/pom.xml b/sender-pubsub/pom.xml new file mode 100644 index 0000000..cb04768 --- /dev/null +++ b/sender-pubsub/pom.xml @@ -0,0 +1,72 @@ + + + + + zipkin-gcp-parent + io.zipkin.gcp + 1.0.3-SNAPSHOT + + 4.0.0 + + zipkin-sender-pubsub + Zipkin Sender: Google PubSub + + + ${project.basedir}/.. + + + + + + com.google.cloud + libraries-bom + 24.1.2 + pom + import + + + + + + + com.google.cloud + google-cloud-pubsub + + + io.zipkin.reporter2 + zipkin-reporter + ${zipkin-reporter.version} + + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/sender-pubsub/src/main/java/zipkin2/reporter/pubsub/PubSubSender.java b/sender-pubsub/src/main/java/zipkin2/reporter/pubsub/PubSubSender.java new file mode 100644 index 0000000..588db97 --- /dev/null +++ b/sender-pubsub/src/main/java/zipkin2/reporter/pubsub/PubSubSender.java @@ -0,0 +1,96 @@ +/* + * Copyright 2016-2022 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package zipkin2.reporter.pubsub; + +import java.io.IOException; +import java.util.List; + +import com.google.api.gax.core.ExecutorProvider; + +import com.google.cloud.pubsub.v1.Publisher; +import zipkin2.Call; +import zipkin2.codec.Encoding; +import zipkin2.reporter.Sender; + +public class PubSubSender extends Sender { + + public static PubSubSender create(String topic) { + return null; + } + + public static final class Builder { + String topic; + Encoding encoding = Encoding.JSON; + Publisher publisher; + ExecutorProvider executorProvider; + + Builder(PubSubSender pubSubSender) { + this.topic = pubSubSender.topic; + this.encoding = pubSubSender.encoding; + this.publisher = pubSubSender.publisher; + this.executorProvider = pubSubSender.executorProvider; + } + + /** PubSub topic to send spans. */ + public Builder topic(String topic) { + if (topic == null) throw new NullPointerException("topic == null"); + this.topic = topic; + return this; + } + + public Builder publisher(Publisher publisher) { + if (publisher == null) throw new NullPointerException("publisher == null"); + this.publisher = publisher; + return this; + } + + public Builder publisher(ExecutorProvider executorProvider) { + if (executorProvider == null) throw new NullPointerException("executorProvider == null"); + this.executorProvider = executorProvider; + return this; + } + + } + + @Override public Encoding encoding() { + return null; + } + + @Override public int messageMaxBytes() { + return 0; + } + + @Override public int messageSizeInBytes(List list) { + return 0; + } + + @Override public Call sendSpans(List list) { + return null; + } + + final String topic = "saddsa"; + final Encoding encoding = Encoding.JSON; + final Publisher publisher = null; + final ExecutorProvider executorProvider = null; + + + public PubSubSender() throws IOException { + Publisher publisher = Publisher.newBuilder(topic).setExecutorProvider(executorProvider) + .build(); + + + publisher.publish(null).addListener(()-> {}, executorProvider.getExecutor()); + //publisherStub = new Pu + } +} From 6bc95c4a6d4944b55e17cbcef853bc218c61cfc5 Mon Sep 17 00:00:00 2001 From: gkatzioura Date: Sat, 15 Jan 2022 09:53:30 +0000 Subject: [PATCH 2/8] Added PubSub intialization and sender --- .../zipkin2/reporter/pubsub/PubSubSender.java | 132 +++++++++++++++--- .../PubSubSenderInitializationException.java | 20 +++ 2 files changed, 135 insertions(+), 17 deletions(-) create mode 100644 sender-pubsub/src/main/java/zipkin2/reporter/pubsub/PubSubSenderInitializationException.java diff --git a/sender-pubsub/src/main/java/zipkin2/reporter/pubsub/PubSubSender.java b/sender-pubsub/src/main/java/zipkin2/reporter/pubsub/PubSubSender.java index 588db97..24c7094 100644 --- a/sender-pubsub/src/main/java/zipkin2/reporter/pubsub/PubSubSender.java +++ b/sender-pubsub/src/main/java/zipkin2/reporter/pubsub/PubSubSender.java @@ -14,23 +14,35 @@ package zipkin2.reporter.pubsub; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.List; import com.google.api.gax.core.ExecutorProvider; +import com.google.api.gax.core.InstantiatingExecutorProvider; import com.google.cloud.pubsub.v1.Publisher; +import com.google.protobuf.ByteString; +import com.google.pubsub.v1.PubsubMessage; + import zipkin2.Call; +import zipkin2.CheckResult; import zipkin2.codec.Encoding; +import zipkin2.reporter.BytesMessageEncoder; import zipkin2.reporter.Sender; public class PubSubSender extends Sender { public static PubSubSender create(String topic) { - return null; + return newBuilder().topic(topic).build(); + } + + public static Builder newBuilder() { + return new Builder(); } public static final class Builder { String topic; + int messageMaxBytes = 10 * 1024 * 1024; // 10MB PubSub limit. Encoding encoding = Encoding.JSON; Publisher publisher; ExecutorProvider executorProvider; @@ -49,48 +61,134 @@ public Builder topic(String topic) { return this; } + /** Maximum size of a message. PubSub max message size is 10MB */ + public Builder messageMaxBytes(int messageMaxBytes) { + this.messageMaxBytes = messageMaxBytes; + return this; + } + + public Builder publisher(Publisher publisher) { if (publisher == null) throw new NullPointerException("publisher == null"); this.publisher = publisher; return this; } + /** ExecutorProvider for PubSub operations **/ public Builder publisher(ExecutorProvider executorProvider) { if (executorProvider == null) throw new NullPointerException("executorProvider == null"); this.executorProvider = executorProvider; return this; } + public PubSubSender build() { + if (topic == null) throw new NullPointerException("topic == null"); + try { + Publisher.newBuilder("dsdssd").setExecutorProvider(null).build(); + } catch (IOException e) { + throw new PubSubSenderInitializationException(e); + } + + if (executorProvider == null) executorProvider = defaultExecutorProvider(); ; + + if(publisher == null) { + try { + publisher = Publisher.newBuilder(topic).setExecutorProvider(executorProvider).build(); + } catch (IOException e) { + throw new PubSubSenderInitializationException(e); + } + } + + return new PubSubSender(this); + } + + private InstantiatingExecutorProvider defaultExecutorProvider() { + return InstantiatingExecutorProvider.newBuilder() + .setExecutorThreadCount(5 * Runtime.getRuntime().availableProcessors()) + .build(); + } + + Builder() { + } + } + + final String topic; + final int messageMaxBytes; + final Encoding encoding; + final Publisher publisher; + final ExecutorProvider executorProvider; + + volatile boolean closeCalled; + + PubSubSender(Builder builder) { + this.topic = builder.topic; + this.messageMaxBytes = builder.messageMaxBytes; + this.encoding = builder.encoding; + this.publisher = builder.publisher; + this.executorProvider = builder.executorProvider; + } + + @Override + public CheckResult check() { + return CheckResult.OK; } @Override public Encoding encoding() { - return null; + return encoding; } - @Override public int messageMaxBytes() { - return 0; + @Override + public int messageMaxBytes() { + return messageMaxBytes; } - @Override public int messageSizeInBytes(List list) { - return 0; + @Override + public int messageSizeInBytes(List bytes) { + return encoding().listSizeInBytes(bytes); } - @Override public Call sendSpans(List list) { + @Override + public Call sendSpans(List byteList) { + if (closeCalled) throw new IllegalStateException("closed"); + + byte[] messageBytes = BytesMessageEncoder.forEncoding(encoding()).encode(byteList); + PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(ByteString.copyFrom(messageBytes)).build(); + publisher.publish(pubsubMessage); return null; - } + /* - final String topic = "saddsa"; - final Encoding encoding = Encoding.JSON; - final Publisher publisher = null; - final ExecutorProvider executorProvider = null; + ByteBuffer message = ByteBuffer.wrap(BytesMessageEncoder.forEncoding(encoding()).encode(list)); + PutRecordRequest request = new PutRecordRequest(); + request.setStreamName(streamName); + request.setData(message); + request.setPartitionKey(getPartitionKey()); - public PubSubSender() throws IOException { - Publisher publisher = Publisher.newBuilder(topic).setExecutorProvider(executorProvider) - .build(); + return new KinesisCall(request); + */ + } + /** + * Shutdown on Publisher is not async thus moving the synchronized block to another function in order not to block until the shutdown is over + * @throws IOException + */ + @Override + public void close() throws IOException { + if(!setClosed()) { + return; + } + publisher.shutdown(); + } - publisher.publish(null).addListener(()-> {}, executorProvider.getExecutor()); - //publisherStub = new Pu + private synchronized boolean setClosed() { + if(closeCalled) { + return false; + } else { + closeCalled = true; + return true; + } } + + + } diff --git a/sender-pubsub/src/main/java/zipkin2/reporter/pubsub/PubSubSenderInitializationException.java b/sender-pubsub/src/main/java/zipkin2/reporter/pubsub/PubSubSenderInitializationException.java new file mode 100644 index 0000000..00dd0da --- /dev/null +++ b/sender-pubsub/src/main/java/zipkin2/reporter/pubsub/PubSubSenderInitializationException.java @@ -0,0 +1,20 @@ +package zipkin2.reporter.pubsub; + +public class PubSubSenderInitializationException extends RuntimeException { + + public PubSubSenderInitializationException() { + } + + public PubSubSenderInitializationException(String message) { + super(message); + } + + public PubSubSenderInitializationException(String message, Throwable cause) { + super(message, cause); + } + + public PubSubSenderInitializationException(Throwable cause) { + super(cause); + } + +} From a235df79e38d2d0f01b8ae710e6964e50f8588a5 Mon Sep 17 00:00:00 2001 From: gkatzioura Date: Sat, 15 Jan 2022 13:04:44 +0000 Subject: [PATCH 3/8] 109: Added future handling on callback --- .../zipkin2/reporter/pubsub/PubSubSender.java | 70 ++++++++++++++++++- 1 file changed, 69 insertions(+), 1 deletion(-) diff --git a/sender-pubsub/src/main/java/zipkin2/reporter/pubsub/PubSubSender.java b/sender-pubsub/src/main/java/zipkin2/reporter/pubsub/PubSubSender.java index 24c7094..a2c8d19 100644 --- a/sender-pubsub/src/main/java/zipkin2/reporter/pubsub/PubSubSender.java +++ b/sender-pubsub/src/main/java/zipkin2/reporter/pubsub/PubSubSender.java @@ -16,7 +16,12 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import com.google.api.core.ApiFuture; +import com.google.api.core.ApiFutureCallback; +import com.google.api.core.ApiFutures; import com.google.api.gax.core.ExecutorProvider; import com.google.api.gax.core.InstantiatingExecutorProvider; @@ -25,6 +30,7 @@ import com.google.pubsub.v1.PubsubMessage; import zipkin2.Call; +import zipkin2.Callback; import zipkin2.CheckResult; import zipkin2.codec.Encoding; import zipkin2.reporter.BytesMessageEncoder; @@ -153,7 +159,6 @@ public Call sendSpans(List byteList) { byte[] messageBytes = BytesMessageEncoder.forEncoding(encoding()).encode(byteList); PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(ByteString.copyFrom(messageBytes)).build(); - publisher.publish(pubsubMessage); return null; /* @@ -189,6 +194,69 @@ private synchronized boolean setClosed() { } } + class PubSubCall extends Call.Base { + private final PubsubMessage message; + volatile ApiFuture future; + public PubSubCall(PubsubMessage message) { + this.message = message; + } + + @Override + protected Void doExecute() throws IOException { + try { + publisher.publish(message).get(); + } catch (InterruptedException| ExecutionException e) { + throw new RuntimeException(e); + } + return null; + } + + @Override + protected void doEnqueue(Callback callback) { + ApiFuture future = publisher.publish(message); + ApiFutures.addCallback(future, new ApiFutureCallbackAdapter(callback), executorProvider.getExecutor()); + if (future.isCancelled()) throw new IllegalStateException("cancelled sending spans"); + } + + @Override + protected void doCancel() { + Future maybeFuture = future; + if (maybeFuture != null) maybeFuture.cancel(true); + } + + @Override + protected boolean doIsCanceled() { + Future maybeFuture = future; + return maybeFuture != null && maybeFuture.isCancelled(); + } + + @Override + public Call clone() { + PubsubMessage clone = PubsubMessage.newBuilder(message).build(); + return new PubSubCall(clone); + } + + } + + static final class ApiFutureCallbackAdapter implements ApiFutureCallback { + + final Callback callback; + + public ApiFutureCallbackAdapter(Callback callback) { + this.callback = callback; + } + + @Override + public void onFailure(Throwable t) { + callback.onError(t); + } + + @Override + public void onSuccess(String result) { + callback.onSuccess(null); + } + + } } From 11dc249d40365ffb724283c44adbe5786f1c93a3 Mon Sep 17 00:00:00 2001 From: gkatzioura Date: Sat, 15 Jan 2022 13:42:34 +0000 Subject: [PATCH 4/8] 109: Added topic Admin for PubSub sender checks --- .../zipkin2/reporter/pubsub/PubSubSender.java | 48 +++++++++++++------ 1 file changed, 33 insertions(+), 15 deletions(-) diff --git a/sender-pubsub/src/main/java/zipkin2/reporter/pubsub/PubSubSender.java b/sender-pubsub/src/main/java/zipkin2/reporter/pubsub/PubSubSender.java index a2c8d19..b042dde 100644 --- a/sender-pubsub/src/main/java/zipkin2/reporter/pubsub/PubSubSender.java +++ b/sender-pubsub/src/main/java/zipkin2/reporter/pubsub/PubSubSender.java @@ -16,6 +16,7 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.List; +import java.util.Optional; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; @@ -26,8 +27,11 @@ import com.google.api.gax.core.InstantiatingExecutorProvider; import com.google.cloud.pubsub.v1.Publisher; +import com.google.cloud.pubsub.v1.TopicAdminClient; import com.google.protobuf.ByteString; import com.google.pubsub.v1.PubsubMessage; +import com.google.pubsub.v1.Topic; +import com.google.pubsub.v1.TopicName; import zipkin2.Call; import zipkin2.Callback; @@ -52,6 +56,7 @@ public static final class Builder { Encoding encoding = Encoding.JSON; Publisher publisher; ExecutorProvider executorProvider; + TopicAdminClient topicAdminClient; Builder(PubSubSender pubSubSender) { this.topic = pubSubSender.topic; @@ -73,7 +78,6 @@ public Builder messageMaxBytes(int messageMaxBytes) { return this; } - public Builder publisher(Publisher publisher) { if (publisher == null) throw new NullPointerException("publisher == null"); this.publisher = publisher; @@ -81,12 +85,18 @@ public Builder publisher(Publisher publisher) { } /** ExecutorProvider for PubSub operations **/ - public Builder publisher(ExecutorProvider executorProvider) { + public Builder executorProvider(ExecutorProvider executorProvider) { if (executorProvider == null) throw new NullPointerException("executorProvider == null"); this.executorProvider = executorProvider; return this; } + public Builder topicAdminClient(TopicAdminClient topicAdminClient) { + if (topicAdminClient == null) throw new NullPointerException("topicAdminClient == null"); + this.topicAdminClient = topicAdminClient; + return this; + } + public PubSubSender build() { if (topic == null) throw new NullPointerException("topic == null"); try { @@ -105,6 +115,14 @@ public PubSubSender build() { } } + if(topicAdminClient == null) { + try { + topicAdminClient = TopicAdminClient.create(); + } catch (IOException e) { + throw new PubSubSenderInitializationException(e); + } + } + return new PubSubSender(this); } @@ -123,6 +141,7 @@ private InstantiatingExecutorProvider defaultExecutorProvider() { final Encoding encoding; final Publisher publisher; final ExecutorProvider executorProvider; + final TopicAdminClient topicAdminClient; volatile boolean closeCalled; @@ -132,11 +151,18 @@ private InstantiatingExecutorProvider defaultExecutorProvider() { this.encoding = builder.encoding; this.publisher = builder.publisher; this.executorProvider = builder.executorProvider; + this.topicAdminClient = builder.topicAdminClient; } + /** + * If no permissions given sent back ok, f permissions and topic exist ok, if topic does not exist error + * @return + */ @Override public CheckResult check() { + Topic topic = topicAdminClient.getTopic(TopicName.parse(this.topic)); return CheckResult.OK; + // return CheckResult.failed(e); } @Override public Encoding encoding() { @@ -159,18 +185,8 @@ public Call sendSpans(List byteList) { byte[] messageBytes = BytesMessageEncoder.forEncoding(encoding()).encode(byteList); PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(ByteString.copyFrom(messageBytes)).build(); - return null; - /* - - ByteBuffer message = ByteBuffer.wrap(BytesMessageEncoder.forEncoding(encoding()).encode(list)); - - PutRecordRequest request = new PutRecordRequest(); - request.setStreamName(streamName); - request.setData(message); - request.setPartitionKey(getPartitionKey()); - return new KinesisCall(request); - */ + return new PubSubCall(pubsubMessage); } /** @@ -194,6 +210,10 @@ private synchronized boolean setClosed() { } } + @Override public final String toString() { + return "PubSubSender{topic=" + topic+ "}"; + } + class PubSubCall extends Call.Base { private final PubsubMessage message; volatile ApiFuture future; @@ -236,7 +256,6 @@ public Call clone() { PubsubMessage clone = PubsubMessage.newBuilder(message).build(); return new PubSubCall(clone); } - } static final class ApiFutureCallbackAdapter implements ApiFutureCallback { @@ -256,7 +275,6 @@ public void onFailure(Throwable t) { public void onSuccess(String result) { callback.onSuccess(null); } - } } From 301e36c67278cc541484aae77b00615a7f31c9c6 Mon Sep 17 00:00:00 2001 From: gkatzioura Date: Sat, 12 Feb 2022 00:38:20 +0000 Subject: [PATCH 5/8] 109: Added PubSub Sender tests --- sender-pubsub/pom.xml | 28 +-- .../zipkin2/reporter/pubsub/PubSubSender.java | 36 ++- .../reporter/pubsub/PubSubSenderTest.java | 212 ++++++++++++++++++ 3 files changed, 251 insertions(+), 25 deletions(-) create mode 100644 sender-pubsub/src/test/java/zipkin2/reporter/pubsub/PubSubSenderTest.java diff --git a/sender-pubsub/pom.xml b/sender-pubsub/pom.xml index cb04768..f58d764 100644 --- a/sender-pubsub/pom.xml +++ b/sender-pubsub/pom.xml @@ -29,6 +29,7 @@ ${project.basedir}/.. + 2.12.0 @@ -53,20 +54,19 @@ zipkin-reporter ${zipkin-reporter.version} - - - - - - - - - - - - - - + + io.grpc + grpc-testing + 1.43.2 + test + + + com.google.api.grpc + grpc-google-cloud-pubsub-v1 + 1.97.1 + test + + \ No newline at end of file diff --git a/sender-pubsub/src/main/java/zipkin2/reporter/pubsub/PubSubSender.java b/sender-pubsub/src/main/java/zipkin2/reporter/pubsub/PubSubSender.java index b042dde..b0bdebb 100644 --- a/sender-pubsub/src/main/java/zipkin2/reporter/pubsub/PubSubSender.java +++ b/sender-pubsub/src/main/java/zipkin2/reporter/pubsub/PubSubSender.java @@ -14,9 +14,7 @@ package zipkin2.reporter.pubsub; import java.io.IOException; -import java.nio.ByteBuffer; import java.util.List; -import java.util.Optional; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; @@ -26,6 +24,8 @@ import com.google.api.gax.core.ExecutorProvider; import com.google.api.gax.core.InstantiatingExecutorProvider; +import com.google.api.gax.rpc.ApiException; +import com.google.api.gax.rpc.UnknownException; import com.google.cloud.pubsub.v1.Publisher; import com.google.cloud.pubsub.v1.TopicAdminClient; import com.google.protobuf.ByteString; @@ -33,6 +33,7 @@ import com.google.pubsub.v1.Topic; import com.google.pubsub.v1.TopicName; +import io.grpc.StatusRuntimeException; import zipkin2.Call; import zipkin2.Callback; import zipkin2.CheckResult; @@ -78,6 +79,17 @@ public Builder messageMaxBytes(int messageMaxBytes) { return this; } + /** + * Use this to change the encoding used in messages. Default is {@linkplain Encoding#JSON} + * + *

Note: If ultimately sending to Zipkin, version 2.8+ is required to process protobuf. + */ + public Builder encoding(Encoding encoding) { + if (encoding == null) throw new NullPointerException("encoding == null"); + this.encoding = encoding; + return this; + } + public Builder publisher(Publisher publisher) { if (publisher == null) throw new NullPointerException("publisher == null"); this.publisher = publisher; @@ -99,11 +111,6 @@ public Builder topicAdminClient(TopicAdminClient topicAdminClient) { public PubSubSender build() { if (topic == null) throw new NullPointerException("topic == null"); - try { - Publisher.newBuilder("dsdssd").setExecutorProvider(null).build(); - } catch (IOException e) { - throw new PubSubSenderInitializationException(e); - } if (executorProvider == null) executorProvider = defaultExecutorProvider(); ; @@ -136,6 +143,10 @@ private InstantiatingExecutorProvider defaultExecutorProvider() { } } + public Builder toBuilder() { + return new Builder(this); + } + final String topic; final int messageMaxBytes; final Encoding encoding; @@ -160,9 +171,12 @@ private InstantiatingExecutorProvider defaultExecutorProvider() { */ @Override public CheckResult check() { - Topic topic = topicAdminClient.getTopic(TopicName.parse(this.topic)); - return CheckResult.OK; - // return CheckResult.failed(e); + try { + Topic topic = topicAdminClient.getTopic(TopicName.parse(this.topic)); + return CheckResult.OK; + } catch (ApiException e) { + return CheckResult.failed(e); + } } @Override public Encoding encoding() { @@ -234,7 +248,7 @@ protected Void doExecute() throws IOException { @Override protected void doEnqueue(Callback callback) { - ApiFuture future = publisher.publish(message); + future = publisher.publish(message); ApiFutures.addCallback(future, new ApiFutureCallbackAdapter(callback), executorProvider.getExecutor()); if (future.isCancelled()) throw new IllegalStateException("cancelled sending spans"); } diff --git a/sender-pubsub/src/test/java/zipkin2/reporter/pubsub/PubSubSenderTest.java b/sender-pubsub/src/test/java/zipkin2/reporter/pubsub/PubSubSenderTest.java new file mode 100644 index 0000000..eb09177 --- /dev/null +++ b/sender-pubsub/src/test/java/zipkin2/reporter/pubsub/PubSubSenderTest.java @@ -0,0 +1,212 @@ +package zipkin2.reporter.pubsub; + +import java.io.IOException; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.ExecutionException; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import org.junit.Rule; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.ArgumentCaptor; + +import com.google.api.gax.core.ExecutorProvider; +import com.google.api.gax.core.InstantiatingExecutorProvider; +import com.google.api.gax.grpc.GrpcTransportChannel; +import com.google.api.gax.rpc.ApiException; +import com.google.api.gax.rpc.FixedTransportChannelProvider; +import com.google.api.gax.rpc.TransportChannel; +import com.google.api.gax.rpc.TransportChannelProvider; +import com.google.cloud.pubsub.v1.Publisher; +import com.google.cloud.pubsub.v1.TopicAdminClient; +import com.google.cloud.pubsub.v1.stub.GrpcPublisherStub; +import com.google.cloud.pubsub.v1.stub.PublisherStub; +import com.google.cloud.pubsub.v1.stub.PublisherStubSettings; +import com.google.pubsub.v1.GetTopicRequest; +import com.google.pubsub.v1.PublishRequest; +import com.google.pubsub.v1.PublishResponse; +import com.google.pubsub.v1.PublisherGrpc; +import com.google.pubsub.v1.PubsubMessage; +import com.google.pubsub.v1.Topic; + +import io.grpc.ManagedChannel; +import io.grpc.Status; +import io.grpc.inprocess.InProcessChannelBuilder; +import io.grpc.inprocess.InProcessServerBuilder; +import io.grpc.stub.StreamObserver; +import io.grpc.testing.GrpcCleanupRule; +import static java.util.stream.Collectors.toList; +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import zipkin2.Call; +import zipkin2.CheckResult; +import zipkin2.Span; +import static zipkin2.TestObjects.CLIENT_SPAN; +import zipkin2.codec.Encoding; +import zipkin2.codec.SpanBytesDecoder; +import zipkin2.codec.SpanBytesEncoder; + +class PubSubSenderTest { + + @Rule + public final GrpcCleanupRule grpcCleanup = new GrpcCleanupRule(); + + PubSubSender sender; + + private PublisherGrpc.PublisherImplBase publisherImplBase; + + @BeforeEach + void setUp() throws IOException, ExecutionException, InterruptedException { + publisherImplBase = mock(PublisherGrpc.PublisherImplBase.class); + + String serverName = InProcessServerBuilder.generateName(); + + grpcCleanup.register(InProcessServerBuilder + .forName(serverName).directExecutor().addService(publisherImplBase).build().start()); + + ExecutorProvider executorProvider = testExecutorProvider(); + + ManagedChannel managedChannel = InProcessChannelBuilder.forName(serverName).directExecutor().build(); + + TransportChannel transportChannel = GrpcTransportChannel.create(managedChannel); + TransportChannelProvider transportChannelProvider = FixedTransportChannelProvider.create(transportChannel); + + String topicName = "projects/test-project/topics/my-topic"; + Publisher publisher = Publisher.newBuilder(topicName) + .setExecutorProvider(executorProvider) + .setChannelProvider(transportChannelProvider) + .build(); + + PublisherStubSettings publisherStubSettings = PublisherStubSettings.newBuilder() + .setTransportChannelProvider(transportChannelProvider) + .build(); + PublisherStub publisherStub = GrpcPublisherStub.create(publisherStubSettings); + TopicAdminClient topicAdminClient = TopicAdminClient.create(publisherStub); + + sender = PubSubSender.newBuilder() + .topic(topicName) + .publisher(publisher) + .topicAdminClient(topicAdminClient) + .build(); + } + + private InstantiatingExecutorProvider testExecutorProvider() { + return InstantiatingExecutorProvider.newBuilder() + .setExecutorThreadCount(5 * Runtime.getRuntime().availableProcessors()) + .build(); + } + + @Test + public void sendsSpans() throws Exception { + ArgumentCaptor requestCaptor = + ArgumentCaptor.forClass(PublishRequest.class); + + doAnswer(invocationOnMock -> { + StreamObserver responseObserver = invocationOnMock.getArgument(1); + responseObserver.onNext(PublishResponse.newBuilder().addMessageIds(UUID.randomUUID().toString()).build()); + responseObserver.onCompleted(); + return null; + }).when(publisherImplBase).publish(requestCaptor.capture(), any(StreamObserver.class)); + + send(CLIENT_SPAN, CLIENT_SPAN).execute(); + + assertThat(extractSpans(requestCaptor.getValue())) + .containsExactly(CLIENT_SPAN, CLIENT_SPAN); + } + + @Test + public void sendsSpans_PROTO3() throws Exception { + ArgumentCaptor requestCaptor = + ArgumentCaptor.forClass(PublishRequest.class); + + doAnswer(invocationOnMock -> { + StreamObserver responseObserver = invocationOnMock.getArgument(1); + responseObserver.onNext(PublishResponse.newBuilder().addMessageIds(UUID.randomUUID().toString()).build()); + responseObserver.onCompleted(); + return null; + }).when(publisherImplBase).publish(requestCaptor.capture(), any(StreamObserver.class)); + + sender = sender.toBuilder().encoding(Encoding.PROTO3).build(); + + send(CLIENT_SPAN, CLIENT_SPAN).execute(); + + assertThat(extractSpans(requestCaptor.getValue())) + .containsExactly(CLIENT_SPAN, CLIENT_SPAN); + } + + @Test + public void sendsSpans_json_unicode() throws Exception { + ArgumentCaptor requestCaptor = + ArgumentCaptor.forClass(PublishRequest.class); + + doAnswer(invocationOnMock -> { + StreamObserver responseObserver = invocationOnMock.getArgument(1); + responseObserver.onNext(PublishResponse.newBuilder().addMessageIds(UUID.randomUUID().toString()).build()); + responseObserver.onCompleted(); + return null; + }).when(publisherImplBase).publish(requestCaptor.capture(), any(StreamObserver.class)); + + Span unicode = CLIENT_SPAN.toBuilder().putTag("error", "\uD83D\uDCA9").build(); + send(unicode).execute(); + + assertThat(extractSpans(requestCaptor.getValue())).containsExactly(unicode); + } + + @Test + public void checkPasses() throws Exception { + ArgumentCaptor captor = + ArgumentCaptor.forClass(GetTopicRequest.class); + + doAnswer(invocationOnMock -> { + StreamObserver responseObserver = invocationOnMock.getArgument(1); + responseObserver.onNext(Topic.newBuilder().setName("topic-name").build()); + responseObserver.onCompleted(); + return null; + }).when(publisherImplBase).getTopic(captor.capture(), any(StreamObserver.class)); + + CheckResult result = sender.check(); + assertThat(result.ok()).isTrue(); + } + + @Test + public void checkFailsWithStreamNotActive() throws Exception { + ArgumentCaptor captor = + ArgumentCaptor.forClass(GetTopicRequest.class); + + doAnswer(invocationOnMock -> { + StreamObserver responseObserver = invocationOnMock.getArgument(1); + responseObserver.onError(new io.grpc.StatusRuntimeException(Status.NOT_FOUND)); + return null; + }).when(publisherImplBase).getTopic(captor.capture(), any(StreamObserver.class)); + + CheckResult result = sender.check(); + assertThat(result.error()).isInstanceOf(ApiException.class); + } + + private List extractSpans(PublishRequest publishRequest) { + return publishRequest.getMessagesList() + .stream() + .flatMap(this::extractSpans) + .collect(Collectors.toList()); + } + + Stream extractSpans(PubsubMessage pubsubMessage) { + byte[] messageBytes = pubsubMessage.getData().toByteArray(); + + if (messageBytes[0] == '[') { + return SpanBytesDecoder.JSON_V2.decodeList(messageBytes).stream(); + } + return SpanBytesDecoder.PROTO3.decodeList(messageBytes).stream(); + } + + Call send(zipkin2.Span... spans) { + SpanBytesEncoder bytesEncoder = + sender.encoding() == Encoding.JSON ? SpanBytesEncoder.JSON_V2 : SpanBytesEncoder.PROTO3; + return sender.sendSpans(Stream.of(spans).map(bytesEncoder::encode).collect(toList())); + } + +} From 4d7275ee2baa0a4242bd6b75f28413ed363ce7ee Mon Sep 17 00:00:00 2001 From: gkatzioura Date: Thu, 17 Mar 2022 08:06:53 +0000 Subject: [PATCH 6/8] 109: Adding collector --- collector-pubsub/pom.xml | 69 +++++++ .../collector/pubsub/PubSubCollector.java | 176 ++++++++++++++++++ .../collector/pubsub/SpanCallback.java | 38 ++++ .../collector/pubsub/SpanMessageReceiver.java | 41 ++++ .../collector/pubsub/SubscriberSettings.java | 129 +++++++++++++ pom.xml | 15 +- sender-pubsub/pom.xml | 1 + 7 files changed, 468 insertions(+), 1 deletion(-) create mode 100644 collector-pubsub/pom.xml create mode 100644 collector-pubsub/src/main/java/zipkin2/collector/pubsub/PubSubCollector.java create mode 100644 collector-pubsub/src/main/java/zipkin2/collector/pubsub/SpanCallback.java create mode 100644 collector-pubsub/src/main/java/zipkin2/collector/pubsub/SpanMessageReceiver.java create mode 100644 collector-pubsub/src/main/java/zipkin2/collector/pubsub/SubscriberSettings.java diff --git a/collector-pubsub/pom.xml b/collector-pubsub/pom.xml new file mode 100644 index 0000000..5bdb382 --- /dev/null +++ b/collector-pubsub/pom.xml @@ -0,0 +1,69 @@ + + + + + zipkin-gcp-parent + io.zipkin.gcp + 1.0.3-SNAPSHOT + + 4.0.0 + + collector-pubsub + + + ${project.basedir}/.. + + + + + + com.google.cloud + libraries-bom + 24.1.2 + pom + import + + + + + + + com.google.cloud + google-cloud-pubsub + + + io.zipkin.zipkin2 + zipkin-collector + ${zipkin.version} + + + io.grpc + grpc-testing + 1.43.2 + test + + + com.google.api.grpc + grpc-google-cloud-pubsub-v1 + 1.97.1 + test + + + + \ No newline at end of file diff --git a/collector-pubsub/src/main/java/zipkin2/collector/pubsub/PubSubCollector.java b/collector-pubsub/src/main/java/zipkin2/collector/pubsub/PubSubCollector.java new file mode 100644 index 0000000..b882e08 --- /dev/null +++ b/collector-pubsub/src/main/java/zipkin2/collector/pubsub/PubSubCollector.java @@ -0,0 +1,176 @@ +/* + * Copyright 2016-2022 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package zipkin2.collector.pubsub; + +import java.io.IOException; + +import com.google.api.gax.core.ExecutorProvider; +import com.google.api.gax.rpc.ApiException; +import com.google.cloud.pubsub.v1.Subscriber; +import com.google.cloud.pubsub.v1.SubscriptionAdminClient; + +import zipkin2.CheckResult; +import zipkin2.codec.Encoding; +import zipkin2.collector.Collector; +import zipkin2.collector.CollectorComponent; +import zipkin2.collector.CollectorMetrics; +import zipkin2.collector.CollectorSampler; +import zipkin2.storage.StorageComponent; + +public class PubSubCollector extends CollectorComponent { + + public static final class Builder extends CollectorComponent.Builder { + + String subscription; + Encoding encoding = Encoding.JSON; + ExecutorProvider executorProvider; + SubscriptionAdminClient subscriptionAdminClient; + SubscriberSettings subscriberSettings; + + Collector.Builder delegate = Collector.newBuilder(PubSubCollector.class); + CollectorMetrics metrics = CollectorMetrics.NOOP_METRICS; + + public Builder(PubSubCollector pubSubCollector) { + this.subscription = pubSubCollector.subscription; + this.encoding = pubSubCollector.encoding; + this.executorProvider = pubSubCollector.executorProvider; + } + + @Override + public Builder storage(StorageComponent storageComponent) { + delegate.storage(storageComponent); + return this; + } + + @Override + public Builder metrics(CollectorMetrics collectorMetrics) { + if (metrics == null) throw new NullPointerException("metrics == null"); + delegate.metrics(this.metrics = metrics.forTransport("pubsub")); + return this; + } + + @Override + public Builder sampler(CollectorSampler collectorSampler) { + delegate.sampler(collectorSampler); + return this; + } + + /** PubSub subscription to receive spans. */ + public Builder subscription(String subscription) { + if (subscription == null) throw new NullPointerException("subscription == null"); + this.subscription = subscription; + return this; + } + + /** + * Use this to change the encoding used in messages. Default is {@linkplain Encoding#JSON} + * + *

Note: If ultimately sending to Zipkin, version 2.8+ is required to process protobuf. + */ + public Builder encoding(Encoding encoding) { + if (encoding == null) throw new NullPointerException("encoding == null"); + this.encoding = encoding; + return this; + } + + /** ExecutorProvider for PubSub operations **/ + public Builder executorProvider(ExecutorProvider executorProvider) { + if (executorProvider == null) throw new NullPointerException("executorProvider == null"); + this.executorProvider = executorProvider; + return this; + } + + public Builder subscriptionAdminClient(SubscriptionAdminClient subscriptionAdminClient) { + if (subscriptionAdminClient == null) throw new NullPointerException("subscriptionAdminClient == null"); + this.subscriptionAdminClient = subscriptionAdminClient; + return this; + } + + public Builder subscriberSettings(SubscriberSettings subscriberSettings) { + if (subscriberSettings == null) throw new NullPointerException("subscriberSettings == null"); + this.subscriberSettings = subscriberSettings; + return this; + } + + @Override + public PubSubCollector build() { + return new PubSubCollector(this); + } + + Builder() {} + } + + final Collector collector; + final CollectorMetrics metrics; + final String subscription; + final Encoding encoding; + Subscriber subscriber; + final ExecutorProvider executorProvider; + final SubscriptionAdminClient subscriptionAdminClient; + final SubscriberSettings subscriberSettings; + + PubSubCollector(Builder builder) { + this.collector = builder.delegate.build(); + this.metrics = builder.metrics; + this.subscription = builder.subscription; + this.encoding = builder.encoding; + this.executorProvider = builder.executorProvider; + this.subscriptionAdminClient = builder.subscriptionAdminClient; + this.subscriberSettings = builder.subscriberSettings; + } + + @Override + public CollectorComponent start() { + Subscriber.Builder builder = Subscriber.newBuilder(subscription, new SpanMessageReceiver(collector, metrics)); + subscriber = applyConfigurations(builder).build(); + subscriber.startAsync().awaitRunning(); + return this; + } + + private Subscriber.Builder applyConfigurations(Subscriber.Builder builder) { + if(subscriberSettings==null) { + return builder; + } + + subscriberSettings.getChannelProvider().ifPresent(builder::setChannelProvider); + subscriberSettings.getHeaderProvider().ifPresent(builder::setHeaderProvider); + subscriberSettings.getFlowControlSettings().ifPresent(builder::setFlowControlSettings); + builder.setUseLegacyFlowControl(subscriberSettings.isUseLegacyFlowControl()); + subscriberSettings.getMaxAckExtensionPeriod().ifPresent(builder::setMaxAckExtensionPeriod); + subscriberSettings.getMaxDurationPerAckExtension().ifPresent(builder::setMaxDurationPerAckExtension); + subscriberSettings.getExecutorProvider().ifPresent(builder::setExecutorProvider); + subscriberSettings.getCredentialsProvider().ifPresent(builder::setCredentialsProvider); + subscriberSettings.getSystemExecutorProvider().ifPresent(builder::setSystemExecutorProvider); + builder.setParallelPullCount(subscriberSettings.getParallelPullCount()); + subscriberSettings.getEndpoint().ifPresent(builder::setEndpoint); + + return builder; + } + + @Override + public CheckResult check() { + try { + subscriptionAdminClient.getSubscription(subscription); + return CheckResult.OK; + } catch (ApiException e) { + return CheckResult.failed(e); + } + } + + @Override + public void close() throws IOException { + subscriber.stopAsync().awaitRunning(); + } + +} diff --git a/collector-pubsub/src/main/java/zipkin2/collector/pubsub/SpanCallback.java b/collector-pubsub/src/main/java/zipkin2/collector/pubsub/SpanCallback.java new file mode 100644 index 0000000..2a6fea4 --- /dev/null +++ b/collector-pubsub/src/main/java/zipkin2/collector/pubsub/SpanCallback.java @@ -0,0 +1,38 @@ +/* + * Copyright 2016-2022 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package zipkin2.collector.pubsub; + +import com.google.cloud.pubsub.v1.AckReplyConsumer; + +import zipkin2.Callback; + +final class SpanCallback implements Callback { + + private final AckReplyConsumer ackReplyConsumer; + + public SpanCallback(AckReplyConsumer ackReplyConsumer) { + this.ackReplyConsumer = ackReplyConsumer; + } + + @Override + public void onSuccess(Void value) { + ackReplyConsumer.ack(); + } + + @Override + public void onError(Throwable throwable) { + ackReplyConsumer.nack(); + } + +} diff --git a/collector-pubsub/src/main/java/zipkin2/collector/pubsub/SpanMessageReceiver.java b/collector-pubsub/src/main/java/zipkin2/collector/pubsub/SpanMessageReceiver.java new file mode 100644 index 0000000..7b3f02d --- /dev/null +++ b/collector-pubsub/src/main/java/zipkin2/collector/pubsub/SpanMessageReceiver.java @@ -0,0 +1,41 @@ +/* + * Copyright 2016-2022 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package zipkin2.collector.pubsub; + +import com.google.cloud.pubsub.v1.AckReplyConsumer; +import com.google.cloud.pubsub.v1.MessageReceiver; +import com.google.pubsub.v1.PubsubMessage; + +import zipkin2.collector.Collector; +import zipkin2.collector.CollectorMetrics; + +final class SpanMessageReceiver implements MessageReceiver { + + final Collector collector; + final CollectorMetrics metrics; + + public SpanMessageReceiver(Collector collector, CollectorMetrics metrics) { + this.collector = collector; + this.metrics = metrics; + } + + @Override + public void receiveMessage(PubsubMessage pubsubMessage, AckReplyConsumer ackReplyConsumer) { + byte[] serialized = pubsubMessage.getData().toByteArray(); + metrics.incrementMessages(); + metrics.incrementBytes(serialized.length); + collector.acceptSpans(serialized, new SpanCallback(ackReplyConsumer)); + } + +} diff --git a/collector-pubsub/src/main/java/zipkin2/collector/pubsub/SubscriberSettings.java b/collector-pubsub/src/main/java/zipkin2/collector/pubsub/SubscriberSettings.java new file mode 100644 index 0000000..1251808 --- /dev/null +++ b/collector-pubsub/src/main/java/zipkin2/collector/pubsub/SubscriberSettings.java @@ -0,0 +1,129 @@ +/* + * Copyright 2016-2022 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package zipkin2.collector.pubsub; + +import java.util.Optional; + +import org.threeten.bp.Duration; + +import com.google.api.core.ApiClock; +import com.google.api.gax.batching.FlowControlSettings; +import com.google.api.gax.core.CredentialsProvider; +import com.google.api.gax.core.ExecutorProvider; +import com.google.api.gax.rpc.HeaderProvider; +import com.google.api.gax.rpc.TransportChannelProvider; + +public class SubscriberSettings { + + private Optional channelProvider = Optional.empty(); + private Optional headerProvider = Optional.empty(); + private Optional flowControlSettings = Optional.empty(); + private boolean useLegacyFlowControl = false; + private Optional maxAckExtensionPeriod = Optional.empty(); + private Optional maxDurationPerAckExtension = Optional.empty(); + private Optional executorProvider = Optional.empty(); + private Optional credentialsProvider = Optional.empty(); + private Optional systemExecutorProvider = Optional.empty(); + private int parallelPullCount = 1; + private Optional endpoint = Optional.empty(); + + public Optional getChannelProvider() { + return channelProvider; + } + + public void setChannelProvider(TransportChannelProvider channelProvider) { + this.channelProvider = Optional.of(channelProvider); + } + + public Optional getHeaderProvider() { + return headerProvider; + } + + public void setHeaderProvider(HeaderProvider headerProvider) { + this.headerProvider = Optional.of(headerProvider); + } + + public Optional getFlowControlSettings() { + return flowControlSettings; + } + + public void setFlowControlSettings(FlowControlSettings flowControlSettings) { + this.flowControlSettings = Optional.of(flowControlSettings); + } + + public boolean isUseLegacyFlowControl() { + return useLegacyFlowControl; + } + + public void setUseLegacyFlowControl(boolean useLegacyFlowControl) { + this.useLegacyFlowControl = useLegacyFlowControl; + } + + public Optional getMaxAckExtensionPeriod() { + return maxAckExtensionPeriod; + } + + public void setMaxAckExtensionPeriod(Duration maxAckExtensionPeriod) { + this.maxAckExtensionPeriod = Optional.of(maxAckExtensionPeriod); + } + + public Optional getMaxDurationPerAckExtension() { + return maxDurationPerAckExtension; + } + + public void setMaxDurationPerAckExtension(Duration maxDurationPerAckExtension) { + this.maxDurationPerAckExtension = Optional.of(maxDurationPerAckExtension); + } + + public Optional getExecutorProvider() { + return executorProvider; + } + + public void setExecutorProvider(ExecutorProvider executorProvider) { + this.executorProvider = Optional.of(executorProvider); + } + + public Optional getCredentialsProvider() { + return credentialsProvider; + } + + public void setCredentialsProvider(CredentialsProvider credentialsProvider) { + this.credentialsProvider = Optional.of(credentialsProvider); + } + + public Optional getSystemExecutorProvider() { + return systemExecutorProvider; + } + + public void setSystemExecutorProvider(ExecutorProvider systemExecutorProvider) { + this.systemExecutorProvider = Optional.of(systemExecutorProvider); + } + + public int getParallelPullCount() { + return parallelPullCount; + } + + public void setParallelPullCount(int parallelPullCount) { + this.parallelPullCount = parallelPullCount; + } + + public Optional getEndpoint() { + return endpoint; + } + + public void setEndpoint(String endpoint) { + this.endpoint = Optional.of(endpoint); + } + +} diff --git a/pom.xml b/pom.xml index 575ccd9..cde29ac 100644 --- a/pom.xml +++ b/pom.xml @@ -1,7 +1,7 @@