diff --git a/Directory.Packages.props b/Directory.Packages.props index 026d961..c707df4 100644 --- a/Directory.Packages.props +++ b/Directory.Packages.props @@ -3,7 +3,7 @@ - + diff --git a/Milvus.Client/MilvusCollection.Entity.cs b/Milvus.Client/MilvusCollection.Entity.cs index 24b9e44..82ebfc8 100644 --- a/Milvus.Client/MilvusCollection.Entity.cs +++ b/Milvus.Client/MilvusCollection.Entity.cs @@ -5,6 +5,7 @@ using System.Runtime.InteropServices; using System.Text.Json; using Google.Protobuf.Collections; +using KeyValuePair = Milvus.Client.Grpc.KeyValuePair; namespace Milvus.Client; @@ -143,7 +144,7 @@ public async Task DeleteAsync( { Verify.NotNullOrWhiteSpace(expression); - var request = new DeleteRequest + DeleteRequest request = new DeleteRequest { CollectionName = Name, Expr = expression, @@ -390,7 +391,7 @@ public Task FlushAsync(CancellationToken cancellationToken = defaul public async Task> GetPersistentSegmentInfosAsync( CancellationToken cancellationToken = default) { - var request = new GetPersistentSegmentInfoRequest { CollectionName = Name }; + GetPersistentSegmentInfoRequest request = new GetPersistentSegmentInfoRequest { CollectionName = Name }; GetPersistentSegmentInfoResponse response = await _client.InvokeAsync( _client.GrpcClient.GetPersistentSegmentInfoAsync, @@ -429,7 +430,7 @@ public async Task> QueryAsync( PopulateQueryRequestFromParameters(request, parameters); - var response = await _client.InvokeAsync( + QueryResults? response = await _client.InvokeAsync( _client.GrpcClient.QueryAsync, request, static r => r.Status, @@ -460,22 +461,22 @@ public async IAsyncEnumerable> QueryWithIteratorAsync( throw new MilvusException("Not support offset when searching iteration"); } - var describeResponse = await _client.InvokeAsync( + DescribeCollectionResponse? describeResponse = await _client.InvokeAsync( _client.GrpcClient.DescribeCollectionAsync, new DescribeCollectionRequest { CollectionName = Name }, r => r.Status, cancellationToken) .ConfigureAwait(false); - var pkField = describeResponse.Schema.Fields.FirstOrDefault(x => x.IsPrimaryKey); + Grpc.FieldSchema? pkField = describeResponse.Schema.Fields.FirstOrDefault(x => x.IsPrimaryKey); if (pkField == null) { throw new MilvusException("Schema must contain pk field"); } - var isUserRequestPkField = parameters?.OutputFieldsInternal?.Contains(pkField.Name) ?? false; - var userExpression = expression; - var userLimit = parameters?.Limit ?? int.MaxValue; + bool isUserRequestPkField = parameters?.OutputFieldsInternal?.Contains(pkField.Name) ?? false; + string? userExpression = expression; + int userLimit = parameters?.Limit ?? int.MaxValue; QueryRequest request = new() { @@ -486,8 +487,10 @@ public async IAsyncEnumerable> QueryWithIteratorAsync( {userExpression: not null} => userExpression, // If user expression is null and pk field is string {pkField.DataType: DataType.VarChar} => $"{pkField.Name} != ''", - // If user expression is null and pk field is not string - _ => $"{pkField.Name} < {long.MaxValue}", + // If user expression is null and pk field is int + {pkField.DataType: DataType.Int8 or DataType.Int16 or DataType.Int32 or DataType.Int64} => $"{pkField.Name} < {long.MaxValue}", + // If user expression is null and pk field is not string and not int + _ => throw new MilvusException("Unsupported data type for primary key field") } }; @@ -497,17 +500,18 @@ public async IAsyncEnumerable> QueryWithIteratorAsync( if (!isUserRequestPkField) request.OutputFields.Add(pkField.Name); // Replace parameters required for iterator + string iterationBatchSize = Math.Min(batchSize, userLimit).ToString(CultureInfo.InvariantCulture); ReplaceKeyValueItems(request.QueryParams, new Grpc.KeyValuePair {Key = Constants.Iterator, Value = "True"}, new Grpc.KeyValuePair {Key = Constants.ReduceStopForBest, Value = "True"}, - new Grpc.KeyValuePair {Key = Constants.BatchSize, Value = batchSize.ToString(CultureInfo.InvariantCulture)}, + new Grpc.KeyValuePair {Key = Constants.BatchSize, Value = iterationBatchSize}, new Grpc.KeyValuePair {Key = Constants.Offset, Value = 0.ToString(CultureInfo.InvariantCulture)}, - new Grpc.KeyValuePair {Key = Constants.Limit, Value = Math.Min(batchSize, userLimit).ToString(CultureInfo.InvariantCulture)}); + new Grpc.KeyValuePair {Key = Constants.Limit, Value = iterationBatchSize}); - var processedItemsCount = 0; + int processedItemsCount = 0; while (!cancellationToken.IsCancellationRequested) { - var response = await _client.InvokeAsync( + QueryResults? response = await _client.InvokeAsync( _client.GrpcClient.QueryAsync, request, static r => r.Status, @@ -516,16 +520,22 @@ public async IAsyncEnumerable> QueryWithIteratorAsync( object? pkLastValue; int processedDuringIterationCount; - var pkFieldsData = response.FieldsData.Single(x => x.FieldId == pkField.FieldID); - if (pkField.DataType == DataType.VarChar) + Grpc.FieldData? pkFieldsData = response.FieldsData.Single(x => x.FieldId == pkField.FieldID); + switch (pkField.DataType) { - pkLastValue = pkFieldsData.Scalars.StringData.Data.LastOrDefault(); - processedDuringIterationCount = pkFieldsData.Scalars.StringData.Data.Count; - } - else - { - pkLastValue = pkFieldsData.Scalars.IntData.Data.LastOrDefault(); - processedDuringIterationCount = pkFieldsData.Scalars.IntData.Data.Count; + case DataType.VarChar: + pkLastValue = pkFieldsData.Scalars.StringData.Data.LastOrDefault(); + processedDuringIterationCount = pkFieldsData.Scalars.StringData.Data.Count; + break; + case DataType.Int8: + case DataType.Int16: + case DataType.Int32: + case DataType.Int64: + pkLastValue = pkFieldsData.Scalars.IntData.Data.LastOrDefault(); + processedDuringIterationCount = pkFieldsData.Scalars.IntData.Data.Count; + break; + default: + throw new MilvusException("Unsupported data type for primary key field"); } // If there are no more items to process, we should break the loop @@ -540,7 +550,7 @@ public async IAsyncEnumerable> QueryWithIteratorAsync( yield return ProcessReturnedFieldData(response.FieldsData); processedItemsCount += processedDuringIterationCount; - var leftItemsCount = userLimit - processedItemsCount; + int leftItemsCount = userLimit - processedItemsCount; // If user limit is reached, we should break the loop if(leftItemsCount <= 0) yield break; @@ -554,7 +564,7 @@ public async IAsyncEnumerable> QueryWithIteratorAsync( Value = Math.Min(batchSize, leftItemsCount).ToString(CultureInfo.InvariantCulture) }); - var nextExpression = pkField.DataType == DataType.VarChar + string nextExpression = pkField.DataType == DataType.VarChar ? $"{pkField.Name} > '{pkLastValue}'" : $"{pkField.Name} > {pkLastValue}"; @@ -577,7 +587,7 @@ public async IAsyncEnumerable> QueryWithIteratorAsync( public async Task> GetQuerySegmentInfoAsync( CancellationToken cancellationToken = default) { - var request = new GetQuerySegmentInfoRequest { CollectionName = Name }; + GetQuerySegmentInfoRequest request = new GetQuerySegmentInfoRequest { CollectionName = Name }; GetQuerySegmentInfoResponse response = await _client.InvokeAsync(_client.GrpcClient.GetQuerySegmentInfoAsync, request, static r => r.Status, @@ -780,14 +790,14 @@ ulong CalculateGuaranteeTimestamp( private static void ReplaceKeyValueItems(RepeatedField collection, params Grpc.KeyValuePair[] pairs) { - var obsoleteParameterKeys = pairs.Select(x => x.Key).Distinct().ToArray(); - var obsoleteParameters = collection.Where(x => obsoleteParameterKeys.Contains(x.Key)).ToArray(); - foreach (var field in obsoleteParameters) + string[] obsoleteParameterKeys = pairs.Select(x => x.Key).Distinct().ToArray(); + KeyValuePair[] obsoleteParameters = collection.Where(x => obsoleteParameterKeys.Contains(x.Key)).ToArray(); + foreach (KeyValuePair field in obsoleteParameters) { collection.Remove(field); } - foreach (var pair in pairs) + foreach (KeyValuePair pair in pairs) { collection.Add(pair); }