From 71235ea9fbe4fa384293412cfbc0a9f7801d625c Mon Sep 17 00:00:00 2001 From: gnkoshelev Date: Sun, 12 Dec 2021 17:13:45 +0500 Subject: [PATCH 01/13] Graphite Adapter connection limiter --- hercules-graphite-adapter/README.md | 2 + .../adapter/server/ConnectionLimiter.java | 40 +++++++++++++++++++ .../adapter/server/GraphiteAdapterServer.java | 11 +++++ .../adapter/server/GraphiteHandler.java | 7 +++- 4 files changed, 58 insertions(+), 2 deletions(-) create mode 100644 hercules-graphite-adapter/src/main/java/ru/kontur/vostok/hercules/graphite/adapter/server/ConnectionLimiter.java diff --git a/hercules-graphite-adapter/README.md b/hercules-graphite-adapter/README.md index 899d283b..21e6f202 100644 --- a/hercules-graphite-adapter/README.md +++ b/hercules-graphite-adapter/README.md @@ -56,6 +56,8 @@ Application is configured through properties file. `server.recv.buffer.size.bytes` - recv buffer size in bytes, value for socket option `SO_RCVBUF`, use system default if not set +`server.conection.limit` - concurrent connection limit, default value: `50000` + ### Graphite metrics reporter settings `metrics.graphite.server.addr` - hostname of graphite instance, default value: `localhost` diff --git a/hercules-graphite-adapter/src/main/java/ru/kontur/vostok/hercules/graphite/adapter/server/ConnectionLimiter.java b/hercules-graphite-adapter/src/main/java/ru/kontur/vostok/hercules/graphite/adapter/server/ConnectionLimiter.java new file mode 100644 index 00000000..d39ea41f --- /dev/null +++ b/hercules-graphite-adapter/src/main/java/ru/kontur/vostok/hercules/graphite/adapter/server/ConnectionLimiter.java @@ -0,0 +1,40 @@ +package ru.kontur.vostok.hercules.graphite.adapter.server; + +import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; + +import java.util.concurrent.atomic.AtomicInteger; + +/** + * @author Gregory Koshelev + */ +@ChannelHandler.Sharable +public class ConnectionLimiter extends ChannelInboundHandlerAdapter { + private final int connLimit; + private final AtomicInteger connections = new AtomicInteger(); + + /** + * {@link ConnectionLimiter} limits concurrent connections + * + * @param connLimit connection limit + */ + public ConnectionLimiter(int connLimit) { + this.connLimit = connLimit; + } + + @Override + public void channelActive(ChannelHandlerContext ctx) throws Exception { + if (connections.incrementAndGet() > connLimit) { + ctx.close(); + } else { + super.channelActive(ctx); + } + } + + @Override + public void channelInactive(ChannelHandlerContext ctx) throws Exception { + super.channelInactive(ctx); + connections.decrementAndGet(); + } +} diff --git a/hercules-graphite-adapter/src/main/java/ru/kontur/vostok/hercules/graphite/adapter/server/GraphiteAdapterServer.java b/hercules-graphite-adapter/src/main/java/ru/kontur/vostok/hercules/graphite/adapter/server/GraphiteAdapterServer.java index 563fc67e..16e1744c 100644 --- a/hercules-graphite-adapter/src/main/java/ru/kontur/vostok/hercules/graphite/adapter/server/GraphiteAdapterServer.java +++ b/hercules-graphite-adapter/src/main/java/ru/kontur/vostok/hercules/graphite/adapter/server/GraphiteAdapterServer.java @@ -54,6 +54,8 @@ public GraphiteAdapterServer(Properties properties, Purgatory purgatory, Metrics int readTimeoutMs = PropertiesUtil.get(Props.READ_TIMEOUT_MS, properties).get(); Integer recvBufferSizeBytes = PropertiesUtil.get(Props.RECV_BUFFER_SIZE_BYTES, properties).orEmpty(null); + int connLimit = PropertiesUtil.get(Props.CONNECTION_LIMIT, properties).get(); + bossGroup = new NioEventLoopGroup( 1, ThreadFactories.newNamedThreadFactory("bossEventLoop", false)); @@ -61,6 +63,7 @@ public GraphiteAdapterServer(Properties properties, Purgatory purgatory, Metrics workerThreadCount, ThreadFactories.newNamedThreadFactory("workerEventLoop", false)); + ConnectionLimiter connectionLimiter = new ConnectionLimiter(connLimit); GraphiteHandler graphiteHandler = new GraphiteHandler(purgatory, metricsCollector); bootstrap = new ServerBootstrap() @@ -71,6 +74,8 @@ public GraphiteAdapterServer(Properties properties, Purgatory purgatory, Metrics @Override public void initChannel(SocketChannel channel) { channel.pipeline(). + /* Limit concurrent connections */ + addLast("connectionLimiter", connectionLimiter). /* Wait for new metrics for this period of time */ addLast("readTimeout", new ReadTimeoutHandler(readTimeoutMs, TimeUnit.MILLISECONDS)). /* One metric per line, a metric length must not exceed 1024 bytes */ @@ -141,5 +146,11 @@ private static class Props { Parameter.integerParameter("recv.buffer.size.bytes"). withValidator(IntegerValidators.positive()). build(); + + static final Parameter CONNECTION_LIMIT = + Parameter.integerParameter("connection.limit"). + withValidator(IntegerValidators.positive()). + withDefault(50_000). + build(); } } \ No newline at end of file diff --git a/hercules-graphite-adapter/src/main/java/ru/kontur/vostok/hercules/graphite/adapter/server/GraphiteHandler.java b/hercules-graphite-adapter/src/main/java/ru/kontur/vostok/hercules/graphite/adapter/server/GraphiteHandler.java index e020ad2b..8742df89 100644 --- a/hercules-graphite-adapter/src/main/java/ru/kontur/vostok/hercules/graphite/adapter/server/GraphiteHandler.java +++ b/hercules-graphite-adapter/src/main/java/ru/kontur/vostok/hercules/graphite/adapter/server/GraphiteHandler.java @@ -52,7 +52,10 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) { @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { - LOGGER.warn("Got exception", cause); - ctx.close(); + try { + LOGGER.warn("Got exception", cause); + } finally { + ctx.close(); + } } } From 6fe0012b2ba22831afd46dba98b6609a5a90b25f Mon Sep 17 00:00:00 2001 From: gnkoshelev Date: Wed, 15 Dec 2021 15:46:51 +0500 Subject: [PATCH 02/13] Upgrade Hercules version to 0.52.0-SNAPSHOT --- hercules-application/pom.xml | 2 +- hercules-auth/pom.xml | 2 +- hercules-cassandra-sink/pom.xml | 2 +- hercules-cassandra-util/pom.xml | 2 +- hercules-clickhouse-sink/pom.xml | 2 +- hercules-clickhouse-util/pom.xml | 2 +- hercules-client/pom.xml | 2 +- hercules-configuration/pom.xml | 2 +- hercules-curator/pom.xml | 2 +- hercules-elastic-adapter/pom.xml | 2 +- hercules-elastic-sink/pom.xml | 2 +- hercules-gate-client/pom.xml | 2 +- hercules-gate/pom.xml | 2 +- hercules-graphite-adapter/pom.xml | 2 +- hercules-graphite-sink/pom.xml | 2 +- hercules-health/pom.xml | 2 +- hercules-http/pom.xml | 2 +- hercules-init/pom.xml | 2 +- hercules-json/pom.xml | 2 +- hercules-kafka-util/pom.xml | 2 +- hercules-management-api/pom.xml | 2 +- hercules-meta/pom.xml | 2 +- hercules-partitioner/pom.xml | 2 +- hercules-protocol/pom.xml | 2 +- hercules-sd/pom.xml | 2 +- hercules-sentry-sink/pom.xml | 2 +- hercules-sink/pom.xml | 2 +- hercules-stream-api/pom.xml | 2 +- hercules-stream-manager/pom.xml | 2 +- hercules-stream-sink/pom.xml | 2 +- hercules-tags/pom.xml | 2 +- hercules-throttling/pom.xml | 2 +- hercules-timeline-api/pom.xml | 2 +- hercules-timeline-manager/pom.xml | 2 +- hercules-timeline-sink/pom.xml | 2 +- hercules-tracing-api/pom.xml | 2 +- hercules-tracing-sink-clickhouse/pom.xml | 2 +- hercules-tracing-sink/pom.xml | 2 +- hercules-undertow-util/pom.xml | 2 +- hercules-util/pom.xml | 2 +- hercules-uuid/pom.xml | 2 +- pom.xml | 4 ++-- 42 files changed, 43 insertions(+), 43 deletions(-) diff --git a/hercules-application/pom.xml b/hercules-application/pom.xml index 6c0020bb..f9814edc 100644 --- a/hercules-application/pom.xml +++ b/hercules-application/pom.xml @@ -5,7 +5,7 @@ hercules ru.kontur.vostok.hercules - 0.51.0-SNAPSHOT + 0.52.0-SNAPSHOT 4.0.0 diff --git a/hercules-auth/pom.xml b/hercules-auth/pom.xml index e69b09b8..832353eb 100644 --- a/hercules-auth/pom.xml +++ b/hercules-auth/pom.xml @@ -3,7 +3,7 @@ hercules ru.kontur.vostok.hercules - 0.51.0-SNAPSHOT + 0.52.0-SNAPSHOT 4.0.0 diff --git a/hercules-cassandra-sink/pom.xml b/hercules-cassandra-sink/pom.xml index a705bcf3..342eb3b1 100644 --- a/hercules-cassandra-sink/pom.xml +++ b/hercules-cassandra-sink/pom.xml @@ -5,7 +5,7 @@ hercules ru.kontur.vostok.hercules - 0.51.0-SNAPSHOT + 0.52.0-SNAPSHOT 4.0.0 diff --git a/hercules-cassandra-util/pom.xml b/hercules-cassandra-util/pom.xml index a3d999cc..662f6edb 100644 --- a/hercules-cassandra-util/pom.xml +++ b/hercules-cassandra-util/pom.xml @@ -3,7 +3,7 @@ hercules ru.kontur.vostok.hercules - 0.51.0-SNAPSHOT + 0.52.0-SNAPSHOT 4.0.0 diff --git a/hercules-clickhouse-sink/pom.xml b/hercules-clickhouse-sink/pom.xml index 342504e7..667ee895 100644 --- a/hercules-clickhouse-sink/pom.xml +++ b/hercules-clickhouse-sink/pom.xml @@ -5,7 +5,7 @@ hercules ru.kontur.vostok.hercules - 0.51.0-SNAPSHOT + 0.52.0-SNAPSHOT 4.0.0 diff --git a/hercules-clickhouse-util/pom.xml b/hercules-clickhouse-util/pom.xml index d65b8083..bfc1423d 100644 --- a/hercules-clickhouse-util/pom.xml +++ b/hercules-clickhouse-util/pom.xml @@ -5,7 +5,7 @@ hercules ru.kontur.vostok.hercules - 0.51.0-SNAPSHOT + 0.52.0-SNAPSHOT 4.0.0 diff --git a/hercules-client/pom.xml b/hercules-client/pom.xml index ca5cf201..4230b669 100644 --- a/hercules-client/pom.xml +++ b/hercules-client/pom.xml @@ -5,7 +5,7 @@ hercules ru.kontur.vostok.hercules - 0.51.0-SNAPSHOT + 0.52.0-SNAPSHOT 4.0.0 diff --git a/hercules-configuration/pom.xml b/hercules-configuration/pom.xml index a29efae7..d69ee85e 100644 --- a/hercules-configuration/pom.xml +++ b/hercules-configuration/pom.xml @@ -3,7 +3,7 @@ hercules ru.kontur.vostok.hercules - 0.51.0-SNAPSHOT + 0.52.0-SNAPSHOT 4.0.0 diff --git a/hercules-curator/pom.xml b/hercules-curator/pom.xml index 3e2416ca..53ec8ec3 100644 --- a/hercules-curator/pom.xml +++ b/hercules-curator/pom.xml @@ -5,7 +5,7 @@ hercules ru.kontur.vostok.hercules - 0.51.0-SNAPSHOT + 0.52.0-SNAPSHOT 4.0.0 diff --git a/hercules-elastic-adapter/pom.xml b/hercules-elastic-adapter/pom.xml index 0ddb0cab..218d3175 100644 --- a/hercules-elastic-adapter/pom.xml +++ b/hercules-elastic-adapter/pom.xml @@ -5,7 +5,7 @@ hercules ru.kontur.vostok.hercules - 0.51.0-SNAPSHOT + 0.52.0-SNAPSHOT 4.0.0 diff --git a/hercules-elastic-sink/pom.xml b/hercules-elastic-sink/pom.xml index 967c3af1..9a0ac4b2 100644 --- a/hercules-elastic-sink/pom.xml +++ b/hercules-elastic-sink/pom.xml @@ -5,7 +5,7 @@ hercules ru.kontur.vostok.hercules - 0.51.0-SNAPSHOT + 0.52.0-SNAPSHOT 4.0.0 diff --git a/hercules-gate-client/pom.xml b/hercules-gate-client/pom.xml index 6ed1d2bc..ad090ace 100644 --- a/hercules-gate-client/pom.xml +++ b/hercules-gate-client/pom.xml @@ -3,7 +3,7 @@ hercules ru.kontur.vostok.hercules - 0.51.0-SNAPSHOT + 0.52.0-SNAPSHOT 4.0.0 diff --git a/hercules-gate/pom.xml b/hercules-gate/pom.xml index 970b4367..9c8921e7 100644 --- a/hercules-gate/pom.xml +++ b/hercules-gate/pom.xml @@ -3,7 +3,7 @@ hercules ru.kontur.vostok.hercules - 0.51.0-SNAPSHOT + 0.52.0-SNAPSHOT 4.0.0 diff --git a/hercules-graphite-adapter/pom.xml b/hercules-graphite-adapter/pom.xml index 2a9e4043..39f09cec 100644 --- a/hercules-graphite-adapter/pom.xml +++ b/hercules-graphite-adapter/pom.xml @@ -5,7 +5,7 @@ hercules ru.kontur.vostok.hercules - 0.51.0-SNAPSHOT + 0.52.0-SNAPSHOT 4.0.0 diff --git a/hercules-graphite-sink/pom.xml b/hercules-graphite-sink/pom.xml index 74e780e1..aa81b1ea 100644 --- a/hercules-graphite-sink/pom.xml +++ b/hercules-graphite-sink/pom.xml @@ -3,7 +3,7 @@ hercules ru.kontur.vostok.hercules - 0.51.0-SNAPSHOT + 0.52.0-SNAPSHOT 4.0.0 diff --git a/hercules-health/pom.xml b/hercules-health/pom.xml index 0b5362a6..4b341cd9 100644 --- a/hercules-health/pom.xml +++ b/hercules-health/pom.xml @@ -3,7 +3,7 @@ hercules ru.kontur.vostok.hercules - 0.51.0-SNAPSHOT + 0.52.0-SNAPSHOT 4.0.0 diff --git a/hercules-http/pom.xml b/hercules-http/pom.xml index 998214e6..47c0f53a 100644 --- a/hercules-http/pom.xml +++ b/hercules-http/pom.xml @@ -3,7 +3,7 @@ hercules ru.kontur.vostok.hercules - 0.51.0-SNAPSHOT + 0.52.0-SNAPSHOT 4.0.0 diff --git a/hercules-init/pom.xml b/hercules-init/pom.xml index b4b5347d..485f821f 100644 --- a/hercules-init/pom.xml +++ b/hercules-init/pom.xml @@ -3,7 +3,7 @@ hercules ru.kontur.vostok.hercules - 0.51.0-SNAPSHOT + 0.52.0-SNAPSHOT 4.0.0 diff --git a/hercules-json/pom.xml b/hercules-json/pom.xml index 90257afd..df240c5f 100644 --- a/hercules-json/pom.xml +++ b/hercules-json/pom.xml @@ -5,7 +5,7 @@ hercules ru.kontur.vostok.hercules - 0.51.0-SNAPSHOT + 0.52.0-SNAPSHOT 4.0.0 diff --git a/hercules-kafka-util/pom.xml b/hercules-kafka-util/pom.xml index 8c31dbd7..4c78aedf 100644 --- a/hercules-kafka-util/pom.xml +++ b/hercules-kafka-util/pom.xml @@ -3,7 +3,7 @@ hercules ru.kontur.vostok.hercules - 0.51.0-SNAPSHOT + 0.52.0-SNAPSHOT 4.0.0 diff --git a/hercules-management-api/pom.xml b/hercules-management-api/pom.xml index f217c25d..be6ebad0 100644 --- a/hercules-management-api/pom.xml +++ b/hercules-management-api/pom.xml @@ -3,7 +3,7 @@ hercules ru.kontur.vostok.hercules - 0.51.0-SNAPSHOT + 0.52.0-SNAPSHOT 4.0.0 diff --git a/hercules-meta/pom.xml b/hercules-meta/pom.xml index 4680a6be..2b13d41c 100644 --- a/hercules-meta/pom.xml +++ b/hercules-meta/pom.xml @@ -3,7 +3,7 @@ hercules ru.kontur.vostok.hercules - 0.51.0-SNAPSHOT + 0.52.0-SNAPSHOT 4.0.0 diff --git a/hercules-partitioner/pom.xml b/hercules-partitioner/pom.xml index 12840d95..ed1219d7 100644 --- a/hercules-partitioner/pom.xml +++ b/hercules-partitioner/pom.xml @@ -3,7 +3,7 @@ hercules ru.kontur.vostok.hercules - 0.51.0-SNAPSHOT + 0.52.0-SNAPSHOT 4.0.0 diff --git a/hercules-protocol/pom.xml b/hercules-protocol/pom.xml index c42231b4..bcd50b36 100644 --- a/hercules-protocol/pom.xml +++ b/hercules-protocol/pom.xml @@ -3,7 +3,7 @@ hercules ru.kontur.vostok.hercules - 0.51.0-SNAPSHOT + 0.52.0-SNAPSHOT 4.0.0 diff --git a/hercules-sd/pom.xml b/hercules-sd/pom.xml index 9b8d181d..a14ae864 100644 --- a/hercules-sd/pom.xml +++ b/hercules-sd/pom.xml @@ -5,7 +5,7 @@ hercules ru.kontur.vostok.hercules - 0.51.0-SNAPSHOT + 0.52.0-SNAPSHOT 4.0.0 diff --git a/hercules-sentry-sink/pom.xml b/hercules-sentry-sink/pom.xml index 56a9b86d..3bfe26d4 100644 --- a/hercules-sentry-sink/pom.xml +++ b/hercules-sentry-sink/pom.xml @@ -3,7 +3,7 @@ hercules ru.kontur.vostok.hercules - 0.51.0-SNAPSHOT + 0.52.0-SNAPSHOT 4.0.0 diff --git a/hercules-sink/pom.xml b/hercules-sink/pom.xml index 344d1d94..fe4d754c 100644 --- a/hercules-sink/pom.xml +++ b/hercules-sink/pom.xml @@ -5,7 +5,7 @@ hercules ru.kontur.vostok.hercules - 0.51.0-SNAPSHOT + 0.52.0-SNAPSHOT 4.0.0 diff --git a/hercules-stream-api/pom.xml b/hercules-stream-api/pom.xml index 1998ebad..3901bb6c 100644 --- a/hercules-stream-api/pom.xml +++ b/hercules-stream-api/pom.xml @@ -3,7 +3,7 @@ hercules ru.kontur.vostok.hercules - 0.51.0-SNAPSHOT + 0.52.0-SNAPSHOT 4.0.0 diff --git a/hercules-stream-manager/pom.xml b/hercules-stream-manager/pom.xml index e5c4af6d..4ffefd41 100644 --- a/hercules-stream-manager/pom.xml +++ b/hercules-stream-manager/pom.xml @@ -3,7 +3,7 @@ hercules ru.kontur.vostok.hercules - 0.51.0-SNAPSHOT + 0.52.0-SNAPSHOT 4.0.0 diff --git a/hercules-stream-sink/pom.xml b/hercules-stream-sink/pom.xml index d0585d5b..081c4f8a 100644 --- a/hercules-stream-sink/pom.xml +++ b/hercules-stream-sink/pom.xml @@ -3,7 +3,7 @@ hercules ru.kontur.vostok.hercules - 0.51.0-SNAPSHOT + 0.52.0-SNAPSHOT 4.0.0 diff --git a/hercules-tags/pom.xml b/hercules-tags/pom.xml index 341fb250..64b82cd9 100644 --- a/hercules-tags/pom.xml +++ b/hercules-tags/pom.xml @@ -3,7 +3,7 @@ hercules ru.kontur.vostok.hercules - 0.51.0-SNAPSHOT + 0.52.0-SNAPSHOT 4.0.0 diff --git a/hercules-throttling/pom.xml b/hercules-throttling/pom.xml index d7c840b7..7e0224c7 100644 --- a/hercules-throttling/pom.xml +++ b/hercules-throttling/pom.xml @@ -3,7 +3,7 @@ hercules ru.kontur.vostok.hercules - 0.51.0-SNAPSHOT + 0.52.0-SNAPSHOT 4.0.0 diff --git a/hercules-timeline-api/pom.xml b/hercules-timeline-api/pom.xml index e7d808a3..2c6c21f0 100644 --- a/hercules-timeline-api/pom.xml +++ b/hercules-timeline-api/pom.xml @@ -3,7 +3,7 @@ hercules ru.kontur.vostok.hercules - 0.51.0-SNAPSHOT + 0.52.0-SNAPSHOT 4.0.0 diff --git a/hercules-timeline-manager/pom.xml b/hercules-timeline-manager/pom.xml index 160bd4c1..f4ffd801 100644 --- a/hercules-timeline-manager/pom.xml +++ b/hercules-timeline-manager/pom.xml @@ -3,7 +3,7 @@ hercules ru.kontur.vostok.hercules - 0.51.0-SNAPSHOT + 0.52.0-SNAPSHOT 4.0.0 diff --git a/hercules-timeline-sink/pom.xml b/hercules-timeline-sink/pom.xml index 688b0731..f58fd36e 100644 --- a/hercules-timeline-sink/pom.xml +++ b/hercules-timeline-sink/pom.xml @@ -3,7 +3,7 @@ hercules ru.kontur.vostok.hercules - 0.51.0-SNAPSHOT + 0.52.0-SNAPSHOT 4.0.0 diff --git a/hercules-tracing-api/pom.xml b/hercules-tracing-api/pom.xml index 198ac830..6ccb973f 100644 --- a/hercules-tracing-api/pom.xml +++ b/hercules-tracing-api/pom.xml @@ -5,7 +5,7 @@ hercules ru.kontur.vostok.hercules - 0.51.0-SNAPSHOT + 0.52.0-SNAPSHOT 4.0.0 diff --git a/hercules-tracing-sink-clickhouse/pom.xml b/hercules-tracing-sink-clickhouse/pom.xml index 69aabc2d..e0a20d25 100644 --- a/hercules-tracing-sink-clickhouse/pom.xml +++ b/hercules-tracing-sink-clickhouse/pom.xml @@ -5,7 +5,7 @@ hercules ru.kontur.vostok.hercules - 0.51.0-SNAPSHOT + 0.52.0-SNAPSHOT 4.0.0 diff --git a/hercules-tracing-sink/pom.xml b/hercules-tracing-sink/pom.xml index 63462d0c..0af8c539 100644 --- a/hercules-tracing-sink/pom.xml +++ b/hercules-tracing-sink/pom.xml @@ -5,7 +5,7 @@ hercules ru.kontur.vostok.hercules - 0.51.0-SNAPSHOT + 0.52.0-SNAPSHOT 4.0.0 diff --git a/hercules-undertow-util/pom.xml b/hercules-undertow-util/pom.xml index aca60f21..79006a28 100644 --- a/hercules-undertow-util/pom.xml +++ b/hercules-undertow-util/pom.xml @@ -3,7 +3,7 @@ hercules ru.kontur.vostok.hercules - 0.51.0-SNAPSHOT + 0.52.0-SNAPSHOT 4.0.0 diff --git a/hercules-util/pom.xml b/hercules-util/pom.xml index fb18f811..2d94d2b7 100644 --- a/hercules-util/pom.xml +++ b/hercules-util/pom.xml @@ -3,7 +3,7 @@ hercules ru.kontur.vostok.hercules - 0.51.0-SNAPSHOT + 0.52.0-SNAPSHOT 4.0.0 diff --git a/hercules-uuid/pom.xml b/hercules-uuid/pom.xml index 1ea1e2c5..5f65ce4f 100644 --- a/hercules-uuid/pom.xml +++ b/hercules-uuid/pom.xml @@ -3,7 +3,7 @@ hercules ru.kontur.vostok.hercules - 0.51.0-SNAPSHOT + 0.52.0-SNAPSHOT 4.0.0 diff --git a/pom.xml b/pom.xml index 7de57d1b..cb05df2f 100644 --- a/pom.xml +++ b/pom.xml @@ -6,7 +6,7 @@ ru.kontur.vostok.hercules hercules pom - 0.51.0-SNAPSHOT + 0.52.0-SNAPSHOT UTF-8 @@ -14,7 +14,7 @@ 11 11 - 0.51.0-SNAPSHOT + 0.52.0-SNAPSHOT 2.4.0 3.4.13 From 53b9c2c030a185988f93bb2af13aff357c768134 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=90=D0=BA=D0=BA=D1=83=D0=B7=D0=B8=D0=BD=20=D0=90=D0=BD?= =?UTF-8?q?=D1=82=D0=BE=D0=BD=20=D0=A2=D0=B8=D0=BC=D0=BE=D1=84=D0=B5=D0=B5?= =?UTF-8?q?=D0=B2=D0=B8=D1=87?= Date: Wed, 26 Jan 2022 17:54:50 +0500 Subject: [PATCH 03/13] Resolve "fix incorrect termination of the thread pool" --- .../hercules/gate/client/GateClient.java | 22 +++-- .../hercules/meta/task/TaskExecutor.java | 12 ++- .../vostok/hercules/meta/task/TaskQueue.java | 10 +- .../hercules/sink/AbstractSinkDaemon.java | 15 ++- .../concurrent/RenewableTaskScheduler.java | 18 +++- .../ScheduledThreadPoolExecutorBuilder.java | 96 +++++++++++++++++++ 6 files changed, 152 insertions(+), 21 deletions(-) create mode 100644 hercules-util/src/main/java/ru/kontur/vostok/hercules/util/concurrent/ScheduledThreadPoolExecutorBuilder.java diff --git a/hercules-gate-client/src/main/java/ru/kontur/vostok/hercules/gate/client/GateClient.java b/hercules-gate-client/src/main/java/ru/kontur/vostok/hercules/gate/client/GateClient.java index 9b8f7f5f..504763a3 100644 --- a/hercules-gate-client/src/main/java/ru/kontur/vostok/hercules/gate/client/GateClient.java +++ b/hercules-gate-client/src/main/java/ru/kontur/vostok/hercules/gate/client/GateClient.java @@ -18,7 +18,7 @@ import ru.kontur.vostok.hercules.gate.client.exception.HttpProtocolException; import ru.kontur.vostok.hercules.gate.client.exception.UnavailableClusterException; import ru.kontur.vostok.hercules.gate.client.exception.UnavailableHostException; -import ru.kontur.vostok.hercules.util.concurrent.ThreadFactories; +import ru.kontur.vostok.hercules.util.concurrent.ScheduledThreadPoolExecutorBuilder; import ru.kontur.vostok.hercules.util.concurrent.Topology; import ru.kontur.vostok.hercules.util.parameter.Parameter; import ru.kontur.vostok.hercules.util.properties.PropertiesUtil; @@ -27,10 +27,8 @@ import java.io.Closeable; import java.io.IOException; import java.util.Properties; -import java.util.Random; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; -import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -41,7 +39,6 @@ */ public class GateClient implements Closeable { private static final Logger LOGGER = LoggerFactory.getLogger(GateClient.class); - private static final Random RANDOM = new Random(); private static final String PING = "/ping"; private static final String SEND_ACK = "/stream/send"; @@ -53,9 +50,12 @@ public class GateClient implements Closeable { private final Topology whiteList; private final int greyListElementsRecoveryTimeMs; private final ScheduledExecutorService scheduler = - Executors.newScheduledThreadPool( - 1, - ThreadFactories.newNamedThreadFactory("gate-client-topology-updater", false)); + new ScheduledThreadPoolExecutorBuilder() + .threadPoolSize(1) + .name("gate-client-topology-updater") + .daemon(false) + .dropDelayedTasksAfterShutdown() + .build(); public GateClient(Properties properties, CloseableHttpClient client, Topology whiteList) { @@ -222,6 +222,14 @@ public void send(String apiKey, String stream, final byte[] data) public void close() { scheduler.shutdown(); + try { + if (!scheduler.awaitTermination(5_000L, TimeUnit.MILLISECONDS)) { + LOGGER.warn("Scheduled thread pool did not terminate"); + } + } catch (InterruptedException ex) { + LOGGER.error("Error on stopping scheduler", ex); + } + try { client.close(); } catch (IOException e) { diff --git a/hercules-meta/src/main/java/ru/kontur/vostok/hercules/meta/task/TaskExecutor.java b/hercules-meta/src/main/java/ru/kontur/vostok/hercules/meta/task/TaskExecutor.java index be684f4f..df7b5558 100644 --- a/hercules-meta/src/main/java/ru/kontur/vostok/hercules/meta/task/TaskExecutor.java +++ b/hercules-meta/src/main/java/ru/kontur/vostok/hercules/meta/task/TaskExecutor.java @@ -64,8 +64,16 @@ public void start() { @Override public boolean stop(long timeout, TimeUnit unit) { running = false; + executorService.shutdown(); try { - return executorService.awaitTermination(timeout, unit); + if (!executorService.awaitTermination(timeout, unit)) { + executorService.shutdownNow(); + if (!executorService.awaitTermination(timeout, unit)) { + LOGGER.warn("Thread pool did not terminate"); + } + return false; + } + return true; } catch (InterruptedException e) { LOGGER.error("TaskExecutor shutdown execute was terminated by InterruptedException", e); return false; @@ -205,7 +213,7 @@ private enum State { /** * A task executor should poll ZK queue for new tasks. */ - SHOULD_POLL; + SHOULD_POLL } /** diff --git a/hercules-meta/src/main/java/ru/kontur/vostok/hercules/meta/task/TaskQueue.java b/hercules-meta/src/main/java/ru/kontur/vostok/hercules/meta/task/TaskQueue.java index d326f17f..7bec2236 100644 --- a/hercules-meta/src/main/java/ru/kontur/vostok/hercules/meta/task/TaskQueue.java +++ b/hercules-meta/src/main/java/ru/kontur/vostok/hercules/meta/task/TaskQueue.java @@ -6,10 +6,9 @@ import ru.kontur.vostok.hercules.curator.exception.CuratorUnknownException; import ru.kontur.vostok.hercules.curator.result.CreationResult; import ru.kontur.vostok.hercules.meta.serialization.SerializationException; -import ru.kontur.vostok.hercules.util.concurrent.ThreadFactories; +import ru.kontur.vostok.hercules.util.concurrent.ScheduledThreadPoolExecutorBuilder; import ru.kontur.vostok.hercules.util.lifecycle.Stoppable; -import java.util.concurrent.Executors; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -26,7 +25,12 @@ public class TaskQueue implements Stoppable { private final long delayMs; private final ScheduledExecutorService executor = - Executors.newSingleThreadScheduledExecutor(ThreadFactories.newNamedThreadFactory("task-queue", false)); + new ScheduledThreadPoolExecutorBuilder() + .threadPoolSize(1) + .name("task-queue") + .daemon(false) + .dropDelayedTasksAfterShutdown() + .build(); public TaskQueue(TaskRepository repository, long delayMs) { this.repository = repository; diff --git a/hercules-sink/src/main/java/ru/kontur/vostok/hercules/sink/AbstractSinkDaemon.java b/hercules-sink/src/main/java/ru/kontur/vostok/hercules/sink/AbstractSinkDaemon.java index 377c0599..32384c4b 100644 --- a/hercules-sink/src/main/java/ru/kontur/vostok/hercules/sink/AbstractSinkDaemon.java +++ b/hercules-sink/src/main/java/ru/kontur/vostok/hercules/sink/AbstractSinkDaemon.java @@ -26,6 +26,7 @@ */ public abstract class AbstractSinkDaemon { private static final Logger LOGGER = LoggerFactory.getLogger(AbstractSinkDaemon.class); + private static final long SHUTDOWN_TIMEOUT_MS = 5_000L; protected MetricsCollector metricsCollector; @@ -106,11 +107,12 @@ protected void run(String[] args) { private void shutdown() { long start = System.currentTimeMillis(); + LOGGER.info("Start {} shutdown", getDaemonName()); try { if (daemonHttpServer != null) { - daemonHttpServer.stop(5_000, TimeUnit.MILLISECONDS); + daemonHttpServer.stop(SHUTDOWN_TIMEOUT_MS, TimeUnit.MILLISECONDS); } } catch (Throwable t) { LOGGER.error("Error on stopping HTTP Server", t); @@ -118,7 +120,7 @@ private void shutdown() { try { if (sinkPool != null) { - sinkPool.stop(5_000, TimeUnit.MILLISECONDS); + sinkPool.stop(SHUTDOWN_TIMEOUT_MS, TimeUnit.MILLISECONDS); } } catch (Throwable t) { LOGGER.error("Error on stopping Sink pool", t); @@ -127,7 +129,12 @@ private void shutdown() { try { if (executor != null) { executor.shutdown(); - executor.awaitTermination(5_000L, TimeUnit.MILLISECONDS); + if (!executor.awaitTermination(SHUTDOWN_TIMEOUT_MS, TimeUnit.MILLISECONDS)) { + executor.shutdownNow(); + if (!executor.awaitTermination(SHUTDOWN_TIMEOUT_MS, TimeUnit.MILLISECONDS)) { + LOGGER.warn("Thread pool did not terminate"); + } + } } } catch (Throwable t) { LOGGER.error("Error on stopping sink thread executor", t); @@ -135,7 +142,7 @@ private void shutdown() { try { if (sender != null) { - sender.stop(5_000L, TimeUnit.MILLISECONDS); + sender.stop(SHUTDOWN_TIMEOUT_MS, TimeUnit.MILLISECONDS); } } catch (Throwable t) { LOGGER.error("Error on stopping sender", t); diff --git a/hercules-util/src/main/java/ru/kontur/vostok/hercules/util/concurrent/RenewableTaskScheduler.java b/hercules-util/src/main/java/ru/kontur/vostok/hercules/util/concurrent/RenewableTaskScheduler.java index a007173d..fbcce172 100644 --- a/hercules-util/src/main/java/ru/kontur/vostok/hercules/util/concurrent/RenewableTaskScheduler.java +++ b/hercules-util/src/main/java/ru/kontur/vostok/hercules/util/concurrent/RenewableTaskScheduler.java @@ -4,7 +4,6 @@ import org.slf4j.LoggerFactory; import ru.kontur.vostok.hercules.util.lifecycle.Stoppable; -import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -18,7 +17,12 @@ public class RenewableTaskScheduler implements Stoppable { private final ScheduledExecutorService executor; public RenewableTaskScheduler(String name, int threadPoolSize) { - executor = Executors.newScheduledThreadPool(threadPoolSize, ThreadFactories.newNamedThreadFactory(name, false)); + executor = new ScheduledThreadPoolExecutorBuilder() + .threadPoolSize(threadPoolSize) + .name(name) + .daemon(false) + .dropDelayedTasksAfterShutdown() + .build(); } /** @@ -26,8 +30,8 @@ public RenewableTaskScheduler(String name, int threadPoolSize) { *

