From 53d30e3dd80ff711aa41634310fc8c694642984f Mon Sep 17 00:00:00 2001 From: Laurent Goujon Date: Mon, 11 Mar 2024 10:58:31 -0700 Subject: [PATCH] CURATOR-709. Add server compatibility check support Add new interface ZookeeperCompatibility to represent server compatibility in addition to the existing Compatibility class (which represents client compatibility). Enhance CuratorFramework to accept ZookeeperCompatibility instance, allowing user to specify which server version to target (default is LATEST). --- .../apache/curator/utils/Compatibility.java | 1 + .../curator/utils/ZookeeperCompatibility.java | 47 +++++++++++++++++++ .../curator/framework/CuratorFramework.java | 8 ++++ .../framework/CuratorFrameworkFactory.java | 12 +++++ .../framework/imps/CuratorFrameworkImpl.java | 14 ++++-- .../cache/CuratorCacheBridgeBuilderImpl.java | 3 +- .../details/AsyncCuratorFrameworkImpl.java | 6 +-- 7 files changed, 83 insertions(+), 8 deletions(-) create mode 100644 curator-client/src/main/java/org/apache/curator/utils/ZookeeperCompatibility.java diff --git a/curator-client/src/main/java/org/apache/curator/utils/Compatibility.java b/curator-client/src/main/java/org/apache/curator/utils/Compatibility.java index 03545cdea..ffa54acb9 100644 --- a/curator-client/src/main/java/org/apache/curator/utils/Compatibility.java +++ b/curator-client/src/main/java/org/apache/curator/utils/Compatibility.java @@ -22,6 +22,7 @@ import java.lang.reflect.Field; import java.lang.reflect.Method; import java.net.InetSocketAddress; + import org.apache.zookeeper.server.quorum.QuorumPeer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/curator-client/src/main/java/org/apache/curator/utils/ZookeeperCompatibility.java b/curator-client/src/main/java/org/apache/curator/utils/ZookeeperCompatibility.java new file mode 100644 index 000000000..3302b6ecc --- /dev/null +++ b/curator-client/src/main/java/org/apache/curator/utils/ZookeeperCompatibility.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.curator.utils; + +/** + * Describe feature supports based on server compatibility (as opposed to {@code Compatibility} + * which represents client compatibility. + */ +public interface ZookeeperCompatibility { + public enum Version implements ZookeeperCompatibility { + VERSION_3_5(false), + LATEST(true); + + private final boolean hasPersistentWatchers; + + private Version(boolean hasPersistentWatchers) { + this.hasPersistentWatchers = hasPersistentWatchers; + } + + @Override + public boolean hasPersistentWatchers() { + return this.hasPersistentWatchers && Compatibility.hasPersistentWatchers(); + } + } + + /** + * Check if both client and server support persistent watchers + * @return {@code true} if both the client library and the server version support persistent watchers + */ + boolean hasPersistentWatchers(); +} diff --git a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java index 25b701ca5..50f6e56ba 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java @@ -34,6 +34,7 @@ import org.apache.curator.framework.state.ConnectionStateErrorPolicy; import org.apache.curator.framework.state.ConnectionStateListener; import org.apache.curator.utils.EnsurePath; +import org.apache.curator.utils.ZookeeperCompatibility; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier; @@ -262,6 +263,13 @@ public interface CuratorFramework extends Closeable { */ public CuratorZookeeperClient getZookeeperClient(); + /** + * Return zookeeper server compatibility + * + * @return compatibility + */ + public ZookeeperCompatibility getZookeeperCompatibility(); + /** * Allocates an ensure path instance that is namespace aware * diff --git a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java index 59ffb4288..098e0ca63 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java @@ -19,6 +19,7 @@ package org.apache.curator.framework; + import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; import java.net.InetAddress; @@ -46,6 +47,7 @@ import org.apache.curator.framework.state.ConnectionStateListenerManagerFactory; import org.apache.curator.framework.state.StandardConnectionStateErrorPolicy; import org.apache.curator.utils.DefaultZookeeperFactory; +import org.apache.curator.utils.ZookeeperCompatibility; import org.apache.curator.utils.ZookeeperFactory; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.Watcher; @@ -174,6 +176,7 @@ public static class Builder { ConnectionStateListenerManagerFactory.standard; private int simulatedSessionExpirationPercent = 100; private ZKClientConfig zkClientConfig; + private ZookeeperCompatibility zookeeperCompatibility = ZookeeperCompatibility.Version.LATEST; /** * Apply the current values and build a new CuratorFramework @@ -519,6 +522,11 @@ public Builder connectionStateListenerManagerFactory( return this; } + public Builder zookeeperCompatibility(ZookeeperCompatibility zookeeperCompatibility) { + this.zookeeperCompatibility = zookeeperCompatibility; + return this; + } + public Executor getRunSafeService() { return runSafeService; } @@ -640,6 +648,10 @@ public ConnectionStateListenerManagerFactory getConnectionStateListenerManagerFa return connectionStateListenerManagerFactory; } + public ZookeeperCompatibility getZookeeperCompatibility() { + return zookeeperCompatibility; + } + private Builder() {} } diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java index dd62006d6..816d0bda0 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java @@ -77,11 +77,11 @@ import org.apache.curator.framework.state.ConnectionStateErrorPolicy; import org.apache.curator.framework.state.ConnectionStateListener; import org.apache.curator.framework.state.ConnectionStateManager; -import org.apache.curator.utils.Compatibility; import org.apache.curator.utils.DebugUtils; import org.apache.curator.utils.EnsurePath; import org.apache.curator.utils.ThreadUtils; import org.apache.curator.utils.ZKPaths; +import org.apache.curator.utils.ZookeeperCompatibility; import org.apache.curator.utils.ZookeeperFactory; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; @@ -117,6 +117,7 @@ public class CuratorFrameworkImpl implements CuratorFramework { private final EnsembleTracker ensembleTracker; private final SchemaSet schemaSet; private final Executor runSafeService; + private final ZookeeperCompatibility zookeeperCompatibility; private volatile ExecutorService executorService; private final AtomicBoolean logAsErrorConnectionErrors = new AtomicBoolean(false); @@ -204,6 +205,7 @@ public void process(WatchedEvent watchedEvent) { builder.withEnsembleTracker() ? new EnsembleTracker(this, builder.getEnsembleProvider()) : null; runSafeService = makeRunSafeService(builder); + zookeeperCompatibility = builder.getZookeeperCompatibility(); } private Executor makeRunSafeService(CuratorFrameworkFactory.Builder builder) { @@ -292,6 +294,7 @@ protected CuratorFrameworkImpl(CuratorFrameworkImpl parent) { schemaSet = parent.schemaSet; ensembleTracker = parent.ensembleTracker; runSafeService = parent.runSafeService; + zookeeperCompatibility = parent.zookeeperCompatibility; } @Override @@ -585,8 +588,8 @@ public RemoveWatchesBuilder watches() { @Override public WatchesBuilder watchers() { Preconditions.checkState( - Compatibility.hasPersistentWatchers(), - "watchers() is not supported in the ZooKeeper library being used. Use watches() instead."); + zookeeperCompatibility.hasPersistentWatchers(), + "watchers() is not supported in the ZooKeeper library and/or server being used. Use watches() instead."); return new WatchesBuilderImpl(this); } @@ -600,6 +603,11 @@ public CuratorZookeeperClient getZookeeperClient() { return client; } + @Override + public ZookeeperCompatibility getZookeeperCompatibility() { + return zookeeperCompatibility; + } + @Override public EnsurePath newNamespaceAwareEnsurePath(String path) { return namespace.newNamespaceAwareEnsurePath(path); diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheBridgeBuilderImpl.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheBridgeBuilderImpl.java index ac9b6ac53..fd078a299 100644 --- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheBridgeBuilderImpl.java +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/CuratorCacheBridgeBuilderImpl.java @@ -21,7 +21,6 @@ import java.util.concurrent.ExecutorService; import org.apache.curator.framework.CuratorFramework; -import org.apache.curator.utils.Compatibility; import org.slf4j.LoggerFactory; class CuratorCacheBridgeBuilderImpl implements CuratorCacheBridgeBuilder { @@ -57,7 +56,7 @@ public CuratorCacheBridgeBuilder withExecutorService(ExecutorService executorSer @Override public CuratorCacheBridge build() { - if (!forceTreeCache && Compatibility.hasPersistentWatchers()) { + if (!forceTreeCache && client.getZookeeperCompatibility().hasPersistentWatchers()) { if (executorService != null) { LoggerFactory.getLogger(getClass()).warn("CuratorCache does not support custom ExecutorService"); } diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncCuratorFrameworkImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncCuratorFrameworkImpl.java index 7e38a821f..171f1215a 100644 --- a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncCuratorFrameworkImpl.java +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncCuratorFrameworkImpl.java @@ -32,7 +32,6 @@ import org.apache.curator.framework.imps.CuratorMultiTransactionImpl; import org.apache.curator.framework.imps.GetACLBuilderImpl; import org.apache.curator.framework.imps.SyncBuilderImpl; -import org.apache.curator.utils.Compatibility; import org.apache.curator.x.async.AsyncCuratorFramework; import org.apache.curator.x.async.AsyncStage; import org.apache.curator.x.async.WatchMode; @@ -41,6 +40,7 @@ import org.apache.zookeeper.data.ACL; import org.apache.zookeeper.data.Stat; + public class AsyncCuratorFrameworkImpl implements AsyncCuratorFramework { private final CuratorFrameworkImpl client; private final Filters filters; @@ -140,8 +140,8 @@ public AsyncRemoveWatchesBuilder removeWatches() { @Override public AsyncWatchBuilder addWatch() { Preconditions.checkState( - Compatibility.hasPersistentWatchers(), - "addWatch() is not supported in the ZooKeeper library being used."); + client.getZookeeperCompatibility().hasPersistentWatchers(), + "addWatch() is not supported in the ZooKeeper library and/or server being used."); return new AsyncWatchBuilderImpl(client, filters); }