Skip to content

Commit 07af1bd

Browse files
authored
Support search iterator (#409)
* support search iterator Signed-off-by: shanghaikid <jiangruiyi@gmail.com> * upgrade milvus test version Signed-off-by: shanghaikid <jiangruiyi@gmail.com> * fix test Signed-off-by: shanghaikid <jiangruiyi@gmail.com> * change log level Signed-off-by: shanghaikid <jiangruiyi@gmail.com> * fix iterator Signed-off-by: ryjiang <jiangruiyi@gmail.com> * add collection id Signed-off-by: ryjiang <jiangruiyi@gmail.com> * fix test Signed-off-by: ryjiang <jiangruiyi@gmail.com> --------- Signed-off-by: shanghaikid <jiangruiyi@gmail.com> Signed-off-by: ryjiang <jiangruiyi@gmail.com>
1 parent 66ae49a commit 07af1bd

File tree

8 files changed

+319
-557
lines changed

8 files changed

+319
-557
lines changed

milvus/grpc/Data.ts

Lines changed: 105 additions & 178 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,6 @@ import {
3737
SearchRes,
3838
SearchSimpleReq,
3939
SearchIteratorReq,
40-
DEFAULT_TOPK,
4140
HybridSearchReq,
4241
promisify,
4342
sleep,
@@ -56,9 +55,6 @@ import {
5655
DEFAULT_COUNT_QUERY_STRING,
5756
getQueryIteratorExpr,
5857
QueryIteratorReq,
59-
getRangeFromSearchResult,
60-
SearchResultData,
61-
getPKFieldExpr,
6258
DEFAULT_MAX_SEARCH_SIZE,
6359
SparseFloatVector,
6460
sparseRowsToBytes,
@@ -473,6 +469,10 @@ export class Data extends Collection {
473469
* @returns {string} status.error_code - The error code of the operation.
474470
* @returns {string} status.reason - The reason for the error, if any.
475471
* @returns {{score:number,id:string, [outputfield]: value}[]} results - Array of search results.
472+
* @returns {number} session_ts - The timestamp of the search session.
473+
* @returns {string} collection_name - The name of the collection.
474+
* @returns {number} all_search_count - The total number of search operations.
475+
* @returns {string[]} recalls - The recalls of the search operation.
476476
*
477477
* @example
478478
* ```
@@ -533,6 +533,14 @@ export class Data extends Collection {
533533
status: originSearchResult.status,
534534
results: [],
535535
recalls: [],
536+
session_ts: -1,
537+
collection_name: data.collection_name,
538+
search_iterator_v2_results:
539+
originSearchResult.results &&
540+
originSearchResult.results.search_iterator_v2_results,
541+
_search_iterator_v2_results:
542+
originSearchResult.results &&
543+
originSearchResult.results._search_iterator_v2_results,
536544
};
537545
}
538546

@@ -547,183 +555,102 @@ export class Data extends Collection {
547555
// nq === 1, return the first object of results array
548556
results: nq === 1 ? results[0] || [] : results,
549557
recalls: originSearchResult.results.recalls,
558+
session_ts: originSearchResult.session_ts,
559+
collection_name: data.collection_name,
560+
all_search_count: originSearchResult.results.all_search_count,
561+
search_iterator_v2_results:
562+
originSearchResult.results.search_iterator_v2_results,
563+
_search_iterator_v2_results:
564+
originSearchResult.results._search_iterator_v2_results,
550565
};
551566
}
552567

553-
// async searchIterator(data: SearchIteratorReq): Promise<any> {
554-
// // store client
555-
// const client = this;
556-
// // get collection info
557-
// const pkField = await this.getPkField(data);
558-
// // get available count
559-
// const count = await client.count({
560-
// collection_name: data.collection_name,
561-
// expr: data.expr || data.filter || '',
562-
// });
563-
// // make sure limit is not exceed the total count
564-
// const total = data.limit > count.data ? count.data : data.limit;
565-
// // make sure batch size is exceed the total count
566-
// let batchSize = data.batchSize > total ? total : data.batchSize;
567-
// // make sure batch size is not exceed max search size
568-
// batchSize =
569-
// batchSize > DEFAULT_MAX_SEARCH_SIZE ? DEFAULT_MAX_SEARCH_SIZE : batchSize;
570-
571-
// // init expr
572-
// const initExpr = data.expr || data.filter || '';
573-
// // init search params object
574-
// data.params = data.params || {};
575-
// data.limit = batchSize;
576-
577-
// // user range filter set
578-
// const initRadius = Number(data.params.radius) || 0;
579-
// const initRangeFilter = Number(data.params.range_filter) || 0;
580-
// // range params object
581-
// const rangeFilterParams = {
582-
// radius: initRadius,
583-
// rangeFilter: initRangeFilter,
584-
// expr: initExpr,
585-
// };
586-
587-
// // force quite if true, at first, if total is 0, return done
588-
// let done = total === 0;
589-
// // batch result store
590-
// let lastBatchRes: SearchResultData[] = [];
591-
592-
// // build cache
593-
// const cache = await client.search({
594-
// ...data,
595-
// limit: total > DEFAULT_MAX_SEARCH_SIZE ? DEFAULT_MAX_SEARCH_SIZE : total,
596-
// });
597-
598-
// return {
599-
// currentTotal: 0,
600-
// [Symbol.asyncIterator]() {
601-
// return {
602-
// currentTotal: this.currentTotal,
603-
// async next() {
604-
// // check if reach the limit
605-
// if (
606-
// (this.currentTotal >= total && this.currentTotal !== 0) ||
607-
// done
608-
// ) {
609-
// return { done: true, value: lastBatchRes };
610-
// }
611-
612-
// // batch result container
613-
// const batchRes: SearchResultData[] = [];
614-
// const bs =
615-
// this.currentTotal + batchSize > total
616-
// ? total - this.currentTotal
617-
// : batchSize;
618-
619-
// // keep getting search data if not reach the batch size
620-
// while (batchRes.length < bs) {
621-
// // search results container
622-
// let searchResults: SearchResults = {
623-
// status: { error_code: 'SUCCESS', reason: '' },
624-
// results: [],
625-
// };
626-
627-
// // Iterate through the cached data, adding it to the search results container until the batch size is reached.
628-
// if (cache.results.length > 0) {
629-
// while (
630-
// cache.results.length > 0 &&
631-
// searchResults.results.length < bs
632-
// ) {
633-
// searchResults.results.push(cache.results.shift()!);
634-
// }
635-
// } else if (searchResults.results.length < bs) {
636-
// // build search params, overwrite range filter
637-
// if (rangeFilterParams.radius && rangeFilterParams.rangeFilter) {
638-
// data.params = {
639-
// ...data.params,
640-
// radius: rangeFilterParams.radius,
641-
// range_filter:
642-
// rangeFilterParams.rangeFilter,
643-
// };
644-
// }
645-
// // set search expr
646-
// data.expr = rangeFilterParams.expr;
647-
648-
// console.log('search param', data.params, data.expr);
649-
650-
// // iterate search, if no result, double the radius, until we doubled for 5 times
651-
// let newSearchRes = await client.search(data);
652-
// let retry = 0;
653-
// while (newSearchRes.results.length === 0 && retry < 5) {
654-
// newSearchRes = await client.search(data);
655-
// if (searchResults.results.length === 0) {
656-
// const newRadius = rangeFilterParams.radius * 2;
657-
658-
// data.params = {
659-
// ...data.params,
660-
// radius: newRadius,
661-
// };
662-
// }
663-
664-
// retry++;
665-
// }
666-
667-
// // combine search results
668-
// searchResults.results = [
669-
// ...searchResults.results,
670-
// ...newSearchRes.results,
671-
// ];
672-
// }
673-
674-
// console.log('return', searchResults.results);
675-
676-
// // filter result, batchRes should be unique
677-
// const filterResult = searchResults.results.filter(
678-
// r =>
679-
// !lastBatchRes.some(l => l.id === r.id) &&
680-
// !batchRes.some(c => c.id === r.id)
681-
// );
682-
683-
// // fill filter result to batch result, it should not exceed the batch size
684-
// for (let i = 0; i < filterResult.length; i++) {
685-
// if (batchRes.length < bs) {
686-
// batchRes.push(filterResult[i]);
687-
// }
688-
// }
689-
690-
// // get data range about last batch result
691-
// const resultRange = getRangeFromSearchResult(filterResult);
692-
693-
// console.log('result range', resultRange);
694-
695-
// // if no more result, force quite
696-
// if (resultRange.lastDistance === 0) {
697-
// done = true;
698-
// return { done: false, value: batchRes };
699-
// }
700-
701-
// // update next range and expr
702-
// rangeFilterParams.rangeFilter = resultRange.lastDistance;
703-
// rangeFilterParams.radius =
704-
// rangeFilterParams.radius + resultRange.radius;
705-
// rangeFilterParams.expr = getPKFieldExpr({
706-
// pkField,
707-
// value: resultRange.id as string,
708-
// expr: initExpr,
709-
// });
710-
711-
// console.log('last', rangeFilterParams);
712-
// }
713-
714-
// // store last result
715-
// lastBatchRes = batchRes;
716-
717-
// // update current total
718-
// this.currentTotal += batchRes.length;
719-
720-
// // return batch result
721-
// return { done: false, value: batchRes };
722-
// },
723-
// };
724-
// },
725-
// };
726-
// }
568+
async searchIterator(data: SearchIteratorReq): Promise<any> {
569+
const client = this;
570+
571+
// Get available count
572+
const count = await client.count({
573+
collection_name: data.collection_name,
574+
expr: data.expr || data.filter || '',
575+
});
576+
577+
// get collection Info
578+
const collectionInfo = await this.describeCollection({
579+
collection_name: data.collection_name,
580+
});
581+
582+
// if limit not set, set it to count
583+
if (!data.limit || data.limit === NO_LIMIT) {
584+
data.limit = count.data;
585+
}
586+
587+
// Ensure limit does not exceed the total count
588+
const total = Math.min(data.limit, count.data);
589+
590+
// Ensure batch size does not exceed the total count or max search size
591+
let batchSize = Math.min(data.batchSize, total, DEFAULT_MAX_SEARCH_SIZE);
592+
593+
// Iterator fields
594+
const ITERATOR_FIELD = 'iterator';
595+
const ITER_SEARCH_V2_KEY = 'search_iter_v2';
596+
const ITER_SEARCH_ID_KEY = 'search_iter_id';
597+
const ITER_SEARCH_BATCH_SIZE_KEY = 'search_iter_batch_size';
598+
const ITER_SEARCH_LAST_BOUND_KEY = 'search_iter_last_bound';
599+
const GUARANTEE_TIMESTAMP_KEY = 'guarantee_timestamp';
600+
const COLLECTION_ID = 'collection_id';
601+
602+
let currentTotal = 0;
603+
604+
// search iterator special params
605+
const params: any = {
606+
...data.params,
607+
[ITERATOR_FIELD]: true,
608+
[ITER_SEARCH_V2_KEY]: true,
609+
[ITER_SEARCH_BATCH_SIZE_KEY]: batchSize,
610+
[GUARANTEE_TIMESTAMP_KEY]: 0,
611+
[COLLECTION_ID]: collectionInfo.collectionID,
612+
};
613+
614+
return {
615+
[Symbol.asyncIterator]() {
616+
return {
617+
async next() {
618+
if (currentTotal >= total) {
619+
return { done: true, value: null };
620+
}
621+
622+
try {
623+
const batchRes = await client.search({
624+
...data,
625+
params,
626+
limit: batchSize,
627+
});
628+
629+
// update current total and batch size
630+
currentTotal += batchRes.results.length;
631+
batchSize = Math.min(batchSize, total - currentTotal);
632+
633+
// update search params
634+
params[ITER_SEARCH_ID_KEY] =
635+
batchRes.search_iterator_v2_results!.token;
636+
params[ITER_SEARCH_LAST_BOUND_KEY] =
637+
batchRes.search_iterator_v2_results?.last_bound;
638+
params[GUARANTEE_TIMESTAMP_KEY] = batchRes.session_ts;
639+
params[ITER_SEARCH_BATCH_SIZE_KEY] = batchSize;
640+
641+
return {
642+
done: currentTotal > total || !batchRes.results.length,
643+
value: batchRes.results,
644+
};
645+
} catch (error) {
646+
console.error('Error during search iteration:', error);
647+
return { done: true, value: null };
648+
}
649+
},
650+
};
651+
},
652+
};
653+
}
727654

728655
/**
729656
* Executes a query and returns an async iterator that allows iterating over the results in batches.

0 commit comments

Comments
 (0)