Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added Query iterator method #80

Merged
merged 2 commits into from
Jul 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions Milvus.Client/Constants.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,26 @@ internal static class Constants
/// </summary>
internal const string Offset = "offset";

/// <summary>
/// Top parameter key name.
/// </summary>
internal const string Limit = "limit";

/// <summary>
/// Top parameter key name.
/// </summary>
internal const string BatchSize = "batch_size";

/// <summary>
/// Top parameter key name.
/// </summary>
internal const string Iterator = "iterator";

/// <summary>
/// Top parameter key name.
/// </summary>
internal const string ReduceStopForBest = "reduce_stop_for_best";

/// <summary>
/// Key name in parameters.<see cref="Client.IndexType"/>
/// </summary>
Expand Down
229 changes: 190 additions & 39 deletions Milvus.Client/MilvusCollection.Entity.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
using System.Buffers;
using System.Buffers.Binary;
using System.Diagnostics.CodeAnalysis;
using System.Globalization;
using System.Runtime.CompilerServices;
using System.Runtime.InteropServices;
using System.Text.Json;
using Google.Protobuf.Collections;
Expand Down Expand Up @@ -418,61 +418,144 @@ public async Task<IReadOnlyList<FieldData>> QueryAsync(
Expr = expression
};

if (parameters is not null)
PopulateQueryRequestFromParameters(request, parameters);

var response = await _client.InvokeAsync(
_client.GrpcClient.QueryAsync,
request,
static r => r.Status,
cancellationToken)
.ConfigureAwait(false);

return ProcessReturnedFieldData(response.FieldsData);
}

/// <summary>
/// Retrieves rows from a collection via scalar filtering based on a boolean expression using iterator.
/// </summary>
/// <param name="expression">A boolean expression determining which rows are to be returned.</param>
/// <param name="batchSize">Batch size that will be used for every iteration request. Must be between 1 and 16384.</param>
/// <param name="parameters">Various additional optional parameters to configure the query.</param>
/// <param name="cancellationToken">
/// The token to monitor for cancellation requests. The default value is <see cref="CancellationToken.None" />.
/// </param>
/// <returns>A list of <see cref="FieldData{TData}" /> instances with the query results.</returns>
public async IAsyncEnumerable<IReadOnlyList<FieldData>> QueryWithIteratorAsync(
string? expression = null,
int batchSize = 1000,
QueryParameters? parameters = null,
[EnumeratorCancellation] CancellationToken cancellationToken = default)
{
if ((parameters?.Offset ?? 0) != 0)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any specific reason not to support Offset? It could simply be set on the very first call, and from that point on just continue with the batching as usual?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same python logic port. From my tests milvus behave strange when you try to setup offset.

https://github.com/milvus-io/pymilvus/blob/6625af7fd0bfdf75fd596e7a9a871528cb539ef7/pymilvus/orm/iterator.py#L435

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd prefer to understand better if this is an actual Milvus limitation or just an unimplemented feature at the client.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think clients should behave the same way. I don't have time to look into it right now, but I saw some strange results when I changed the offset to a non-zero value. I need this code merged into the master branch so I can use it and not rely on a fork. Maybe we should delay this proof of concept to future releases for those who really need it?

{
if (parameters.TimeTravelTimestamp is not null)
throw new MilvusException("Not support offset when searching iteration");
}

var describeResponse = await _client.InvokeAsync(
_client.GrpcClient.DescribeCollectionAsync,
new DescribeCollectionRequest { CollectionName = Name },
r => r.Status,
cancellationToken)
.ConfigureAwait(false);

var pkField = describeResponse.Schema.Fields.FirstOrDefault(x => x.IsPrimaryKey);
if (pkField == null)
{
throw new MilvusException("Schema must contain pk field");
}

var isUserRequestPkField = parameters?.OutputFieldsInternal?.Contains(pkField.Name) ?? false;
roji marked this conversation as resolved.
Show resolved Hide resolved
var userExpression = expression;
var userLimit = parameters?.Limit ?? int.MaxValue;

QueryRequest request = new()
{
CollectionName = Name,
Expr = (userExpression, pkField) switch
{
request.TravelTimestamp = parameters.TimeTravelTimestamp.Value;
// If user expression is not null, we should use it
{userExpression: not null} => userExpression,
// If user expression is null and pk field is string
{pkField.DataType: DataType.VarChar} => $"{pkField.Name} != ''",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is this non-empty string check - and the below < {long.MaxValue} check needed? Why not just leave Expr empty if the user didn't specify an expression?

Copy link
Collaborator

@roji roji Jul 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@BlackGad what about this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is to guaranty order on first request when user expression is absent. Again it is port from python :)

// If user expression is null and pk field is not string
_ => $"{pkField.Name} < {long.MaxValue}",
roji marked this conversation as resolved.
Show resolved Hide resolved
}
};

