Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion src/python/tools/preprocess/converters/spark_constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,4 @@
RELATION_LABEL = "relation_label"
TMP_DATA_DIRECTORY = "tmp_pyspark"
SPARK_APP_NAME = "marius_edge_converter"
EDGES_INDEX_COL = "edge_index"
REL_INDEX_COL = "rel_index"
44 changes: 15 additions & 29 deletions src/python/tools/preprocess/converters/spark_converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,12 @@
from pathlib import Path

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, monotonically_increasing_id, rand, row_number
from pyspark.sql.window import Window
from pyspark.sql.functions import col

from marius.tools.preprocess.converters.partitioners.spark_partitioner import SparkPartitioner
from marius.tools.preprocess.converters.readers.spark_readers import SparkDelimitedFileReader
from marius.tools.preprocess.converters.spark_constants import (
DST_COL,
EDGES_INDEX_COL,
INDEX_COL,
NODE_LABEL,
REL_COL,
Expand Down Expand Up @@ -38,52 +36,46 @@ def get_nodes_df(edges_df):
edges_df.select(col(SRC_COL).alias(NODE_LABEL))
.union(edges_df.select(col(DST_COL).alias(NODE_LABEL)))
.distinct()
.repartition(1)
.orderBy(rand())
.cache()
)
nodes = assign_ids(nodes, INDEX_COL)
nodes = assign_ids(nodes, INDEX_COL).cache()
return nodes


def get_relations_df(edges_df):
rels = (
edges_df.drop(SRC_COL, DST_COL)
.distinct()
.repartition(1)
.orderBy(rand())
.withColumnRenamed(REL_COL, RELATION_LABEL)
.cache()
)
rels = assign_ids(rels, REL_INDEX_COL)
rels = edges_df.drop(SRC_COL, DST_COL).distinct().withColumnRenamed(REL_COL, RELATION_LABEL)
rels = assign_ids(rels, REL_INDEX_COL).cache()
return rels


def assign_ids(df, col_id):
def assign_ids(df, index_col_id):
if df is None:
return None
return df.withColumn(col_id, row_number().over(Window.orderBy(monotonically_increasing_id())) - 1)
columns = [*df.columns]
df_with_index = df.rdd.zipWithIndex().toDF()
for column in columns:
df_with_index = df_with_index.withColumn(column, df_with_index["_1"].getItem(column))
return df_with_index.withColumnRenamed("_2", index_col_id).select(*columns, index_col_id)


def remap_edges(edges_df, nodes, rels):
if rels is not None:
remapped_edges_df = (
edges_df.join(nodes.hint("merge"), edges_df.src == nodes.node_label)
edges_df.join(nodes.hint("broadcast"), edges_df.src == nodes.node_label)
.drop(NODE_LABEL, SRC_COL)
.withColumnRenamed(INDEX_COL, SRC_COL)
.join(rels.hint("merge"), edges_df.rel == rels.relation_label)
.join(rels.hint("broadcast"), edges_df.rel == rels.relation_label)
.drop(RELATION_LABEL, REL_COL)
.withColumnRenamed(INDEX_COL, REL_COL)
.join(nodes.hint("merge"), edges_df.dst == nodes.node_label)
.join(nodes.hint("broadcast"), edges_df.dst == nodes.node_label)
.drop(NODE_LABEL, DST_COL)
.withColumnRenamed(INDEX_COL, DST_COL)
)
else:
remapped_edges_df = (
edges_df.join(nodes.hint("merge"), edges_df.src == nodes.node_label)
edges_df.join(nodes.hint("broadcast"), edges_df.src == nodes.node_label)
.drop(NODE_LABEL, SRC_COL)
.withColumnRenamed(INDEX_COL, SRC_COL)
.join(nodes.hint("merge"), edges_df.dst == nodes.node_label)
.join(nodes.hint("broadcast"), edges_df.dst == nodes.node_label)
.drop(NODE_LABEL, DST_COL)
.withColumnRenamed(INDEX_COL, DST_COL)
)
Expand Down Expand Up @@ -220,12 +212,6 @@ def convert(self):
train_edges_df, test_edges_df = all_edges_df.randomSplit([self.train_split, self.test_split])
else:
train_edges_df = all_edges_df
all_edges_df, train_edges_df, valid_edges_df, test_edges_df = (
assign_ids(all_edges_df, EDGES_INDEX_COL),
assign_ids(train_edges_df, EDGES_INDEX_COL),
assign_ids(valid_edges_df, EDGES_INDEX_COL),
assign_ids(test_edges_df, EDGES_INDEX_COL),
)

