Skip to content

Commit

Permalink
0.9.10 (#8)
Browse files Browse the repository at this point in the history
* Add DistributedQueues
* Update utils and guava jars
  • Loading branch information
pambrose authored Nov 8, 2019
1 parent 9220bb5 commit 1b24e71
Show file tree
Hide file tree
Showing 28 changed files with 732 additions and 213 deletions.
6 changes: 3 additions & 3 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ plugins {
}

group 'io.etcd'
version '0.9.9'
version '0.9.10'

sourceCompatibility = 1.8

Expand All @@ -22,8 +22,8 @@ jacocoTestReport {
def kotlinVersion = '1.3.50'
def serializationVersion = '0.13.0'
def jetcdVersion = '0.4.1'
def guavaVersion = '26.0-android'
def utilsVersion = '1.0.2'
def guavaVersion = '28.1-android'
def utilsVersion = '1.0.3'
def loggingVersion = '1.7.6'
def slf4jVersion = '1.7.28'
def kluentVersion = '1.56'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@

import static com.sudothought.common.util.Misc.sleepSecs;
import static io.etcd.recipes.common.ClientUtils.connectToEtcd;
import static io.etcd.recipes.common.KVUtils.putValueWithKeepAlive;
import static io.etcd.recipes.common.KeepAliveUtils.putValueWithKeepAlive;
import static io.etcd.recipes.common.WatchUtils.getKeyAsString;
import static io.etcd.recipes.common.WatchUtils.watcherWithLatch;
import static java.lang.String.format;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io.etcd.jetcd.KV;
import io.etcd.jetcd.Lease;
import io.etcd.jetcd.lease.LeaseGrantResponse;
import io.etcd.jetcd.options.PutOption;

import java.util.List;
import java.util.concurrent.CountDownLatch;
Expand All @@ -32,7 +33,7 @@
import static io.etcd.recipes.common.ClientUtils.connectToEtcd;
import static io.etcd.recipes.common.KVUtils.getValue;
import static io.etcd.recipes.common.KVUtils.putValue;
import static io.etcd.recipes.common.LeaseUtils.getAsPutOption;
import static io.etcd.recipes.common.OptionUtils.putOption;
import static java.lang.String.format;

public class SetValueWithLease {
Expand All @@ -50,7 +51,8 @@ public static void main(String[] args) throws InterruptedException {
KV kvClient = client.getKVClient()) {
System.out.println(format("Assigning %s = %s", path, keyval));
LeaseGrantResponse lease = leaseClient.grant(5).get();
putValue(kvClient, path, keyval, getAsPutOption(lease));
PutOption putOption = putOption((PutOption.Builder builder) -> builder.withLeaseId(lease.getID()));
putValue(kvClient, path, keyval, putOption);
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
Expand Down
52 changes: 29 additions & 23 deletions src/main/java/io/etcd/recipes/examples/basics/WatchKeyRange.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,55 +17,61 @@
package io.etcd.recipes.examples.basics;

import com.google.common.collect.Lists;
import io.etcd.jetcd.ByteSequence;
import io.etcd.jetcd.Client;
import io.etcd.jetcd.KV;
import io.etcd.jetcd.Watch;
import io.etcd.jetcd.Watch.Watcher;
import io.etcd.jetcd.options.WatchOption;
import io.etcd.recipes.common.KVUtils;
import kotlin.Unit;

import java.util.List;

import static com.sudothought.common.util.Misc.sleepSecs;
import static io.etcd.recipes.common.ByteSequenceUtils.getAsByteSequence;
import static io.etcd.recipes.common.ClientUtils.connectToEtcd;
import static io.etcd.recipes.common.KVUtils.delete;
import static io.etcd.recipes.common.KVUtils.getChildren;
import static io.etcd.recipes.common.KVUtils.putValue;
import static io.etcd.recipes.common.KeyValueUtils.getAsString;
import static io.etcd.recipes.common.OptionUtils.watchOption;
import static io.etcd.recipes.common.PairUtils.getAsString;
import static io.etcd.recipes.common.WatchUtils.getAsPrefixWatchOption;
import static io.etcd.recipes.common.WatchUtils.watcher;
import static java.lang.String.format;

public class WatchKeyRange {

public static void main(String[] args) throws InterruptedException {
public static void main(String[] args) {
List<String> urls = Lists.newArrayList("http://localhost:2379");
String path = "/watchkeyrange";

ByteSequence pathBS = getAsByteSequence(path);
WatchOption watchOption = watchOption((WatchOption.Builder builder) -> builder.withPrefix(pathBS));

try (Client client = connectToEtcd(urls);
KV kvClient = client.getKVClient();
Watch watchClient = client.getWatchClient();
Watcher watcher = watcher(watchClient,
path,
getAsPrefixWatchOption(path),
(watchResponse) -> {
watchResponse.getEvents().forEach((watchEvent) -> {
System.out.println(format("%s for %s",
watchEvent.getEventType(),
getAsString(watchEvent.getKeyValue())
));

});
return Unit.INSTANCE;
})) {

Watcher watcher =
watcher(watchClient,
path,
watchOption,
(watchResponse) -> {
watchResponse.getEvents().forEach((watchEvent) ->
System.out.println(format("%s for %s",
watchEvent.getEventType(),
getAsString(watchEvent.getKeyValue())
)));
return Unit.INSTANCE;
})) {

// Create empty root
putValue(kvClient, path, "root");

System.out.println("After creation:");
System.out.println(getAsString(getChildren(kvClient, path)));
System.out.println(KVUtils.countChildren(kvClient, path));
System.out.println(KVUtils.getChildrenCount(kvClient, path));

sleepSecs(5);

Expand All @@ -77,30 +83,30 @@ public static void main(String[] args) throws InterruptedException {

System.out.println("\nAfter putValues:");
System.out.println(getAsString(getChildren(kvClient, path)));
System.out.println(KVUtils.countChildren(kvClient, path));
System.out.println(KVUtils.getChildrenCount(kvClient, path));

System.out.println("\nElections only:");
System.out.println(getAsString(getChildren(kvClient, path + "/election")));
System.out.println(KVUtils.countChildren(kvClient, path + "/election"));
System.out.println(KVUtils.getChildrenCount(kvClient, path + "/election"));

System.out.println("\nWaitings only:");
System.out.println(getAsString(getChildren(kvClient, path + "/waiting")));
System.out.println(KVUtils.countChildren(kvClient, path + "/waiting"));
System.out.println(KVUtils.getChildrenCount(kvClient, path + "/waiting"));

sleepSecs(5);

// Delete root
delete(kvClient, path);

// Delete children
KVUtils.getChildrenKeys(kvClient, path).forEach((keyname) -> {
System.out.println(format("Deleting key: %s", keyname));
delete(kvClient, keyname);
KVUtils.getChildrenKeys(kvClient, path).forEach((keyName) -> {
System.out.println(format("Deleting key: %s", keyName));
delete(kvClient, keyName);
});

System.out.println("\nAfter delete:");
System.out.println(getAsString(getChildren(kvClient, path)));
System.out.println(KVUtils.countChildren(kvClient, path));
System.out.println(KVUtils.getChildrenCount(kvClient, path));

sleepSecs(5);
}
Expand Down
4 changes: 2 additions & 2 deletions src/main/kotlin/io/etcd/recipes/barrier/DistributedBarrier.kt
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,14 @@ import com.sudothought.common.util.randomId
import io.etcd.jetcd.CloseableClient
import io.etcd.jetcd.watch.WatchEvent.EventType.DELETE
import io.etcd.recipes.common.EtcdConnector
import io.etcd.recipes.common.asPutOption
import io.etcd.recipes.common.asString
import io.etcd.recipes.common.delete
import io.etcd.recipes.common.doesNotExist
import io.etcd.recipes.common.getValue
import io.etcd.recipes.common.grant
import io.etcd.recipes.common.isKeyPresent
import io.etcd.recipes.common.keepAlive
import io.etcd.recipes.common.putOption
import io.etcd.recipes.common.setTo
import io.etcd.recipes.common.transaction
import io.etcd.recipes.common.watcher
Expand Down Expand Up @@ -82,7 +82,7 @@ constructor(val urls: List<String>,
val txn =
kvClient.transaction {
If(barrierPath.doesNotExist)
Then(barrierPath.setTo(uniqueToken, lease.asPutOption))
Then(barrierPath.setTo(uniqueToken, putOption { withLeaseId(lease.id) }))
}

// Check to see if unique value was successfully set in the CAS step
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,23 +27,24 @@ import io.etcd.jetcd.watch.WatchEvent.EventType.PUT
import io.etcd.recipes.common.EtcdConnector
import io.etcd.recipes.common.EtcdRecipeException
import io.etcd.recipes.common.appendToPath
import io.etcd.recipes.common.asPrefixWatchOption
import io.etcd.recipes.common.asPutOption
import io.etcd.recipes.common.asByteSequence
import io.etcd.recipes.common.asString
import io.etcd.recipes.common.countChildren
import io.etcd.recipes.common.delete
import io.etcd.recipes.common.deleteKey
import io.etcd.recipes.common.doesExist
import io.etcd.recipes.common.doesNotExist
import io.etcd.recipes.common.ensureTrailing
import io.etcd.recipes.common.ensureSuffix
import io.etcd.recipes.common.etcdExec
import io.etcd.recipes.common.getChildrenCount
import io.etcd.recipes.common.getChildrenKeys
import io.etcd.recipes.common.getValue
import io.etcd.recipes.common.grant
import io.etcd.recipes.common.isKeyPresent
import io.etcd.recipes.common.keepAlive
import io.etcd.recipes.common.putOption
import io.etcd.recipes.common.setTo
import io.etcd.recipes.common.transaction
import io.etcd.recipes.common.watchOption
import io.etcd.recipes.common.watcher
import java.io.Closeable
import java.util.concurrent.TimeUnit
Expand Down Expand Up @@ -84,7 +85,7 @@ constructor(val urls: List<String>,
val waiterCount: Long
get() {
checkCloseNotCalled()
return kvClient.countChildren(waitingPath)
return kvClient.getChildrenCount(waitingPath)
}

@Throws(InterruptedException::class, EtcdRecipeException::class)
Expand Down Expand Up @@ -141,7 +142,7 @@ constructor(val urls: List<String>,
val txn =
kvClient.transaction {
If(waitingPath.doesNotExist)
Then(waitingPath.setTo(uniqueToken, lease.asPutOption))
Then(waitingPath.setTo(uniqueToken, putOption { withLeaseId(lease.id) }))
}

when {
Expand All @@ -158,8 +159,9 @@ constructor(val urls: List<String>,
true
} else {
// Watch for DELETE of /ready and PUTS on /waiters/*
val adjustedKey = barrierPath.ensureTrailing("/")
watchClient.watcher(adjustedKey, adjustedKey.asPrefixWatchOption) { watchResponse ->
val trailingKey = barrierPath.ensureSuffix("/")
val watchOption = watchOption { withPrefix(trailingKey.asByteSequence) }
watchClient.watcher(trailingKey, watchOption) { watchResponse ->
watchResponse.events
.forEach { watchEvent ->
val key = watchEvent.keyValue.key.asString
Expand Down
16 changes: 9 additions & 7 deletions src/main/kotlin/io/etcd/recipes/cache/PathChildrenCache.kt
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,11 @@ import io.etcd.recipes.cache.PathChildrenCacheEvent.Type.CHILD_UPDATED
import io.etcd.recipes.cache.PathChildrenCacheEvent.Type.INITIALIZED
import io.etcd.recipes.common.EtcdConnector
import io.etcd.recipes.common.EtcdRecipeRuntimeException
import io.etcd.recipes.common.asByteSequence
import io.etcd.recipes.common.asPair
import io.etcd.recipes.common.asPrefixWatchOption
import io.etcd.recipes.common.ensureTrailing
import io.etcd.recipes.common.ensureSuffix
import io.etcd.recipes.common.getChildren
import io.etcd.recipes.common.watchOption
import io.etcd.recipes.common.watcher
import mu.KLogging
import java.io.Closeable
Expand Down Expand Up @@ -156,13 +157,14 @@ class PathChildrenCache(val urls: List<String>,
}

private fun setupWatcher() {
val adjustedCachePath = cachePath.ensureTrailing("/")
logger.debug { "Setting up watch for $adjustedCachePath" }
watchClient.watcher(adjustedCachePath, adjustedCachePath.asPrefixWatchOption) { watchResponse ->
val trailingPath = cachePath.ensureSuffix("/")
logger.debug { "Setting up watch for $trailingPath" }
val watchOption = watchOption { withPrefix(trailingPath.asByteSequence) }
watchClient.watcher(trailingPath, watchOption) { watchResponse ->
watchResponse.events
.forEach { event ->
val (k, v) = event.keyValue.asPair
val stripped = k.substring(adjustedCachePath.length)
val stripped = k.substring(trailingPath.length)
when (event.eventType) {
PUT -> {
val isAdd = !cacheMap.containsKey(stripped)
Expand Down Expand Up @@ -223,7 +225,7 @@ class PathChildrenCache(val urls: List<String>,
val currentData: List<ChildData> get() = cacheMap.map { (k, v) -> ChildData(k, v) }.sortedBy { it.key }

// For consistency with Curator
fun getCurrentData(path: String) = cacheMap.get(path)
fun getCurrentData(path: String): ByteSequence? = cacheMap[path]

val currentDataAsMap: Map<String, ByteSequence> get() = cacheMap.toMap()

Expand Down
Loading

0 comments on commit 1b24e71

Please sign in to comment.