* If task is scheduled, it will run between two heartbeats. Task is auto-scheduled if call renew or run methods. * - * @param runnable is task to be scheduled - * @param heartbeatMillis is heartbeat interval in millis + * @param runnable is task to be scheduled + * @param heartbeatMillis is heartbeat interval in millis * @param shouldBeScheduled if true, then call schedule method on task * @return task created */ @@ -43,7 +47,11 @@ public RenewableTask task(Runnable runnable, long heartbeatMillis, boolean shoul public boolean stop(long timeout, TimeUnit unit) { executor.shutdown(); try { - return executor.awaitTermination(timeout, unit); + boolean isTerminated = executor.awaitTermination(timeout, unit); + if (!isTerminated) { + LOGGER.warn("Scheduled thread pool did not terminate"); + } + return isTerminated; } catch (InterruptedException e) { LOGGER.warn("Shutdown interrupted", e); Thread.currentThread().interrupt(); diff --git a/hercules-util/src/main/java/ru/kontur/vostok/hercules/util/concurrent/ScheduledThreadPoolExecutorBuilder.java b/hercules-util/src/main/java/ru/kontur/vostok/hercules/util/concurrent/ScheduledThreadPoolExecutorBuilder.java new file mode 100644 index 00000000..0b6678c1 --- /dev/null +++ b/hercules-util/src/main/java/ru/kontur/vostok/hercules/util/concurrent/ScheduledThreadPoolExecutorBuilder.java @@ -0,0 +1,96 @@ +package ru.kontur.vostok.hercules.util.concurrent; + +import org.jetbrains.annotations.NotNull; + +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * This class provides an API to configure and to create an instance of {@link ScheduledThreadPoolExecutor}. + *

    Note, by default: + *
  • the value of {@code threadPoolSize} is 1,
  • + *
  • each new thread is created as a non-daemon thread,
  • + *
  • new threads have names accessible via {@link Thread#getName} of + * hercules-pool-N-thread-M, where N is the sequence + * number of this builder, and M is the sequence number + * of the thread created by {@link ScheduledThreadPoolExecutor}.
  • + *
+ * + * @author Anton Akkuzin + */ +public final class ScheduledThreadPoolExecutorBuilder { + + private static final AtomicInteger poolNumber = new AtomicInteger(1); + + private int threadPoolSize = 1; + private boolean continueExistingPeriodicTasksAfterShutdown = false; + private boolean executeExistingDelayedTasksAfterShutdown = true; + private boolean daemonThreads = false; + private String threadNamePrefix; + + public ScheduledThreadPoolExecutor build() { + if (threadNamePrefix == null) { + threadNamePrefix = "hercules-pool-" + poolNumber.getAndIncrement(); + } + + ScheduledThreadPoolExecutor executor + = new ScheduledThreadPoolExecutor(threadPoolSize, new NamedThreadFactory(threadNamePrefix, daemonThreads)); + + executor.setContinueExistingPeriodicTasksAfterShutdownPolicy(continueExistingPeriodicTasksAfterShutdown); + executor.setExecuteExistingDelayedTasksAfterShutdownPolicy(executeExistingDelayedTasksAfterShutdown); + + return executor; + } + + /** + * Sets a thread name prefix. + *

Example: if threadPoolName param is foo

then new threads will have names foo-thread-M. + * + * @param threadPoolName the thread name prefix + * @return a reference to this object. + */ + public ScheduledThreadPoolExecutorBuilder name(@NotNull String threadPoolName) { + this.threadNamePrefix = threadPoolName; + return this; + } + + /** + * Sets whether threads will be daemons. + * + * @param daemon if {@code true} then executor will create daemon threads, otherwise non-daemon threads + * @return a reference to this object. + */ + public ScheduledThreadPoolExecutorBuilder daemon(boolean daemon) { + this.daemonThreads = daemon; + return this; + } + + /** + * Sets the number of threads to keep in the pool. + * + * @param threadPoolSize the number of threads + * @return a reference to this object. + */ + public ScheduledThreadPoolExecutorBuilder threadPoolSize(int threadPoolSize) { + this.threadPoolSize = threadPoolSize; + return this; + } + + /** + * @see ScheduledThreadPoolExecutor#setContinueExistingPeriodicTasksAfterShutdownPolicy(boolean) + * @return a reference to this object. + */ + public ScheduledThreadPoolExecutorBuilder continuePeriodicTasksAfterShutdown() { + continueExistingPeriodicTasksAfterShutdown = true; + return this; + } + + /** + * @see ScheduledThreadPoolExecutor#setExecuteExistingDelayedTasksAfterShutdownPolicy(boolean) + * @return a reference to this object. + */ + public ScheduledThreadPoolExecutorBuilder dropDelayedTasksAfterShutdown() { + executeExistingDelayedTasksAfterShutdown = false; + return this; + } +} From 012ca75377e50c538e97c6e0f0b77ce229dd5410 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=94=D0=B5=D0=BC=D0=B5=D0=BD=D0=B5=D0=B2=20=D0=9F=D0=B5?= =?UTF-8?q?=D1=82=D1=80=20=D0=90=D0=BD=D0=B4=D1=80=D0=B5=D0=B5=D0=B2=D0=B8?= =?UTF-8?q?=D1=87?= Date: Wed, 9 Feb 2022 22:42:33 +0500 Subject: [PATCH 04/13] Resolve "add config about passing not specified tags" --- hercules-elastic-sink/README.md | 2 ++ .../json/format/EventToJsonFormatter.java | 16 ++++++++++--- .../json/format/EventToJsonFormatterTest.java | 24 +++++++++++++++++-- 3 files changed, 37 insertions(+), 5 deletions(-) diff --git a/hercules-elastic-sink/README.md b/hercules-elastic-sink/README.md index 3b9fd6e5..35ef606c 100644 --- a/hercules-elastic-sink/README.md +++ b/hercules-elastic-sink/README.md @@ -54,6 +54,8 @@ default value: `yyyy-MM-dd'T'HH:mm:ss.nnnnnnnnnX` `sink.sender.elastic.format.file` - path to the mapping file. Can use `resource://log-event.mapping` if sink processes logs. See [MappingLoader](../hercules-json/src/main/java/ru/kontur/vostok/hercules/json/format/MappingLoader.java) for details, required +`sink.sender.elastic.format.ignore.unknown.tags` - should ignore event tags for which no mapping is specified, default value: `false` + ##### Elastic Client settings `sink.sender.elastic.client.hosts` - list of elastic hosts diff --git a/hercules-json/src/main/java/ru/kontur/vostok/hercules/json/format/EventToJsonFormatter.java b/hercules-json/src/main/java/ru/kontur/vostok/hercules/json/format/EventToJsonFormatter.java index 629fa91f..f9e9d33a 100644 --- a/hercules-json/src/main/java/ru/kontur/vostok/hercules/json/format/EventToJsonFormatter.java +++ b/hercules-json/src/main/java/ru/kontur/vostok/hercules/json/format/EventToJsonFormatter.java @@ -25,7 +25,8 @@ *
    *
  • Event timestamp can be used as JSON-document field, *
  • Event payload can be mapped using {@link Mapper} implementations, - *
  • By default, {@link Transformer#PLAIN} is used if no mapping is specified. + *
  • If no mapping is specified for some tags then {@link Transformer#PLAIN} is used by default + * or you can ignore these tags. *
* * @author Gregory Koshelev @@ -37,6 +38,7 @@ public class EventToJsonFormatter { private final String timestampField; private final DateTimeFormatter timestampFormatter; private final Mapping mapping; + private final boolean ignoreUnknownTags; public EventToJsonFormatter(Properties properties) { timestampEnabled = PropertiesUtil.get(Props.TIMESTAMP_ENABLE, properties).get(); @@ -49,6 +51,7 @@ public EventToJsonFormatter(Properties properties) { } mapping = MappingLoader.loadMapping(PropertiesUtil.get(Props.FILE, properties).get()); + ignoreUnknownTags = PropertiesUtil.get(Props.IGNORE_UNKNOWN_TAGS, properties).get(); } public Document format(Event event) { @@ -63,8 +66,10 @@ public Document format(Event event) { mapper.map(event, document); } - HTree.Navigator navigator = mapping.navigator(); - process(document, navigator, event.getPayload()); + if (!ignoreUnknownTags) { + HTree.Navigator navigator = mapping.navigator(); + process(document, navigator, event.getPayload()); + } return document; } @@ -107,5 +112,10 @@ public static class Props { Parameter.stringParameter("file"). required(). build(); + + public static final Parameter IGNORE_UNKNOWN_TAGS = + Parameter.booleanParameter("ignore.unknown.tags"). + withDefault(false). + build(); } } diff --git a/hercules-json/src/test/java/ru/kontur/vostok/hercules/json/format/EventToJsonFormatterTest.java b/hercules-json/src/test/java/ru/kontur/vostok/hercules/json/format/EventToJsonFormatterTest.java index 677643df..1a1ec2ba 100644 --- a/hercules-json/src/test/java/ru/kontur/vostok/hercules/json/format/EventToJsonFormatterTest.java +++ b/hercules-json/src/test/java/ru/kontur/vostok/hercules/json/format/EventToJsonFormatterTest.java @@ -169,7 +169,6 @@ public void shouldProjectContainer() throws IOException { "}", stream.toString(StandardCharsets.UTF_8.name()) ); - } @Test @@ -196,7 +195,28 @@ public void shouldProjectVectorOfContainers() throws IOException { "}", stream.toString(StandardCharsets.UTF_8.name()) ); - } + @Test + public void shouldIgnoreUnknownTags() throws IOException { + Event event = EventBuilder.create(0, "11203800-63fd-11e8-83e2-3a587d902000"). + tag("integerTag", Variant.ofInteger(123)). + tag("unmappedTag", Variant.ofString("value")). + build(); + + Properties properties = new Properties(); + properties.setProperty(EventToJsonFormatter.Props.TIMESTAMP_ENABLE.name(), "false"); + properties.setProperty(EventToJsonFormatter.Props.FILE.name(), "resource://transform.mapping"); + properties.setProperty(EventToJsonFormatter.Props.IGNORE_UNKNOWN_TAGS.name(), "true"); + EventToJsonFormatter formatter = new EventToJsonFormatter(properties); + + ByteArrayOutputStream stream = new ByteArrayOutputStream(); + DocumentWriter.writeTo(stream, formatter.format(event)); + + assertEquals("{" + + "\"stringField\":\"123\"" + + "}", + stream.toString(StandardCharsets.UTF_8.name()) + ); + } } From bbeaafdb71060427b3cc48539d71f9738d6d5315 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=90=D0=BA=D0=BA=D1=83=D0=B7=D0=B8=D0=BD=20=D0=90=D0=BD?= =?UTF-8?q?=D1=82=D0=BE=D0=BD=20=D0=A2=D0=B8=D0=BC=D0=BE=D1=84=D0=B5=D0=B5?= =?UTF-8?q?=D0=B2=D0=B8=D1=87?= Date: Mon, 14 Feb 2022 16:42:03 +0500 Subject: [PATCH 05/13] Resolve "Blacklist event filter trie" --- .../sink/EventToElasticJsonWriter.java | 3 +- .../kontur/vostok/hercules/protocol/Type.java | 41 +++-- .../hercules/protocol/util/ContainerUtil.java | 7 + .../hercules/protocol/util/TypeUtil.java | 55 +++++++ .../hercules/protocol/util/VariantUtil.java | 57 ------- .../sink/converters/SentryEventConverter.java | 5 +- .../sink/filter/BlacklistEventFilter.java | 42 +---- .../hercules/sink/filter/PatternTree.java | 152 +++++++++++++++++ .../hercules/sink/filter/PatternTreeTest.java | 155 ++++++++++++++++++ 9 files changed, 406 insertions(+), 111 deletions(-) create mode 100644 hercules-protocol/src/main/java/ru/kontur/vostok/hercules/protocol/util/TypeUtil.java delete mode 100644 hercules-protocol/src/main/java/ru/kontur/vostok/hercules/protocol/util/VariantUtil.java create mode 100644 hercules-sink/src/main/java/ru/kontur/vostok/hercules/sink/filter/PatternTree.java create mode 100644 hercules-sink/src/test/java/ru/kontur/vostok/hercules/sink/filter/PatternTreeTest.java diff --git a/hercules-elastic-sink/src/main/java/ru/kontur/vostok/hercules/elastic/sink/EventToElasticJsonWriter.java b/hercules-elastic-sink/src/main/java/ru/kontur/vostok/hercules/elastic/sink/EventToElasticJsonWriter.java index 50ec51df..01104d12 100644 --- a/hercules-elastic-sink/src/main/java/ru/kontur/vostok/hercules/elastic/sink/EventToElasticJsonWriter.java +++ b/hercules-elastic-sink/src/main/java/ru/kontur/vostok/hercules/elastic/sink/EventToElasticJsonWriter.java @@ -9,7 +9,6 @@ import ru.kontur.vostok.hercules.protocol.TinyString; import ru.kontur.vostok.hercules.protocol.Variant; import ru.kontur.vostok.hercules.protocol.util.ContainerUtil; -import ru.kontur.vostok.hercules.protocol.util.VariantUtil; import ru.kontur.vostok.hercules.tags.CommonTags; import ru.kontur.vostok.hercules.tags.LogEventTags; import ru.kontur.vostok.hercules.util.time.TimeUtil; @@ -59,7 +58,7 @@ public static void writeEvent(OutputStream stream, Event event, boolean mergePro } if (LogEventTags.EXCEPTION_TAG.getName().equals(tag.getKey())) { - Optional exception = VariantUtil.extractContainer(tag.getValue()); + Optional exception = ContainerUtil.extractContainer(tag.getValue()); if (exception.isPresent()) { String stackTrace = StackTraceCreator.createStackTrace(exception.get()); EventToJsonWriter.writeVariantAsField(generator, STACKTRACE_FIELD, Variant.ofString(stackTrace)); diff --git a/hercules-protocol/src/main/java/ru/kontur/vostok/hercules/protocol/Type.java b/hercules-protocol/src/main/java/ru/kontur/vostok/hercules/protocol/Type.java index 81777f1a..47699d18 100644 --- a/hercules-protocol/src/main/java/ru/kontur/vostok/hercules/protocol/Type.java +++ b/hercules-protocol/src/main/java/ru/kontur/vostok/hercules/protocol/Type.java @@ -6,20 +6,20 @@ * @author Gregory Koshelev */ public enum Type { - TYPE(0x00, 1), - CONTAINER(0x01, -1), - BYTE(0x02, 1), - SHORT(0x03, 2), - INTEGER(0x04, 4), - LONG(0x05, 8), - FLAG(0x06, 1), - FLOAT(0x07, 4), - DOUBLE(0x08, 8), - STRING(0x09, -1), - UUID(0x0A, 16), - NULL(0x0B, 0), - VECTOR(0x80, -1), - RESERVED(0xFF, -1); + TYPE(0x00, 1, false), + CONTAINER(0x01, -1, false), + BYTE(0x02, 1, true), + SHORT(0x03, 2, true), + INTEGER(0x04, 4, true), + LONG(0x05, 8, true), + FLAG(0x06, 1, true), + FLOAT(0x07, 4, true), + DOUBLE(0x08, 8, true), + STRING(0x09, -1, true), + UUID(0x0A, 16, true), + NULL(0x0B, 0, true), + VECTOR(0x80, -1, false), + RESERVED(0xFF, -1, false); public final int code; /** @@ -28,10 +28,21 @@ public enum Type { * Size -1 means that value of that type has no fixed size. */ public final int size; + private final boolean primitive; - Type(int code, int size) { + Type(int code, int size, boolean isPrimitive) { this.code = code; this.size = size; + this.primitive = isPrimitive; + } + + /** + * Returns {@code true} if {@link Type} is primitive. + * + * @return {@code true} if {@link Type} is primitive. + */ + public boolean isPrimitive() { + return primitive; } public static Type valueOf(int code) { diff --git a/hercules-protocol/src/main/java/ru/kontur/vostok/hercules/protocol/util/ContainerUtil.java b/hercules-protocol/src/main/java/ru/kontur/vostok/hercules/protocol/util/ContainerUtil.java index 578b4d23..bf2e2649 100644 --- a/hercules-protocol/src/main/java/ru/kontur/vostok/hercules/protocol/util/ContainerUtil.java +++ b/hercules-protocol/src/main/java/ru/kontur/vostok/hercules/protocol/util/ContainerUtil.java @@ -29,6 +29,13 @@ public static T extract(Container container, TagDescription tag) { return extractor.apply(value); } + public static Optional extractContainer(Variant variant) { + if (variant.getType() == Type.CONTAINER) { + return Optional.of((Container) variant.getValue()); + } + return Optional.empty(); + } + private ContainerUtil() { /* static class */ } diff --git a/hercules-protocol/src/main/java/ru/kontur/vostok/hercules/protocol/util/TypeUtil.java b/hercules-protocol/src/main/java/ru/kontur/vostok/hercules/protocol/util/TypeUtil.java new file mode 100644 index 00000000..6734cfea --- /dev/null +++ b/hercules-protocol/src/main/java/ru/kontur/vostok/hercules/protocol/util/TypeUtil.java @@ -0,0 +1,55 @@ +package ru.kontur.vostok.hercules.protocol.util; + +import ru.kontur.vostok.hercules.protocol.TinyString; +import ru.kontur.vostok.hercules.protocol.Type; + +import java.util.UUID; + +/** + * Utility class for {@link Type}-related operations. + * + * @author Anton Akkuzin + */ +public final class TypeUtil { + + /** + * Parses a value from {@link String} by the specified primitive {@link Type}. + * + * @param value {@link String} value to parse + * @param type the type by which the {@code value} should be parsed + * @return parsed {@code value} boxed in {@link Object}. + */ + public static Object parsePrimitiveValue(String value, Type type) { + switch (type) { + case BYTE: + return Byte.parseByte(value); + case SHORT: + return Short.parseShort(value); + case INTEGER: + return Integer.parseInt(value); + case LONG: + return Long.parseLong(value); + case FLAG: + return Boolean.parseBoolean(value); + case FLOAT: + return Float.parseFloat(value); + case DOUBLE: + return Double.parseDouble(value); + case UUID: + return UUID.fromString(value); + case STRING: + return TinyString.of(value); + case NULL: + if (value.equalsIgnoreCase("null")) { + return null; + } + throw new IllegalArgumentException(String.format("Can't parse value with Type.%s from string '%s'", type, value)); + default: + throw new IllegalArgumentException(String.format("Type '%s' is not primitive.", type)); + } + } + + private TypeUtil() { + /* static class */ + } +} diff --git a/hercules-protocol/src/main/java/ru/kontur/vostok/hercules/protocol/util/VariantUtil.java b/hercules-protocol/src/main/java/ru/kontur/vostok/hercules/protocol/util/VariantUtil.java deleted file mode 100644 index db1f4851..00000000 --- a/hercules-protocol/src/main/java/ru/kontur/vostok/hercules/protocol/util/VariantUtil.java +++ /dev/null @@ -1,57 +0,0 @@ -package ru.kontur.vostok.hercules.protocol.util; - -import ru.kontur.vostok.hercules.protocol.Container; -import ru.kontur.vostok.hercules.protocol.Type; -import ru.kontur.vostok.hercules.protocol.Variant; - -import java.nio.charset.StandardCharsets; -import java.util.Optional; - -public final class VariantUtil { - - private VariantUtil() { - } - - public static boolean isPrimitive(Variant variant) { - switch (variant.getType()) { - case BYTE: - case SHORT: - case INTEGER: - case LONG: - case FLAG: - case FLOAT: - case DOUBLE: - case UUID: - case STRING: - case NULL: - return true; - default: - return false; - } - } - - public static Optional extractPrimitiveAsString(Variant variant) { - switch (variant.getType()) { - case BYTE: - case SHORT: - case INTEGER: - case LONG: - case FLAG: - case FLOAT: - case DOUBLE: - case UUID: - return Optional.of(String.valueOf(variant.getValue())); - case STRING: - return Optional.of(new String((byte[]) variant.getValue(), StandardCharsets.UTF_8)); - default: - return Optional.empty(); - } - } - - public static Optional extractContainer(Variant variant) { - if (variant.getType() == Type.CONTAINER) { - return Optional.of((Container) variant.getValue()); - } - return Optional.empty(); - } -} diff --git a/hercules-sentry-sink/src/main/java/ru/kontur/vostok/hercules/sentry/sink/converters/SentryEventConverter.java b/hercules-sentry-sink/src/main/java/ru/kontur/vostok/hercules/sentry/sink/converters/SentryEventConverter.java index 7f1cde17..0960a573 100644 --- a/hercules-sentry-sink/src/main/java/ru/kontur/vostok/hercules/sentry/sink/converters/SentryEventConverter.java +++ b/hercules-sentry-sink/src/main/java/ru/kontur/vostok/hercules/sentry/sink/converters/SentryEventConverter.java @@ -16,7 +16,6 @@ import ru.kontur.vostok.hercules.protocol.Vector; import ru.kontur.vostok.hercules.protocol.util.ContainerUtil; import ru.kontur.vostok.hercules.protocol.util.TagDescription; -import ru.kontur.vostok.hercules.protocol.util.VariantUtil; import ru.kontur.vostok.hercules.tags.CommonTags; import ru.kontur.vostok.hercules.tags.ExceptionTags; import ru.kontur.vostok.hercules.tags.LogEventTags; @@ -202,7 +201,7 @@ private static void writeOtherData(final Container properties, EventBuilder even continue; } - if (VariantUtil.isPrimitive(value)) { + if (value.getType().isPrimitive()) { eventBuilder.withTag(tagName, sanitizeTagValue(value)); continue; } @@ -226,7 +225,7 @@ private static Optional cutOffPrefixIfExists(String prefix, String sourc if (sourceName.length() <= prefixWithDelimiter.length()) { return Optional.empty(); } - if (!sourceName.substring(0, prefixWithDelimiter.length()).equals(prefixWithDelimiter)) { + if (!sourceName.startsWith(prefixWithDelimiter)) { return Optional.empty(); } return Optional.of(sourceName.substring(prefixWithDelimiter.length())); diff --git a/hercules-sink/src/main/java/ru/kontur/vostok/hercules/sink/filter/BlacklistEventFilter.java b/hercules-sink/src/main/java/ru/kontur/vostok/hercules/sink/filter/BlacklistEventFilter.java index 77e14882..1a082a0b 100644 --- a/hercules-sink/src/main/java/ru/kontur/vostok/hercules/sink/filter/BlacklistEventFilter.java +++ b/hercules-sink/src/main/java/ru/kontur/vostok/hercules/sink/filter/BlacklistEventFilter.java @@ -2,14 +2,13 @@ import ru.kontur.vostok.hercules.protocol.Container; import ru.kontur.vostok.hercules.protocol.Event; -import ru.kontur.vostok.hercules.protocol.TinyString; import ru.kontur.vostok.hercules.protocol.Type; import ru.kontur.vostok.hercules.protocol.Variant; import ru.kontur.vostok.hercules.protocol.hpath.HPath; import ru.kontur.vostok.hercules.util.parameter.Parameter; import ru.kontur.vostok.hercules.util.properties.PropertiesUtil; -import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.Properties; import java.util.stream.Collectors; @@ -33,10 +32,9 @@ * @author Gregory Koshelev */ public class BlacklistEventFilter extends EventFilter { - private static final TinyString STAR = TinyString.of("*"); private final List paths; - private final List> patterns; + private final PatternTree blacklistTree; /** * Inheritors must implement constructor with the same signature. @@ -50,49 +48,25 @@ public BlacklistEventFilter(Properties properties) { map(HPath::fromPath). collect(Collectors.toList()); - this.patterns = Stream.of(PropertiesUtil.get(Props.PATTERNS, properties).get()). - map(x -> Stream.of(x.split(":")). - map(v -> v.equals("*") ? STAR : TinyString.of(v)). - collect(Collectors.toList())). - collect(Collectors.toList()); + this.blacklistTree = new PatternTree(Collections.nCopies(paths.size(), Type.STRING)); - for (List pattern : patterns) { - if (paths.size() != pattern.size()) { - throw new IllegalArgumentException("Pattern size should be equal to paths size"); - } + for (String pattern : PropertiesUtil.get(Props.PATTERNS, properties).get()) { + blacklistTree.put(pattern); } } @Override public boolean test(Event event) { - if (patterns.isEmpty()) { + if (blacklistTree.isEmpty()) { return true; } Container payload = event.getPayload(); - List values = paths.stream(). + List variants = paths.stream(). map(path -> path.extract(payload)). collect(Collectors.toList()); - for (List pattern : patterns) {//TODO: Should be reimplemented (may be use trie?) to avoid for-for iterations with array comparing - boolean matched = true; - for (int i = 0; i < pattern.size(); i++) { - TinyString element = pattern.get(i); - if (element == STAR) { - continue; - } - - Variant variant = values.get(i); - matched = (variant != null) && (variant.getType() == Type.STRING) && Arrays.equals(element.getBytes(), (byte[]) variant.getValue()); - if (!matched) { - break; - } - } - if (matched) { - return false; - } - } - return true; + return !blacklistTree.matches(variants); } private static class Props { diff --git a/hercules-sink/src/main/java/ru/kontur/vostok/hercules/sink/filter/PatternTree.java b/hercules-sink/src/main/java/ru/kontur/vostok/hercules/sink/filter/PatternTree.java new file mode 100644 index 00000000..c3b449a7 --- /dev/null +++ b/hercules-sink/src/main/java/ru/kontur/vostok/hercules/sink/filter/PatternTree.java @@ -0,0 +1,152 @@ +package ru.kontur.vostok.hercules.sink.filter; + +import org.jetbrains.annotations.NotNull; +import ru.kontur.vostok.hercules.protocol.TinyString; +import ru.kontur.vostok.hercules.protocol.Type; +import ru.kontur.vostok.hercules.protocol.Variant; +import ru.kontur.vostok.hercules.protocol.util.TypeUtil; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; + +/** + *

The class is a tree of patterns, the traversal of which allows you + * to determine whether the tested sequence of {@link Variant}s fits any pattern.

+ * + *

The tree supports the primitive {@link Variant} types (see {@link Type}). + * + *

The tree supports star {@code '*'} in the pattern definition. It means {@code any value}.

+ * + *

The pattern consists of elements separated by {@code ':'}. + * The number of elements in the pattern must strictly correspond to the number of {@link Type}s specified during initialization. + * Each element in the string representation, except for {@code '*'}, must match the {@link Type} specified during initialization.

+ * + *

Example:

+ *

let the tree be initialized with such list of types: [{@link Type#STRING}, {@link Type#INTEGER}, {@link Type#FLOAT}]. + * After that you can put in the tree patterns like {@code 'my_project:999:3.14'}, {@code 'my_project:1234:*'}, {@code '*:999:*'}, etc. + *

+ * + * @author Anton Akkuzin + */ +public class PatternTree { + + private final Node root; + private final List types; + private final int depth; + + public PatternTree(@NotNull List types) { + for (Type type : types) { + if (!type.isPrimitive()) { + throw new IllegalArgumentException(String.format("Type '%s' is not primitive.", type)); + } + } + + this.root = Node.newStar(); + this.types = types; + this.depth = types.size(); + } + + /** + *

Rebuilds the tree on each call. Patterns with common paths are merged together.

+ *

See {@link PatternTree} for pattern examples.

+ * + * @param pattern {@link String} with elements separated by {@code ':'} + */ + public void put(@NotNull String pattern) { + String[] tokens = pattern.split(":"); + if (tokens.length != depth) { + throw new IllegalArgumentException("Pattern size should be equal to paths size."); + } + + Node current = root; + + for (int i = 0; i < tokens.length; i++) { + boolean star = tokens[i].equals("*"); + + if (star && !current.containsStar()) { + current.starChild = Node.newStar(); + } + + current = star + ? current.starChild + : current.children.computeIfAbsent(TypeUtil.parsePrimitiveValue(tokens[i], types.get(i)), Node::new); + } + } + + /** + *

Iterates the tree and checks the given sequence of {@link Variant}s matches any of the patterns.

+ * + *

Pattern branches that are specified explicitly (not an {@code '*'}) are checked first.

+ * + * @param variants sequence of {@link Variant}s to check + * @return {@code true} if {@code variants} matches any of the patterns + */ + public boolean matches(List variants) { + return testNode(root, variants, 0); + } + + /** + * Returns {@code true} if this tree contains no branches. + * + * @return {@code true} if this tree contains no branches. + */ + public boolean isEmpty() { + return this.root.children.isEmpty() && !this.root.containsStar(); + } + + private boolean testNode(Node node, List variants, int currentDepth) { + if (currentDepth == this.depth) { + return true; + } + + Node child = node.getChild(variants.get(currentDepth), types.get(currentDepth)); + + if (child != null && testNode(child, variants, currentDepth + 1)) { + return true; + } + + return node.containsStar() && testNode(node.starChild, variants, currentDepth + 1); + } + + private static class Node { + + final Map children = new HashMap<>(); + final Object value; + Node starChild; + + private Node(Object value) { + this.value = value; + } + + private static Node newStar() { + return new Node(null); + } + + private Node getChild(Variant variant, Type type) { + if (variant == null || variant.getType() != type) { + return null; + } + + for (Node child : children.values()) { + if (child.matches(variant)) { + return child; + } + } + + return null; + } + + private boolean matches(Variant variant) { + return variant.getType() == Type.STRING + ? Arrays.equals(((TinyString) value).getBytes(), (byte[]) variant.getValue()) + : Objects.equals(value, variant.getValue()); + } + + private boolean containsStar() { + return starChild != null; + } + } +} diff --git a/hercules-sink/src/test/java/ru/kontur/vostok/hercules/sink/filter/PatternTreeTest.java b/hercules-sink/src/test/java/ru/kontur/vostok/hercules/sink/filter/PatternTreeTest.java new file mode 100644 index 00000000..37043315 --- /dev/null +++ b/hercules-sink/src/test/java/ru/kontur/vostok/hercules/sink/filter/PatternTreeTest.java @@ -0,0 +1,155 @@ +package ru.kontur.vostok.hercules.sink.filter; + +import org.junit.Assert; +import org.junit.Test; +import ru.kontur.vostok.hercules.protocol.Type; +import ru.kontur.vostok.hercules.protocol.Variant; + +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; + +/** + * @author Anton Akkuzin + */ +public class PatternTreeTest { + + @Test + public void shouldNotInitializeWithNonPrimitiveTypes() { + Exception exception = null; + + try { + new PatternTree(List.of(Type.STRING, Type.VECTOR)); + } catch (IllegalArgumentException ex) { + exception = ex; + } + + assert exception != null; + Assert.assertEquals("Type 'VECTOR' is not primitive.", exception.getMessage()); + } + + @Test + public void shouldThrowWhenPatternSizeIsDifferentFromTypesSize() { + PatternTree patternTree = new PatternTree(List.of(Type.STRING, Type.INTEGER, Type.FLOAT)); + + Exception exception = null; + try { + patternTree.put("some_str:1234"); + } catch (IllegalArgumentException ex) { + exception = ex; + } + + assert exception != null; + Assert.assertEquals("Pattern size should be equal to paths size.", exception.getMessage()); + } + + @Test + public void shouldThrowWhenWrongType() { + PatternTree patternTree = new PatternTree(List.of(Type.INTEGER)); + + boolean thrown = false; + try { + patternTree.put("not_a_number"); + } catch (Exception ex) { + thrown = true; + } + + Assert.assertTrue(thrown); + } + + @Test + public void shouldMatchCorrectPatterns() { + PatternTree patternTree = new PatternTree(List.of(Type.STRING, Type.INTEGER, Type.FLOAT)); + + String[] patterns = { + "a:99:105.9", + "a:*:999.9", + "b:99:*", + "*:1234:*", + }; + + for (String pattern : patterns) { + patternTree.put(pattern); + } + + Assert.assertTrue(patternTree.matches(List.of( + Variant.ofString("a"), + Variant.ofInteger(99), + Variant.ofFloat(105.9f) + ))); + Assert.assertTrue(patternTree.matches(List.of( + Variant.ofString("a"), + Variant.ofInteger(99), + Variant.ofFloat(999.9f) + ))); + Assert.assertTrue(patternTree.matches(List.of( + Variant.ofString("a"), + Variant.ofInteger(99), + Variant.ofFloat(105.9f) + ))); + Assert.assertTrue(patternTree.matches(List.of( + Variant.ofString("b"), + Variant.ofInteger(99), + Variant.ofFloat(3.14f) + ))); + Assert.assertTrue(patternTree.matches(List.of( + Variant.ofString("b"), + Variant.ofInteger(99), + Variant.ofFloat(1) + ))); + Assert.assertTrue(patternTree.matches(List.of( + Variant.ofString("c"), + Variant.ofInteger(1234), + Variant.ofFloat(999.9f) + ))); + Assert.assertFalse(patternTree.matches(List.of( + Variant.ofString("a"), + Variant.ofInteger(99), + Variant.ofFloat(666) + ))); + } + + @Test + public void shouldMatchStarWhenVariantIsMissing() { + PatternTree patternTree = new PatternTree(List.of(Type.STRING, Type.INTEGER, Type.FLOAT)); + + String[] patterns = { + "b:99:*", + "*:1234:*", + }; + + for (String pattern : patterns) { + patternTree.put(pattern); + } + + ArrayList variants = new ArrayList<>(); + variants.add(null); + variants.add(Variant.ofInteger(1234)); + variants.add(null); + + Assert.assertTrue(patternTree.matches(variants)); + } + + @Test + public void shouldMatchToAllPrimitiveTypes() { + Assert.assertTrue(matchesToPrimitiveType(Type.BYTE, "123", Variant.ofByte((byte) 123))); + Assert.assertTrue(matchesToPrimitiveType(Type.SHORT, "123", Variant.ofShort((short) 123))); + Assert.assertTrue(matchesToPrimitiveType(Type.INTEGER, "1234", Variant.ofInteger(1234))); + Assert.assertTrue(matchesToPrimitiveType(Type.LONG, "123456789", Variant.ofLong(123456789))); + Assert.assertTrue(matchesToPrimitiveType(Type.FLAG, "true", Variant.ofFlag(true))); + Assert.assertTrue(matchesToPrimitiveType(Type.FLAG, "false", Variant.ofFlag(false))); + Assert.assertTrue(matchesToPrimitiveType(Type.FLOAT, "3.14", Variant.ofFloat(3.14f))); + Assert.assertTrue(matchesToPrimitiveType(Type.DOUBLE, "3.14", Variant.ofDouble(3.14))); + Assert.assertTrue(matchesToPrimitiveType(Type.STRING, "some_str", Variant.ofString("some_str"))); + Assert.assertTrue(matchesToPrimitiveType(Type.NULL, "null", Variant.ofNull())); + + UUID uuid = UUID.randomUUID(); + Assert.assertTrue(matchesToPrimitiveType(Type.UUID, uuid.toString(), Variant.ofUuid(uuid))); + } + + private boolean matchesToPrimitiveType(Type type, String pattern, Variant variant) { + PatternTree patternTree = new PatternTree(List.of(type)); + patternTree.put(pattern); + return patternTree.matches(List.of(variant)); + } +} From 71ab0296fd6fec5fb54ede6f4f6329dd28b404ec Mon Sep 17 00:00:00 2001 From: "a.akkuzin" Date: Wed, 16 Feb 2022 16:24:17 +0400 Subject: [PATCH 06/13] Add support of primitive tags for BlacklistEventFilter --- .../sink/filter/BlacklistEventFilter.java | 30 ++++++++++++++----- .../hercules/sink/filter/EventFilterTest.java | 16 +++++++--- 2 files changed, 34 insertions(+), 12 deletions(-) diff --git a/hercules-sink/src/main/java/ru/kontur/vostok/hercules/sink/filter/BlacklistEventFilter.java b/hercules-sink/src/main/java/ru/kontur/vostok/hercules/sink/filter/BlacklistEventFilter.java index 1a082a0b..dc6b2672 100644 --- a/hercules-sink/src/main/java/ru/kontur/vostok/hercules/sink/filter/BlacklistEventFilter.java +++ b/hercules-sink/src/main/java/ru/kontur/vostok/hercules/sink/filter/BlacklistEventFilter.java @@ -15,19 +15,21 @@ import java.util.stream.Stream; /** - * Blacklist filter uses paths and corresponding patterns to filter out any events with tag values from paths are same as any of patterns has. + * Blacklist filter uses paths and corresponding patterns to filter out any events with tag values and types from paths are same as any of patterns has. *

- * For blacklist filter initialization two properties are needed:
+ * For blacklist filter initialization three properties are needed:
* {@code paths} is the list of {@link HPath} in the string-path form ({@code paths} can be empty),
- * and {@code patterns} is the list of value patterns for tags from paths above, values in the pattern are separated by the colon {@code :} ({@code patterns} can be empty). + * {@code types} is the list of {@link Type}s based on which tags retrieved from the event will be checked ({@code types} can be empty or absent, + * then by default the list will consist of {@link Type#STRING} elements,
+ * and {@code patterns} is the list of value patterns for tags from paths above, values in the pattern are separated by the colon {@code :} + * and should correspond to the specified types ({@code patterns} can be empty). *

* Blacklist filter supports star {@code *} in the pattern definition. It means {@code any value}. *

- * FIXME: Currently, this filter supports only tags of type {@link Type#STRING} and has inefficient walkthrough over patterns when test events. - *

* Sample: - *

{@code paths=properties/project,properties/environment
- * patterns=my_project:testing,my_project:staging}
+ *
{@code types=STRING,STRING,INTEGER
+ * paths=properties/project,properties/environment,properties/id
+ * patterns=my_project:testing:123,my_project:staging:*}
* Here, events for project {@code my_project} from {@code testing} and {@code staging} environments will be filtered out. * @author Gregory Koshelev */ @@ -48,7 +50,15 @@ public BlacklistEventFilter(Properties properties) { map(HPath::fromPath). collect(Collectors.toList()); - this.blacklistTree = new PatternTree(Collections.nCopies(paths.size(), Type.STRING)); + List types = Stream.of(PropertiesUtil.get(Props.TYPES, properties).get()). + map(Type::valueOf). + collect(Collectors.toList()); + + if (types.isEmpty()) { + types = Collections.nCopies(paths.size(), Type.STRING); + } + + this.blacklistTree = new PatternTree(types); for (String pattern : PropertiesUtil.get(Props.PATTERNS, properties).get()) { blacklistTree.put(pattern); @@ -77,5 +87,9 @@ private static class Props { private static final Parameter PATTERNS = Parameter.stringArrayParameter("patterns"). withDefault(new String[0]). build(); + + private static final Parameter TYPES = Parameter.stringArrayParameter("types"). + withDefault(new String[0]). + build(); } } diff --git a/hercules-sink/src/test/java/ru/kontur/vostok/hercules/sink/filter/EventFilterTest.java b/hercules-sink/src/test/java/ru/kontur/vostok/hercules/sink/filter/EventFilterTest.java index e1622cc1..d2ee1286 100644 --- a/hercules-sink/src/test/java/ru/kontur/vostok/hercules/sink/filter/EventFilterTest.java +++ b/hercules-sink/src/test/java/ru/kontur/vostok/hercules/sink/filter/EventFilterTest.java @@ -41,8 +41,9 @@ public void shouldInitializeListOfFiltersFromProperties() { @Test public void blacklistTest() { Properties properties = new Properties(); - properties.setProperty("paths", "properties/project,properties/environment"); - properties.setProperty("patterns", "my_project:staging,test_project:production,bad_project:*"); + properties.setProperty("types", "STRING,STRING,INTEGER"); + properties.setProperty("paths", "properties/project,properties/environment,properties/id"); + properties.setProperty("patterns", "my_project:staging:123,test_project:production:456,bad_project:*:*"); BlacklistEventFilter filter = new BlacklistEventFilter(properties); @@ -66,6 +67,7 @@ public void blacklistTest() { Container.builder(). tag("project", Variant.ofString("bad_project")). tag("environment", Variant.ofString("testing")). + tag("id", Variant.ofInteger(123)). build())). build(); Assert.assertFalse(filter.test(event)); @@ -77,6 +79,7 @@ public void blacklistTest() { Container.builder(). tag("project", Variant.ofString("test_project")). tag("environment", Variant.ofString("production")). + tag("id", Variant.ofInteger(456)). build())). build(); Assert.assertFalse(filter.test(event)); @@ -88,6 +91,7 @@ public void blacklistTest() { Container.builder(). tag("project", Variant.ofString("test_project")). tag("environment", Variant.ofString("staging")). + tag("id", Variant.ofInteger(456)). build())). build(); Assert.assertTrue(filter.test(event)); @@ -96,8 +100,9 @@ public void blacklistTest() { @Test public void whitelistTest() { Properties properties = new Properties(); - properties.setProperty("paths", "properties/project,properties/environment"); - properties.setProperty("patterns", "my_project:staging,test_project:production,bad_project:*"); + properties.setProperty("types", "STRING,STRING,INTEGER"); + properties.setProperty("paths", "properties/project,properties/environment,properties/id"); + properties.setProperty("patterns", "my_project:staging:123,test_project:production:456,bad_project:*:*"); WhitelistEventFilter filter = new WhitelistEventFilter(properties); @@ -121,6 +126,7 @@ public void whitelistTest() { Container.builder(). tag("project", Variant.ofString("bad_project")). tag("environment", Variant.ofString("testing")). + tag("id", Variant.ofInteger(123)). build())). build(); Assert.assertTrue(filter.test(event)); @@ -132,6 +138,7 @@ public void whitelistTest() { Container.builder(). tag("project", Variant.ofString("test_project")). tag("environment", Variant.ofString("production")). + tag("id", Variant.ofInteger(456)). build())). build(); Assert.assertTrue(filter.test(event)); @@ -143,6 +150,7 @@ public void whitelistTest() { Container.builder(). tag("project", Variant.ofString("test_project")). tag("environment", Variant.ofString("staging")). + tag("id", Variant.ofInteger(456)). build())). build(); Assert.assertFalse(filter.test(event)); From 839978283f87660ddcdf82ee88e4127934d54258 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=A2=D0=BE=D0=BA=D0=BC=D1=8F=D0=BD=D0=B8=D0=BD=D0=B0=20?= =?UTF-8?q?=D0=A2=D0=B0=D1=82=D1=8C=D1=8F=D0=BD=D0=B0=20=D0=98=D0=B3=D0=BE?= =?UTF-8?q?=D1=80=D0=B5=D0=B2=D0=BD=D0=B0?= Date: Fri, 18 Feb 2022 16:38:43 +0500 Subject: [PATCH 07/13] Resolve "LeproseryElasticSink: send eventCountMeter by original-index" --- .../elastic/sink/LeproserySender.java | 21 ++++++++++++------- 1 file changed, 14 insertions(+), 7 deletions(-) diff --git a/hercules-elastic-sink/src/main/java/ru/kontur/vostok/hercules/elastic/sink/LeproserySender.java b/hercules-elastic-sink/src/main/java/ru/kontur/vostok/hercules/elastic/sink/LeproserySender.java index e38994c2..78ef130c 100644 --- a/hercules-elastic-sink/src/main/java/ru/kontur/vostok/hercules/elastic/sink/LeproserySender.java +++ b/hercules-elastic-sink/src/main/java/ru/kontur/vostok/hercules/elastic/sink/LeproserySender.java @@ -68,12 +68,18 @@ class LeproserySender { private final Lazy sinkGroupId; private final Lazy sinkSubscription; - private final Meter sentToLeproseryEventCountMeter; - private final Meter sentToLeproseryWithErrorsEventCountMeter; + private final Meter leproseryEventsMeter; + private final Meter leproseryEventsWithErrorsMeter; + private final IndicesMetricsCollector leproseryEventsIndicesMetricsCollector; LeproserySender(Properties properties, MetricsCollector metricsCollector) { - sentToLeproseryEventCountMeter = metricsCollector.meter("sentToLeproseryEventCount"); - sentToLeproseryWithErrorsEventCountMeter = metricsCollector.meter("sentToLeproseryWithErrorsEventCount"); + leproseryEventsMeter = metricsCollector.meter("leproseryEvents"); + leproseryEventsWithErrorsMeter = metricsCollector.meter("leproseryEventsWithErrors"); + leproseryEventsIndicesMetricsCollector = new IndicesMetricsCollector( + "leproseryEvents", + 10_000, + metricsCollector + ); this.leproseryStream = PropertiesUtil.get(Props.LEPROSERY_STREAM, properties).get(); this.leproseryIndex = PropertiesUtil.get(Props.LEPROSERY_INDEX, properties).get(); @@ -100,11 +106,11 @@ public void convertAndSend(Map eventErrorInfo gateClient.send(leproseryApiKey, leproseryStream, data); LOGGER.info("Send to leprosery {} events", count); - sentToLeproseryEventCountMeter.mark(count); - sentToLeproseryWithErrorsEventCountMeter.mark(eventErrorInfos.size() - count); + leproseryEventsMeter.mark(count); + leproseryEventsWithErrorsMeter.mark(eventErrorInfos.size() - count); } catch (BadRequestException | UnavailableClusterException e) { LOGGER.error("Leprosery sending error", e); - sentToLeproseryWithErrorsEventCountMeter.mark(count); + leproseryEventsWithErrorsMeter.mark(count); } } @@ -113,6 +119,7 @@ private List convert(Map eventErrorInf .map(entry -> { ElasticDocument document = entry.getKey(); ValidationResult validationResult = entry.getValue(); + leproseryEventsIndicesMetricsCollector.markEvent(document.index()); return toLeproseryEvent(document, validationResult.error()); }) .filter(Optional::isPresent) From 1203db4e1ca776236fa659538e1231c202b4e206 Mon Sep 17 00:00:00 2001 From: "a.akkuzin" Date: Fri, 18 Feb 2022 17:39:03 +0400 Subject: [PATCH 08/13] Explicitly described supported types --- .../ru/kontur/vostok/hercules/protocol/Type.java | 12 ++++++++++++ .../hercules/sink/filter/BlacklistEventFilter.java | 2 ++ 2 files changed, 14 insertions(+) diff --git a/hercules-protocol/src/main/java/ru/kontur/vostok/hercules/protocol/Type.java b/hercules-protocol/src/main/java/ru/kontur/vostok/hercules/protocol/Type.java index 47699d18..2cd56201 100644 --- a/hercules-protocol/src/main/java/ru/kontur/vostok/hercules/protocol/Type.java +++ b/hercules-protocol/src/main/java/ru/kontur/vostok/hercules/protocol/Type.java @@ -38,6 +38,18 @@ public enum Type { /** * Returns {@code true} if {@link Type} is primitive. + *
    List of primitive types: + *
  • {@link Type#BYTE}
  • + *
  • {@link Type#SHORT}
  • + *
  • {@link Type#INTEGER}
  • + *
  • {@link Type#LONG}
  • + *
  • {@link Type#FLAG}
  • + *
  • {@link Type#FLOAT}
  • + *
  • {@link Type#DOUBLE}
  • + *
  • {@link Type#STRING}
  • + *
  • {@link Type#UUID}
  • + *
  • {@link Type#NULL}
  • + *
* * @return {@code true} if {@link Type} is primitive. */ diff --git a/hercules-sink/src/main/java/ru/kontur/vostok/hercules/sink/filter/BlacklistEventFilter.java b/hercules-sink/src/main/java/ru/kontur/vostok/hercules/sink/filter/BlacklistEventFilter.java index dc6b2672..55a9d4a0 100644 --- a/hercules-sink/src/main/java/ru/kontur/vostok/hercules/sink/filter/BlacklistEventFilter.java +++ b/hercules-sink/src/main/java/ru/kontur/vostok/hercules/sink/filter/BlacklistEventFilter.java @@ -26,6 +26,8 @@ *

* Blacklist filter supports star {@code *} in the pattern definition. It means {@code any value}. *

+ * Blacklist filter supports only primitive types (see {@link Type#isPrimitive()}). + *

* Sample: *

{@code types=STRING,STRING,INTEGER
  * paths=properties/project,properties/environment,properties/id

From 6c67fc5b37da35f74aca05f5c5c2e394d1f07f74 Mon Sep 17 00:00:00 2001
From: "a.akkuzin" 
Date: Fri, 25 Feb 2022 17:49:30 +0400
Subject: [PATCH 09/13] fix timestamp offsets mapping

---
 .../hercules/json/format/CombineMapper.java   |  6 +--
 .../json/format/combiner/Combiner.java        |  4 +-
 .../format/combiner/IsoDateTimeCombiner.java  | 37 +++++++++++++------
 .../format/combiner/LatencyMsCombiner.java    |  8 ++--
 .../combiner/LatencyStringCombiner.java       |  8 ++--
 .../json/format/combiner/CombinerTest.java    |  2 +
 6 files changed, 41 insertions(+), 24 deletions(-)

diff --git a/hercules-json/src/main/java/ru/kontur/vostok/hercules/json/format/CombineMapper.java b/hercules-json/src/main/java/ru/kontur/vostok/hercules/json/format/CombineMapper.java
index 26d275a5..eb1910bb 100644
--- a/hercules-json/src/main/java/ru/kontur/vostok/hercules/json/format/CombineMapper.java
+++ b/hercules-json/src/main/java/ru/kontur/vostok/hercules/json/format/CombineMapper.java
@@ -41,11 +41,7 @@ public void map(Event event, Document document) {
         Variant[] values = new Variant[sourcePaths.size()];
         int i = 0;
         for (HPath sourcePath : sourcePaths) {
-            Variant value = sourcePath.extract(payload);
-            if (value == null) {
-                return;
-            }
-            values[i++] = value;
+            values[i++] = sourcePath.extract(payload);
         }
 
         Object result = combiner.combine(values);
diff --git a/hercules-json/src/main/java/ru/kontur/vostok/hercules/json/format/combiner/Combiner.java b/hercules-json/src/main/java/ru/kontur/vostok/hercules/json/format/combiner/Combiner.java
index 883e0fa0..568fe60d 100644
--- a/hercules-json/src/main/java/ru/kontur/vostok/hercules/json/format/combiner/Combiner.java
+++ b/hercules-json/src/main/java/ru/kontur/vostok/hercules/json/format/combiner/Combiner.java
@@ -1,5 +1,6 @@
 package ru.kontur.vostok.hercules.json.format.combiner;
 
+import org.jetbrains.annotations.Nullable;
 import ru.kontur.vostok.hercules.protocol.Variant;
 import ru.kontur.vostok.hercules.util.ClassUtil;
 
@@ -12,11 +13,12 @@
 public interface Combiner {
     /**
      * Combine values into single one.
+     * Each value in {@code values} can be {@code null}.
      *
      * @param values values to combine
      * @return the result
      */
-    Object combine(Variant... values);
+    @Nullable Object combine(Variant... values);
 
     static Combiner fromClass(String className) {
         return ClassUtil.fromClass(className, Combiner.class);
diff --git a/hercules-json/src/main/java/ru/kontur/vostok/hercules/json/format/combiner/IsoDateTimeCombiner.java b/hercules-json/src/main/java/ru/kontur/vostok/hercules/json/format/combiner/IsoDateTimeCombiner.java
index 42026963..8336869c 100644
--- a/hercules-json/src/main/java/ru/kontur/vostok/hercules/json/format/combiner/IsoDateTimeCombiner.java
+++ b/hercules-json/src/main/java/ru/kontur/vostok/hercules/json/format/combiner/IsoDateTimeCombiner.java
@@ -1,5 +1,6 @@
 package ru.kontur.vostok.hercules.json.format.combiner;
 
+import org.jetbrains.annotations.Nullable;
 import ru.kontur.vostok.hercules.protocol.Type;
 import ru.kontur.vostok.hercules.protocol.Variant;
 import ru.kontur.vostok.hercules.util.time.TimeUtil;
@@ -20,24 +21,36 @@ public class IsoDateTimeCombiner implements Combiner {
     /**
      * Combine timestamp and timezone offset into ISO 8601 formatted date and time string.
      *
-     * @param values timestamp and timezone offset
-     * @return ISO 8601 formatted date and time string
+     * @param values timestamp (required) and timezone offset (optional)
+     * @return ISO 8601 formatted date and time string if timestamp is present, otherwise {@code null}
      */
     @Override
-    public Object combine(Variant... values) {
-        if (values.length != 2) {
-            throw new IllegalArgumentException("Combiner expects 2 args: timestamp and zone offset");
-        }
-        if (values[0].getType() != Type.LONG || values[1].getType() != Type.LONG) {
+    public @Nullable Object combine(Variant... values) {
+        Long timestamp = extractTimestampFrom(values);
+        if (timestamp == null) {
             return null;
         }
-
-        Variant timestamp = values[0];
-        Variant offset = values[1];
+        long offset = extractOffsetFrom(values);
 
         ZonedDateTime dateTime = ZonedDateTime.ofInstant(
-                TimeUtil.unixTicksToInstant((Long) timestamp.getValue()),
-                ZoneOffset.ofTotalSeconds((int) TimeUtil.ticksToSeconds((Long) offset.getValue())));
+                TimeUtil.unixTicksToInstant(timestamp),
+                ZoneOffset.ofTotalSeconds((int) TimeUtil.ticksToSeconds(offset)));
         return FORMATTER.format(dateTime);
     }
+
+    private Long extractTimestampFrom(Variant[] values) {
+        if (values.length == 0 || values.length > 2) {
+            throw new IllegalArgumentException("Combiner expects args like (timestamp) or (timestamp, offset)");
+        }
+
+        return values[0] != null && values[0].getType() == Type.LONG
+                ? (Long) values[0].getValue()
+                : null;
+    }
+
+    private long extractOffsetFrom(Variant[] values) {
+        return values.length == 2 && values[1] != null && values[1].getType() == Type.LONG
+                ? (long) values[1].getValue()
+                : 0L;
+    }
 }
diff --git a/hercules-json/src/main/java/ru/kontur/vostok/hercules/json/format/combiner/LatencyMsCombiner.java b/hercules-json/src/main/java/ru/kontur/vostok/hercules/json/format/combiner/LatencyMsCombiner.java
index d1797596..515a918c 100644
--- a/hercules-json/src/main/java/ru/kontur/vostok/hercules/json/format/combiner/LatencyMsCombiner.java
+++ b/hercules-json/src/main/java/ru/kontur/vostok/hercules/json/format/combiner/LatencyMsCombiner.java
@@ -1,5 +1,6 @@
 package ru.kontur.vostok.hercules.json.format.combiner;
 
+import org.jetbrains.annotations.Nullable;
 import ru.kontur.vostok.hercules.protocol.Type;
 import ru.kontur.vostok.hercules.protocol.Variant;
 import ru.kontur.vostok.hercules.util.time.TimeUtil;
@@ -14,14 +15,15 @@ public class LatencyMsCombiner implements Combiner {
      * Combine begin and end timestamp into the latency in milliseconds.
      *
      * @param values begin and end timestamp in 100ns ticks
-     * @return the latency in milliseconds
+     * @return the latency in milliseconds if both timestamps are present, otherwise {@code null}
      */
     @Override
-    public Object combine(Variant... values) {
+    public @Nullable Object combine(Variant... values) {
         if (values.length != 2) {
             throw new IllegalArgumentException("Combiner expects 2 timestamps");
         }
-        if (values[0].getType() != Type.LONG || values[1].getType() != Type.LONG) {
+        if (values[0] == null || values[1] == null
+                || values[0].getType() != Type.LONG || values[1].getType() != Type.LONG) {
             return null;
         }
 
diff --git a/hercules-json/src/main/java/ru/kontur/vostok/hercules/json/format/combiner/LatencyStringCombiner.java b/hercules-json/src/main/java/ru/kontur/vostok/hercules/json/format/combiner/LatencyStringCombiner.java
index d15ec422..6516c84b 100644
--- a/hercules-json/src/main/java/ru/kontur/vostok/hercules/json/format/combiner/LatencyStringCombiner.java
+++ b/hercules-json/src/main/java/ru/kontur/vostok/hercules/json/format/combiner/LatencyStringCombiner.java
@@ -1,5 +1,6 @@
 package ru.kontur.vostok.hercules.json.format.combiner;
 
+import org.jetbrains.annotations.Nullable;
 import ru.kontur.vostok.hercules.protocol.Type;
 import ru.kontur.vostok.hercules.protocol.Variant;
 import ru.kontur.vostok.hercules.util.time.TimeUtil;
@@ -15,14 +16,15 @@ public class LatencyStringCombiner implements Combiner {
      * Combine begin and end timestamp into the latency is formatted to a string with the measurement unit.
      *
      * @param values begin and end timestamp
-     * @return the string representation of the latency
+     * @return the string representation of the latency if both timestamps are present, otherwise {@code null}
      */
     @Override
-    public Object combine(Variant... values) {
+    public @Nullable Object combine(Variant... values) {
         if (values.length != 2) {
             throw new IllegalArgumentException("Combiner expects 2 timestamps");
         }
-        if (values[0].getType() != Type.LONG || values[1].getType() != Type.LONG) {
+        if (values[0] == null || values[1] == null
+                || values[0].getType() != Type.LONG || values[1].getType() != Type.LONG) {
             return null;
         }
 
diff --git a/hercules-json/src/test/java/ru/kontur/vostok/hercules/json/format/combiner/CombinerTest.java b/hercules-json/src/test/java/ru/kontur/vostok/hercules/json/format/combiner/CombinerTest.java
index e9108426..5ef7667f 100644
--- a/hercules-json/src/test/java/ru/kontur/vostok/hercules/json/format/combiner/CombinerTest.java
+++ b/hercules-json/src/test/java/ru/kontur/vostok/hercules/json/format/combiner/CombinerTest.java
@@ -16,6 +16,8 @@ public void testIsoDateTimeCombiner() {
 
         Combiner combiner = new IsoDateTimeCombiner();
         assertEquals("2020-07-23T12:50:50.149976600+03:00", combiner.combine(timestamp, offset));
+        assertEquals("2020-07-23T09:50:50.149976600Z", combiner.combine(timestamp));
+        assertEquals("2020-07-23T09:50:50.149976600Z", combiner.combine(timestamp, null));
     }
 
     @Test

From 1983803c7ba29de764fff5ea64ea5ff55e785f1e Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?=D0=A2=D0=BE=D0=BA=D0=BC=D1=8F=D0=BD=D0=B8=D0=BD=D0=B0=20?=
 =?UTF-8?q?=D0=A2=D0=B0=D1=82=D1=8C=D1=8F=D0=BD=D0=B0=20=D0=98=D0=B3=D0=BE?=
 =?UTF-8?q?=D1=80=D0=B5=D0=B2=D0=BD=D0=B0?= 
Date: Mon, 18 Apr 2022 09:50:08 +0500
Subject: [PATCH 10/13] Add new file

---
 hercules-init/application.properties | 1 +
 1 file changed, 1 insertion(+)
 create mode 100644 hercules-init/application.properties

diff --git a/hercules-init/application.properties b/hercules-init/application.properties
new file mode 100644
index 00000000..8b137891
--- /dev/null
+++ b/hercules-init/application.properties
@@ -0,0 +1 @@
+

From 1f991f01bdc6b592232b3984a7df3a6c3fd0f4d4 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?=D0=94=D0=B5=D0=BC=D0=B5=D0=BD=D0=B5=D0=B2=20=D0=9F=D0=B5?=
 =?UTF-8?q?=D1=82=D1=80=20=D0=90=D0=BD=D0=B4=D1=80=D0=B5=D0=B5=D0=B2=D0=B8?=
 =?UTF-8?q?=D1=87?= 
Date: Tue, 19 Apr 2022 19:02:31 +0500
Subject: [PATCH 11/13] Resolve "IndexResolver with list of indices and tags"

---
 hercules-elastic-sink/README.md               | 29 ++++++
 .../sink/index/MatchingIndexResolver.java     | 95 ++++++++++++++++++
 .../sink/index/matching/ConfigParser.java     | 51 ++++++++++
 .../sink/index/matching/IndexData.java        | 26 +++++
 .../sink/index/matching/IndexDataModel.java   | 30 ++++++
 .../elastic/sink/index/matching/TagMap.java   | 50 ++++++++++
 .../sink/index/MatchingIndexResolverTest.java | 97 +++++++++++++++++++
 .../src/test/resources/indices.json           | 26 +++++
 .../graphite/sink/acl/AclParserTest.java      |  2 +-
 9 files changed, 405 insertions(+), 1 deletion(-)
 create mode 100644 hercules-elastic-sink/src/main/java/ru/kontur/vostok/hercules/elastic/sink/index/MatchingIndexResolver.java
 create mode 100644 hercules-elastic-sink/src/main/java/ru/kontur/vostok/hercules/elastic/sink/index/matching/ConfigParser.java
 create mode 100644 hercules-elastic-sink/src/main/java/ru/kontur/vostok/hercules/elastic/sink/index/matching/IndexData.java
 create mode 100644 hercules-elastic-sink/src/main/java/ru/kontur/vostok/hercules/elastic/sink/index/matching/IndexDataModel.java
 create mode 100644 hercules-elastic-sink/src/main/java/ru/kontur/vostok/hercules/elastic/sink/index/matching/TagMap.java
 create mode 100644 hercules-elastic-sink/src/test/java/ru/kontur/vostok/hercules/elastic/sink/index/MatchingIndexResolverTest.java
 create mode 100644 hercules-elastic-sink/src/test/resources/indices.json

diff --git a/hercules-elastic-sink/README.md b/hercules-elastic-sink/README.md
index 35ef606c..f68db14d 100644
--- a/hercules-elastic-sink/README.md
+++ b/hercules-elastic-sink/README.md
@@ -211,3 +211,32 @@ move properties/* to *
 # Render structured exception as string stack trace
 transform exception to stackTrace using ru.kontur.vostok.hercules.elastic.sink.format.ExceptionToStackTraceTransformer
 ```
+
+### `indices.json` sample:
+```json
+[
+  {
+    "index": "first-index",
+    "tagMaps": [
+      {
+        "some-tag": "some-value",
+        "other-tag": "other-value"
+      },
+      {
+        "some-tag": "alternative-value"
+      }
+    ]
+  },
+  {
+    "index": "second-index",
+    "tagMaps": [
+      {
+        "some-tag": "for-second-index-value"
+      },
+      {
+        "special-tag": "special-value"
+      }
+    ]
+  }
+]
+```
diff --git a/hercules-elastic-sink/src/main/java/ru/kontur/vostok/hercules/elastic/sink/index/MatchingIndexResolver.java b/hercules-elastic-sink/src/main/java/ru/kontur/vostok/hercules/elastic/sink/index/MatchingIndexResolver.java
new file mode 100644
index 00000000..d666c317
--- /dev/null
+++ b/hercules-elastic-sink/src/main/java/ru/kontur/vostok/hercules/elastic/sink/index/MatchingIndexResolver.java
@@ -0,0 +1,95 @@
+package ru.kontur.vostok.hercules.elastic.sink.index;
+
+import ru.kontur.vostok.hercules.configuration.Sources;
+import ru.kontur.vostok.hercules.elastic.sink.index.matching.ConfigParser;
+import ru.kontur.vostok.hercules.elastic.sink.index.matching.IndexData;
+import ru.kontur.vostok.hercules.protocol.Event;
+import ru.kontur.vostok.hercules.util.parameter.Parameter;
+import ru.kontur.vostok.hercules.util.properties.PropertiesUtil;
+
+import java.util.Optional;
+import java.util.Properties;
+
+/**
+ * Resolves the index name by choice from file with predefined indices and corresponding tag values.
+ * 

+ * File format rules: + *

    + *
  • File content must be in JSON format. + *
  • The content consist of index data array. + *
  • Value of the field {@code index} is an index name. + *
  • Value of the field {@code tagMaps} is an array of tag maps. + *
  • Every tag map consist of set of tag-value pairs. + *
  • Tag value may be present as plain string or regular expressions. + *
+ *

+ * File content sample: + *

{@code
+ * [
+ *   {
+ *     "index": "first-index",
+ *     "tagMaps": [
+ *       {
+ *         "some-tag": "some-value",
+ *         "outer-tag/inner-tag": "inner-value"
+ *       },
+ *       {
+ *         "some-tag": "alternative-value"
+ *       }
+ *     ]
+ *   },
+ *   {
+ *     "index": "second-index",
+ *     "tagMaps": [
+ *       {
+ *         "some-tag": "for-second-index-value"
+ *       },
+ *       {
+ *         "special-tag": "special-value"
+ *       }
+ *     ]
+ *   }
+ * ]}
+ * The event matches the index if the event contains all tags from at least one of index tag maps.

+ * If the event matches one or more indices then the name of first of these indices is returned with {@link Optional} wrap.

+ * If the event doesn't match all indices then the resolver returns empty {@link Optional}.

+ *
+ * Example for above file content sample: + *

  • for event with single tag {@code 'some-tag=for-second-index-value'} index {@code 'second-index'} will be resolved, + *
  • for event with single tag {@code 'special-tag=special-value'} index {@code 'second-index'} will be resolved, + *
  • for event with tags {@code 'some-tag=some-value'} and {@code 'outer-tag/inner-tag=inner-value'} + * without other tags, index {@code 'first-index'} will be resolved, + *
  • for event with tags {@code 'some-tag=alternative-value'} and {@code 'special-tag=special-value'} + * without other tags, index {@code 'first-index'} will be resolved, + *
  • for event with tags {@code 'some-tag=some-value'} and {@code 'special-tag=non-special-value'} + * without other tags, no index will be resolved. + * + * @author Petr Demenev + */ +public class MatchingIndexResolver extends IndexResolver { + + private final IndexData[] indices; + + public MatchingIndexResolver(Properties properties) { + super(properties); + String filePath = PropertiesUtil.get(Props.FILE_PATH, properties).get(); + indices = ConfigParser.parse(Sources.load(filePath)); + } + + @Override + public Optional resolve(Event event) { + for (IndexData index : indices) { + boolean matches = index.getTagMaps().stream().anyMatch(tagMap -> tagMap.matches(event)); + if (matches) { + return Optional.of(index.getIndex()); + } + } + return Optional.empty(); + } + + private static class Props { + private static final Parameter FILE_PATH = Parameter.stringParameter("file"). + withDefault("file://indices.json"). + build(); + } +} diff --git a/hercules-elastic-sink/src/main/java/ru/kontur/vostok/hercules/elastic/sink/index/matching/ConfigParser.java b/hercules-elastic-sink/src/main/java/ru/kontur/vostok/hercules/elastic/sink/index/matching/ConfigParser.java new file mode 100644 index 00000000..6e478552 --- /dev/null +++ b/hercules-elastic-sink/src/main/java/ru/kontur/vostok/hercules/elastic/sink/index/matching/ConfigParser.java @@ -0,0 +1,51 @@ +package ru.kontur.vostok.hercules.elastic.sink.index.matching; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.ObjectReader; +import ru.kontur.vostok.hercules.protocol.hpath.HPath; +import ru.kontur.vostok.hercules.util.Maps; + +import java.io.IOException; +import java.io.InputStream; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +/** + * Configures MatchingIndexResolver. + * Parses raw index data array from JSON and transform its contents to usable object types + * + * @author Petr Demenev + */ +public class ConfigParser { + + public static IndexData[] parse(InputStream in) { + ObjectMapper mapper = new ObjectMapper(); + ObjectReader reader = mapper.readerFor(new TypeReference() {}); + IndexDataModel[] indices; + try { + indices = reader.readValue(in); + } catch (IOException ex) { + throw new IllegalArgumentException("Cannot parse index map", ex); + } + return Arrays.stream(indices). + map(i -> new IndexData(i.getIndex(), transform(i.getTagMaps()))). + toArray(IndexData[]::new); + } + + private static List transform(List> list) { + return list.stream(). + map(sourceMap -> { + Map resultMap = new HashMap<>(Maps.effectiveHashMapCapacity(sourceMap.size())); + sourceMap.forEach((key, value) -> resultMap.put( + HPath.fromPath(key), + Pattern.compile(value == null ? "null" : value))); + return new TagMap(resultMap); + }). + collect(Collectors.toList()); + } +} diff --git a/hercules-elastic-sink/src/main/java/ru/kontur/vostok/hercules/elastic/sink/index/matching/IndexData.java b/hercules-elastic-sink/src/main/java/ru/kontur/vostok/hercules/elastic/sink/index/matching/IndexData.java new file mode 100644 index 00000000..c0bdd2f8 --- /dev/null +++ b/hercules-elastic-sink/src/main/java/ru/kontur/vostok/hercules/elastic/sink/index/matching/IndexData.java @@ -0,0 +1,26 @@ +package ru.kontur.vostok.hercules.elastic.sink.index.matching; + +import java.util.List; + +/** + * Index data with index name and corresponding list of tag maps + * + * @author Petr Demenev + */ +public class IndexData { + private final String index; + private final List tagMaps; + + public IndexData(String index, List tagMaps) { + this.index = index; + this.tagMaps = tagMaps; + } + + public String getIndex() { + return index; + } + + public List getTagMaps() { + return tagMaps; + } +} diff --git a/hercules-elastic-sink/src/main/java/ru/kontur/vostok/hercules/elastic/sink/index/matching/IndexDataModel.java b/hercules-elastic-sink/src/main/java/ru/kontur/vostok/hercules/elastic/sink/index/matching/IndexDataModel.java new file mode 100644 index 00000000..3d1fd587 --- /dev/null +++ b/hercules-elastic-sink/src/main/java/ru/kontur/vostok/hercules/elastic/sink/index/matching/IndexDataModel.java @@ -0,0 +1,30 @@ +package ru.kontur.vostok.hercules.elastic.sink.index.matching; + +import java.util.List; +import java.util.Map; + +/** + * Raw model for index data. The model is parsed from JSON + * + * @author Petr Demenev + */ +public class IndexDataModel { + private String index; + private List> tagMaps; + + public String getIndex() { + return index; + } + + public List> getTagMaps() { + return tagMaps; + } + + public void setIndex(String index) { + this.index = index; + } + + public void setTagMaps(List> tagMaps) { + this.tagMaps = tagMaps; + } +} diff --git a/hercules-elastic-sink/src/main/java/ru/kontur/vostok/hercules/elastic/sink/index/matching/TagMap.java b/hercules-elastic-sink/src/main/java/ru/kontur/vostok/hercules/elastic/sink/index/matching/TagMap.java new file mode 100644 index 00000000..b8217910 --- /dev/null +++ b/hercules-elastic-sink/src/main/java/ru/kontur/vostok/hercules/elastic/sink/index/matching/TagMap.java @@ -0,0 +1,50 @@ +package ru.kontur.vostok.hercules.elastic.sink.index.matching; + +import ru.kontur.vostok.hercules.protocol.Event; +import ru.kontur.vostok.hercules.protocol.Type; +import ru.kontur.vostok.hercules.protocol.Variant; +import ru.kontur.vostok.hercules.protocol.hpath.HPath; + +import java.nio.charset.StandardCharsets; +import java.util.Map; +import java.util.regex.Pattern; + +/** + * @author Anton Akkuzin + */ +public class TagMap { + private final Map tagMap; + + public TagMap(Map tagMap) { + this.tagMap = tagMap; + } + + /** + * Checks the given {@link Event} matches to all patterns. + * + * @param event event to test + * @return {@code true} if {@code event} tags match to all patterns. + */ + public boolean matches(Event event) { + for (Map.Entry tagMapPair : tagMap.entrySet()) { + Variant tag = tagMapPair.getKey().extract(event.getPayload()); + if (tag == null) { + return false; + } + if (!tag.getType().isPrimitive()) { + return false; + } + Pattern patternTagValue = tagMapPair.getValue(); + if (!patternTagValue.matcher(stringOf(tag)).matches()) { + return false; + } + } + return true; + } + + private String stringOf(Variant variant) { + return variant.getType() == Type.STRING + ? new String((byte[]) variant.getValue(), StandardCharsets.UTF_8) + : String.valueOf(variant.getValue()); + } +} diff --git a/hercules-elastic-sink/src/test/java/ru/kontur/vostok/hercules/elastic/sink/index/MatchingIndexResolverTest.java b/hercules-elastic-sink/src/test/java/ru/kontur/vostok/hercules/elastic/sink/index/MatchingIndexResolverTest.java new file mode 100644 index 00000000..3c55570c --- /dev/null +++ b/hercules-elastic-sink/src/test/java/ru/kontur/vostok/hercules/elastic/sink/index/MatchingIndexResolverTest.java @@ -0,0 +1,97 @@ +package ru.kontur.vostok.hercules.elastic.sink.index; + +import org.junit.Assert; +import org.junit.Test; +import ru.kontur.vostok.hercules.protocol.Container; +import ru.kontur.vostok.hercules.protocol.Event; +import ru.kontur.vostok.hercules.protocol.EventBuilder; +import ru.kontur.vostok.hercules.protocol.Variant; + +import java.util.Optional; +import java.util.Properties; + +/** + * @author Petr Demenev + */ +public class MatchingIndexResolverTest { + private static final IndexResolver INDEX_RESOLVER; + + static { + Properties props = new Properties(); + props.setProperty("file", "resource://indices.json"); + INDEX_RESOLVER = new MatchingIndexResolver(props); + } + + @Test + public void shouldResolveIndexWithRegex() { + final Event event = EventBuilder.create(0, "00000000-0000-0000-0000-000000000000"). + tag("some-tag", Variant.ofString("some-value")). + tag("other-tag", Variant.ofString("other-value-123")). + tag("outer-tag", Variant.ofContainer( + Container.builder(). + tag("inner-tag", Variant.ofString("inner-value")). + build()) + ). + build(); + Optional index = INDEX_RESOLVER.resolve(event); + Assert.assertTrue(index.isPresent()); + Assert.assertEquals("first-index", index.get()); + } + + @Test + public void shouldResolveIndexWithNotStringPatternValues() { + final Event event = EventBuilder.create(0, "00000000-0000-0000-0000-000000000000"). + tag("number-tag", Variant.ofLong(0L)). + tag("null-tag", Variant.ofNull()). + tag("boolean-tag", Variant.ofFlag(true)). + build(); + Optional index = INDEX_RESOLVER.resolve(event); + Assert.assertTrue(index.isPresent()); + Assert.assertEquals("first-index", index.get()); + } + + @Test + public void shouldNotResolveIndex() { + final Event event = EventBuilder.create(0, "00000000-0000-0000-0000-000000000000"). + tag("some-tag", Variant.ofString("some-value")). + tag("other-tag", Variant.ofString("other-value")). + tag("outer-tag", Variant.ofContainer( + Container.builder(). + tag("inner-tag", Variant.ofString("inner-value")). + build()) + ). + build(); + Optional index = INDEX_RESOLVER.resolve(event); + Assert.assertFalse(index.isPresent()); + } + + @Test + public void shouldResolveSecondIndex() { + final Event event = EventBuilder.create(0, "00000000-0000-0000-0000-000000000000"). + tag("foo", Variant.ofInteger(1)). + tag("bar", Variant.ofString("bar-value")). + tag("other-tag", Variant.ofString("other-value")). + build(); + Optional index = INDEX_RESOLVER.resolve(event); + Assert.assertTrue(index.isPresent()); + Assert.assertEquals("second-index", index.get()); + } + + @Test + public void shouldResolveFirstIndexWhenEventMatchesToTwoIndices() { + final Event event = EventBuilder.create(0, "00000000-0000-0000-0000-000000000000"). + tag("foo", Variant.ofInteger(1)). + tag("bar", Variant.ofString("bar-value")). + tag("some-tag", Variant.ofString("some-value")). + tag("other-tag", Variant.ofString("other-value-123")). + tag("outer-tag", Variant.ofContainer( + Container.builder(). + tag("inner-tag", Variant.ofString("inner-value")). + build()) + ). + build(); + Optional index = INDEX_RESOLVER.resolve(event); + Assert.assertTrue(index.isPresent()); + Assert.assertEquals("first-index", index.get()); + } +} diff --git a/hercules-elastic-sink/src/test/resources/indices.json b/hercules-elastic-sink/src/test/resources/indices.json new file mode 100644 index 00000000..a5107b54 --- /dev/null +++ b/hercules-elastic-sink/src/test/resources/indices.json @@ -0,0 +1,26 @@ +[ + { + "index": "first-index", + "tagMaps": [ + { + "some-tag": "some-value", + "other-tag": "other-value-\\d*", + "outer-tag/inner-tag": "inner-value" + }, + { + "number-tag": 0, + "null-tag": null, + "boolean-tag": true + } + ] + }, + { + "index": "second-index", + "tagMaps": [ + { + "foo": 1, + "bar": "bar-value" + } + ] + } +] diff --git a/hercules-graphite-sink/src/test/java/ru/kontur/vostok/hercules/graphite/sink/acl/AclParserTest.java b/hercules-graphite-sink/src/test/java/ru/kontur/vostok/hercules/graphite/sink/acl/AclParserTest.java index 7b014824..78689196 100644 --- a/hercules-graphite-sink/src/test/java/ru/kontur/vostok/hercules/graphite/sink/acl/AclParserTest.java +++ b/hercules-graphite-sink/src/test/java/ru/kontur/vostok/hercules/graphite/sink/acl/AclParserTest.java @@ -42,4 +42,4 @@ public void shouldThrowExceptionIfIncorrectFileContent() { String source = "DENY:test.foo.bar.*"; AclParser.parse(new ByteArrayInputStream(source.getBytes(StandardCharsets.UTF_8))); } -} \ No newline at end of file +} From 0cf434e9cf359429baf196d590771b77be5e47c9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=A2=D0=BE=D0=BA=D0=BC=D1=8F=D0=BD=D0=B8=D0=BD=D0=B0=20?= =?UTF-8?q?=D0=A2=D0=B0=D1=82=D1=8C=D1=8F=D0=BD=D0=B0=20=D0=98=D0=B3=D0=BE?= =?UTF-8?q?=D1=80=D0=B5=D0=B2=D0=BD=D0=B0?= Date: Wed, 20 Apr 2022 16:25:53 +0500 Subject: [PATCH 12/13] Resolve "Check templates before creating (elastic)" --- .../elastic/sink/index/IlmIndexCreator.java | 24 ++++++++++++++++++- 1 file changed, 23 insertions(+), 1 deletion(-) diff --git a/hercules-elastic-sink/src/main/java/ru/kontur/vostok/hercules/elastic/sink/index/IlmIndexCreator.java b/hercules-elastic-sink/src/main/java/ru/kontur/vostok/hercules/elastic/sink/index/IlmIndexCreator.java index 3aee254b..f6060a44 100644 --- a/hercules-elastic-sink/src/main/java/ru/kontur/vostok/hercules/elastic/sink/index/IlmIndexCreator.java +++ b/hercules-elastic-sink/src/main/java/ru/kontur/vostok/hercules/elastic/sink/index/IlmIndexCreator.java @@ -20,7 +20,29 @@ public IlmIndexCreator(RestClient restClient) { @Override public boolean create(String index) { - return createTemplate(index) && createIndex(index); + boolean result = templateExists(index) || createTemplate(index); + return result && createIndex(index); + } + + /** + *
    {@code GET _template/${index}
    +     * }
    + * + * @param index + * @return + */ + public boolean templateExists(String index) { + try { + Response response = + restClient.performRequest( + "GET", + "/_template/" + index + ); + return HttpStatusCodes.isSuccess(response.getStatusLine().getStatusCode()); + } catch (IOException ex) { + LOGGER.warn("Cannot check template due to exception", ex); + return false; + } } /** From 5fe4f75623b309a4bfa896eba597e94483f82bfc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=9A=D0=BE=D1=88=D0=B5=D0=BB=D0=B5=D0=B2=20=D0=93=D1=80?= =?UTF-8?q?=D0=B8=D0=B3=D0=BE=D1=80=D0=B8=D0=B9=20=D0=9D=D0=B8=D0=BA=D0=BE?= =?UTF-8?q?=D0=BB=D0=B0=D0=B5=D0=B2=D0=B8=D1=87?= Date: Thu, 21 Apr 2022 13:22:43 +0500 Subject: [PATCH 13/13] Resolve "Add connection.ttl in GraphiteSink" --- hercules-graphite-sink/README.md | 3 +++ hercules-graphite-sink/application.properties | 1 + .../graphite/sink/connection/Endpoint.java | 20 ++++++++++++++++++- .../sink/connection/EndpointPool.java | 12 +++++++++++ .../sink/connection/EndpointTest.java | 1 + 5 files changed, 36 insertions(+), 1 deletion(-) diff --git a/hercules-graphite-sink/README.md b/hercules-graphite-sink/README.md index c9ba7714..fe5db33b 100644 --- a/hercules-graphite-sink/README.md +++ b/hercules-graphite-sink/README.md @@ -42,6 +42,8 @@ Application is configured through properties file. `sink.sender.graphite.connector.local.connection.limit.per.endpoint` - maximum connections per local endpoint, default value: `3` +`sink.sender.graphite.connector.local.connection.ttl.ms` - TTL for connection in milliseconds. If missing, connections will never expire. + `sink.sender.graphite.connector.local.socket.timeout.ms` - timeout in milliseconds to create TCP-connection with a local endpoint, default value: `2 000` `sink.sender.graphite.connector.remote.endpoints` - list of remote Graphite endpoints in form `host:port`, optional @@ -135,6 +137,7 @@ sink.sender.graphite.replace.dots=true sink.sender.graphite.connector.local.endpoints=localhost:2003 sink.sender.graphite.connector.local.frozen.time.ms=30000 sink.sender.graphite.connector.local.connection.limit.per.endpoint=3 +sink.sender.graphite.connector.local.connection.ttl.ms=3600000 sink.sender.graphite.connector.local.socket.timeout.ms=2000 sink.sender.graphite.connector.remote.endpoints= diff --git a/hercules-graphite-sink/application.properties b/hercules-graphite-sink/application.properties index bd3d0592..ea84734c 100644 --- a/hercules-graphite-sink/application.properties +++ b/hercules-graphite-sink/application.properties @@ -18,6 +18,7 @@ sink.sender.graphite.replace.dots=true sink.sender.graphite.connector.local.endpoints=localhost:2003 sink.sender.graphite.connector.local.frozen.time.ms=30000 sink.sender.graphite.connector.local.connection.limit.per.endpoint=3 +sink.sender.graphite.connector.local.connection.ttl.ms=3600000 sink.sender.graphite.connector.local.socket.timeout.ms=2000 sink.sender.graphite.connector.remote.endpoints= diff --git a/hercules-graphite-sink/src/main/java/ru/kontur/vostok/hercules/graphite/sink/connection/Endpoint.java b/hercules-graphite-sink/src/main/java/ru/kontur/vostok/hercules/graphite/sink/connection/Endpoint.java index 395317d1..22227624 100644 --- a/hercules-graphite-sink/src/main/java/ru/kontur/vostok/hercules/graphite/sink/connection/Endpoint.java +++ b/hercules-graphite-sink/src/main/java/ru/kontur/vostok/hercules/graphite/sink/connection/Endpoint.java @@ -37,6 +37,7 @@ public class Endpoint { private final ConcurrentLinkedQueue connections = new ConcurrentLinkedQueue<>(); private final AtomicInteger leasedConnections = new AtomicInteger(0); + private final long connectionTtlMs; private volatile boolean frozen; private volatile long frozenToMs; @@ -44,10 +45,12 @@ public class Endpoint { public Endpoint( InetSocketAddress address, int connectionLimit, + long connectionTtlMs, int socketTimeoutMs, TimeSource time) { this.address = address; this.connectionLimit = connectionLimit; + this.connectionTtlMs = connectionTtlMs; this.socketTimeoutMs = socketTimeoutMs; this.time = time; @@ -139,7 +142,7 @@ private Connection leaseConnection() throws EndpointException { private void releaseConnection(Connection connection) { try { - if (connection.isBroken()) { + if (connection.isBroken() || connection.isExpired()) { connection.close(); return; } @@ -158,6 +161,7 @@ class Connection { private final Socket socket; private final Writer writer; private boolean broken; + private long expiresAtMs; private Connection() throws IOException { this.socket = new Socket(Proxy.NO_PROXY); @@ -171,6 +175,9 @@ private Connection() throws IOException { socket.getOutputStream(), StandardCharsets.US_ASCII)); this.broken = false; + + long expiresAt = time.milliseconds() + connectionTtlMs; + this.expiresAtMs = (expiresAt > 0) ? expiresAt : Long.MAX_VALUE; } /** @@ -238,5 +245,16 @@ public void close() { public boolean isBroken() { return broken; } + + /** + * Check if the connection is expired. + * + * @return {@code true} if the connection is expired, otherwise return {@code false} + * + *

    + */ + public boolean isExpired() { + return time.milliseconds() > this.expiresAtMs; + } } } diff --git a/hercules-graphite-sink/src/main/java/ru/kontur/vostok/hercules/graphite/sink/connection/EndpointPool.java b/hercules-graphite-sink/src/main/java/ru/kontur/vostok/hercules/graphite/sink/connection/EndpointPool.java index 90fda1ea..cf46824b 100644 --- a/hercules-graphite-sink/src/main/java/ru/kontur/vostok/hercules/graphite/sink/connection/EndpointPool.java +++ b/hercules-graphite-sink/src/main/java/ru/kontur/vostok/hercules/graphite/sink/connection/EndpointPool.java @@ -29,6 +29,11 @@ public EndpointPool(Properties properties, TimeSource time) { this.frozenTimeMs = PropertiesUtil.get(Props.FROZEN_TIME_MS, properties).get(); int connectionLimitPerEndpoint = PropertiesUtil.get(Props.CONNECTION_LIMIT_PER_ENDPOINT, properties).get(); + + /* After August 17, 292278994 7:12:55 AM UTC connection will be never expire. + Just keep it in mind if this project will be alive so long */ + long connectionTtlMs = PropertiesUtil.get(Props.CONNECTION_TTL_MS, properties).get(); + int socketTimeoutMs = PropertiesUtil.get(Props.SOCKET_TIMEOUT_MS, properties).get(); Endpoint[] endpoints = Stream.of(PropertiesUtil.get(Props.ENDPOINTS, properties).orEmpty(new String[0])). @@ -36,6 +41,7 @@ public EndpointPool(Properties properties, TimeSource time) { new Endpoint( InetSocketAddressUtil.fromString(hostAndPort, 2003), connectionLimitPerEndpoint, + connectionTtlMs, socketTimeoutMs, time)). toArray(Endpoint[]::new); @@ -108,6 +114,12 @@ private static class Props { withDefault(3). build(); + static final Parameter CONNECTION_TTL_MS = + Parameter.longParameter("connection.ttl.ms"). + withDefault(Long.MAX_VALUE). + withValidator(LongValidators.positive()). + build(); + static final Parameter SOCKET_TIMEOUT_MS = Parameter.integerParameter("socket.timeout.ms"). withDefault(2_000). diff --git a/hercules-graphite-sink/src/test/java/ru/kontur/vostok/hercules/graphite/sink/connection/EndpointTest.java b/hercules-graphite-sink/src/test/java/ru/kontur/vostok/hercules/graphite/sink/connection/EndpointTest.java index ed7f1fc0..3c6567ab 100644 --- a/hercules-graphite-sink/src/test/java/ru/kontur/vostok/hercules/graphite/sink/connection/EndpointTest.java +++ b/hercules-graphite-sink/src/test/java/ru/kontur/vostok/hercules/graphite/sink/connection/EndpointTest.java @@ -19,6 +19,7 @@ public void shouldCorrectlyLeaseConnectionIfFrozen() throws EndpointException { new Endpoint( InetSocketAddressUtil.fromString("127.0.0.1", 2003), 3, + 3_600_000, 2_000, time);