if (parameters.PartitionNamesInternal?.Count > 0)
PopulateQueryRequestFromParameters(request, parameters);

// Request id field in any case to proceed with an iterations
if (!isUserRequestPkField) request.OutputFields.Add(pkField.Name);

// Replace parameters required for iterator
ReplaceKeyValueItems(request.QueryParams,
new Grpc.KeyValuePair {Key = Constants.Iterator, Value = "True"},
new Grpc.KeyValuePair {Key = Constants.ReduceStopForBest, Value = "True"},
new Grpc.KeyValuePair {Key = Constants.BatchSize, Value = batchSize.ToString(CultureInfo.InvariantCulture)},
new Grpc.KeyValuePair {Key = Constants.Offset, Value = 0.ToString(CultureInfo.InvariantCulture)},
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: why not just "0"? Are you sure we even need to set Offset at all?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The line you refer to doesn't seem to have anything to do with this - it just checks that offset is zero; whereas here I'm asking why not just "0" as simpler code.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh do you mean use constant?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

new Grpc.KeyValuePair {Key = Constants.Limit, Value = Math.Min(batchSize, userLimit).ToString(CultureInfo.InvariantCulture)});
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a bit confused about Limit vs. BatchSize here... If Limit is bigger than BatchSize, will have the same value (because of the Min here). If Limit is smaller than BatchSize, BatchSize isn't reduced... What's the logic here supposed to be?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Limit and batch size must be equal here. Will fix


var processedItemsCount = 0;
while (!cancellationToken.IsCancellationRequested)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The cancellation token should be checked internally in InvokeAsync which is called just below - is there any reason for the additional check here? If not, it should be removed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because then it will be while(true) so it is defensive strategy so I will be sure that loop will broke.

I prefer to leave this. What do you think?

Copy link
Collaborator

@roji roji Jul 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding the check doesn't ensure that the loop will be broken any more than a simple while (true) (i.e. the cancellation token mat never get triggered). The thing that guarantees that the loop will break is the yield break inside, when there are no more results. And since the cancellation token is checked within InvokeAsync anyway, I think this should be removed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed

