Skip to content

Commit

Permalink
[Feat][Format] Add internal id column to vertex payload file (#264)
Browse files Browse the repository at this point in the history
---------

Signed-off-by: acezen <qiaozi.zwb@alibaba-inc.com>
  • Loading branch information
acezen authored Nov 9, 2023
1 parent bde169c commit 77f547a
Show file tree
Hide file tree
Showing 20 changed files with 114 additions and 123 deletions.
8 changes: 4 additions & 4 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,10 @@ jobs:
-P /tmp/
sudo apt-get install -y /tmp/apache-arrow-apt-source-latest-"$(lsb_release --codename --short)".deb
sudo apt-get update -y
sudo apt install -y libarrow-dev=14.0.0-1 \
libarrow-dataset-dev=14.0.0-1 \
libarrow-acero-dev=14.0.0-1 \
libparquet-dev=14.0.0-1
sudo apt install -y libarrow-dev=14.0.1-1 \
libarrow-dataset-dev=14.0.1-1 \
libarrow-acero-dev=14.0.1-1 \
libparquet-dev=14.0.1-1
sudo apt-get install -y libboost-graph-dev ccache libcurl4-openssl-dev
- name: CMake
Expand Down
2 changes: 2 additions & 0 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,8 @@ Take the "person" vertex table as an example, if the chunk size is set to be 500
:align: center
:alt: vertex physical table

**Note**: For efficiently utilize the filter push-down of the payload file format like Parquet, the internal vertex id is stored in the payload file as a column. And since the internal vertex id is continuous, the payload file format can use the delta encoding for the internal vertex id column, which would not bring too much overhead for the storage.

Edges in GraphAr
^^^^^^^^^^^^^^^^

Expand Down
27 changes: 18 additions & 9 deletions cpp/examples/mid_level_reader_example.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ limitations under the License.
#include "./config.h"
#include "gar/reader/arrow_chunk_reader.h"
#include "gar/util/expression.h"
#include "gar/util/general_params.h"

void vertex_property_chunk_reader(const GAR_NAMESPACE::GraphInfo& graph_info) {
// constuct reader
Expand All @@ -36,29 +37,37 @@ void vertex_property_chunk_reader(const GAR_NAMESPACE::GraphInfo& graph_info) {
// use reader
auto result = reader.GetChunk();
ASSERT(!result.has_error());
auto range = reader.GetRange().value();
std::cout << "chunk number: " << reader.GetChunkNum() << std::endl;
std::cout << "range of fisrt vertex property chunk: " << range.first << " "
<< range.second << std::endl;
auto table = result.value();
std::cout << "rows number of first vertex property chunk: "
<< table->num_rows() << std::endl;
std::cout << "schema of first vertex property chunk: " << std::endl
<< table->schema()->ToString() << std::endl;
auto index_col =
table->GetColumnByName(GAR_NAMESPACE::GeneralParams::kVertexIndexCol);
ASSERT(index_col != nullptr);
std::cout << "Internal id column: " << index_col->ToString() << " "
<< std::endl;
// seek vertex id
ASSERT(reader.seek(100).ok());
result = reader.GetChunk();
ASSERT(!result.has_error());
range = reader.GetRange().value();
std::cout << "range of vertex property chunk for vertex id 100: "
<< range.first << " " << range.second << std::endl;
table = result.value();
index_col =
table->GetColumnByName(GAR_NAMESPACE::GeneralParams::kVertexIndexCol);
ASSERT(index_col != nullptr);
std::cout << "Internal id column of vertex property chunk for vertex id 100: "
<< index_col->ToString() << " " << std::endl;
// next chunk
ASSERT(reader.next_chunk().ok());
result = reader.GetChunk();
ASSERT(!result.has_error());
range = reader.GetRange().value();
std::cout << "range of next vertex property chunk: " << range.first << " "
<< range.second << std::endl;
table = result.value();
index_col =
table->GetColumnByName(GAR_NAMESPACE::GeneralParams::kVertexIndexCol);
ASSERT(index_col != nullptr);
std::cout << "Internal id column of next chunk: " << index_col->ToString()
<< " " << std::endl;

// reader with filter pushdown
auto filter = GAR_NAMESPACE::_Equal(GAR_NAMESPACE::_Property("gender"),
Expand Down
7 changes: 0 additions & 7 deletions cpp/include/gar/reader/arrow_chunk_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -100,13 +100,6 @@ class VertexPropertyArrowChunkReader {
*/
Result<std::shared_ptr<arrow::Table>> GetChunk() noexcept;

/**
* @brief Get the vertex id range of current chunk.
*
* @return Result: std::pair<begin_id, end_id> or error.
*/
Result<std::pair<IdType, IdType>> GetRange() noexcept;

/**
* @brief Sets chunk position indicator to next chunk.
*
Expand Down
4 changes: 4 additions & 0 deletions cpp/include/gar/writer/arrow_chunk_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,10 @@ class VertexPropertyWriter {
ValidateLevel validate_level) const noexcept;

private:
Result<std::shared_ptr<arrow::Table>> addIndexColumn(
const std::shared_ptr<arrow::Table>& table, IdType chunk_index,
IdType chunk_size) const noexcept;

VertexInfo vertex_info_;
std::string prefix_;
std::shared_ptr<FileSystem> fs_;
Expand Down
16 changes: 0 additions & 16 deletions cpp/src/arrow_chunk_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -36,22 +36,6 @@ VertexPropertyArrowChunkReader::GetChunk() noexcept {
return chunk_table_->Slice(row_offset);
}

Result<std::pair<IdType, IdType>>
VertexPropertyArrowChunkReader::GetRange() noexcept {
if (chunk_table_ == nullptr) {
return Status::Invalid(
"The chunk table is not initialized, please call "
"GetChunk() first.");
}
const auto chunk_size = vertex_info_.GetChunkSize();
IdType row_offset = seek_id_ - chunk_index_ * chunk_size;
bool is_last_chunk = (chunk_index_ == chunk_num_ - 1);
const auto curr_chunk_size =
is_last_chunk ? (vertex_num_ - chunk_index_ * chunk_size) : chunk_size;

return std::make_pair(seek_id_, seek_id_ + curr_chunk_size - row_offset);
}

void VertexPropertyArrowChunkReader::Filter(util::Filter filter) {
filter_options_.filter = filter;
}
Expand Down
39 changes: 34 additions & 5 deletions cpp/src/arrow_chunk_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -198,10 +198,15 @@ Status VertexPropertyWriter::WriteChunk(
GAR_RETURN_NOT_OK(
validate(input_table, property_group, chunk_index, validate_level));
auto file_type = property_group.GetFileType();

std::vector<int> indices;
indices.clear();
auto schema = input_table->schema();
int indice = schema->GetFieldIndex(GeneralParams::kVertexIndexCol);
if (indice == -1) {
return Status::Invalid("The internal id Column named ",
GeneralParams::kVertexIndexCol,
" does not exist in the input table.");
}

std::vector<int> indices({indice});
for (auto& property : property_group.GetProperties()) {
int indice = schema->GetFieldIndex(property.name);
if (indice == -1) {
Expand Down Expand Up @@ -251,13 +256,37 @@ Status VertexPropertyWriter::WriteTable(
const std::shared_ptr<arrow::Table>& input_table, IdType start_chunk_index,
ValidateLevel validate_level) const noexcept {
auto property_groups = vertex_info_.GetPropertyGroups();
GAR_ASSIGN_OR_RAISE(auto table_with_index,
addIndexColumn(input_table, start_chunk_index,
vertex_info_.GetChunkSize()));
for (auto& property_group : property_groups) {
GAR_RETURN_NOT_OK(WriteTable(input_table, property_group, start_chunk_index,
validate_level));
GAR_RETURN_NOT_OK(WriteTable(table_with_index, property_group,
start_chunk_index, validate_level));
}
return Status::OK();
}

Result<std::shared_ptr<arrow::Table>> VertexPropertyWriter::addIndexColumn(
const std::shared_ptr<arrow::Table>& table, IdType chunk_index,
IdType chunk_size) const noexcept {
arrow::Int64Builder array_builder;
RETURN_NOT_ARROW_OK(array_builder.Reserve(chunk_size));
int64_t length = table->num_rows();
for (IdType i = 0; i < length; i++) {
RETURN_NOT_ARROW_OK(array_builder.Append(chunk_index * chunk_size + i));
}
std::shared_ptr<arrow::Array> array;
RETURN_NOT_ARROW_OK(array_builder.Finish(&array));
std::shared_ptr<arrow::ChunkedArray> chunked_array =
std::make_shared<arrow::ChunkedArray>(array);
GAR_RETURN_ON_ARROW_ERROR_AND_ASSIGN(
auto ret, table->AddColumn(0,
arrow::field(GeneralParams::kVertexIndexCol,
arrow::int64(), false),
chunked_array));
return ret;
}

// implementations for EdgeChunkWriter

// Check if the operation of writing number or copying a file is allowed.
Expand Down
26 changes: 10 additions & 16 deletions cpp/test/test_arrow_chunk_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ limitations under the License.
#include "./util.h"
#include "gar/reader/arrow_chunk_reader.h"
#include "gar/util/expression.h"
#include "gar/util/general_params.h"

#define CATCH_CONFIG_MAIN
#include <catch2/catch.hpp>
Expand Down Expand Up @@ -53,37 +54,33 @@ TEST_CASE("test_vertex_property_arrow_chunk_reader") {
auto reader = maybe_reader.value();
auto result = reader.GetChunk();
REQUIRE(!result.has_error());
auto range = reader.GetRange().value();
auto table = result.value();
REQUIRE(table->num_rows() == 100);
REQUIRE(range.first == 0);
REQUIRE(range.second == 100);
REQUIRE(table->GetColumnByName(
GAR_NAMESPACE::GeneralParams::kVertexIndexCol) != nullptr);

// seek
REQUIRE(reader.seek(100).ok());
result = reader.GetChunk();
REQUIRE(!result.has_error());
range = reader.GetRange().value();
table = result.value();
REQUIRE(table->num_rows() == 100);
REQUIRE(range.first == 100);
REQUIRE(range.second == 200);
REQUIRE(table->GetColumnByName(
GAR_NAMESPACE::GeneralParams::kVertexIndexCol) != nullptr);
REQUIRE(reader.next_chunk().ok());
result = reader.GetChunk();
REQUIRE(!result.has_error());
range = reader.GetRange().value();
table = result.value();
REQUIRE(table->num_rows() == 100);
REQUIRE(range.first == 200);
REQUIRE(range.second == 300);
REQUIRE(table->GetColumnByName(
GAR_NAMESPACE::GeneralParams::kVertexIndexCol) != nullptr);
REQUIRE(reader.seek(900).ok());
result = reader.GetChunk();
REQUIRE(!result.has_error());
range = reader.GetRange().value();
table = result.value();
REQUIRE(table->num_rows() == 3);
REQUIRE(range.first == 900);
REQUIRE(range.second == 903);
REQUIRE(table->GetColumnByName(
GAR_NAMESPACE::GeneralParams::kVertexIndexCol) != nullptr);
REQUIRE(reader.GetChunkNum() == 10);
REQUIRE(reader.next_chunk().IsIndexError());

Expand Down Expand Up @@ -119,10 +116,7 @@ TEST_CASE("test_vertex_property_pushdown") {
auto result = reader.GetChunk();
REQUIRE(!result.has_error());
table = result.value();
auto [start, end] = reader.GetRange().value();
std::cout << "Chunk: " << idx << ",\tNums: " << table->num_rows() << "/"
<< chunk_size << ",\tRange: (" << start << ", " << end << "]"
<< '\n';
std::cout << "Chunk: " << idx << ",\tNums: " << table->num_rows() << '\n';
idx++;
sum += table->num_rows();
} while (!reader.next_chunk().IsIndexError());
Expand Down
Binary file modified docs/images/vertex_physical_table.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
2 changes: 2 additions & 0 deletions docs/user-guide/file-format.rst
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ Take the "person" vertex table as an example, if the chunk size is set to be 500
:alt: vertex physical table


**Note**: For efficiently utilize the filter push-down of the payload file format like Parquet, the internal vertex id is stored in the payload file as a column. And since the internal vertex id is continuous, the payload file format can use the delta encoding for the internal vertex id column, which would not bring too much overhead for the storage.

Edges in GraphAr
------------------------

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ object GraphAr2Nebula {
// The edge data need to convert src and dst to the vertex id,
// so we need to read the vertex data with index column.
val graphData =
GraphReader.read(graphInfoPath, spark, addVertexIndex = true)
GraphReader.read(graphInfoPath, spark)
val vertexData = graphData._1
val edgeData = graphData._2
putVertexDataIntoNebula(graphInfo, vertexData)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,7 @@ object GraphAr2Neo4j {
val graphInfoPath: String = args(0)
val graphInfo = GraphInfo.loadGraphInfo(graphInfoPath, spark)

// The edge data need to convert src and dst to the vertex id , so we need to read
// the vertex data with index column.
val graphData = GraphReader.read(graphInfoPath, spark, true)
val graphData = GraphReader.read(graphInfoPath, spark)
val vertexData = graphData._1
val edgeData = graphData._2

Expand Down
21 changes: 6 additions & 15 deletions spark/src/main/scala/com/alibaba/graphar/graph/GraphReader.scala
Original file line number Diff line number Diff line change
Expand Up @@ -35,21 +35,18 @@ object GraphReader {
* The map of (vertex label -> VertexInfo) for the graph.
* @param spark
* The Spark session for the reading.
* @param addIndex
* Whether to add index column for the DataFrame.
* @return
* The map of (vertex label -> DataFrame)
*/
private def readAllVertices(
prefix: String,
vertexInfos: Map[String, VertexInfo],
spark: SparkSession,
addIndex: Boolean = false
spark: SparkSession
): Map[String, DataFrame] = {
val vertex_dataframes: Map[String, DataFrame] = vertexInfos.map {
case (label, vertexInfo) => {
val reader = new VertexReader(prefix, vertexInfo, spark)
(label, reader.readAllVertexPropertyGroups(addIndex))
(label, reader.readAllVertexPropertyGroups())
}
}
return vertex_dataframes
Expand Down Expand Up @@ -109,8 +106,6 @@ object GraphReader {
* The info object for the graph.
* @param spark
* The Spark session for the loading.
* @param addVertexIndex
* Whether to add index for the vertex DataFrames.
* @return
* Pair of vertex DataFrames and edge DataFrames, the vertex DataFrames are
* stored as the map of (vertex_label -> DataFrame) the edge DataFrames are
Expand All @@ -119,8 +114,7 @@ object GraphReader {
*/
def readWithGraphInfo(
graphInfo: GraphInfo,
spark: SparkSession,
addVertexIndex: Boolean = false
spark: SparkSession
): Pair[Map[String, DataFrame], Map[
(String, String, String),
Map[String, DataFrame]
Expand All @@ -129,7 +123,7 @@ object GraphReader {
val vertex_infos = graphInfo.getVertexInfos()
val edge_infos = graphInfo.getEdgeInfos()
return (
readAllVertices(prefix, vertex_infos, spark, addVertexIndex),
readAllVertices(prefix, vertex_infos, spark),
readAllEdges(prefix, edge_infos, spark)
)
}
Expand All @@ -142,8 +136,6 @@ object GraphReader {
* The path of the graph info yaml.
* @param spark
* The Spark session for the loading.
* @param addVertexIndex
* Whether to add index for the vertex DataFrames.
* @return
* Pair of vertex DataFrames and edge DataFrames, the vertex DataFrames are
* stored as the map of (vertex_label -> DataFrame) the edge DataFrames are
Expand All @@ -152,8 +144,7 @@ object GraphReader {
*/
def read(
graphInfoPath: String,
spark: SparkSession,
addVertexIndex: Boolean = false
spark: SparkSession
): Pair[Map[String, DataFrame], Map[
(String, String, String),
Map[String, DataFrame]
Expand All @@ -162,6 +153,6 @@ object GraphReader {
val graph_info = GraphInfo.loadGraphInfo(graphInfoPath, spark)

// conduct reading
readWithGraphInfo(graph_info, spark, addVertexIndex)
readWithGraphInfo(graph_info, spark)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ object GraphTransformer {
val source_vertex_info = sourceVertexInfosMap(label)
// read vertex chunks from the source graph
val reader = new VertexReader(source_prefix, source_vertex_info, spark)
val df = reader.readAllVertexPropertyGroups(true)
val df = reader.readAllVertexPropertyGroups()
// write vertex chunks for the dest graph
val writer = new VertexWriter(dest_prefix, dest_vertex_info, df)
writer.writeVertexProperties()
Expand Down
Loading

0 comments on commit 77f547a

Please sign in to comment.