if self.partitioner is not None:
print("Partition nodes into {} partitions".format(self.num_partitions))
Expand Down
48 changes: 19 additions & 29 deletions src/python/tools/preprocess/converters/writers/spark_writer.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import glob
import os
import re
import sys
from pathlib import Path
from random import randint
Expand All @@ -13,7 +12,6 @@
from marius.tools.configuration.marius_config import DatasetConfig
from marius.tools.preprocess.converters.spark_constants import (
DST_EDGE_BUCKET_COL,
EDGES_INDEX_COL,
INDEX_COL,
REL_INDEX_COL,
SRC_EDGE_BUCKET_COL,
Expand Down Expand Up @@ -58,40 +56,35 @@ def write_partitioned_df_to_csv(partition_triples, num_partitions, output_filena

print(partition_triples.rdd.getNumPartitions())

# for edges, the order needs to be maintained. all edges that belong to bucket [i, j]
# should appear before [i, j+1] and that of [i, j+1] should appear before [i+1, j].
# repartitionByRange makes sure that all edges belonging to src bucket i, fall in the
# same partition. Also, this function will output at most `num_partitions` partitions.
partition_triples.repartitionByRange(num_partitions, SRC_EDGE_BUCKET_COL).sortWithinPartitions(
SRC_EDGE_BUCKET_COL, DST_EDGE_BUCKET_COL
).drop(DST_EDGE_BUCKET_COL, SRC_EDGE_BUCKET_COL).write.csv(
partition_triples.write.partitionBy([SRC_EDGE_BUCKET_COL, DST_EDGE_BUCKET_COL]).csv(
TMP_DATA_DIRECTORY + "_edges", mode="overwrite", sep="\t"
)

# for partition offset counts, the ordering of dst_buckets does not matter since we
# read the value before setting the offset in line number 92.
# dst_buckets = counts.iloc[:, 0].values.
# we make use of partitionBy to parallelize writes.
bucket_counts.write.partitionBy(SRC_EDGE_BUCKET_COL).csv(TMP_DATA_DIRECTORY + "_counts", mode="overwrite", sep="\t")

partition_offsets = []

os.system("rm -rf {}".format(output_filename))
for i in range(num_partitions):
# looks like there is no way in glob to restrict to the pattern [0]*{i}- alone.
# it matches things like part-00004-sdfvf0-sdf.csv when given part-[0]*0-*.csv
tmp_edges_files = glob.glob("{}/part-[0]*{}-*.csv".format(TMP_DATA_DIRECTORY + "_edges", str(i)))
for src_part in range(num_partitions):
for dst_part in range(num_partitions):
tmp_edges_files = glob.glob(
"{}/{}={}/{}={}/*.csv".format(
TMP_DATA_DIRECTORY + "_edges",
SRC_EDGE_BUCKET_COL,
str(src_part),
DST_EDGE_BUCKET_COL,
str(dst_part),
)
)

for tmp_edges_file in tmp_edges_files:
os.system("cat {} >> {}".format(tmp_edges_file, output_filename))

tmp_counts_files = glob.glob(
"{}/{}={}/*.csv".format(TMP_DATA_DIRECTORY + "_counts", SRC_EDGE_BUCKET_COL, str(i))
"{}/{}={}/*.csv".format(TMP_DATA_DIRECTORY + "_counts", SRC_EDGE_BUCKET_COL, str(src_part))
)

edges_bucket_counts = np.zeros(num_partitions, dtype=np.int)
edge_file_pattern = re.compile(r"{}/part-[0]*{}-.*\.csv".format(TMP_DATA_DIRECTORY + "_edges", str(i)))
for tmp_edges_file in tmp_edges_files:
if edge_file_pattern.match(tmp_edges_file):
os.system("cat {} >> {}".format(tmp_edges_file, output_filename))

for tmp_counts_file in tmp_counts_files:
counts = pd.read_csv(tmp_counts_file, sep="\t", header=None)

Expand Down Expand Up @@ -120,16 +113,13 @@ def write_to_csv(self, train_edges_df, valid_edges_df, test_edges_df, nodes_df,
dataset_stats = DatasetConfig()
dataset_stats.dataset_dir = Path(self.output_dir).absolute().__str__()

dataset_stats.num_edges = get_df_count(train_edges_df, EDGES_INDEX_COL)
train_edges_df = train_edges_df.drop(EDGES_INDEX_COL)
dataset_stats.num_edges = train_edges_df.count()
dataset_stats.num_train = dataset_stats.num_edges

if valid_edges_df is not None:
dataset_stats.num_valid = get_df_count(valid_edges_df, EDGES_INDEX_COL)
valid_edges_df = valid_edges_df.drop(EDGES_INDEX_COL)
dataset_stats.num_valid = valid_edges_df.count()
if test_edges_df is not None:
dataset_stats.num_test = get_df_count(test_edges_df, EDGES_INDEX_COL)
test_edges_df = test_edges_df.drop(EDGES_INDEX_COL)
dataset_stats.num_test = test_edges_df.count()

dataset_stats.num_nodes = get_df_count(nodes_df, INDEX_COL)

Expand Down