diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index e94e1f8ba..1e3868edb 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -36,10 +36,10 @@ jobs: -P /tmp/ sudo apt-get install -y /tmp/apache-arrow-apt-source-latest-"$(lsb_release --codename --short)".deb sudo apt-get update -y - sudo apt install -y libarrow-dev=14.0.0-1 \ - libarrow-dataset-dev=14.0.0-1 \ - libarrow-acero-dev=14.0.0-1 \ - libparquet-dev=14.0.0-1 + sudo apt install -y libarrow-dev=14.0.1-1 \ + libarrow-dataset-dev=14.0.1-1 \ + libarrow-acero-dev=14.0.1-1 \ + libparquet-dev=14.0.1-1 sudo apt-get install -y libboost-graph-dev ccache libcurl4-openssl-dev - name: CMake diff --git a/README.rst b/README.rst index c887b02f6..dd3f0d880 100644 --- a/README.rst +++ b/README.rst @@ -98,6 +98,8 @@ Take the "person" vertex table as an example, if the chunk size is set to be 500 :align: center :alt: vertex physical table +**Note**: For efficiently utilize the filter push-down of the payload file format like Parquet, the internal vertex id is stored in the payload file as a column. And since the internal vertex id is continuous, the payload file format can use the delta encoding for the internal vertex id column, which would not bring too much overhead for the storage. + Edges in GraphAr ^^^^^^^^^^^^^^^^ diff --git a/cpp/examples/mid_level_reader_example.cc b/cpp/examples/mid_level_reader_example.cc index e92c2f026..3dd875ad2 100644 --- a/cpp/examples/mid_level_reader_example.cc +++ b/cpp/examples/mid_level_reader_example.cc @@ -20,6 +20,7 @@ limitations under the License. #include "./config.h" #include "gar/reader/arrow_chunk_reader.h" #include "gar/util/expression.h" +#include "gar/util/general_params.h" void vertex_property_chunk_reader(const GAR_NAMESPACE::GraphInfo& graph_info) { // constuct reader @@ -36,29 +37,37 @@ void vertex_property_chunk_reader(const GAR_NAMESPACE::GraphInfo& graph_info) { // use reader auto result = reader.GetChunk(); ASSERT(!result.has_error()); - auto range = reader.GetRange().value(); std::cout << "chunk number: " << reader.GetChunkNum() << std::endl; - std::cout << "range of fisrt vertex property chunk: " << range.first << " " - << range.second << std::endl; auto table = result.value(); std::cout << "rows number of first vertex property chunk: " << table->num_rows() << std::endl; std::cout << "schema of first vertex property chunk: " << std::endl << table->schema()->ToString() << std::endl; + auto index_col = + table->GetColumnByName(GAR_NAMESPACE::GeneralParams::kVertexIndexCol); + ASSERT(index_col != nullptr); + std::cout << "Internal id column: " << index_col->ToString() << " " + << std::endl; // seek vertex id ASSERT(reader.seek(100).ok()); result = reader.GetChunk(); ASSERT(!result.has_error()); - range = reader.GetRange().value(); - std::cout << "range of vertex property chunk for vertex id 100: " - << range.first << " " << range.second << std::endl; + table = result.value(); + index_col = + table->GetColumnByName(GAR_NAMESPACE::GeneralParams::kVertexIndexCol); + ASSERT(index_col != nullptr); + std::cout << "Internal id column of vertex property chunk for vertex id 100: " + << index_col->ToString() << " " << std::endl; // next chunk ASSERT(reader.next_chunk().ok()); result = reader.GetChunk(); ASSERT(!result.has_error()); - range = reader.GetRange().value(); - std::cout << "range of next vertex property chunk: " << range.first << " " - << range.second << std::endl; + table = result.value(); + index_col = + table->GetColumnByName(GAR_NAMESPACE::GeneralParams::kVertexIndexCol); + ASSERT(index_col != nullptr); + std::cout << "Internal id column of next chunk: " << index_col->ToString() + << " " << std::endl; // reader with filter pushdown auto filter = GAR_NAMESPACE::_Equal(GAR_NAMESPACE::_Property("gender"), diff --git a/cpp/include/gar/reader/arrow_chunk_reader.h b/cpp/include/gar/reader/arrow_chunk_reader.h index f221b21ba..ab88f233f 100644 --- a/cpp/include/gar/reader/arrow_chunk_reader.h +++ b/cpp/include/gar/reader/arrow_chunk_reader.h @@ -100,13 +100,6 @@ class VertexPropertyArrowChunkReader { */ Result> GetChunk() noexcept; - /** - * @brief Get the vertex id range of current chunk. - * - * @return Result: std::pair or error. - */ - Result> GetRange() noexcept; - /** * @brief Sets chunk position indicator to next chunk. * diff --git a/cpp/include/gar/writer/arrow_chunk_writer.h b/cpp/include/gar/writer/arrow_chunk_writer.h index 8576b91f1..3b9cd905c 100644 --- a/cpp/include/gar/writer/arrow_chunk_writer.h +++ b/cpp/include/gar/writer/arrow_chunk_writer.h @@ -227,6 +227,10 @@ class VertexPropertyWriter { ValidateLevel validate_level) const noexcept; private: + Result> addIndexColumn( + const std::shared_ptr& table, IdType chunk_index, + IdType chunk_size) const noexcept; + VertexInfo vertex_info_; std::string prefix_; std::shared_ptr fs_; diff --git a/cpp/src/arrow_chunk_reader.cc b/cpp/src/arrow_chunk_reader.cc index 1ecb214bf..2434fe541 100644 --- a/cpp/src/arrow_chunk_reader.cc +++ b/cpp/src/arrow_chunk_reader.cc @@ -36,22 +36,6 @@ VertexPropertyArrowChunkReader::GetChunk() noexcept { return chunk_table_->Slice(row_offset); } -Result> -VertexPropertyArrowChunkReader::GetRange() noexcept { - if (chunk_table_ == nullptr) { - return Status::Invalid( - "The chunk table is not initialized, please call " - "GetChunk() first."); - } - const auto chunk_size = vertex_info_.GetChunkSize(); - IdType row_offset = seek_id_ - chunk_index_ * chunk_size; - bool is_last_chunk = (chunk_index_ == chunk_num_ - 1); - const auto curr_chunk_size = - is_last_chunk ? (vertex_num_ - chunk_index_ * chunk_size) : chunk_size; - - return std::make_pair(seek_id_, seek_id_ + curr_chunk_size - row_offset); -} - void VertexPropertyArrowChunkReader::Filter(util::Filter filter) { filter_options_.filter = filter; } diff --git a/cpp/src/arrow_chunk_writer.cc b/cpp/src/arrow_chunk_writer.cc index 1c6bd1f8a..d6061c3ed 100644 --- a/cpp/src/arrow_chunk_writer.cc +++ b/cpp/src/arrow_chunk_writer.cc @@ -198,10 +198,15 @@ Status VertexPropertyWriter::WriteChunk( GAR_RETURN_NOT_OK( validate(input_table, property_group, chunk_index, validate_level)); auto file_type = property_group.GetFileType(); - - std::vector indices; - indices.clear(); auto schema = input_table->schema(); + int indice = schema->GetFieldIndex(GeneralParams::kVertexIndexCol); + if (indice == -1) { + return Status::Invalid("The internal id Column named ", + GeneralParams::kVertexIndexCol, + " does not exist in the input table."); + } + + std::vector indices({indice}); for (auto& property : property_group.GetProperties()) { int indice = schema->GetFieldIndex(property.name); if (indice == -1) { @@ -251,13 +256,37 @@ Status VertexPropertyWriter::WriteTable( const std::shared_ptr& input_table, IdType start_chunk_index, ValidateLevel validate_level) const noexcept { auto property_groups = vertex_info_.GetPropertyGroups(); + GAR_ASSIGN_OR_RAISE(auto table_with_index, + addIndexColumn(input_table, start_chunk_index, + vertex_info_.GetChunkSize())); for (auto& property_group : property_groups) { - GAR_RETURN_NOT_OK(WriteTable(input_table, property_group, start_chunk_index, - validate_level)); + GAR_RETURN_NOT_OK(WriteTable(table_with_index, property_group, + start_chunk_index, validate_level)); } return Status::OK(); } +Result> VertexPropertyWriter::addIndexColumn( + const std::shared_ptr& table, IdType chunk_index, + IdType chunk_size) const noexcept { + arrow::Int64Builder array_builder; + RETURN_NOT_ARROW_OK(array_builder.Reserve(chunk_size)); + int64_t length = table->num_rows(); + for (IdType i = 0; i < length; i++) { + RETURN_NOT_ARROW_OK(array_builder.Append(chunk_index * chunk_size + i)); + } + std::shared_ptr array; + RETURN_NOT_ARROW_OK(array_builder.Finish(&array)); + std::shared_ptr chunked_array = + std::make_shared(array); + GAR_RETURN_ON_ARROW_ERROR_AND_ASSIGN( + auto ret, table->AddColumn(0, + arrow::field(GeneralParams::kVertexIndexCol, + arrow::int64(), false), + chunked_array)); + return ret; +} + // implementations for EdgeChunkWriter // Check if the operation of writing number or copying a file is allowed. diff --git a/cpp/test/test_arrow_chunk_reader.cc b/cpp/test/test_arrow_chunk_reader.cc index b47ef773e..67b0ff388 100644 --- a/cpp/test/test_arrow_chunk_reader.cc +++ b/cpp/test/test_arrow_chunk_reader.cc @@ -20,6 +20,7 @@ limitations under the License. #include "./util.h" #include "gar/reader/arrow_chunk_reader.h" #include "gar/util/expression.h" +#include "gar/util/general_params.h" #define CATCH_CONFIG_MAIN #include @@ -53,37 +54,33 @@ TEST_CASE("test_vertex_property_arrow_chunk_reader") { auto reader = maybe_reader.value(); auto result = reader.GetChunk(); REQUIRE(!result.has_error()); - auto range = reader.GetRange().value(); auto table = result.value(); REQUIRE(table->num_rows() == 100); - REQUIRE(range.first == 0); - REQUIRE(range.second == 100); + REQUIRE(table->GetColumnByName( + GAR_NAMESPACE::GeneralParams::kVertexIndexCol) != nullptr); // seek REQUIRE(reader.seek(100).ok()); result = reader.GetChunk(); REQUIRE(!result.has_error()); - range = reader.GetRange().value(); table = result.value(); REQUIRE(table->num_rows() == 100); - REQUIRE(range.first == 100); - REQUIRE(range.second == 200); + REQUIRE(table->GetColumnByName( + GAR_NAMESPACE::GeneralParams::kVertexIndexCol) != nullptr); REQUIRE(reader.next_chunk().ok()); result = reader.GetChunk(); REQUIRE(!result.has_error()); - range = reader.GetRange().value(); table = result.value(); REQUIRE(table->num_rows() == 100); - REQUIRE(range.first == 200); - REQUIRE(range.second == 300); + REQUIRE(table->GetColumnByName( + GAR_NAMESPACE::GeneralParams::kVertexIndexCol) != nullptr); REQUIRE(reader.seek(900).ok()); result = reader.GetChunk(); REQUIRE(!result.has_error()); - range = reader.GetRange().value(); table = result.value(); REQUIRE(table->num_rows() == 3); - REQUIRE(range.first == 900); - REQUIRE(range.second == 903); + REQUIRE(table->GetColumnByName( + GAR_NAMESPACE::GeneralParams::kVertexIndexCol) != nullptr); REQUIRE(reader.GetChunkNum() == 10); REQUIRE(reader.next_chunk().IsIndexError()); @@ -119,10 +116,7 @@ TEST_CASE("test_vertex_property_pushdown") { auto result = reader.GetChunk(); REQUIRE(!result.has_error()); table = result.value(); - auto [start, end] = reader.GetRange().value(); - std::cout << "Chunk: " << idx << ",\tNums: " << table->num_rows() << "/" - << chunk_size << ",\tRange: (" << start << ", " << end << "]" - << '\n'; + std::cout << "Chunk: " << idx << ",\tNums: " << table->num_rows() << '\n'; idx++; sum += table->num_rows(); } while (!reader.next_chunk().IsIndexError()); diff --git a/docs/images/vertex_physical_table.png b/docs/images/vertex_physical_table.png index 15ff249b3..2794c5938 100644 Binary files a/docs/images/vertex_physical_table.png and b/docs/images/vertex_physical_table.png differ diff --git a/docs/user-guide/file-format.rst b/docs/user-guide/file-format.rst index 86f2ad55a..e4ccf770b 100644 --- a/docs/user-guide/file-format.rst +++ b/docs/user-guide/file-format.rst @@ -57,6 +57,8 @@ Take the "person" vertex table as an example, if the chunk size is set to be 500 :alt: vertex physical table +**Note**: For efficiently utilize the filter push-down of the payload file format like Parquet, the internal vertex id is stored in the payload file as a column. And since the internal vertex id is continuous, the payload file format can use the delta encoding for the internal vertex id column, which would not bring too much overhead for the storage. + Edges in GraphAr ------------------------ diff --git a/spark/src/main/scala/com/alibaba/graphar/example/GraphAr2Nebula.scala b/spark/src/main/scala/com/alibaba/graphar/example/GraphAr2Nebula.scala index 33b6c70d5..9830ce5d6 100644 --- a/spark/src/main/scala/com/alibaba/graphar/example/GraphAr2Nebula.scala +++ b/spark/src/main/scala/com/alibaba/graphar/example/GraphAr2Nebula.scala @@ -56,7 +56,7 @@ object GraphAr2Nebula { // The edge data need to convert src and dst to the vertex id, // so we need to read the vertex data with index column. val graphData = - GraphReader.read(graphInfoPath, spark, addVertexIndex = true) + GraphReader.read(graphInfoPath, spark) val vertexData = graphData._1 val edgeData = graphData._2 putVertexDataIntoNebula(graphInfo, vertexData) diff --git a/spark/src/main/scala/com/alibaba/graphar/example/GraphAr2Neo4j.scala b/spark/src/main/scala/com/alibaba/graphar/example/GraphAr2Neo4j.scala index 8e21fa6cc..c1a82eeeb 100644 --- a/spark/src/main/scala/com/alibaba/graphar/example/GraphAr2Neo4j.scala +++ b/spark/src/main/scala/com/alibaba/graphar/example/GraphAr2Neo4j.scala @@ -46,9 +46,7 @@ object GraphAr2Neo4j { val graphInfoPath: String = args(0) val graphInfo = GraphInfo.loadGraphInfo(graphInfoPath, spark) - // The edge data need to convert src and dst to the vertex id , so we need to read - // the vertex data with index column. - val graphData = GraphReader.read(graphInfoPath, spark, true) + val graphData = GraphReader.read(graphInfoPath, spark) val vertexData = graphData._1 val edgeData = graphData._2 diff --git a/spark/src/main/scala/com/alibaba/graphar/graph/GraphReader.scala b/spark/src/main/scala/com/alibaba/graphar/graph/GraphReader.scala index d10e1a181..eda464bc8 100644 --- a/spark/src/main/scala/com/alibaba/graphar/graph/GraphReader.scala +++ b/spark/src/main/scala/com/alibaba/graphar/graph/GraphReader.scala @@ -35,21 +35,18 @@ object GraphReader { * The map of (vertex label -> VertexInfo) for the graph. * @param spark * The Spark session for the reading. - * @param addIndex - * Whether to add index column for the DataFrame. * @return * The map of (vertex label -> DataFrame) */ private def readAllVertices( prefix: String, vertexInfos: Map[String, VertexInfo], - spark: SparkSession, - addIndex: Boolean = false + spark: SparkSession ): Map[String, DataFrame] = { val vertex_dataframes: Map[String, DataFrame] = vertexInfos.map { case (label, vertexInfo) => { val reader = new VertexReader(prefix, vertexInfo, spark) - (label, reader.readAllVertexPropertyGroups(addIndex)) + (label, reader.readAllVertexPropertyGroups()) } } return vertex_dataframes @@ -109,8 +106,6 @@ object GraphReader { * The info object for the graph. * @param spark * The Spark session for the loading. - * @param addVertexIndex - * Whether to add index for the vertex DataFrames. * @return * Pair of vertex DataFrames and edge DataFrames, the vertex DataFrames are * stored as the map of (vertex_label -> DataFrame) the edge DataFrames are @@ -119,8 +114,7 @@ object GraphReader { */ def readWithGraphInfo( graphInfo: GraphInfo, - spark: SparkSession, - addVertexIndex: Boolean = false + spark: SparkSession ): Pair[Map[String, DataFrame], Map[ (String, String, String), Map[String, DataFrame] @@ -129,7 +123,7 @@ object GraphReader { val vertex_infos = graphInfo.getVertexInfos() val edge_infos = graphInfo.getEdgeInfos() return ( - readAllVertices(prefix, vertex_infos, spark, addVertexIndex), + readAllVertices(prefix, vertex_infos, spark), readAllEdges(prefix, edge_infos, spark) ) } @@ -142,8 +136,6 @@ object GraphReader { * The path of the graph info yaml. * @param spark * The Spark session for the loading. - * @param addVertexIndex - * Whether to add index for the vertex DataFrames. * @return * Pair of vertex DataFrames and edge DataFrames, the vertex DataFrames are * stored as the map of (vertex_label -> DataFrame) the edge DataFrames are @@ -152,8 +144,7 @@ object GraphReader { */ def read( graphInfoPath: String, - spark: SparkSession, - addVertexIndex: Boolean = false + spark: SparkSession ): Pair[Map[String, DataFrame], Map[ (String, String, String), Map[String, DataFrame] @@ -162,6 +153,6 @@ object GraphReader { val graph_info = GraphInfo.loadGraphInfo(graphInfoPath, spark) // conduct reading - readWithGraphInfo(graph_info, spark, addVertexIndex) + readWithGraphInfo(graph_info, spark) } } diff --git a/spark/src/main/scala/com/alibaba/graphar/graph/GraphTransformer.scala b/spark/src/main/scala/com/alibaba/graphar/graph/GraphTransformer.scala index f7d814922..cb360117b 100644 --- a/spark/src/main/scala/com/alibaba/graphar/graph/GraphTransformer.scala +++ b/spark/src/main/scala/com/alibaba/graphar/graph/GraphTransformer.scala @@ -66,7 +66,7 @@ object GraphTransformer { val source_vertex_info = sourceVertexInfosMap(label) // read vertex chunks from the source graph val reader = new VertexReader(source_prefix, source_vertex_info, spark) - val df = reader.readAllVertexPropertyGroups(true) + val df = reader.readAllVertexPropertyGroups() // write vertex chunks for the dest graph val writer = new VertexWriter(dest_prefix, dest_vertex_info, df) writer.writeVertexProperties() diff --git a/spark/src/main/scala/com/alibaba/graphar/reader/VertexReader.scala b/spark/src/main/scala/com/alibaba/graphar/reader/VertexReader.scala index 5df22355b..38939cd66 100644 --- a/spark/src/main/scala/com/alibaba/graphar/reader/VertexReader.scala +++ b/spark/src/main/scala/com/alibaba/graphar/reader/VertexReader.scala @@ -17,7 +17,7 @@ package com.alibaba.graphar.reader import com.alibaba.graphar.util.{IndexGenerator, DataFrameConcat} -import com.alibaba.graphar.{VertexInfo, PropertyGroup} +import com.alibaba.graphar.{VertexInfo, PropertyGroup, GeneralParams} import com.alibaba.graphar.util.FileSystem import org.apache.spark.sql.{DataFrame, SparkSession} @@ -84,15 +84,12 @@ class VertexReader( * * @param propertyGroup * property group. - * @param addIndex - * flag that add vertex index column or not in the final DataFrame. * @return * DataFrame that contains all chunks of property group. Raise * IllegalArgumentException if the property group not contained. */ def readVertexPropertyGroup( - propertyGroup: PropertyGroup, - addIndex: Boolean = true + propertyGroup: PropertyGroup ): DataFrame = { if (!vertexInfo.containPropertyGroup(propertyGroup)) { throw new IllegalArgumentException( @@ -106,12 +103,7 @@ class VertexReader( .option("header", "true") .format("com.alibaba.graphar.datasources.GarDataSource") .load(file_path) - - if (addIndex) { - IndexGenerator.generateVertexIndexColumn(df) - } else { - df - } + return df } /** @@ -119,15 +111,12 @@ class VertexReader( * * @param propertyGroups * list of property groups. - * @param addIndex - * flag that add vertex index column or not in the final DataFrame. * @return * DataFrame that contains all chunks of property group. Raise * IllegalArgumentException if the property group not contained. */ def readMultipleVertexPropertyGroups( - propertyGroups: java.util.ArrayList[PropertyGroup], - addIndex: Boolean = true + propertyGroups: java.util.ArrayList[PropertyGroup] ): DataFrame = { val len: Int = propertyGroups.size if (len == 0) { @@ -135,43 +124,31 @@ class VertexReader( } val pg0: PropertyGroup = propertyGroups.get(0) - val df0 = readVertexPropertyGroup(pg0, false) - if (len == 1) { - if (addIndex) { - return IndexGenerator.generateVertexIndexColumn(df0) - } else { - return df0 - } - } + val df0 = readVertexPropertyGroup(pg0) var rdd = df0.rdd var schema_array = df0.schema.fields for (i <- 1 until len) { val pg: PropertyGroup = propertyGroups.get(i) - val new_df = readVertexPropertyGroup(pg, false) + val new_df = + readVertexPropertyGroup(pg).drop(GeneralParams.vertexIndexCol) schema_array = Array.concat(schema_array, new_df.schema.fields) rdd = DataFrameConcat.concatRdd(rdd, new_df.rdd) } val schema = StructType(schema_array) val df = spark.createDataFrame(rdd, schema) - if (addIndex) { - IndexGenerator.generateVertexIndexColumn(df) - } else { - df - } + return df } /** * Load the chunks for all property groups as a DataFrame. * - * @param addIndex - * flag that add vertex index column or not in the final DataFrame. * @return * DataFrame that contains all property group chunks of vertex. */ - def readAllVertexPropertyGroups(addIndex: Boolean = true): DataFrame = { + def readAllVertexPropertyGroups(): DataFrame = { val property_groups = vertexInfo.getProperty_groups() - return readMultipleVertexPropertyGroups(property_groups, addIndex) + return readMultipleVertexPropertyGroups(property_groups) } } diff --git a/spark/src/main/scala/com/alibaba/graphar/writer/VertexWriter.scala b/spark/src/main/scala/com/alibaba/graphar/writer/VertexWriter.scala index 8b764a528..07e6d345b 100644 --- a/spark/src/main/scala/com/alibaba/graphar/writer/VertexWriter.scala +++ b/spark/src/main/scala/com/alibaba/graphar/writer/VertexWriter.scala @@ -16,7 +16,7 @@ package com.alibaba.graphar.writer -import com.alibaba.graphar.util.{FileSystem, ChunkPartitioner} +import com.alibaba.graphar.util.{FileSystem, ChunkPartitioner, IndexGenerator} import com.alibaba.graphar.{GeneralParams, VertexInfo, PropertyGroup} import org.apache.spark.sql.types._ @@ -33,10 +33,17 @@ object VertexWriter { chunkSize: Long, vertexNum: Long ): DataFrame = { - val vertex_df_schema = vertexDf.schema + val vertexDfWithIndex = vertexDf.schema.contains( + StructField(GeneralParams.vertexIndexCol, LongType) + ) match { + case true => vertexDf + case _ => IndexGenerator.generateVertexIndexColumn(vertexDf) + } + val vertex_df_schema = vertexDfWithIndex.schema val index = vertex_df_schema.fieldIndex(GeneralParams.vertexIndexCol) val partition_num = ((vertexNum + chunkSize - 1) / chunkSize).toInt - val rdd = vertexDf.rdd.map(row => (row(index).asInstanceOf[Long], row)) + val rdd = + vertexDfWithIndex.rdd.map(row => (row(index).asInstanceOf[Long], row)) // repartition val partitioner = new ChunkPartitioner(partition_num, chunkSize) @@ -113,6 +120,7 @@ class VertexWriter( // write out the chunks val output_prefix = prefix + vertexInfo.getPathPrefix(propertyGroup) val property_list = ArrayBuffer[String]() + property_list += "`" + GeneralParams.vertexIndexCol + "`" val it = propertyGroup.getProperties().iterator while (it.hasNext()) { val property = it.next() diff --git a/spark/src/test/scala/com/alibaba/graphar/ComputeExample.scala b/spark/src/test/scala/com/alibaba/graphar/ComputeExample.scala index bd261029e..63fc877b3 100644 --- a/spark/src/test/scala/com/alibaba/graphar/ComputeExample.scala +++ b/spark/src/test/scala/com/alibaba/graphar/ComputeExample.scala @@ -40,7 +40,7 @@ class ComputeExampleSuite extends AnyFunSuite { val vertex_reader = new VertexReader(prefix, vertex_info, spark) val vertices_num = vertex_reader.readVerticesNumber() - val vertex_df = vertex_reader.readAllVertexPropertyGroups(true) + val vertex_df = vertex_reader.readAllVertexPropertyGroups() vertex_df.show() assert(vertex_df.columns.size == 5) assert(vertex_df.count() == vertices_num) diff --git a/spark/src/test/scala/com/alibaba/graphar/TestReader.scala b/spark/src/test/scala/com/alibaba/graphar/TestReader.scala index 665cdf3f5..385058ede 100644 --- a/spark/src/test/scala/com/alibaba/graphar/TestReader.scala +++ b/spark/src/test/scala/com/alibaba/graphar/TestReader.scala @@ -128,7 +128,7 @@ class ReaderSuite extends AnyFunSuite { // test reading a single property chunk val single_chunk_df = reader.readVertexPropertyChunk(property_group, 0) - assert(single_chunk_df.columns.length == 3) + assert(single_chunk_df.columns.length == 4) assert(single_chunk_df.count() == 100) val cond = "gender = 'female'" var df_pd = single_chunk_df.select("firstName", "gender").filter(cond) @@ -151,8 +151,8 @@ class ReaderSuite extends AnyFunSuite { // test reading all chunks for a property group val property_df = - reader.readVertexPropertyGroup(property_group, addIndex = false) - assert(property_df.columns.length == 3) + reader.readVertexPropertyGroup(property_group) + assert(property_df.columns.length == 4) assert(property_df.count() == 903) df_pd = property_df.select("firstName", "gender").filter(cond) @@ -178,15 +178,15 @@ class ReaderSuite extends AnyFunSuite { property_groups.add(property_group_1) property_groups.add(property_group) val multiple_property_df = - reader.readMultipleVertexPropertyGroups(property_groups, addIndex = false) - assert(multiple_property_df.columns.length == 4) + reader.readMultipleVertexPropertyGroups(property_groups) + assert(multiple_property_df.columns.length == 5) assert(multiple_property_df.count() == 903) df_pd = multiple_property_df.filter(cond) df_pd.explain() df_pd.show() // test reading chunks for all property groups and optionally adding indices - val vertex_df = reader.readAllVertexPropertyGroups(addIndex = false) - assert(vertex_df.columns.length == 4) + val vertex_df = reader.readAllVertexPropertyGroups() + assert(vertex_df.columns.length == 5) assert(vertex_df.count() == 903) df_pd = vertex_df.filter(cond) df_pd.explain() diff --git a/spark/src/test/scala/com/alibaba/graphar/TransformExample.scala b/spark/src/test/scala/com/alibaba/graphar/TransformExample.scala index 8d40b4556..138356535 100644 --- a/spark/src/test/scala/com/alibaba/graphar/TransformExample.scala +++ b/spark/src/test/scala/com/alibaba/graphar/TransformExample.scala @@ -41,7 +41,7 @@ class TransformExampleSuite extends AnyFunSuite { val reader = new VertexReader(prefix, vertex_info, spark) val vertices_num = reader.readVerticesNumber() - val vertex_df_with_index = reader.readAllVertexPropertyGroups(true) + val vertex_df_with_index = reader.readAllVertexPropertyGroups() assert(vertex_df_with_index.count() == vertices_num) // write to parquet files diff --git a/testing b/testing index 1522189a7..1a72361b8 160000 --- a/testing +++ b/testing @@ -1 +1 @@ -Subproject commit 1522189a772b6bf66ee4631aaa5c91af3e0dcf5e +Subproject commit 1a72361b84847c96923804612d4a5e0f35c6971a