|
17 | 17 | * under the License.
|
18 | 18 | */
|
19 | 19 |
|
| 20 | +#include "graphar/high-level/graph_reader.h" |
20 | 21 | #include <algorithm>
|
21 | 22 | #include <unordered_set>
|
22 |
| - |
23 | 23 | #include "arrow/array.h"
|
24 | 24 | #include "graphar/api/arrow_reader.h"
|
25 | 25 | #include "graphar/convert_to_arrow_type.h"
|
26 |
| -#include "graphar/high-level/graph_reader.h" |
27 | 26 | #include "graphar/label.h"
|
28 | 27 | #include "graphar/types.h"
|
29 | 28 |
|
@@ -264,6 +263,69 @@ Result<std::vector<IdType>> VerticesCollection::filter_by_acero(
|
264 | 263 | return indices64;
|
265 | 264 | }
|
266 | 265 |
|
| 266 | +Result<std::vector<IdType>> VerticesCollection::filter( |
| 267 | + std::string property_name, std::shared_ptr<Expression> filter_expression, |
| 268 | + std::vector<IdType>* new_valid_chunk) { |
| 269 | + std::vector<int> indices; |
| 270 | + const int TOT_ROWS_NUM = vertex_num_; |
| 271 | + const int CHUNK_SIZE = vertex_info_->GetChunkSize(); |
| 272 | + int total_count = 0; |
| 273 | + auto property_group = vertex_info_->GetPropertyGroup(property_name); |
| 274 | + auto maybe_filter_reader = graphar::VertexPropertyArrowChunkReader::Make( |
| 275 | + vertex_info_, property_group, prefix_, {}); |
| 276 | + auto filter_reader = maybe_filter_reader.value(); |
| 277 | + filter_reader->Filter(filter_expression); |
| 278 | + std::vector<int64_t> indices64; |
| 279 | + if (is_filtered_) { |
| 280 | + for (int chunk_idx : valid_chunk_) { |
| 281 | + // how to itetate valid_chunk_? |
| 282 | + filter_reader->seek(chunk_idx * CHUNK_SIZE); |
| 283 | + auto filter_result = filter_reader->GetChunk(); |
| 284 | + auto filter_table = filter_result.value(); |
| 285 | + int count = filter_table->num_rows(); |
| 286 | + if (count != 0 && new_valid_chunk != nullptr) { |
| 287 | + new_valid_chunk->emplace_back(static_cast<IdType>(chunk_idx)); |
| 288 | + // TODO(elssky): record indices |
| 289 | + int kVertexIndexCol = filter_table->schema()->GetFieldIndex( |
| 290 | + GeneralParams::kVertexIndexCol); |
| 291 | + auto column_array = filter_table->column(kVertexIndexCol)->chunk(0); |
| 292 | + auto int64_array = |
| 293 | + std::static_pointer_cast<arrow::Int64Array>(column_array); |
| 294 | + for (int64_t i = 0; i < int64_array->length(); ++i) { |
| 295 | + if (!int64_array->IsNull(i)) { |
| 296 | + indices64.push_back(int64_array->Value(i)); |
| 297 | + } |
| 298 | + } |
| 299 | + } |
| 300 | + } |
| 301 | + } else { |
| 302 | + for (int chunk_idx = 0; chunk_idx * CHUNK_SIZE < TOT_ROWS_NUM; |
| 303 | + ++chunk_idx) { |
| 304 | + auto filter_result = filter_reader->GetChunk(); |
| 305 | + auto filter_table = filter_result.value(); |
| 306 | + int count = filter_table->num_rows(); |
| 307 | + filter_reader->next_chunk(); |
| 308 | + total_count += count; |
| 309 | + if (count != 0) { |
| 310 | + valid_chunk_.emplace_back(static_cast<IdType>(chunk_idx)); |
| 311 | + // TODO(elssky): record indices |
| 312 | + int kVertexIndexCol = filter_table->schema()->GetFieldIndex( |
| 313 | + GeneralParams::kVertexIndexCol); |
| 314 | + auto column_array = filter_table->column(kVertexIndexCol)->chunk(0); |
| 315 | + auto int64_array = |
| 316 | + std::static_pointer_cast<arrow::Int64Array>(column_array); |
| 317 | + for (int64_t i = 0; i < int64_array->length(); ++i) { |
| 318 | + if (!int64_array->IsNull(i)) { |
| 319 | + indices64.push_back(int64_array->Value(i)); |
| 320 | + } |
| 321 | + } |
| 322 | + } |
| 323 | + } |
| 324 | + } |
| 325 | + // std::cout << "Total valid count: " << total_count << std::endl; |
| 326 | + return indices64; |
| 327 | +} |
| 328 | + |
267 | 329 | Result<std::shared_ptr<VerticesCollection>>
|
268 | 330 | VerticesCollection::verticesWithLabel(
|
269 | 331 | const std::string& filter_label,
|
@@ -384,6 +446,48 @@ VerticesCollection::verticesWithMultipleLabels(
|
384 | 446 | return new_vertices_collection;
|
385 | 447 | }
|
386 | 448 |
|
| 449 | +Result<std::shared_ptr<VerticesCollection>> |
| 450 | +VerticesCollection::verticesWithProperty( |
| 451 | + const std::string property_name, const graphar::util::Filter filter, |
| 452 | + const std::shared_ptr<GraphInfo>& graph_info, const std::string& type) { |
| 453 | + auto prefix = graph_info->GetPrefix(); |
| 454 | + auto vertex_info = graph_info->GetVertexInfo(type); |
| 455 | + auto vertices_collection = |
| 456 | + std::make_shared<VerticesCollection>(vertex_info, prefix); |
| 457 | + vertices_collection->filtered_ids_ = |
| 458 | + vertices_collection->filter(property_name, filter).value(); |
| 459 | + vertices_collection->is_filtered_ = true; |
| 460 | + return vertices_collection; |
| 461 | +} |
| 462 | + |
| 463 | +Result<std::shared_ptr<VerticesCollection>> |
| 464 | +VerticesCollection::verticesWithProperty( |
| 465 | + const std::string property_name, const graphar::util::Filter filter, |
| 466 | + const std::shared_ptr<VerticesCollection>& vertices_collection) { |
| 467 | + auto new_vertices_collection = std::make_shared<VerticesCollection>( |
| 468 | + vertices_collection->vertex_info_, vertices_collection->prefix_); |
| 469 | + auto filtered_ids = vertices_collection |
| 470 | + ->filter(property_name, filter, |
| 471 | + &new_vertices_collection->valid_chunk_) |
| 472 | + .value(); |
| 473 | + if (vertices_collection->is_filtered_) { |
| 474 | + std::unordered_set<IdType> origin_set( |
| 475 | + vertices_collection->filtered_ids_.begin(), |
| 476 | + vertices_collection->filtered_ids_.end()); |
| 477 | + std::unordered_set<int> intersection; |
| 478 | + for (int num : filtered_ids) { |
| 479 | + if (origin_set.count(num)) { |
| 480 | + intersection.insert(num); |
| 481 | + } |
| 482 | + } |
| 483 | + filtered_ids = |
| 484 | + std::vector<IdType>(intersection.begin(), intersection.end()); |
| 485 | + new_vertices_collection->is_filtered_ = true; |
| 486 | + } |
| 487 | + new_vertices_collection->filtered_ids_ = filtered_ids; |
| 488 | + return new_vertices_collection; |
| 489 | +} |
| 490 | + |
387 | 491 | template <typename T>
|
388 | 492 | Result<T> Vertex::property(const std::string& property) const {
|
389 | 493 | if constexpr (std::is_final<T>::value) {
|
|
0 commit comments