Skip to content

Commit ed807ad

Browse files
author
Alexander Lavrukov
committed
better-spliterator: Better spliterator
1 parent 89a53cc commit ed807ad

File tree

11 files changed

+428
-20
lines changed

11 files changed

+428
-20
lines changed

repository-test/src/main/java/tech/ydb/yoj/repository/test/RepositoryTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -380,7 +380,7 @@ private static <ID extends Entity.Id<?>> ReadTableParams<ID> defaultReadTablePar
380380
}
381381

382382
private static <ID extends Entity.Id<?>> ReadTableParams.ReadTableParamsBuilder<ID> buildReadTableParamsNonLegacy() {
383-
return ReadTableParams.<ID>builder().useNewSpliterator(true);
383+
return ReadTableParams.<ID>builder().useNewSpliterator2(true);
384384
}
385385

386386
@Test

repository-ydb-v2/src/main/java/tech/ydb/yoj/repository/ydb/YdbRepositoryTransaction.java

Lines changed: 40 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import tech.ydb.core.Result;
1616
import tech.ydb.core.Status;
1717
import tech.ydb.core.StatusCode;
18+
import tech.ydb.core.grpc.GrpcReadStream;
1819
import tech.ydb.proto.ValueProtos;
1920
import tech.ydb.table.Session;
2021
import tech.ydb.table.query.DataQueryResult;
@@ -25,6 +26,7 @@
2526
import tech.ydb.table.query.stats.QueryStats;
2627
import tech.ydb.table.query.stats.QueryStatsCollectionMode;
2728
import tech.ydb.table.query.stats.TableAccessStats;
29+
import tech.ydb.table.query.ReadTablePart;
2830
import tech.ydb.table.result.ResultSetReader;
2931
import tech.ydb.table.settings.BulkUpsertSettings;
3032
import tech.ydb.table.settings.CommitTxSettings;
@@ -70,6 +72,13 @@
7072
import tech.ydb.yoj.repository.ydb.exception.YdbRepositoryException;
7173
import tech.ydb.yoj.repository.ydb.merge.QueriesMerger;
7274
import tech.ydb.yoj.repository.ydb.readtable.ReadTableMapper;
75+
import tech.ydb.yoj.repository.ydb.spliterator.ClosableSpliterator;
76+
import tech.ydb.yoj.repository.ydb.spliterator.ResultSetIterator;
77+
import tech.ydb.yoj.repository.ydb.spliterator.YdbSpliterator;
78+
import tech.ydb.yoj.repository.ydb.spliterator.YdbSpliteratorQueue;
79+
import tech.ydb.yoj.repository.ydb.spliterator.YdbSpliteratorQueueGrpcStreamAdapter;
80+
import tech.ydb.yoj.repository.ydb.spliterator.legacy.YdbLegacySpliterator;
81+
import tech.ydb.yoj.repository.ydb.spliterator.legacy.YdbNewLegacySpliterator;
7382
import tech.ydb.yoj.repository.ydb.statement.Statement;
7483
import tech.ydb.yoj.repository.ydb.table.YdbTable;
7584
import tech.ydb.yoj.util.lang.Interrupts;
@@ -78,6 +87,7 @@
7887
import javax.annotation.Nullable;
7988
import java.time.Duration;
8089
import java.util.ArrayList;
90+
import java.util.Iterator;
8191
import java.util.List;
8292
import java.util.Map;
8393
import java.util.concurrent.CompletableFuture;
@@ -101,7 +111,7 @@ public class YdbRepositoryTransaction<REPO extends YdbRepository>
101111
private static final String PROP_TRACE_VERBOSE_OBJ_RESULTS = "tech.ydb.yoj.repository.ydb.trace.verboseObjResults";
102112

103113
private final List<YdbRepository.Query<?>> pendingWrites = new ArrayList<>();
104-
private final List<YdbSpliterator<?>> spliterators = new ArrayList<>();
114+
private final List<ClosableSpliterator<?>> spliterators = new ArrayList<>();
105115

106116
@Getter
107117
private final TxOptions options;
@@ -127,8 +137,8 @@ public YdbRepositoryTransaction(REPO repo, TxOptions options) {
127137
this.tablespace = repo.getSchemaOperations().getTablespace();
128138
}
129139

