Skip to content

Commit

Permalink
Comments fix from previous PR
Browse files Browse the repository at this point in the history
Updated System.Text.Json package because of compile errors (warning as errors)
  • Loading branch information
BlackGad committed Jul 13, 2024
1 parent 30199ee commit 29dd6a2
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 31 deletions.
2 changes: 1 addition & 1 deletion Directory.Packages.props
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
<PackageVersion Include="Google.Protobuf" Version="3.25.3" />
<PackageVersion Include="Grpc.Net.Client" Version="2.61.0" />
<PackageVersion Include="Grpc.Tools" Version="2.62.0" />
<PackageVersion Include="System.Text.Json" Version="8.0.2" />
<PackageVersion Include="System.Text.Json" Version="8.0.4" />
<PackageVersion Include="Microsoft.Bcl.HashCode" Version="1.1.1" />
<!-- Test -->
<PackageVersion Include="Microsoft.NET.Test.Sdk" Version="17.9.0" />
Expand Down
70 changes: 40 additions & 30 deletions Milvus.Client/MilvusCollection.Entity.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using System.Runtime.InteropServices;
using System.Text.Json;
using Google.Protobuf.Collections;
using KeyValuePair = Milvus.Client.Grpc.KeyValuePair;

namespace Milvus.Client;

