Skip to content
This repository has been archived by the owner on Apr 1, 2024. It is now read-only.

Commit

Permalink
Fix: set receive queue size for sinks (#4091)
Browse files Browse the repository at this point in the history
* fix setting recieve queue size for sinks

* fix setting recieve queue size for sinks
  • Loading branch information
jerrypeng authored and merlimat committed Apr 20, 2019
1 parent c3e8a33 commit 84a4141
Show file tree
Hide file tree
Showing 4 changed files with 75 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.auth.AuthenticationTls;
import org.apache.pulsar.common.functions.ConsumerConfig;
import org.apache.pulsar.common.functions.FunctionConfig;
import org.apache.pulsar.common.functions.Utils;
import org.apache.pulsar.common.io.SinkConfig;
Expand Down Expand Up @@ -383,7 +384,7 @@ private static SinkConfig createSinkConfig(String tenant, String namespace, Stri
sinkConfig.setName(functionName);
sinkConfig.setParallelism(1);
sinkConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE);
sinkConfig.setInputs(Collections.singleton(sourceTopic));
sinkConfig.setInputSpecs(Collections.singletonMap(sourceTopic, ConsumerConfig.builder().build()));
sinkConfig.setSourceSubscriptionName(subName);
sinkConfig.setCleanupSubscription(true);
return sinkConfig;
Expand Down Expand Up @@ -514,7 +515,7 @@ private void testPulsarSinkStats(String jarFilePathUrl) throws Exception {
final String namespacePortion = "io";
final String replNamespace = tenant + "/" + namespacePortion;
final String sourceTopic = "persistent://" + replNamespace + "/input";
final String functionName = "PulsarSink-test";
final String sinkName = "PulsarSink-test";
final String propertyKey = "key";
final String propertyValue = "value";
final String subscriptionName = "test-sub";
Expand All @@ -525,20 +526,34 @@ private void testPulsarSinkStats(String jarFilePathUrl) throws Exception {
// create a producer that creates a topic at broker
Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(sourceTopic).create();

SinkConfig sinkConfig = createSinkConfig(tenant, namespacePortion, functionName, sourceTopic, subscriptionName);
SinkConfig sinkConfig = createSinkConfig(tenant, namespacePortion, sinkName, sourceTopic, subscriptionName);

sinkConfig.setInputSpecs(Collections.singletonMap(sourceTopic, ConsumerConfig.builder().receiverQueueSize(1000).build()));

admin.sink().createSinkWithUrl(sinkConfig, jarFilePathUrl);

sinkConfig.setInputSpecs(Collections.singletonMap(sourceTopic, ConsumerConfig.builder().receiverQueueSize(523).build()));

admin.sink().updateSinkWithUrl(sinkConfig, jarFilePathUrl);

retryStrategically((test) -> {
try {
return admin.topics().getStats(sourceTopic).subscriptions.size() == 1;
TopicStats topicStats = admin.topics().getStats(sourceTopic);

return topicStats.subscriptions.containsKey(subscriptionName)
&& topicStats.subscriptions.get(subscriptionName).consumers.size() == 1
&& topicStats.subscriptions.get(subscriptionName).consumers.get(0).availablePermits == 523;

} catch (PulsarAdminException e) {
return false;
}
}, 50, 150);
// validate pulsar sink consumer has started on the topic
assertEquals(admin.topics().getStats(sourceTopic).subscriptions.size(), 1);

TopicStats topicStats = admin.topics().getStats(sourceTopic);
assertEquals(topicStats.subscriptions.size(), 1);
assertTrue(topicStats.subscriptions.containsKey(subscriptionName));
assertEquals(topicStats.subscriptions.get(subscriptionName).consumers.size(), 1);
assertEquals(topicStats.subscriptions.get(subscriptionName).consumers.get(0).availablePermits, 523);

// validate prometheus metrics empty
String prometheusMetrics = getPrometheusMetrics(brokerWebServicePort);
Expand All @@ -548,65 +563,65 @@ private void testPulsarSinkStats(String jarFilePathUrl) throws Exception {
Metric m = metrics.get("pulsar_sink_received_total");
assertEquals(m.tags.get("cluster"), config.getClusterName());
assertEquals(m.tags.get("instance_id"), "0");
assertEquals(m.tags.get("name"), functionName);
assertEquals(m.tags.get("name"), sinkName);
assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, functionName));
assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, sinkName));
assertEquals(m.value, 0.0);
m = metrics.get("pulsar_sink_received_total_1min");
assertEquals(m.tags.get("cluster"), config.getClusterName());
assertEquals(m.tags.get("instance_id"), "0");
assertEquals(m.tags.get("name"), functionName);
assertEquals(m.tags.get("name"), sinkName);
assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, functionName));
assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, sinkName));
assertEquals(m.value, 0.0);
m = metrics.get("pulsar_sink_written_total");
assertEquals(m.tags.get("cluster"), config.getClusterName());
assertEquals(m.tags.get("instance_id"), "0");
assertEquals(m.tags.get("name"), functionName);
assertEquals(m.tags.get("name"), sinkName);
assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, functionName));
assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, sinkName));
assertEquals(m.value, 0.0);
m = metrics.get("pulsar_sink_written_total_1min");
assertEquals(m.tags.get("cluster"), config.getClusterName());
assertEquals(m.tags.get("instance_id"), "0");
assertEquals(m.tags.get("name"), functionName);
assertEquals(m.tags.get("name"), sinkName);
assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, functionName));
assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, sinkName));
assertEquals(m.value, 0.0);
m = metrics.get("pulsar_sink_sink_exceptions_total");
assertEquals(m.tags.get("cluster"), config.getClusterName());
assertEquals(m.tags.get("instance_id"), "0");
assertEquals(m.tags.get("name"), functionName);
assertEquals(m.tags.get("name"), sinkName);
assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, functionName));
assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, sinkName));
assertEquals(m.value, 0.0);
m = metrics.get("pulsar_sink_sink_exceptions_total_1min");
assertEquals(m.tags.get("cluster"), config.getClusterName());
assertEquals(m.tags.get("instance_id"), "0");
assertEquals(m.tags.get("name"), functionName);
assertEquals(m.tags.get("name"), sinkName);
assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, functionName));
assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, sinkName));
assertEquals(m.value, 0.0);
m = metrics.get("pulsar_sink_system_exceptions_total");
assertEquals(m.tags.get("cluster"), config.getClusterName());
assertEquals(m.tags.get("instance_id"), "0");
assertEquals(m.tags.get("name"), functionName);
assertEquals(m.tags.get("name"), sinkName);
assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, functionName));
assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, sinkName));
assertEquals(m.value, 0.0);
m = metrics.get("pulsar_sink_system_exceptions_total_1min");
assertEquals(m.tags.get("cluster"), config.getClusterName());
assertEquals(m.tags.get("instance_id"), "0");
assertEquals(m.tags.get("name"), functionName);
assertEquals(m.tags.get("name"), sinkName);
assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, functionName));
assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, sinkName));
assertEquals(m.value, 0.0);
m = metrics.get("pulsar_sink_last_invocation");
assertEquals(m.tags.get("cluster"), config.getClusterName());
assertEquals(m.tags.get("instance_id"), "0");
assertEquals(m.tags.get("name"), functionName);
assertEquals(m.tags.get("name"), sinkName);
assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, functionName));
assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, sinkName));
assertEquals(m.value, 0.0);

