diff --git a/src/python/tools/preprocess/converters/spark_constants.py b/src/python/tools/preprocess/converters/spark_constants.py index 6b6fc8f4..7220064f 100644 --- a/src/python/tools/preprocess/converters/spark_constants.py +++ b/src/python/tools/preprocess/converters/spark_constants.py @@ -9,5 +9,4 @@ RELATION_LABEL = "relation_label" TMP_DATA_DIRECTORY = "tmp_pyspark" SPARK_APP_NAME = "marius_edge_converter" -EDGES_INDEX_COL = "edge_index" REL_INDEX_COL = "rel_index" diff --git a/src/python/tools/preprocess/converters/spark_converter.py b/src/python/tools/preprocess/converters/spark_converter.py index 20ec31e0..c5f63240 100644 --- a/src/python/tools/preprocess/converters/spark_converter.py +++ b/src/python/tools/preprocess/converters/spark_converter.py @@ -3,14 +3,12 @@ from pathlib import Path from pyspark.sql import SparkSession -from pyspark.sql.functions import col, monotonically_increasing_id, rand, row_number -from pyspark.sql.window import Window +from pyspark.sql.functions import col from marius.tools.preprocess.converters.partitioners.spark_partitioner import SparkPartitioner from marius.tools.preprocess.converters.readers.spark_readers import SparkDelimitedFileReader from marius.tools.preprocess.converters.spark_constants import ( DST_COL, - EDGES_INDEX_COL, INDEX_COL, NODE_LABEL, REL_COL, @@ -38,52 +36,46 @@ def get_nodes_df(edges_df): edges_df.select(col(SRC_COL).alias(NODE_LABEL)) .union(edges_df.select(col(DST_COL).alias(NODE_LABEL))) .distinct() - .repartition(1) - .orderBy(rand()) - .cache() ) - nodes = assign_ids(nodes, INDEX_COL) + nodes = assign_ids(nodes, INDEX_COL).cache() return nodes def get_relations_df(edges_df): - rels = ( - edges_df.drop(SRC_COL, DST_COL) - .distinct() - .repartition(1) - .orderBy(rand()) - .withColumnRenamed(REL_COL, RELATION_LABEL) - .cache() - ) - rels = assign_ids(rels, REL_INDEX_COL) + rels = edges_df.drop(SRC_COL, DST_COL).distinct().withColumnRenamed(REL_COL, RELATION_LABEL) + rels = assign_ids(rels, REL_INDEX_COL).cache() return rels -def assign_ids(df, col_id): +def assign_ids(df, index_col_id): if df is None: return None - return df.withColumn(col_id, row_number().over(Window.orderBy(monotonically_increasing_id())) - 1) + columns = [*df.columns] + df_with_index = df.rdd.zipWithIndex().toDF() + for column in columns: + df_with_index = df_with_index.withColumn(column, df_with_index["_1"].getItem(column)) + return df_with_index.withColumnRenamed("_2", index_col_id).select(*columns, index_col_id) def remap_edges(edges_df, nodes, rels): if rels is not None: remapped_edges_df = ( - edges_df.join(nodes.hint("merge"), edges_df.src == nodes.node_label) + edges_df.join(nodes.hint("broadcast"), edges_df.src == nodes.node_label) .drop(NODE_LABEL, SRC_COL) .withColumnRenamed(INDEX_COL, SRC_COL) - .join(rels.hint("merge"), edges_df.rel == rels.relation_label) + .join(rels.hint("broadcast"), edges_df.rel == rels.relation_label) .drop(RELATION_LABEL, REL_COL) .withColumnRenamed(INDEX_COL, REL_COL) - .join(nodes.hint("merge"), edges_df.dst == nodes.node_label) + .join(nodes.hint("broadcast"), edges_df.dst == nodes.node_label) .drop(NODE_LABEL, DST_COL) .withColumnRenamed(INDEX_COL, DST_COL) ) else: remapped_edges_df = ( - edges_df.join(nodes.hint("merge"), edges_df.src == nodes.node_label) + edges_df.join(nodes.hint("broadcast"), edges_df.src == nodes.node_label) .drop(NODE_LABEL, SRC_COL) .withColumnRenamed(INDEX_COL, SRC_COL) - .join(nodes.hint("merge"), edges_df.dst == nodes.node_label) + .join(nodes.hint("broadcast"), edges_df.dst == nodes.node_label) .drop(NODE_LABEL, DST_COL) .withColumnRenamed(INDEX_COL, DST_COL) ) @@ -220,12 +212,6 @@ def convert(self): train_edges_df, test_edges_df = all_edges_df.randomSplit([self.train_split, self.test_split]) else: train_edges_df = all_edges_df - all_edges_df, train_edges_df, valid_edges_df, test_edges_df = ( - assign_ids(all_edges_df, EDGES_INDEX_COL), - assign_ids(train_edges_df, EDGES_INDEX_COL), - assign_ids(valid_edges_df, EDGES_INDEX_COL), - assign_ids(test_edges_df, EDGES_INDEX_COL), - ) if self.partitioner is not None: print("Partition nodes into {} partitions".format(self.num_partitions)) diff --git a/src/python/tools/preprocess/converters/writers/spark_writer.py b/src/python/tools/preprocess/converters/writers/spark_writer.py index b7468817..44a8f77f 100644 --- a/src/python/tools/preprocess/converters/writers/spark_writer.py +++ b/src/python/tools/preprocess/converters/writers/spark_writer.py @@ -1,6 +1,5 @@ import glob import os -import re import sys from pathlib import Path from random import randint @@ -13,7 +12,6 @@ from marius.tools.configuration.marius_config import DatasetConfig from marius.tools.preprocess.converters.spark_constants import ( DST_EDGE_BUCKET_COL, - EDGES_INDEX_COL, INDEX_COL, REL_INDEX_COL, SRC_EDGE_BUCKET_COL, @@ -58,40 +56,35 @@ def write_partitioned_df_to_csv(partition_triples, num_partitions, output_filena print(partition_triples.rdd.getNumPartitions()) - # for edges, the order needs to be maintained. all edges that belong to bucket [i, j] - # should appear before [i, j+1] and that of [i, j+1] should appear before [i+1, j]. - # repartitionByRange makes sure that all edges belonging to src bucket i, fall in the - # same partition. Also, this function will output at most `num_partitions` partitions. - partition_triples.repartitionByRange(num_partitions, SRC_EDGE_BUCKET_COL).sortWithinPartitions( - SRC_EDGE_BUCKET_COL, DST_EDGE_BUCKET_COL - ).drop(DST_EDGE_BUCKET_COL, SRC_EDGE_BUCKET_COL).write.csv( + partition_triples.write.partitionBy([SRC_EDGE_BUCKET_COL, DST_EDGE_BUCKET_COL]).csv( TMP_DATA_DIRECTORY + "_edges", mode="overwrite", sep="\t" ) - # for partition offset counts, the ordering of dst_buckets does not matter since we - # read the value before setting the offset in line number 92. - # dst_buckets = counts.iloc[:, 0].values. - # we make use of partitionBy to parallelize writes. bucket_counts.write.partitionBy(SRC_EDGE_BUCKET_COL).csv(TMP_DATA_DIRECTORY + "_counts", mode="overwrite", sep="\t") partition_offsets = [] os.system("rm -rf {}".format(output_filename)) - for i in range(num_partitions): - # looks like there is no way in glob to restrict to the pattern [0]*{i}- alone. - # it matches things like part-00004-sdfvf0-sdf.csv when given part-[0]*0-*.csv - tmp_edges_files = glob.glob("{}/part-[0]*{}-*.csv".format(TMP_DATA_DIRECTORY + "_edges", str(i))) + for src_part in range(num_partitions): + for dst_part in range(num_partitions): + tmp_edges_files = glob.glob( + "{}/{}={}/{}={}/*.csv".format( + TMP_DATA_DIRECTORY + "_edges", + SRC_EDGE_BUCKET_COL, + str(src_part), + DST_EDGE_BUCKET_COL, + str(dst_part), + ) + ) + + for tmp_edges_file in tmp_edges_files: + os.system("cat {} >> {}".format(tmp_edges_file, output_filename)) tmp_counts_files = glob.glob( - "{}/{}={}/*.csv".format(TMP_DATA_DIRECTORY + "_counts", SRC_EDGE_BUCKET_COL, str(i)) + "{}/{}={}/*.csv".format(TMP_DATA_DIRECTORY + "_counts", SRC_EDGE_BUCKET_COL, str(src_part)) ) edges_bucket_counts = np.zeros(num_partitions, dtype=np.int) - edge_file_pattern = re.compile(r"{}/part-[0]*{}-.*\.csv".format(TMP_DATA_DIRECTORY + "_edges", str(i))) - for tmp_edges_file in tmp_edges_files: - if edge_file_pattern.match(tmp_edges_file): - os.system("cat {} >> {}".format(tmp_edges_file, output_filename)) - for tmp_counts_file in tmp_counts_files: counts = pd.read_csv(tmp_counts_file, sep="\t", header=None) @@ -120,16 +113,13 @@ def write_to_csv(self, train_edges_df, valid_edges_df, test_edges_df, nodes_df, dataset_stats = DatasetConfig() dataset_stats.dataset_dir = Path(self.output_dir).absolute().__str__() - dataset_stats.num_edges = get_df_count(train_edges_df, EDGES_INDEX_COL) - train_edges_df = train_edges_df.drop(EDGES_INDEX_COL) + dataset_stats.num_edges = train_edges_df.count() dataset_stats.num_train = dataset_stats.num_edges if valid_edges_df is not None: - dataset_stats.num_valid = get_df_count(valid_edges_df, EDGES_INDEX_COL) - valid_edges_df = valid_edges_df.drop(EDGES_INDEX_COL) + dataset_stats.num_valid = valid_edges_df.count() if test_edges_df is not None: - dataset_stats.num_test = get_df_count(test_edges_df, EDGES_INDEX_COL) - test_edges_df = test_edges_df.drop(EDGES_INDEX_COL) + dataset_stats.num_test = test_edges_df.count() dataset_stats.num_nodes = get_df_count(nodes_df, INDEX_COL)