Expand Down Expand Up @@ -143,7 +144,7 @@ public async Task<MutationResult> DeleteAsync(
{
Verify.NotNullOrWhiteSpace(expression);

var request = new DeleteRequest
DeleteRequest request = new DeleteRequest
{
CollectionName = Name,
Expr = expression,
Expand Down Expand Up @@ -390,7 +391,7 @@ public Task<FlushResult> FlushAsync(CancellationToken cancellationToken = defaul
public async Task<IReadOnlyList<PersistentSegmentInfo>> GetPersistentSegmentInfosAsync(
CancellationToken cancellationToken = default)
{
var request = new GetPersistentSegmentInfoRequest { CollectionName = Name };
GetPersistentSegmentInfoRequest request = new GetPersistentSegmentInfoRequest { CollectionName = Name };

GetPersistentSegmentInfoResponse response = await _client.InvokeAsync(
_client.GrpcClient.GetPersistentSegmentInfoAsync,
Expand Down Expand Up @@ -429,7 +430,7 @@ public async Task<IReadOnlyList<FieldData>> QueryAsync(

PopulateQueryRequestFromParameters(request, parameters);

var response = await _client.InvokeAsync(
QueryResults? response = await _client.InvokeAsync(
_client.GrpcClient.QueryAsync,
request,
static r => r.Status,
Expand Down Expand Up @@ -460,22 +461,22 @@ public async IAsyncEnumerable<IReadOnlyList<FieldData>> QueryWithIteratorAsync(
throw new MilvusException("Not support offset when searching iteration");
}

var describeResponse = await _client.InvokeAsync(
DescribeCollectionResponse? describeResponse = await _client.InvokeAsync(
_client.GrpcClient.DescribeCollectionAsync,
new DescribeCollectionRequest { CollectionName = Name },
r => r.Status,
cancellationToken)
.ConfigureAwait(false);

var pkField = describeResponse.Schema.Fields.FirstOrDefault(x => x.IsPrimaryKey);
Grpc.FieldSchema? pkField = describeResponse.Schema.Fields.FirstOrDefault(x => x.IsPrimaryKey);
if (pkField == null)
{
throw new MilvusException("Schema must contain pk field");
}

var isUserRequestPkField = parameters?.OutputFieldsInternal?.Contains(pkField.Name) ?? false;
var userExpression = expression;
var userLimit = parameters?.Limit ?? int.MaxValue;
bool isUserRequestPkField = parameters?.OutputFieldsInternal?.Contains(pkField.Name) ?? false;
string? userExpression = expression;
int userLimit = parameters?.Limit ?? int.MaxValue;

QueryRequest request = new()
{
Expand All @@ -486,8 +487,10 @@ public async IAsyncEnumerable<IReadOnlyList<FieldData>> QueryWithIteratorAsync(
{userExpression: not null} => userExpression,
// If user expression is null and pk field is string
{pkField.DataType: DataType.VarChar} => $"{pkField.Name} != ''",
// If user expression is null and pk field is not string
_ => $"{pkField.Name} < {long.MaxValue}",
// If user expression is null and pk field is int
{pkField.DataType: DataType.Int8 or DataType.Int16 or DataType.Int32 or DataType.Int64} => $"{pkField.Name} < {long.MaxValue}",
// If user expression is null and pk field is not string and not int
_ => throw new MilvusException("Unsupported data type for primary key field")
}
};

Expand All @@ -497,17 +500,18 @@ public async IAsyncEnumerable<IReadOnlyList<FieldData>> QueryWithIteratorAsync(
if (!isUserRequestPkField) request.OutputFields.Add(pkField.Name);

// Replace parameters required for iterator
string iterationBatchSize = Math.Min(batchSize, userLimit).ToString(CultureInfo.InvariantCulture);
ReplaceKeyValueItems(request.QueryParams,
new Grpc.KeyValuePair {Key = Constants.Iterator, Value = "True"},
new Grpc.KeyValuePair {Key = Constants.ReduceStopForBest, Value = "True"},
new Grpc.KeyValuePair {Key = Constants.BatchSize, Value = batchSize.ToString(CultureInfo.InvariantCulture)},
new Grpc.KeyValuePair {Key = Constants.BatchSize, Value = iterationBatchSize},
new Grpc.KeyValuePair {Key = Constants.Offset, Value = 0.ToString(CultureInfo.InvariantCulture)},
new Grpc.KeyValuePair {Key = Constants.Limit, Value = Math.Min(batchSize, userLimit).ToString(CultureInfo.InvariantCulture)});
new Grpc.KeyValuePair {Key = Constants.Limit, Value = iterationBatchSize});

var processedItemsCount = 0;
int processedItemsCount = 0;
while (!cancellationToken.IsCancellationRequested)
{
var response = await _client.InvokeAsync(
QueryResults? response = await _client.InvokeAsync(
_client.GrpcClient.QueryAsync,
request,
static r => r.Status,
Expand All @@ -516,16 +520,22 @@ public async IAsyncEnumerable<IReadOnlyList<FieldData>> QueryWithIteratorAsync(

object? pkLastValue;
int processedDuringIterationCount;
var pkFieldsData = response.FieldsData.Single(x => x.FieldId == pkField.FieldID);
if (pkField.DataType == DataType.VarChar)
Grpc.FieldData? pkFieldsData = response.FieldsData.Single(x => x.FieldId == pkField.FieldID);
switch (pkField.DataType)
{
pkLastValue = pkFieldsData.Scalars.StringData.Data.LastOrDefault();
processedDuringIterationCount = pkFieldsData.Scalars.StringData.Data.Count;
}
else
{
pkLastValue = pkFieldsData.Scalars.IntData.Data.LastOrDefault();
processedDuringIterationCount = pkFieldsData.Scalars.IntData.Data.Count;
case DataType.VarChar:
pkLastValue = pkFieldsData.Scalars.StringData.Data.LastOrDefault();
processedDuringIterationCount = pkFieldsData.Scalars.StringData.Data.Count;
break;
case DataType.Int8:
case DataType.Int16:
case DataType.Int32:
case DataType.Int64:
pkLastValue = pkFieldsData.Scalars.IntData.Data.LastOrDefault();
processedDuringIterationCount = pkFieldsData.Scalars.IntData.Data.Count;
break;
default:
throw new MilvusException("Unsupported data type for primary key field");
}

// If there are no more items to process, we should break the loop
Expand All @@ -540,7 +550,7 @@ public async IAsyncEnumerable<IReadOnlyList<FieldData>> QueryWithIteratorAsync(
yield return ProcessReturnedFieldData(response.FieldsData);

processedItemsCount += processedDuringIterationCount;
var leftItemsCount = userLimit - processedItemsCount;
int leftItemsCount = userLimit - processedItemsCount;

// If user limit is reached, we should break the loop
if(leftItemsCount <= 0) yield break;
Expand All @@ -554,7 +564,7 @@ public async IAsyncEnumerable<IReadOnlyList<FieldData>> QueryWithIteratorAsync(
Value = Math.Min(batchSize, leftItemsCount).ToString(CultureInfo.InvariantCulture)
});

var nextExpression = pkField.DataType == DataType.VarChar
string nextExpression = pkField.DataType == DataType.VarChar
? $"{pkField.Name} > '{pkLastValue}'"
: $"{pkField.Name} > {pkLastValue}";

Expand All @@ -577,7 +587,7 @@ public async IAsyncEnumerable<IReadOnlyList<FieldData>> QueryWithIteratorAsync(
public async Task<IReadOnlyList<QuerySegmentInfoResult>> GetQuerySegmentInfoAsync(
CancellationToken cancellationToken = default)
{
var request = new GetQuerySegmentInfoRequest { CollectionName = Name };
GetQuerySegmentInfoRequest request = new GetQuerySegmentInfoRequest { CollectionName = Name };

GetQuerySegmentInfoResponse response =
await _client.InvokeAsync(_client.GrpcClient.GetQuerySegmentInfoAsync, request, static r => r.Status,
Expand Down Expand Up @@ -780,14 +790,14 @@ ulong CalculateGuaranteeTimestamp(

private static void ReplaceKeyValueItems(RepeatedField<Grpc.KeyValuePair> collection, params Grpc.KeyValuePair[] pairs)
{
var obsoleteParameterKeys = pairs.Select(x => x.Key).Distinct().ToArray();
var obsoleteParameters = collection.Where(x => obsoleteParameterKeys.Contains(x.Key)).ToArray();
foreach (var field in obsoleteParameters)
string[] obsoleteParameterKeys = pairs.Select(x => x.Key).Distinct().ToArray();
KeyValuePair[] obsoleteParameters = collection.Where(x => obsoleteParameterKeys.Contains(x.Key)).ToArray();
foreach (KeyValuePair field in obsoleteParameters)
{
collection.Remove(field);
}

foreach (var pair in pairs)
foreach (KeyValuePair pair in pairs)
{
collection.Add(pair);
}
Expand Down

0 comments on commit 29dd6a2

Please sign in to comment.