{
var response = await _client.InvokeAsync(
_client.GrpcClient.QueryAsync,
request,
static r => r.Status,
cancellationToken)
.ConfigureAwait(false);

object? pkLastValue;
int processedDuringIterationCount;
var pkFieldsData = response.FieldsData.Single(x => x.FieldId == pkField.FieldID);
if (pkField.DataType == DataType.VarChar)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use a switch here, with a default case that throws, in case Milvus add support for a 3rd PK type.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In general, I don't think it makes sense to keep code a certain way just because the python implementation looks a certain way. This is a different project in a different language, and we should be doing things in the best way possible here regardless of what the Python driver looks like.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added

{
request.PartitionNames.AddRange(parameters.PartitionNamesInternal);
pkLastValue = pkFieldsData.Scalars.StringData.Data.LastOrDefault();
processedDuringIterationCount = pkFieldsData.Scalars.StringData.Data.Count;
}

if (parameters.OutputFieldsInternal?.Count > 0)
else
{
request.OutputFields.AddRange(parameters.OutputFieldsInternal);
pkLastValue = pkFieldsData.Scalars.IntData.Data.LastOrDefault();
processedDuringIterationCount = pkFieldsData.Scalars.IntData.Data.Count;
}

if (parameters.Offset is not null)
// If there are no more items to process, we should break the loop
if(processedDuringIterationCount == 0) yield break;

// Respond with processed data
if (!isUserRequestPkField)
{
request.QueryParams.Add(new Grpc.KeyValuePair
{
Key = Constants.Offset,
Value = parameters.Offset.Value.ToString(CultureInfo.InvariantCulture)
});
// Filter out extra field if user didn't request it
response.FieldsData.Remove(pkFieldsData);
}
yield return ProcessReturnedFieldData(response.FieldsData);

if (parameters.Limit is not null)
{
request.QueryParams.Add(new Grpc.KeyValuePair
processedItemsCount += processedDuringIterationCount;
var leftItemsCount = userLimit - processedItemsCount;

// If user limit is reached, we should break the loop
if(leftItemsCount <= 0) yield break;

// Setup next iteration limit and expression
ReplaceKeyValueItems(
request.QueryParams,
new Grpc.KeyValuePair
{
Key = "limit", Value = parameters.Limit.Value.ToString(CultureInfo.InvariantCulture)
Key = Constants.Limit,
Value = Math.Min(batchSize, leftItemsCount).ToString(CultureInfo.InvariantCulture)
});
}
}

// Note that we send both the consistency level and the guarantee timestamp, although the latter is derived
// from the former and should be sufficient.
if (parameters?.ConsistencyLevel is null)
{
request.UseDefaultConsistency = true;
request.GuaranteeTimestamp = CalculateGuaranteeTimestamp(Name, ConsistencyLevel.Session, userProvidedGuaranteeTimestamp: null);
}
else
{
request.ConsistencyLevel = (Grpc.ConsistencyLevel)parameters.ConsistencyLevel.Value;
request.GuaranteeTimestamp =
CalculateGuaranteeTimestamp(Name, parameters.ConsistencyLevel.Value,
parameters.GuaranteeTimestamp);
}
var nextExpression = pkField.DataType == DataType.VarChar
? $"{pkField.Name} > '{pkLastValue}'"
: $"{pkField.Name} > {pkLastValue}";

QueryResults response =
await _client.InvokeAsync(_client.GrpcClient.QueryAsync, request, static r => r.Status, cancellationToken)
.ConfigureAwait(false);
if (!string.IsNullOrWhiteSpace(userExpression))
{
nextExpression += $" and {userExpression}";
}

return ProcessReturnedFieldData(response.FieldsData);
request.Expr = nextExpression;
}
}

/// <summary>
Expand Down Expand Up @@ -685,4 +768,72 @@ ulong CalculateGuaranteeTimestamp(

return guaranteeTimestamp;
}

private static void ReplaceKeyValueItems(RepeatedField<Grpc.KeyValuePair> collection, params Grpc.KeyValuePair[] pairs)
{
var obsoleteParameterKeys = pairs.Select(x => x.Key).Distinct().ToArray();
var obsoleteParameters = collection.Where(x => obsoleteParameterKeys.Contains(x.Key)).ToArray();
foreach (var field in obsoleteParameters)
{
collection.Remove(field);
}

foreach (var pair in pairs)
{
collection.Add(pair);
}
}

private void PopulateQueryRequestFromParameters(QueryRequest request, QueryParameters? parameters)
{
if (parameters is not null)
{
if (parameters.TimeTravelTimestamp is not null)
{
request.TravelTimestamp = parameters.TimeTravelTimestamp.Value;
}

if (parameters.PartitionNamesInternal?.Count > 0)
{
request.PartitionNames.AddRange(parameters.PartitionNamesInternal);
}

if (parameters.OutputFieldsInternal?.Count > 0)
{
request.OutputFields.AddRange(parameters.OutputFieldsInternal);
}

if (parameters.Offset is not null)
{
request.QueryParams.Add(new Grpc.KeyValuePair
{
Key = Constants.Offset,
Value = parameters.Offset.Value.ToString(CultureInfo.InvariantCulture)
});
}

if (parameters.Limit is not null)
{
request.QueryParams.Add(new Grpc.KeyValuePair
{
Key = Constants.Limit, Value = parameters.Limit.Value.ToString(CultureInfo.InvariantCulture)
});
}
}

// Note that we send both the consistency level and the guarantee timestamp, although the latter is derived
// from the former and should be sufficient.
if (parameters?.ConsistencyLevel is null)
{
request.UseDefaultConsistency = true;
request.GuaranteeTimestamp = CalculateGuaranteeTimestamp(Name, ConsistencyLevel.Session, userProvidedGuaranteeTimestamp: null);
}
else
{
request.ConsistencyLevel = (Grpc.ConsistencyLevel)parameters.ConsistencyLevel.Value;
request.GuaranteeTimestamp =
CalculateGuaranteeTimestamp(Name, parameters.ConsistencyLevel.Value,
parameters.GuaranteeTimestamp);
}
}
}
Loading