From 10905913b8dfc7b6706587c4ae53210776349c0a Mon Sep 17 00:00:00 2001 From: Akram Yakubov Date: Sun, 30 Mar 2025 18:56:43 +0500 Subject: [PATCH 1/6] Tcp iperf proxy example --- examples/tcp.proxy.iperf/README.md | 23 ++++++++++++ examples/tcp.proxy.iperf/compose.yaml | 35 +++++++++++++++++++ examples/tcp.proxy.iperf/etc/zilla.yaml | 20 +++++++++++ .../tcp/internal/stream/TcpClientRouter.java | 2 +- 4 files changed, 79 insertions(+), 1 deletion(-) create mode 100644 examples/tcp.proxy.iperf/README.md create mode 100644 examples/tcp.proxy.iperf/compose.yaml create mode 100644 examples/tcp.proxy.iperf/etc/zilla.yaml diff --git a/examples/tcp.proxy.iperf/README.md b/examples/tcp.proxy.iperf/README.md new file mode 100644 index 0000000000..53ba73618c --- /dev/null +++ b/examples/tcp.proxy.iperf/README.md @@ -0,0 +1,23 @@ +# tcp.proxy.iperf + +Listens on tcp port `12345` and will proxy iperf traffic to iperf server. + +## Requirements + +- docker compose + +## Setup + +To `start` the Docker Compose stack defined in the [compose.yaml](compose.yaml) file, use: + +```bash +docker compose up -d +``` + +## Teardown + +To remove any resources created by the Docker Compose stack, use: + +```bash +docker compose down +``` diff --git a/examples/tcp.proxy.iperf/compose.yaml b/examples/tcp.proxy.iperf/compose.yaml new file mode 100644 index 0000000000..cf02fda542 --- /dev/null +++ b/examples/tcp.proxy.iperf/compose.yaml @@ -0,0 +1,35 @@ +name: ${NAMESPACE:-zilla-tcp-proxy-iperf} +services: + iperf-server: + image: networkstatic/iperf3 + container_name: iperf-server + command: -s + ports: + - "5201:5201" + + zilla: + image: ghcr.io/aklivity/zilla:develop-SNAPSHOT + restart: unless-stopped + hostname: zilla.examples.dev + ports: + - 12345:12345 + healthcheck: + interval: 5s + timeout: 3s + retries: 5 + test: ["CMD", "bash", "-c", "echo -n '' > /dev/tcp/127.0.0.1/12345"] + volumes: + - ./etc:/etc/zilla + command: start -v -e -Pzilla.engine.maximum.messages.per.read=10 -w 2 -Pzilla.engine.worker.capacity=128 + depends_on: + - iperf-server + + iperf-client: + image: networkstatic/iperf3 + container_name: iperf-client + entrypoint: > + sh -c "sleep 10 && + iperf3 -c zilla.examples.dev -p 12345 -P 50 -t 20" +networks: + default: + driver: bridge diff --git a/examples/tcp.proxy.iperf/etc/zilla.yaml b/examples/tcp.proxy.iperf/etc/zilla.yaml new file mode 100644 index 0000000000..46e820aa22 --- /dev/null +++ b/examples/tcp.proxy.iperf/etc/zilla.yaml @@ -0,0 +1,20 @@ +--- +name: example +bindings: + north_tcp_server: + type: tcp + kind: server + options: + host: 0.0.0.0 + port: 12345 + exit: south_tcp_client + south_tcp_client: + type: tcp + kind: client + options: + host: iperf-server + port: 5201 +telemetry: + exporters: + stdout_logs_exporter: + type: stdout diff --git a/runtime/binding-tcp/src/main/java/io/aklivity/zilla/runtime/binding/tcp/internal/stream/TcpClientRouter.java b/runtime/binding-tcp/src/main/java/io/aklivity/zilla/runtime/binding/tcp/internal/stream/TcpClientRouter.java index 049787e0e1..adaad8b45e 100644 --- a/runtime/binding-tcp/src/main/java/io/aklivity/zilla/runtime/binding/tcp/internal/stream/TcpClientRouter.java +++ b/runtime/binding-tcp/src/main/java/io/aklivity/zilla/runtime/binding/tcp/internal/stream/TcpClientRouter.java @@ -85,7 +85,7 @@ public InetSocketAddress resolve( try { - if (beginEx == null) + if (beginEx == null || port > 0) { InetAddress[] addresses = options != null ? resolveHost(options.host) : null; resolved = addresses != null ? new InetSocketAddress(addresses[0], port) : null; From 893eb032b447593d11574e30624b5a746dbf3c99 Mon Sep 17 00:00:00 2001 From: Akram Yakubov Date: Sun, 30 Mar 2025 19:08:56 +0500 Subject: [PATCH 2/6] Fix typos --- examples/{tcp.proxy.iperf => tcp.proxy}/README.md | 2 +- examples/{tcp.proxy.iperf => tcp.proxy}/compose.yaml | 2 +- examples/{tcp.proxy.iperf => tcp.proxy}/etc/zilla.yaml | 0 3 files changed, 2 insertions(+), 2 deletions(-) rename examples/{tcp.proxy.iperf => tcp.proxy}/README.md (95%) rename examples/{tcp.proxy.iperf => tcp.proxy}/compose.yaml (95%) rename examples/{tcp.proxy.iperf => tcp.proxy}/etc/zilla.yaml (100%) diff --git a/examples/tcp.proxy.iperf/README.md b/examples/tcp.proxy/README.md similarity index 95% rename from examples/tcp.proxy.iperf/README.md rename to examples/tcp.proxy/README.md index 53ba73618c..a4764bcd9c 100644 --- a/examples/tcp.proxy.iperf/README.md +++ b/examples/tcp.proxy/README.md @@ -1,4 +1,4 @@ -# tcp.proxy.iperf +# tcp.proxy Listens on tcp port `12345` and will proxy iperf traffic to iperf server. diff --git a/examples/tcp.proxy.iperf/compose.yaml b/examples/tcp.proxy/compose.yaml similarity index 95% rename from examples/tcp.proxy.iperf/compose.yaml rename to examples/tcp.proxy/compose.yaml index cf02fda542..c50c5fa173 100644 --- a/examples/tcp.proxy.iperf/compose.yaml +++ b/examples/tcp.proxy/compose.yaml @@ -1,4 +1,4 @@ -name: ${NAMESPACE:-zilla-tcp-proxy-iperf} +name: ${NAMESPACE:-zilla-tcp-proxy} services: iperf-server: image: networkstatic/iperf3 diff --git a/examples/tcp.proxy.iperf/etc/zilla.yaml b/examples/tcp.proxy/etc/zilla.yaml similarity index 100% rename from examples/tcp.proxy.iperf/etc/zilla.yaml rename to examples/tcp.proxy/etc/zilla.yaml From c58d859466086ee9bd81a9d9aa547707c92efa1c Mon Sep 17 00:00:00 2001 From: Akram Yakubov Date: Mon, 31 Mar 2025 20:58:19 +0500 Subject: [PATCH 3/6] Make socket per core --- examples/tcp.proxy/README.md | 23 ---------- examples/tcp.proxy/compose.yaml | 35 --------------- examples/tcp.proxy/etc/zilla.yaml | 20 --------- .../binding/tcp/internal/TcpBinding.java | 7 +-- .../config/TcpServerBindingConfig.java | 4 +- .../tcp/internal/stream/TcpServerFactory.java | 4 +- .../tcp/internal/stream/TcpServerRouter.java | 7 ++- .../tcp/internal/util/OperatingSystem.java | 45 ------------------- 8 files changed, 12 insertions(+), 133 deletions(-) delete mode 100644 examples/tcp.proxy/README.md delete mode 100644 examples/tcp.proxy/compose.yaml delete mode 100644 examples/tcp.proxy/etc/zilla.yaml delete mode 100644 runtime/binding-tcp/src/main/java/io/aklivity/zilla/runtime/binding/tcp/internal/util/OperatingSystem.java diff --git a/examples/tcp.proxy/README.md b/examples/tcp.proxy/README.md deleted file mode 100644 index a4764bcd9c..0000000000 --- a/examples/tcp.proxy/README.md +++ /dev/null @@ -1,23 +0,0 @@ -# tcp.proxy - -Listens on tcp port `12345` and will proxy iperf traffic to iperf server. - -## Requirements - -- docker compose - -## Setup - -To `start` the Docker Compose stack defined in the [compose.yaml](compose.yaml) file, use: - -```bash -docker compose up -d -``` - -## Teardown - -To remove any resources created by the Docker Compose stack, use: - -```bash -docker compose down -``` diff --git a/examples/tcp.proxy/compose.yaml b/examples/tcp.proxy/compose.yaml deleted file mode 100644 index c50c5fa173..0000000000 --- a/examples/tcp.proxy/compose.yaml +++ /dev/null @@ -1,35 +0,0 @@ -name: ${NAMESPACE:-zilla-tcp-proxy} -services: - iperf-server: - image: networkstatic/iperf3 - container_name: iperf-server - command: -s - ports: - - "5201:5201" - - zilla: - image: ghcr.io/aklivity/zilla:develop-SNAPSHOT - restart: unless-stopped - hostname: zilla.examples.dev - ports: - - 12345:12345 - healthcheck: - interval: 5s - timeout: 3s - retries: 5 - test: ["CMD", "bash", "-c", "echo -n '' > /dev/tcp/127.0.0.1/12345"] - volumes: - - ./etc:/etc/zilla - command: start -v -e -Pzilla.engine.maximum.messages.per.read=10 -w 2 -Pzilla.engine.worker.capacity=128 - depends_on: - - iperf-server - - iperf-client: - image: networkstatic/iperf3 - container_name: iperf-client - entrypoint: > - sh -c "sleep 10 && - iperf3 -c zilla.examples.dev -p 12345 -P 50 -t 20" -networks: - default: - driver: bridge diff --git a/examples/tcp.proxy/etc/zilla.yaml b/examples/tcp.proxy/etc/zilla.yaml deleted file mode 100644 index 46e820aa22..0000000000 --- a/examples/tcp.proxy/etc/zilla.yaml +++ /dev/null @@ -1,20 +0,0 @@ ---- -name: example -bindings: - north_tcp_server: - type: tcp - kind: server - options: - host: 0.0.0.0 - port: 12345 - exit: south_tcp_client - south_tcp_client: - type: tcp - kind: client - options: - host: iperf-server - port: 5201 -telemetry: - exporters: - stdout_logs_exporter: - type: stdout diff --git a/runtime/binding-tcp/src/main/java/io/aklivity/zilla/runtime/binding/tcp/internal/TcpBinding.java b/runtime/binding-tcp/src/main/java/io/aklivity/zilla/runtime/binding/tcp/internal/TcpBinding.java index 5ca4c1bb93..c9f8cb56b2 100644 --- a/runtime/binding-tcp/src/main/java/io/aklivity/zilla/runtime/binding/tcp/internal/TcpBinding.java +++ b/runtime/binding-tcp/src/main/java/io/aklivity/zilla/runtime/binding/tcp/internal/TcpBinding.java @@ -20,7 +20,6 @@ import java.util.concurrent.ConcurrentMap; import io.aklivity.zilla.runtime.binding.tcp.internal.config.TcpServerBindingConfig; -import io.aklivity.zilla.runtime.binding.tcp.internal.util.OperatingSystem; import io.aklivity.zilla.runtime.engine.EngineContext; import io.aklivity.zilla.runtime.engine.binding.Binding; import io.aklivity.zilla.runtime.engine.binding.BindingContext; @@ -61,10 +60,8 @@ public BindingContext supply( } private TcpServerBindingConfig supplyServer( - long bindingId) + long index) { - return OperatingSystem.detect() == OperatingSystem.OS.MACOS - ? servers.computeIfAbsent(bindingId, TcpServerBindingConfig::new) - : new TcpServerBindingConfig(bindingId); + return servers.computeIfAbsent(index, TcpServerBindingConfig::new); } } diff --git a/runtime/binding-tcp/src/main/java/io/aklivity/zilla/runtime/binding/tcp/internal/config/TcpServerBindingConfig.java b/runtime/binding-tcp/src/main/java/io/aklivity/zilla/runtime/binding/tcp/internal/config/TcpServerBindingConfig.java index f4f69e5874..ddecb977a7 100644 --- a/runtime/binding-tcp/src/main/java/io/aklivity/zilla/runtime/binding/tcp/internal/config/TcpServerBindingConfig.java +++ b/runtime/binding-tcp/src/main/java/io/aklivity/zilla/runtime/binding/tcp/internal/config/TcpServerBindingConfig.java @@ -40,9 +40,9 @@ public final class TcpServerBindingConfig private volatile ServerSocketChannel[] channels; public TcpServerBindingConfig( - long bindingId) + long index) { - this.id = bindingId; + this.id = index; this.binds = new AtomicInteger(); } diff --git a/runtime/binding-tcp/src/main/java/io/aklivity/zilla/runtime/binding/tcp/internal/stream/TcpServerFactory.java b/runtime/binding-tcp/src/main/java/io/aklivity/zilla/runtime/binding/tcp/internal/stream/TcpServerFactory.java index fc7e9a6f70..f5889d3f96 100644 --- a/runtime/binding-tcp/src/main/java/io/aklivity/zilla/runtime/binding/tcp/internal/stream/TcpServerFactory.java +++ b/runtime/binding-tcp/src/main/java/io/aklivity/zilla/runtime/binding/tcp/internal/stream/TcpServerFactory.java @@ -99,6 +99,7 @@ public class TcpServerFactory implements TcpStreamFactory private final int replyMax; private final int windowThreshold; private final int proxyTypeId; + private final int index; private final BindingHandler streamFactory; public TcpServerFactory( @@ -106,7 +107,7 @@ public TcpServerFactory( EngineContext context, LongFunction servers) { - this.router = new TcpServerRouter(config, context, this::handleAccept, servers); + this.router = new TcpServerRouter(context.index(), config, context, this::handleAccept, servers); this.writeBuffer = context.writeBuffer(); this.writeByteBuffer = ByteBuffer.allocateDirect(writeBuffer.capacity()).order(nativeOrder()); this.bufferPool = context.bufferPool(); @@ -116,6 +117,7 @@ public TcpServerFactory( this.supplyPollerKey = context::supplyPollerKey; this.streamFactory = context.streamFactory(); this.proxyTypeId = context.supplyTypeId("proxy"); + this.index = context.index(); final int readBufferSize = writeBuffer.capacity() - DataFW.FIELD_OFFSET_PAYLOAD; this.readByteBuffer = ByteBuffer.allocateDirect(readBufferSize).order(nativeOrder()); diff --git a/runtime/binding-tcp/src/main/java/io/aklivity/zilla/runtime/binding/tcp/internal/stream/TcpServerRouter.java b/runtime/binding-tcp/src/main/java/io/aklivity/zilla/runtime/binding/tcp/internal/stream/TcpServerRouter.java index 92fd0e4736..5ad6fc44bb 100644 --- a/runtime/binding-tcp/src/main/java/io/aklivity/zilla/runtime/binding/tcp/internal/stream/TcpServerRouter.java +++ b/runtime/binding-tcp/src/main/java/io/aklivity/zilla/runtime/binding/tcp/internal/stream/TcpServerRouter.java @@ -41,16 +41,19 @@ public final class TcpServerRouter private final ToIntFunction acceptHandler; private final Function supplyPollerKey; private final LongFunction lookupServer; + private final int index; private int remainingConnections; private boolean unbound; public TcpServerRouter( + int index, TcpConfiguration config, EngineContext context, ToIntFunction acceptHandler, LongFunction lookupServer) { + this.index = index; this.remainingConnections = config.maxConnections(); this.bindings = new Long2ObjectHashMap<>(); this.supplyPollerKey = context::supplyPollerKey; @@ -130,7 +133,7 @@ public void close( private void register( TcpBindingConfig binding) { - TcpServerBindingConfig server = lookupServer.apply(binding.id); + TcpServerBindingConfig server = lookupServer.apply(index); ServerSocketChannel[] channels = server.bind(binding.options); PollerKey[] acceptKeys = new PollerKey[channels.length]; @@ -158,7 +161,7 @@ private void unregister( } } - TcpServerBindingConfig server = lookupServer.apply(binding.id); + TcpServerBindingConfig server = lookupServer.apply(index); server.unbind(); } } diff --git a/runtime/binding-tcp/src/main/java/io/aklivity/zilla/runtime/binding/tcp/internal/util/OperatingSystem.java b/runtime/binding-tcp/src/main/java/io/aklivity/zilla/runtime/binding/tcp/internal/util/OperatingSystem.java deleted file mode 100644 index 4e3f2c0a64..0000000000 --- a/runtime/binding-tcp/src/main/java/io/aklivity/zilla/runtime/binding/tcp/internal/util/OperatingSystem.java +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Copyright 2021-2024 Aklivity Inc. - * - * Aklivity licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package io.aklivity.zilla.runtime.binding.tcp.internal.util; - -public final class OperatingSystem -{ - public enum OS - { - MACOS, LINUX, UNKNOWN - } - - private OperatingSystem() - { - // no instances - } - - public static OS detect() - { - OS os = OS.UNKNOWN; - String osName = System.getProperty("os.name").toLowerCase(); - if (osName.contains("mac")) - { - os = OS.MACOS; - } - else if (osName.contains("nix") || osName.contains("nux") || osName.contains("aix")) - { - os = OS.LINUX; - } - - return os; - } -} From 138eb0c23105de0edb15aff72824b72543e5fa1b Mon Sep 17 00:00:00 2001 From: Akram Yakubov Date: Mon, 31 Mar 2025 21:02:53 +0500 Subject: [PATCH 4/6] Revert back the change --- .../runtime/binding/tcp/internal/stream/TcpClientRouter.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/runtime/binding-tcp/src/main/java/io/aklivity/zilla/runtime/binding/tcp/internal/stream/TcpClientRouter.java b/runtime/binding-tcp/src/main/java/io/aklivity/zilla/runtime/binding/tcp/internal/stream/TcpClientRouter.java index adaad8b45e..049787e0e1 100644 --- a/runtime/binding-tcp/src/main/java/io/aklivity/zilla/runtime/binding/tcp/internal/stream/TcpClientRouter.java +++ b/runtime/binding-tcp/src/main/java/io/aklivity/zilla/runtime/binding/tcp/internal/stream/TcpClientRouter.java @@ -85,7 +85,7 @@ public InetSocketAddress resolve( try { - if (beginEx == null || port > 0) + if (beginEx == null) { InetAddress[] addresses = options != null ? resolveHost(options.host) : null; resolved = addresses != null ? new InetSocketAddress(addresses[0], port) : null; From 6d3b8ce43ec787f823726cbe056d642eb9a13d74 Mon Sep 17 00:00:00 2001 From: Akram Yakubov Date: Tue, 1 Apr 2025 16:23:05 +0500 Subject: [PATCH 5/6] Add tcp.proxy example --- examples/tcp.proxy/README.md | 23 +++++++++++++++++++++++ examples/tcp.proxy/compose.yaml | 31 +++++++++++++++++++++++++++++++ examples/tcp.proxy/etc/zilla.yaml | 20 ++++++++++++++++++++ 3 files changed, 74 insertions(+) create mode 100644 examples/tcp.proxy/README.md create mode 100644 examples/tcp.proxy/compose.yaml create mode 100644 examples/tcp.proxy/etc/zilla.yaml diff --git a/examples/tcp.proxy/README.md b/examples/tcp.proxy/README.md new file mode 100644 index 0000000000..a4764bcd9c --- /dev/null +++ b/examples/tcp.proxy/README.md @@ -0,0 +1,23 @@ +# tcp.proxy + +Listens on tcp port `12345` and will proxy iperf traffic to iperf server. + +## Requirements + +- docker compose + +## Setup + +To `start` the Docker Compose stack defined in the [compose.yaml](compose.yaml) file, use: + +```bash +docker compose up -d +``` + +## Teardown + +To remove any resources created by the Docker Compose stack, use: + +```bash +docker compose down +``` diff --git a/examples/tcp.proxy/compose.yaml b/examples/tcp.proxy/compose.yaml new file mode 100644 index 0000000000..a647055a03 --- /dev/null +++ b/examples/tcp.proxy/compose.yaml @@ -0,0 +1,31 @@ +name: ${NAMESPACE:-zilla-tcp-proxy} +services: + iperf-server: + image: networkstatic/iperf3 + container_name: iperf-server + command: -s + ports: + - "5201:5201" + + zilla: + image: ghcr.io/aklivity/zilla::${ZILLA_VERSION:-latest} + restart: unless-stopped + hostname: zilla.examples.dev + ports: + - "12345:12345" + volumes: + - ./etc:/etc/zilla + command: start -v -e -Pzilla.engine.maximum.messages.per.read=10 + depends_on: + - iperf-server + + iperf-client: + image: networkstatic/iperf3 + container_name: iperf-client + entrypoint: > + sh -c "sleep 10 && + iperf3 -c zilla.examples.dev -p 12345 -P 50 -t 20" + +networks: + default: + driver: bridge diff --git a/examples/tcp.proxy/etc/zilla.yaml b/examples/tcp.proxy/etc/zilla.yaml new file mode 100644 index 0000000000..46e820aa22 --- /dev/null +++ b/examples/tcp.proxy/etc/zilla.yaml @@ -0,0 +1,20 @@ +--- +name: example +bindings: + north_tcp_server: + type: tcp + kind: server + options: + host: 0.0.0.0 + port: 12345 + exit: south_tcp_client + south_tcp_client: + type: tcp + kind: client + options: + host: iperf-server + port: 5201 +telemetry: + exporters: + stdout_logs_exporter: + type: stdout From f101d115cb7b1b355d4a3027e1f57d5339721faf Mon Sep 17 00:00:00 2001 From: Akram Yakubov Date: Tue, 1 Apr 2025 18:24:59 +0500 Subject: [PATCH 6/6] Revert back changes --- examples/tcp.proxy/compose.yaml | 6 ++- .../binding/tcp/internal/TcpBinding.java | 7 ++- .../config/TcpServerBindingConfig.java | 4 +- .../tcp/internal/stream/TcpServerFactory.java | 4 +- .../tcp/internal/stream/TcpServerRouter.java | 7 +-- .../tcp/internal/util/OperatingSystem.java | 45 +++++++++++++++++++ 6 files changed, 60 insertions(+), 13 deletions(-) create mode 100644 runtime/binding-tcp/src/main/java/io/aklivity/zilla/runtime/binding/tcp/internal/util/OperatingSystem.java diff --git a/examples/tcp.proxy/compose.yaml b/examples/tcp.proxy/compose.yaml index a647055a03..000413f02e 100644 --- a/examples/tcp.proxy/compose.yaml +++ b/examples/tcp.proxy/compose.yaml @@ -16,6 +16,11 @@ services: volumes: - ./etc:/etc/zilla command: start -v -e -Pzilla.engine.maximum.messages.per.read=10 + healthcheck: + interval: 5s + timeout: 3s + retries: 5 + test: ["CMD", "bash", "-c", "echo -n '' > /dev/tcp/127.0.0.1/12345"] depends_on: - iperf-server @@ -25,7 +30,6 @@ services: entrypoint: > sh -c "sleep 10 && iperf3 -c zilla.examples.dev -p 12345 -P 50 -t 20" - networks: default: driver: bridge diff --git a/runtime/binding-tcp/src/main/java/io/aklivity/zilla/runtime/binding/tcp/internal/TcpBinding.java b/runtime/binding-tcp/src/main/java/io/aklivity/zilla/runtime/binding/tcp/internal/TcpBinding.java index c9f8cb56b2..5ca4c1bb93 100644 --- a/runtime/binding-tcp/src/main/java/io/aklivity/zilla/runtime/binding/tcp/internal/TcpBinding.java +++ b/runtime/binding-tcp/src/main/java/io/aklivity/zilla/runtime/binding/tcp/internal/TcpBinding.java @@ -20,6 +20,7 @@ import java.util.concurrent.ConcurrentMap; import io.aklivity.zilla.runtime.binding.tcp.internal.config.TcpServerBindingConfig; +import io.aklivity.zilla.runtime.binding.tcp.internal.util.OperatingSystem; import io.aklivity.zilla.runtime.engine.EngineContext; import io.aklivity.zilla.runtime.engine.binding.Binding; import io.aklivity.zilla.runtime.engine.binding.BindingContext; @@ -60,8 +61,10 @@ public BindingContext supply( } private TcpServerBindingConfig supplyServer( - long index) + long bindingId) { - return servers.computeIfAbsent(index, TcpServerBindingConfig::new); + return OperatingSystem.detect() == OperatingSystem.OS.MACOS + ? servers.computeIfAbsent(bindingId, TcpServerBindingConfig::new) + : new TcpServerBindingConfig(bindingId); } } diff --git a/runtime/binding-tcp/src/main/java/io/aklivity/zilla/runtime/binding/tcp/internal/config/TcpServerBindingConfig.java b/runtime/binding-tcp/src/main/java/io/aklivity/zilla/runtime/binding/tcp/internal/config/TcpServerBindingConfig.java index ddecb977a7..f4f69e5874 100644 --- a/runtime/binding-tcp/src/main/java/io/aklivity/zilla/runtime/binding/tcp/internal/config/TcpServerBindingConfig.java +++ b/runtime/binding-tcp/src/main/java/io/aklivity/zilla/runtime/binding/tcp/internal/config/TcpServerBindingConfig.java @@ -40,9 +40,9 @@ public final class TcpServerBindingConfig private volatile ServerSocketChannel[] channels; public TcpServerBindingConfig( - long index) + long bindingId) { - this.id = index; + this.id = bindingId; this.binds = new AtomicInteger(); } diff --git a/runtime/binding-tcp/src/main/java/io/aklivity/zilla/runtime/binding/tcp/internal/stream/TcpServerFactory.java b/runtime/binding-tcp/src/main/java/io/aklivity/zilla/runtime/binding/tcp/internal/stream/TcpServerFactory.java index f5889d3f96..fc7e9a6f70 100644 --- a/runtime/binding-tcp/src/main/java/io/aklivity/zilla/runtime/binding/tcp/internal/stream/TcpServerFactory.java +++ b/runtime/binding-tcp/src/main/java/io/aklivity/zilla/runtime/binding/tcp/internal/stream/TcpServerFactory.java @@ -99,7 +99,6 @@ public class TcpServerFactory implements TcpStreamFactory private final int replyMax; private final int windowThreshold; private final int proxyTypeId; - private final int index; private final BindingHandler streamFactory; public TcpServerFactory( @@ -107,7 +106,7 @@ public TcpServerFactory( EngineContext context, LongFunction servers) { - this.router = new TcpServerRouter(context.index(), config, context, this::handleAccept, servers); + this.router = new TcpServerRouter(config, context, this::handleAccept, servers); this.writeBuffer = context.writeBuffer(); this.writeByteBuffer = ByteBuffer.allocateDirect(writeBuffer.capacity()).order(nativeOrder()); this.bufferPool = context.bufferPool(); @@ -117,7 +116,6 @@ public TcpServerFactory( this.supplyPollerKey = context::supplyPollerKey; this.streamFactory = context.streamFactory(); this.proxyTypeId = context.supplyTypeId("proxy"); - this.index = context.index(); final int readBufferSize = writeBuffer.capacity() - DataFW.FIELD_OFFSET_PAYLOAD; this.readByteBuffer = ByteBuffer.allocateDirect(readBufferSize).order(nativeOrder()); diff --git a/runtime/binding-tcp/src/main/java/io/aklivity/zilla/runtime/binding/tcp/internal/stream/TcpServerRouter.java b/runtime/binding-tcp/src/main/java/io/aklivity/zilla/runtime/binding/tcp/internal/stream/TcpServerRouter.java index 5ad6fc44bb..92fd0e4736 100644 --- a/runtime/binding-tcp/src/main/java/io/aklivity/zilla/runtime/binding/tcp/internal/stream/TcpServerRouter.java +++ b/runtime/binding-tcp/src/main/java/io/aklivity/zilla/runtime/binding/tcp/internal/stream/TcpServerRouter.java @@ -41,19 +41,16 @@ public final class TcpServerRouter private final ToIntFunction acceptHandler; private final Function supplyPollerKey; private final LongFunction lookupServer; - private final int index; private int remainingConnections; private boolean unbound; public TcpServerRouter( - int index, TcpConfiguration config, EngineContext context, ToIntFunction acceptHandler, LongFunction lookupServer) { - this.index = index; this.remainingConnections = config.maxConnections(); this.bindings = new Long2ObjectHashMap<>(); this.supplyPollerKey = context::supplyPollerKey; @@ -133,7 +130,7 @@ public void close( private void register( TcpBindingConfig binding) { - TcpServerBindingConfig server = lookupServer.apply(index); + TcpServerBindingConfig server = lookupServer.apply(binding.id); ServerSocketChannel[] channels = server.bind(binding.options); PollerKey[] acceptKeys = new PollerKey[channels.length]; @@ -161,7 +158,7 @@ private void unregister( } } - TcpServerBindingConfig server = lookupServer.apply(index); + TcpServerBindingConfig server = lookupServer.apply(binding.id); server.unbind(); } } diff --git a/runtime/binding-tcp/src/main/java/io/aklivity/zilla/runtime/binding/tcp/internal/util/OperatingSystem.java b/runtime/binding-tcp/src/main/java/io/aklivity/zilla/runtime/binding/tcp/internal/util/OperatingSystem.java new file mode 100644 index 0000000000..4e3f2c0a64 --- /dev/null +++ b/runtime/binding-tcp/src/main/java/io/aklivity/zilla/runtime/binding/tcp/internal/util/OperatingSystem.java @@ -0,0 +1,45 @@ +/* + * Copyright 2021-2024 Aklivity Inc. + * + * Aklivity licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.aklivity.zilla.runtime.binding.tcp.internal.util; + +public final class OperatingSystem +{ + public enum OS + { + MACOS, LINUX, UNKNOWN + } + + private OperatingSystem() + { + // no instances + } + + public static OS detect() + { + OS os = OS.UNKNOWN; + String osName = System.getProperty("os.name").toLowerCase(); + if (osName.contains("mac")) + { + os = OS.MACOS; + } + else if (osName.contains("nix") || osName.contains("nux") || osName.contains("aix")) + { + os = OS.LINUX; + } + + return os; + } +}