int totalMsgs = 10;
Expand All @@ -631,70 +646,70 @@ private void testPulsarSinkStats(String jarFilePathUrl) throws Exception {
m = metrics.get("pulsar_sink_received_total");
assertEquals(m.tags.get("cluster"), config.getClusterName());
assertEquals(m.tags.get("instance_id"), "0");
assertEquals(m.tags.get("name"), functionName);
assertEquals(m.tags.get("name"), sinkName);
assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, functionName));
assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, sinkName));
assertEquals(m.value, (double) totalMsgs);
m = metrics.get("pulsar_sink_received_total_1min");
assertEquals(m.tags.get("cluster"), config.getClusterName());
assertEquals(m.tags.get("instance_id"), "0");
assertEquals(m.tags.get("name"), functionName);
assertEquals(m.tags.get("name"), sinkName);
assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, functionName));
assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, sinkName));
assertEquals(m.value, (double) totalMsgs);
m = metrics.get("pulsar_sink_written_total");
assertEquals(m.tags.get("cluster"), config.getClusterName());
assertEquals(m.tags.get("instance_id"), "0");
assertEquals(m.tags.get("name"), functionName);
assertEquals(m.tags.get("name"), sinkName);
assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, functionName));
assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, sinkName));
assertEquals(m.value, (double) totalMsgs);
m = metrics.get("pulsar_sink_written_total_1min");
assertEquals(m.tags.get("cluster"), config.getClusterName());
assertEquals(m.tags.get("instance_id"), "0");
assertEquals(m.tags.get("name"), functionName);
assertEquals(m.tags.get("name"), sinkName);
assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, functionName));
assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, sinkName));
assertEquals(m.value, (double) totalMsgs);
m = metrics.get("pulsar_sink_sink_exceptions_total");
assertEquals(m.tags.get("cluster"), config.getClusterName());
assertEquals(m.tags.get("instance_id"), "0");
assertEquals(m.tags.get("name"), functionName);
assertEquals(m.tags.get("name"), sinkName);
assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, functionName));
assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, sinkName));
assertEquals(m.value, 0.0);
m = metrics.get("pulsar_sink_sink_exceptions_total_1min");
assertEquals(m.tags.get("cluster"), config.getClusterName());
assertEquals(m.tags.get("instance_id"), "0");
assertEquals(m.tags.get("name"), functionName);
assertEquals(m.tags.get("name"), sinkName);
assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, functionName));
assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, sinkName));
assertEquals(m.value, 0.0);
m = metrics.get("pulsar_sink_system_exceptions_total");
assertEquals(m.tags.get("cluster"), config.getClusterName());
assertEquals(m.tags.get("instance_id"), "0");
assertEquals(m.tags.get("name"), functionName);
assertEquals(m.tags.get("name"), sinkName);
assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, functionName));
assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, sinkName));
assertEquals(m.value, 0.0);
m = metrics.get("pulsar_sink_system_exceptions_total_1min");
assertEquals(m.tags.get("cluster"), config.getClusterName());
assertEquals(m.tags.get("instance_id"), "0");
assertEquals(m.tags.get("name"), functionName);
assertEquals(m.tags.get("name"), sinkName);
assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, functionName));
assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, sinkName));
assertEquals(m.value, 0.0);
m = metrics.get("pulsar_sink_last_invocation");
assertEquals(m.tags.get("cluster"), config.getClusterName());
assertEquals(m.tags.get("instance_id"), "0");
assertEquals(m.tags.get("name"), functionName);
assertEquals(m.tags.get("name"), sinkName);
assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, functionName));
assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, sinkName));
assertTrue(m.value > 0.0);


