diff --git a/src/main/java/com/couchbase/intellij/database/DataLoader.java b/src/main/java/com/couchbase/intellij/database/DataLoader.java index f99ef601..ff4280f9 100644 --- a/src/main/java/com/couchbase/intellij/database/DataLoader.java +++ b/src/main/java/com/couchbase/intellij/database/DataLoader.java @@ -374,7 +374,12 @@ private static void loadKVDocuments(DefaultMutableTreeNode parentNode, Tree tree } public static boolean stubsAvailable(String bucket, String scope, String collection) { - return ActiveCluster.getInstance().getChild(bucket).flatMap(b -> b.getChild(scope)).flatMap(s -> s.getChild(collection)).map(col -> ((CouchbaseCollection) col).generateDocument()).filter(Objects::nonNull).findAny().isPresent(); + return ActiveCluster.getInstance() + .getChild(bucket) + .flatMap(b -> b.getChild(scope)) + .flatMap(s -> s.getChild(collection)) + .map(col -> ((CouchbaseCollection) col).generateDocument()) + .anyMatch(Objects::nonNull); } /** diff --git a/src/main/java/com/couchbase/intellij/database/entity/CouchbaseClusterEntity.java b/src/main/java/com/couchbase/intellij/database/entity/CouchbaseClusterEntity.java index 29ab1dbf..18f154c4 100644 --- a/src/main/java/com/couchbase/intellij/database/entity/CouchbaseClusterEntity.java +++ b/src/main/java/com/couchbase/intellij/database/entity/CouchbaseClusterEntity.java @@ -46,7 +46,7 @@ default Stream getChild(String name) { } Stream childrenStream = getChildren().stream(); if (this.getName() != null) { - childrenStream = Streams.concat(Stream.of(this), childrenStream); + childrenStream = Streams.concat(childrenStream, Stream.of(this)); } return childrenStream .flatMap(c -> c.getName() == null ? c.getChildren().stream() : Stream.of(c)) diff --git a/src/main/java/com/couchbase/intellij/database/entity/CouchbaseCollection.java b/src/main/java/com/couchbase/intellij/database/entity/CouchbaseCollection.java index 04bbe8a3..d88a2215 100644 --- a/src/main/java/com/couchbase/intellij/database/entity/CouchbaseCollection.java +++ b/src/main/java/com/couchbase/intellij/database/entity/CouchbaseCollection.java @@ -5,23 +5,23 @@ import com.couchbase.client.java.json.JsonObject; import com.couchbase.client.java.manager.collection.CollectionSpec; import com.couchbase.intellij.database.InferHelper; +import com.couchbase.intellij.utils.Subscribable; import com.couchbase.intellij.workbench.Log; import com.intellij.openapi.progress.ProgressIndicator; import com.intellij.openapi.progress.ProgressManager; import com.intellij.openapi.progress.Task; import org.jetbrains.annotations.NotNull; -import java.util.Comparator; -import java.util.HashSet; -import java.util.Set; +import java.util.*; import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; import java.util.stream.IntStream; +import java.util.stream.Stream; public class CouchbaseCollection implements CouchbaseClusterEntity { private CollectionSpec spec; private CouchbaseScope parent; - private Set children; + private Subscribable> children = new Subscribable<>(); private AtomicBoolean updating = new AtomicBoolean(false); @@ -79,11 +79,11 @@ private void loadSchema(JsonObject schema) { final String path = path(); if (schema != null) { JsonArray content = schema.getArray("content"); - children = IntStream.range(0, content.size()).boxed() + children.set(IntStream.range(0, content.size()).boxed() .map(content::getObject) .map(flavor -> new CouchbaseDocumentFlavor(this, flavor)) .peek(CouchbaseDocumentFlavor::updateSchema) - .collect(Collectors.toSet()); + .collect(Collectors.toSet())); } else { Log.debug("nada data for schemata " + path); } @@ -96,10 +96,14 @@ public Cluster getCluster() { @Override public Set getChildren() { - if (children == null) { + if (!children.isPresent()) { updateSchema(); } - return children; + try { + return children.get(1000); + } catch (InterruptedException e) { + return null; + } } public JsonObject generateDocument() { @@ -119,7 +123,8 @@ public JsonObject generateDocument() { public JsonArray toJson() { JsonArray result = JsonArray.create(); if (children != null) { - children.stream().map(CouchbaseDocumentFlavor::toJson) + children.get().stream().flatMap(Collection::stream) + .map(CouchbaseDocumentFlavor::toJson) .forEach(result::add); } return result; @@ -130,11 +135,8 @@ public Set getAllAttributeNames() { Set attributeNames = new HashSet<>(); // Check if children are not null - if (this.children != null) { - for (CouchbaseDocumentFlavor flavor : this.children) { - collectAttributeNames(flavor, attributeNames); - } - } + this.children.get().stream().flatMap(Collection::stream) + .forEach(flavor -> collectAttributeNames(flavor, attributeNames)); return attributeNames; } diff --git a/src/main/java/com/couchbase/intellij/utils/Subscribable.java b/src/main/java/com/couchbase/intellij/utils/Subscribable.java index ebd70ba6..cf450055 100644 --- a/src/main/java/com/couchbase/intellij/utils/Subscribable.java +++ b/src/main/java/com/couchbase/intellij/utils/Subscribable.java @@ -3,7 +3,9 @@ import cn.hutool.core.stream.StreamUtil; import java.lang.ref.WeakReference; +import java.time.Duration; import java.util.*; +import java.util.function.Consumer; import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -21,19 +23,16 @@ public Subscribable(T value) { } public void set(T value) { - this.value = Optional.ofNullable(value); - subscribers = subscribers.entrySet().stream() - .filter(subscriber -> subscriber.getKey() != null - && subscriber.getKey().get() != null - && subscriber.getValue() != null) - .filter(subscriber -> { - try { - return subscriber.getValue().apply(this.value); - } catch (Throwable e) { - return false; - } - }) - .collect(Collectors.toMap(s -> s.getKey(), s -> s.getValue())); + synchronized (this) { + this.value = Optional.ofNullable(value); + subscribers = subscribers.entrySet().stream().filter(subscriber -> subscriber.getKey() != null && subscriber.getKey().get() != null && subscriber.getValue() != null).filter(subscriber -> { + try { + return subscriber.getValue().apply(this.value); + } catch (Throwable e) { + return false; + } + }).collect(Collectors.toMap(s -> s.getKey(), s -> s.getValue())); + } } /** @@ -58,7 +57,31 @@ public void unsubscribe(Object key) { } public Optional get() { - return value; + try { + return Optional.of(get(0)); + } catch (InterruptedException e) { + return Optional.empty(); + } + } + + public T get(long millis) throws InterruptedException { + synchronized (this) { + if (!value.isPresent()) { + wait(millis); + } + return value.get(); + } + } + + public void get(Consumer> subscriber) { + if (isPresent()) { + subscriber.accept(value); + } else { + subscribe(subscriber, t -> { + subscriber.accept(t); + return false; + }); + } } public boolean isPresent() { diff --git a/src/test/java/com/couchbase/intellij/tree/iq/AbstractMockedIQTest.java b/src/test/java/com/couchbase/intellij/tree/iq/AbstractMockedIQTest.java index 4b9b8934..dacd96de 100644 --- a/src/test/java/com/couchbase/intellij/tree/iq/AbstractMockedIQTest.java +++ b/src/test/java/com/couchbase/intellij/tree/iq/AbstractMockedIQTest.java @@ -64,7 +64,11 @@ protected void setUp() throws Exception { @Override protected void tearDown() throws Exception { - server.close(); + try { + server.close(); + } catch (Exception e) { + // noop + } } protected void enqueueResponse(String text) {