130-
private <V> YdbSpliterator<V> createSpliterator(String request, boolean isOrdered) {
131-
YdbSpliterator<V> spliterator = new YdbSpliterator<>(request, isOrdered);
140+
private <V> YdbNewLegacySpliterator<V> createSpliterator(String request, boolean isOrdered) {
141+
YdbNewLegacySpliterator<V> spliterator = new YdbNewLegacySpliterator<>(request, isOrdered);
132142
spliterators.add(spliterator);
133143
return spliterator;
134144
}
@@ -183,7 +193,7 @@ private void doCommit() {
183193

184194
private void closeStreams() {
185195
Exception summaryException = null;
186-
for (YdbSpliterator<?> spliterator : spliterators) {
196+
for (ClosableSpliterator<?> spliterator : spliterators) {
187197
try {
188198
spliterator.close();
189199
} catch (Exception e) {
@@ -451,7 +461,7 @@ public <PARAMS, RESULT> Stream<RESULT> executeScanQuery(Statement<PARAMS, RESULT
451461
String yql = getYql(statement);
452462
Params sdkParams = getSdkParams(statement, params);
453463

454-
YdbSpliterator<RESULT> spliterator = createSpliterator("scanQuery: " + yql, false);
464+
YdbNewLegacySpliterator<RESULT> spliterator = createSpliterator("scanQuery: " + yql, false);
455465

456466
initSession();
457467
session.executeScanQuery(
@@ -560,7 +570,7 @@ public <PARAMS, RESULT> Stream<RESULT> readTable(ReadTableMapper<PARAMS, RESULT>
560570
}
561571

562572
if (params.isUseNewSpliterator()) {
563-
YdbSpliterator<RESULT> spliterator = createSpliterator("readTable: " + tableName, params.isOrdered());
573+
YdbNewLegacySpliterator<RESULT> spliterator = createSpliterator("readTable: " + tableName, params.isOrdered());
564574

565575
initSession();
566576
session.readTable(
@@ -571,6 +581,30 @@ public <PARAMS, RESULT> Stream<RESULT> readTable(ReadTableMapper<PARAMS, RESULT>
571581
return spliterator.createStream();
572582
}
573583

584+
if (params.isUseNewSpliterator2()) {
585+
initSession();
586+
587+
// TODO: configure stream timeout
588+
YdbSpliteratorQueue<Iterator<RESULT>> queue = new YdbSpliteratorQueue<>(1, Duration.ofMinutes(5));
589+
590+
var adapter = new YdbSpliteratorQueueGrpcStreamAdapter<>("readTable: " + tableName, queue);
591+
GrpcReadStream<ReadTablePart> grpcStream = session.executeReadTable(tableName, settings.build());
592+
CompletableFuture<Status> future = grpcStream.start(readTablePart -> {
593+
ResultSetIterator<RESULT> iterator = new ResultSetIterator<>(
594+
readTablePart.getResultSetReader(),
595+
mapper::mapResult
596+
);
597+
adapter.onNext(iterator);
598+
});
599+
future.whenComplete(adapter::onSupplierThreadComplete);
600+
601+
YdbSpliterator<RESULT> spliterator = new YdbSpliterator<>(queue, params.isOrdered());
602+
603+
spliterators.add(spliterator);
604+
605+
return spliterator.createStream();
606+
}
607+
574608
try {
575609
YdbLegacySpliterator<RESULT> spliterator = new YdbLegacySpliterator<>(params.isOrdered(), action ->
576610
doCall("read table " + mapper.getTableName(""), () -> {
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
package tech.ydb.yoj.repository.ydb.spliterator;
2+
3+
import java.util.Spliterator;
4+
5+
public interface ClosableSpliterator<V> extends Spliterator<V> {
6+
void close();
7+
}
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
package tech.ydb.yoj.repository.ydb.spliterator;
2+
3+
import tech.ydb.proto.ValueProtos;
4+
import tech.ydb.table.result.ResultSetReader;
5+
import tech.ydb.yoj.repository.ydb.client.YdbConverter;
6+
7+
import java.util.ArrayList;
8+
import java.util.Iterator;
9+
import java.util.List;
10+
import java.util.NoSuchElementException;
11+
12+
public final class ResultSetIterator<V> implements Iterator<V> {
13+
private final ResultSetReader resultSet;
14+
private final ResultConverter<V> converter;
15+
private final List<ValueProtos.Column> columns;
16+
17+
private int position = 0;
18+
19+
public ResultSetIterator(ResultSetReader resultSet, ResultConverter<V> converter) {
20+
List<ValueProtos.Column> columns;
21+
if (resultSet.getRowCount() > 0) {
22+
resultSet.setRowIndex(0);
23+
columns = getColumns(resultSet);
24+
} else {
25+
columns = new ArrayList<>();
26+
}
27+
28+
this.resultSet = resultSet;
29+
this.converter = converter;
30+
this.columns = columns;
31+
}
32+
33+
@Override
34+
public boolean hasNext() {
35+
return position < resultSet.getRowCount();
36+
}
37+
38+
@Override
39+
public V next() {
40+
if (!hasNext()) {
41+
throw new NoSuchElementException();
42+
}
43+
44+
ValueProtos.Value value = buildValue(position++);
45+
46+
return converter.convert(columns, value);
47+
}
48+
49+
private ValueProtos.Value buildValue(int rowIndex) {
50+
resultSet.setRowIndex(rowIndex);
51+
ValueProtos.Value.Builder value = ValueProtos.Value.newBuilder();
52+
for (int i = 0; i < columns.size(); i++) {
53+
value.addItems(YdbConverter.convertValueToProto(resultSet.getColumn(i)));
54+
}
55+
return value.build();
56+
}
57+
58+
private static List<ValueProtos.Column> getColumns(ResultSetReader resultSet) {
59+
List<ValueProtos.Column> columns = new ArrayList<>();
60+
for (int i = 0; i < resultSet.getColumnCount(); i++) {
61+
columns.add(ValueProtos.Column.newBuilder()
62+
.setName(resultSet.getColumnName(i))
63+
.build()
64+
);
65+
}
66+
return columns;
67+
}
68+
69+
@FunctionalInterface
70+
public interface ResultConverter<V> {
71+
V convert(List<ValueProtos.Column> columns, ValueProtos.Value value);
72+
}
73+
}
Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
package tech.ydb.yoj.repository.ydb.spliterator;
2+
3+
import tech.ydb.yoj.ExperimentalApi;
4+
5+
import javax.annotation.Nullable;
6+
import java.util.Iterator;
7+
import java.util.Spliterator;
8+
import java.util.function.Consumer;
9+
import java.util.stream.Stream;
10+
import java.util.stream.StreamSupport;
11+
12+
@ExperimentalApi(issue = "https://github.com/ydb-platform/yoj-project/issues/42")
13+
public final class YdbSpliterator<V> implements ClosableSpliterator<V> {
14+
private final YdbSpliteratorQueue<Iterator<V>> queue;
15+
private final int flags;
16+
17+
private Iterator<V> valueIterator;
18+
19+
private boolean closed = false;
20+
21+
public YdbSpliterator(YdbSpliteratorQueue<Iterator<V>> queue, boolean isOrdered) {
22+
this.queue = queue;
23+
this.flags = (isOrdered ? ORDERED : 0) | NONNULL;
24+
}
25+
26+
// Correct way to create stream with YdbSpliterator. onClose call is important for avoid supplier thread leak.
27+
public Stream<V> createStream() {
28+
return StreamSupport.stream(this, false).onClose(this::close);
29+
}
30+
31+
@Override
32+
public void close() {
33+
if (closed) {
34+
return;
35+
}
36+
closed = true;
37+
queue.close();
38+
}
39+
40+
@Override
41+
public boolean tryAdvance(Consumer<? super V> action) {
42+
if (closed) {
43+
return false;
44+
}
45+
46+
// WARNING: At one point in time, this spliterator will store up to queue.size() + 2 blocks from YDB in memory.
47+
// One block right here, one in the queue, one in the grpc thread, waiting for free space in the queue.
48+
// Maximum response size in YDB - 50mb. It means that it could be up to 150mb for spliterator.
49+
valueIterator = getValueIterator(valueIterator, queue);
50+
if (valueIterator == null) {
51+
close();
52+
return false;
53+
}
54+
55+
V value = valueIterator.next();
56+
57+
action.accept(value);
58+
59+
return true;
60+
}
61+
62+
/*
63+
* Returns not empty valueIterator, null in case of end of stream
64+
*/
65+
@Nullable
66+
private static <V> Iterator<V> getValueIterator(
67+
@Nullable Iterator<V> valueIterator, YdbSpliteratorQueue<Iterator<V>> queue
68+
) {
69+
// valueIterator could be null only on first call of tryAdvance
70+
if (valueIterator == null) {
71+
valueIterator = queue.poll();
72+
if (valueIterator == null) {
73+
return null;
74+
}
75+
}
76+
77+
// queue could return empty iterator, we have to select one with elements
78+
while (!valueIterator.hasNext()) {
79+
valueIterator = queue.poll();
80+
if (valueIterator == null) {
81+
return null;
82+
}
83+
}
84+
85+
return valueIterator;
86+
}
87+
88+
@Override
89+
public Spliterator<V> trySplit() {
90+
return null;
91+
}
92+
93+
@Override
94+
public long estimateSize() {
95+
return Long.MAX_VALUE;
96+
}
97+
98+
@Override
99+
public long getExactSizeIfKnown() {
100+
return -1;
101+
}
102+
103+
@Override
104+
public int characteristics() {
105+
return flags;
106+
}
107+
}

0 commit comments

Comments
 (0)