// delete functions
admin.sink().deleteSink(tenant, namespacePortion, functionName);
admin.sink().deleteSink(tenant, namespacePortion, sinkName);

retryStrategically((test) -> {
try {
Expand Down Expand Up @@ -740,6 +755,7 @@ private void testPulsarSourceStats(String jarFilePathUrl) throws Exception {
admin.namespaces().setNamespaceReplicationClusters(replNamespace, clusters);

SourceConfig sourceConfig = createSourceConfig(tenant, namespacePortion, sourceName, sinkTopic);

admin.source().createSourceWithUrl(sourceConfig, jarFilePathUrl);

retryStrategically((test) -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,6 @@ void processArguments() throws Exception {

if (null != sinkConfigFile) {
this.sinkConfig = CmdUtils.loadConfig(sinkConfigFile, SinkConfig.class);
log.info("The sinkConfig read from file is {}", sinkConfig);
} else {
this.sinkConfig = new SinkConfig();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,12 +123,18 @@ public static FunctionDetails convert(SinkConfig sinkConfig, ExtractedSinkDetail
}
if (sinkConfig.getInputSpecs() != null) {
sinkConfig.getInputSpecs().forEach((topic, spec) -> {
sourceSpecBuilder.putInputSpecs(topic,
Function.ConsumerSpec.newBuilder()
.setSerdeClassName(spec.getSerdeClassName() != null ? spec.getSerdeClassName() : "")
.setSchemaType(spec.getSchemaType() != null ? spec.getSchemaType() : "")
.setIsRegexPattern(spec.isRegexPattern())
.build());
Function.ConsumerSpec.Builder bldr = Function.ConsumerSpec.newBuilder()
.setIsRegexPattern(spec.isRegexPattern());
if (!StringUtils.isBlank(spec.getSchemaType())) {
bldr.setSchemaType(spec.getSchemaType());
} else if (!StringUtils.isBlank(spec.getSerdeClassName())) {
bldr.setSerdeClassName(spec.getSerdeClassName());
}
if (spec.getReceiverQueueSize() != null) {
bldr.setReceiverQueueSize(Function.ConsumerSpec.ReceiverQueueSize.newBuilder()
.setValue(spec.getReceiverQueueSize()).build());
}
sourceSpecBuilder.putInputSpecs(topic, bldr.build());
});
}

Expand Down Expand Up @@ -216,6 +222,9 @@ public static SinkConfig convertFromDetails(FunctionDetails functionDetails) {
if (!isEmpty(input.getValue().getSchemaType())) {
consumerConfig.setSchemaType(input.getValue().getSchemaType());
}
if (input.getValue().hasReceiverQueueSize()) {
consumerConfig.setReceiverQueueSize(input.getValue().getReceiverQueueSize().getValue());
}
consumerConfig.setRegexPattern(input.getValue().getIsRegexPattern());
consumerConfigMap.put(input.getKey(), consumerConfig);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public void testConvertBackFidelity() throws IOException {
sinkConfig.setArchive("builtin://jdbc");
sinkConfig.setSourceSubscriptionName("test-subscription");
Map<String, ConsumerConfig> inputSpecs = new HashMap<>();
inputSpecs.put("test-input", ConsumerConfig.builder().isRegexPattern(true).serdeClassName("test-serde").build());
inputSpecs.put("test-input", ConsumerConfig.builder().isRegexPattern(true).receiverQueueSize(532).serdeClassName("test-serde").build());
sinkConfig.setInputSpecs(inputSpecs);
sinkConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE);

Expand Down

0 comments on commit 84a4141

Please sign in to comment.