From 6911f2952f4bea2bc418d46f438adcce7ee63051 Mon Sep 17 00:00:00 2001 From: Basava Date: Tue, 22 Nov 2022 21:14:41 +0000 Subject: [PATCH 1/2] spark now writes output files to s3, need to combine them --- .../converters/readers/spark_readers.py | 22 +- .../preprocess/converters/spark_converter.py | 3 +- .../converters/writers/spark_writer.py | 322 +++++++++++++++--- .../tools/preprocess/datasets/fb15k_237.py | 3 +- .../tools/preprocess/datasets/twitter.py | 9 +- 5 files changed, 306 insertions(+), 53 deletions(-) diff --git a/src/python/tools/preprocess/converters/readers/spark_readers.py b/src/python/tools/preprocess/converters/readers/spark_readers.py index 0a01a394..1200550b 100644 --- a/src/python/tools/preprocess/converters/readers/spark_readers.py +++ b/src/python/tools/preprocess/converters/readers/spark_readers.py @@ -1,3 +1,4 @@ +import os from pathlib import Path from pyspark.sql import SparkSession @@ -9,7 +10,7 @@ class SparkDelimitedFileReader(Reader): def __init__( self, - spark: SparkSession, + spark: SparkSession.builder.appName("marius_spark").getOrCreate(), train_edges: Path, valid_edges: Path = None, test_edges: Path = None, @@ -39,6 +40,25 @@ def __init__( self.spark = spark + if str(train_edges).startswith("s3a://"): + if ( + "AWS_ACCESS_KEY_ID" not in os.environ + or "AWS_SECRET_ACCESS_KEY" not in os.environ + or "S3_BUCKET" not in os.environ + ): + print( + "Edge path is an s3 path, but required env variables not set. {}, {} and {} need to be set".format( + "AWS_ACCESS_KEY_ID", "AWS_SECRET_ACCESS_KEY", "S3_BUCKET" + ) + ) + exit() + self.spark._jsc.hadoopConfiguration().set( + "fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider" + ) + self.spark._jsc.hadoopConfiguration().set("fs.s3a.access.key", os.getenv("AWS_ACCESS_KEY_ID")) + self.spark._jsc.hadoopConfiguration().set("fs.s3a.secret.key", os.getenv("AWS_SECRET_ACCESS_KEY")) + self.spark._jsc.hadoopConfiguration().set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") + self.train_edges = train_edges self.valid_edges = valid_edges self.test_edges = test_edges diff --git a/src/python/tools/preprocess/converters/spark_converter.py b/src/python/tools/preprocess/converters/spark_converter.py index c5f63240..e23edaa6 100644 --- a/src/python/tools/preprocess/converters/spark_converter.py +++ b/src/python/tools/preprocess/converters/spark_converter.py @@ -110,6 +110,7 @@ def __init__( spark_executor_memory: str = "4g", ): self.output_dir = output_dir + self.use_s3 = True if str(train_edges).startswith("s3a://") else False self.spark = ( SparkSession.builder.appName(SPARK_APP_NAME) @@ -135,7 +136,7 @@ def __init__( else: self.partitioner = None - self.writer = SparkWriter(self.spark, self.output_dir, partitioned_evaluation) + self.writer = SparkWriter(self.spark, self.output_dir, self.use_s3, partitioned_evaluation) self.train_split = None self.valid_split = None diff --git a/src/python/tools/preprocess/converters/writers/spark_writer.py b/src/python/tools/preprocess/converters/writers/spark_writer.py index 44a8f77f..4fc81b0d 100644 --- a/src/python/tools/preprocess/converters/writers/spark_writer.py +++ b/src/python/tools/preprocess/converters/writers/spark_writer.py @@ -1,11 +1,14 @@ import glob import os +import re +import shutil import sys from pathlib import Path from random import randint import numpy as np import pandas as pd +import s3fs from omegaconf import OmegaConf from marius.tools.configuration.constants import PathConstants @@ -45,13 +48,64 @@ def merge_csvs(input_directory, output_file): os.system("rm -rf {}".format(input_directory)) +def s3_write_df_to_csv(s3_obj, df, output_folder, output_filename): + if df is None: + return + + file_pattern = re.compile(r".*part-.*\.csv") + df.write.csv(output_folder, mode="overwrite", sep="\t") + files_list = [f for f in s3_obj.ls(output_folder) if file_pattern.match(f)] + if len(files_list) < 2: + # rename to csv + # s3_obj.rename("s3a://{}".format(files_list[0]), output_filename) + pass + else: + # CompleteMultipartUpload errors out saying EntityTooSmall when using merge + # need to create a single file from files_list + # s3_obj.merge(output_filename, files_list) + # TODO: merge all node outputs into output_filename + pass + + def write_df_to_csv(df, output_filename): + if df is None: + return + tmp_dir = TMP_DATA_DIRECTORY + str(randint(0, sys.maxsize)) df.write.csv(tmp_dir, mode="overwrite", sep="\t") merge_csvs(tmp_dir, output_filename) +def s3_write_partitioned_df_to_csv(s3_obj, partition_triples, num_partitions, output_folder, output_filename): + if partition_triples is None: + return + + bucket_counts = partition_triples.groupBy([SRC_EDGE_BUCKET_COL, DST_EDGE_BUCKET_COL]).count() + partition_triples.write.partitionBy([SRC_EDGE_BUCKET_COL, DST_EDGE_BUCKET_COL]).csv( + output_folder, mode="overwrite", sep="\t" + ) + bucket_counts.write.partitionBy(SRC_EDGE_BUCKET_COL).csv(output_folder + "_counts", mode="overwrite", sep="\t") + + partition_offsets = [] + for src_part in range(num_partitions): + tmp_counts_files = s3_obj.glob("{}_counts/src_part={}/part-*.csv".format(output_folder, str(src_part))) + edges_bucket_counts = np.zeros(num_partitions, dtype=np.int) + for tmp_counts_file in tmp_counts_files: + counts = pd.read_csv("s3a://{}".format(tmp_counts_file), sep="\t", header=None) + dst_buckets = counts.iloc[:, 0].values + dst_counts = counts.iloc[:, 1].values + edges_bucket_counts[dst_buckets] = dst_counts + partition_offsets.append(edges_bucket_counts) + + # TODO: merge all the csvs generated by partition_triples into output_filename + + return np.concatenate(partition_offsets) + + def write_partitioned_df_to_csv(partition_triples, num_partitions, output_filename): + if partition_triples is None: + return + bucket_counts = partition_triples.groupBy([SRC_EDGE_BUCKET_COL, DST_EDGE_BUCKET_COL]).count() print(partition_triples.rdd.getNumPartitions()) @@ -101,91 +155,261 @@ def write_partitioned_df_to_csv(partition_triples, num_partitions, output_filena return np.concatenate(partition_offsets) -class SparkWriter(object): - def __init__(self, spark, output_dir, partitioned_evaluation): - super().__init__() - +class SparkWriteBase(object): + def __init__( + self, + spark, + output_dir, + partitioned_evaluation, + train_edges_df, + valid_edges_df, + test_edges_df, + nodes_df, + rels_df, + num_partitions, + ): self.spark = spark self.output_dir = output_dir self.partitioned_evaluation = partitioned_evaluation - - def write_to_csv(self, train_edges_df, valid_edges_df, test_edges_df, nodes_df, rels_df, num_partitions): + self.train_edges_df = train_edges_df + self.valid_edges_df = valid_edges_df + self.test_edges_df = test_edges_df + self.nodes_df = nodes_df + self.rels_df = rels_df + self.num_partitions = num_partitions + + def get_dataset_stats(self): dataset_stats = DatasetConfig() - dataset_stats.dataset_dir = Path(self.output_dir).absolute().__str__() - - dataset_stats.num_edges = train_edges_df.count() + dataset_stats.num_edges = self.train_edges_df.count() dataset_stats.num_train = dataset_stats.num_edges - if valid_edges_df is not None: - dataset_stats.num_valid = valid_edges_df.count() - if test_edges_df is not None: - dataset_stats.num_test = test_edges_df.count() + if self.valid_edges_df is not None: + dataset_stats.num_valid = self.valid_edges_df.count() + if self.test_edges_df is not None: + dataset_stats.num_test = self.test_edges_df.count() - dataset_stats.num_nodes = get_df_count(nodes_df, INDEX_COL) + dataset_stats.num_nodes = get_df_count(self.nodes_df, INDEX_COL) - if rels_df is None: + if self.rels_df is None: dataset_stats.num_relations = 1 else: - dataset_stats.num_relations = get_df_count(rels_df, REL_INDEX_COL) + dataset_stats.num_relations = get_df_count(self.rels_df, REL_INDEX_COL) + + return dataset_stats + +class SparkWriterLocal(SparkWriteBase): + def __init__( + self, + spark, + output_dir, + partitioned_evaluation, + train_edges_df, + valid_edges_df, + test_edges_df, + nodes_df, + rels_df, + num_partitions, + ): + super().__init__( + spark, + output_dir, + partitioned_evaluation, + train_edges_df, + valid_edges_df, + test_edges_df, + nodes_df, + rels_df, + num_partitions, + ) + self.edges_helper = { + "train": [self.train_edges_df, self.output_dir / Path(PathConstants.train_edges_path)], + "valid": [self.valid_edges_df, self.output_dir / Path(PathConstants.valid_edges_path)], + "test": [self.test_edges_df, self.output_dir / Path(PathConstants.test_edges_path)], + } + self.offset_helper = { + "train": self.output_dir / Path(PathConstants.train_edge_buckets_path), + "valid": self.output_dir / Path(PathConstants.valid_edge_buckets_path), + "test": self.output_dir / Path(PathConstants.test_edge_buckets_path), + } + + def write_dataset_stats(self): + dataset_stats = self.get_dataset_stats() + dataset_stats.dataset_dir = Path(self.output_dir).absolute().__str__() with open(self.output_dir / Path("dataset.yaml"), "w") as f: print("Dataset statistics written to: {}".format((self.output_dir / Path("dataset.yaml")).__str__())) yaml_file = OmegaConf.to_yaml(dataset_stats) f.writelines(yaml_file) + return dataset_stats - write_df_to_csv(nodes_df, self.output_dir / Path(PathConstants.node_mapping_path)) + def write_nodes_df(self): + write_df_to_csv(self.nodes_df, self.output_dir / Path(PathConstants.node_mapping_path)) + + def write_rels_df(self): + if self.rels_df is not None: + write_df_to_csv(self.rels_df, self.output_dir / Path(PathConstants.relation_mapping_path)) + + def write_partitioned_df_to_csv(self, mode): + return write_partitioned_df_to_csv(self.edges_helper[mode][0], self.num_partitions, self.edges_helper[mode][1]) + + def write_edge_offsets(self, mode, offsets): + with open(self.offset_helper[mode], "w") as f: + f.writelines([str(o) + "\n" for o in offsets]) + + def write_edges_df(self, mode): + write_df_to_csv(self.edges_helper[mode][0], self.edges_helper[mode][1]) + + +class SparkWriteS3(SparkWriteBase): + def __init__( + self, + spark, + output_dir, + partitioned_evaluation, + train_edges_df, + valid_edges_df, + test_edges_df, + nodes_df, + rels_df, + num_partitions, + ): + super().__init__( + spark, + output_dir, + partitioned_evaluation, + train_edges_df, + valid_edges_df, + test_edges_df, + nodes_df, + rels_df, + num_partitions, + ) - if rels_df is not None: - write_df_to_csv(rels_df, self.output_dir / Path(PathConstants.relation_mapping_path)) + self.s3_bucket = "s3a://{}".format(os.getenv("S3_BUCKET")) + self.s3 = s3fs.S3FileSystem(key=os.getenv("AWS_ACCESS_KEY_ID"), secret=os.getenv("AWS_SECRET_ACCESS_KEY")) + self.nodes_dir = "{}/{}".format(self.s3_bucket, PathConstants.nodes_directory) + self.edges_dir = "{}/{}".format(self.s3_bucket, PathConstants.edges_directory) + self.dataset_stats_path = "{}/dataset.yaml".format(self.s3_bucket) + self.s3.mkdir(self.nodes_dir) + self.s3.mkdir(self.edges_dir) + for mode in ["train", "valid", "test"]: + self.s3.mkdir("{}{}".format(self.edges_dir, mode)) + self.s3.mkdir("{}{}_counts".format(self.edges_dir, mode)) + + self.node_mapping_file = "{}{}".format(self.nodes_dir, PathConstants.node_mapping_file) + self.relation_mapping_file = "{}{}".format(self.edges_dir, PathConstants.relation_mapping_file) + self.edges_helper = { + "train": [self.train_edges_df, "{}{}".format(self.edges_dir, PathConstants.train_edges_path)], + "valid": [self.valid_edges_df, "{}{}".format(self.edges_dir, PathConstants.valid_edges_path)], + "test": [self.test_edges_df, "{}{}".format(self.edges_dir, PathConstants.test_edges_path)], + } + self.offset_helper = { + "train": "{}train/{}{}".format( + self.edges_dir, PathConstants.training_file_prefix, PathConstants.partition_offsets_file + ), + "valid": "{}valid/{}{}".format( + self.edges_dir, PathConstants.validation_file_prefix, PathConstants.partition_offsets_file + ), + "test": "{}test/{}{}".format( + self.edges_dir, PathConstants.test_file_prefix, PathConstants.partition_offsets_file + ), + } + + def write_dataset_stats(self): + dataset_stats = self.get_dataset_stats() + dataset_stats.dataset_dir = "s3a://{}".format(self.s3_bucket) + with self.s3.open(self.dataset_stats_path, "w") as f: + yaml_file = OmegaConf.to_yaml(dataset_stats) + f.writelines(yaml_file) + f.flush() + return dataset_stats - if num_partitions > 1: - offsets = write_partitioned_df_to_csv( - train_edges_df, num_partitions, self.output_dir / Path(PathConstants.train_edges_path) - ) + def write_nodes_df(self): + s3_write_df_to_csv(self.s3, self.nodes_df, self.nodes_dir, self.node_mapping_file) - with open(self.output_dir / Path(PathConstants.train_edge_buckets_path), "w") as f: - f.writelines([str(o) + "\n" for o in offsets]) + def write_rels_df(self): + s3_write_df_to_csv(self.s3, self.rels_df, self.edges_dir, self.relation_mapping_file) - if self.partitioned_evaluation: - if valid_edges_df is not None: - offsets = write_partitioned_df_to_csv( - valid_edges_df, num_partitions, self.output_dir / Path(PathConstants.valid_edges_path) - ) + def write_edges_df(self, mode): + s3_write_df_to_csv(self.s3, self.edges_helper[mode][0], self.edges_dir + mode, self.edges_helper[mode][1]) - with open(self.output_dir / Path(PathConstants.valid_edge_buckets_path), "w") as f: - f.writelines([str(o) + "\n" for o in offsets]) + def write_partitioned_df_to_csv(self, mode): + return s3_write_partitioned_df_to_csv( + self.s3, self.edges_helper[mode][0], self.num_partitions, self.edges_dir + mode, self.edges_helper[mode][1] + ) - if test_edges_df is not None: - offsets = write_partitioned_df_to_csv( - test_edges_df, num_partitions, self.output_dir / Path(PathConstants.test_edges_path) - ) - with open(self.output_dir / Path(PathConstants.test_edge_buckets_path), "w") as f: - f.writelines([str(o) + "\n" for o in offsets]) + def write_edge_offsets(self, mode, offsets): + with self.s3.open(self.offset_helper[mode], "w") as f: + f.writelines([str(o) + "\n" for o in offsets]) + f.flush() - else: - if valid_edges_df is not None: - write_df_to_csv(valid_edges_df, self.output_dir / Path(PathConstants.valid_edges_path)) - if test_edges_df is not None: - write_df_to_csv(test_edges_df, self.output_dir / Path(PathConstants.test_edges_path)) +class SparkWriter(object): + def __init__(self, spark, output_dir, output_to_s3, partitioned_evaluation): + self.spark = spark + self.output_dir = output_dir + self.output_to_s3 = output_to_s3 + if self.output_to_s3: + self.s3 = s3fs.S3FileSystem(key=os.getenv("AWS_ACCESS_KEY_ID"), secret=os.getenv("AWS_SECRET_ACCESS_KEY")) + self.partitioned_evaluation = partitioned_evaluation + def write_to_csv(self, train_edges_df, valid_edges_df, test_edges_df, nodes_df, rels_df, num_partitions): + writer = None + if self.output_to_s3: + writer = SparkWriteS3( + self.spark, + self.output_dir, + self.partitioned_evaluation, + train_edges_df, + valid_edges_df, + test_edges_df, + nodes_df, + rels_df, + num_partitions, + ) else: - write_df_to_csv(train_edges_df, self.output_dir / Path(PathConstants.train_edges_path)) + writer = SparkWriterLocal( + self.spark, + self.output_dir, + self.partitioned_evaluation, + train_edges_df, + valid_edges_df, + test_edges_df, + nodes_df, + rels_df, + num_partitions, + ) - if valid_edges_df is not None: - write_df_to_csv(valid_edges_df, self.output_dir / Path(PathConstants.valid_edges_path)) + dataset_stats = writer.write_dataset_stats() + writer.write_nodes_df() + writer.write_rels_df() - if test_edges_df is not None: - write_df_to_csv(test_edges_df, self.output_dir / Path(PathConstants.test_edges_path)) + if writer.num_partitions > 1: + offsets = writer.write_partitioned_df_to_csv("train") + writer.write_edge_offsets("train", offsets) + + for mode in ["valid", "test"]: + if writer.partitioned_evaluation: + offsets = writer.write_partitioned_df_to_csv(mode) + writer.write_edge_offsets(mode, offsets) + else: + writer.write_edges_df(mode) + else: + for mode in ["train", "valid", "test"]: + writer.write_edges_df(mode) return dataset_stats def write_to_binary(self, train_edges_df, valid_edges_df, test_edges_df, nodes_df, rels_df, num_partitions): - print("Writing to CSV") dataset_stats = self.write_to_csv( train_edges_df, valid_edges_df, test_edges_df, nodes_df, rels_df, num_partitions ) + if self.output_to_s3: + # TODO: merge csvs generated above and convert them to binary + print("Binary mode output not yet supported for S3") + exit() + train_file = self.output_dir / Path(PathConstants.train_edges_path) valid_file = self.output_dir / Path(PathConstants.valid_edges_path) test_file = self.output_dir / Path(PathConstants.test_edges_path) diff --git a/src/python/tools/preprocess/datasets/fb15k_237.py b/src/python/tools/preprocess/datasets/fb15k_237.py index 0161c269..3fec8666 100644 --- a/src/python/tools/preprocess/datasets/fb15k_237.py +++ b/src/python/tools/preprocess/datasets/fb15k_237.py @@ -1,5 +1,6 @@ from pathlib import Path +from marius.tools.preprocess.converters.spark_converter import SparkEdgeListConverter from marius.tools.preprocess.converters.torch_converter import TorchEdgeListConverter from marius.tools.preprocess.dataset import LinkPredictionDataset from marius.tools.preprocess.utils import download_url, extract_file @@ -47,7 +48,7 @@ def download(self, overwrite=False): def preprocess( self, num_partitions=1, remap_ids=True, splits=None, sequential_train_nodes=False, partitioned_eval=False ): - converter = TorchEdgeListConverter( + converter = SparkEdgeListConverter( output_dir=self.output_directory, train_edges=self.input_train_edges_file, valid_edges=self.input_valid_edges_file, diff --git a/src/python/tools/preprocess/datasets/twitter.py b/src/python/tools/preprocess/datasets/twitter.py index a66accc4..13d642fb 100644 --- a/src/python/tools/preprocess/datasets/twitter.py +++ b/src/python/tools/preprocess/datasets/twitter.py @@ -28,7 +28,14 @@ def download(self, overwrite=False): archive_path = download_url(self.dataset_url, self.output_directory, overwrite) extract_file(archive_path, remove_input=True) - def preprocess(self, num_partitions=1, remap_ids=True, splits=[0.9, 0.05, 0.05], sequential_train_nodes=False): + def preprocess( + self, + num_partitions=1, + remap_ids=True, + splits=[0.9, 0.05, 0.05], + sequential_train_nodes=False, + partitioned_eval=False, + ): converter = TorchEdgeListConverter( output_dir=self.output_directory, train_edges=self.input_edges, From 9ca0a0590517970270471c5edfcc50899eb62638 Mon Sep 17 00:00:00 2001 From: Basava Date: Tue, 22 Nov 2022 21:15:30 +0000 Subject: [PATCH 2/2] reverting test changes --- src/python/tools/preprocess/datasets/fb15k_237.py | 3 +-- src/python/tools/preprocess/datasets/twitter.py | 9 +-------- 2 files changed, 2 insertions(+), 10 deletions(-) diff --git a/src/python/tools/preprocess/datasets/fb15k_237.py b/src/python/tools/preprocess/datasets/fb15k_237.py index 3fec8666..0161c269 100644 --- a/src/python/tools/preprocess/datasets/fb15k_237.py +++ b/src/python/tools/preprocess/datasets/fb15k_237.py @@ -1,6 +1,5 @@ from pathlib import Path -from marius.tools.preprocess.converters.spark_converter import SparkEdgeListConverter from marius.tools.preprocess.converters.torch_converter import TorchEdgeListConverter from marius.tools.preprocess.dataset import LinkPredictionDataset from marius.tools.preprocess.utils import download_url, extract_file @@ -48,7 +47,7 @@ def download(self, overwrite=False): def preprocess( self, num_partitions=1, remap_ids=True, splits=None, sequential_train_nodes=False, partitioned_eval=False ): - converter = SparkEdgeListConverter( + converter = TorchEdgeListConverter( output_dir=self.output_directory, train_edges=self.input_train_edges_file, valid_edges=self.input_valid_edges_file, diff --git a/src/python/tools/preprocess/datasets/twitter.py b/src/python/tools/preprocess/datasets/twitter.py index 13d642fb..a66accc4 100644 --- a/src/python/tools/preprocess/datasets/twitter.py +++ b/src/python/tools/preprocess/datasets/twitter.py @@ -28,14 +28,7 @@ def download(self, overwrite=False): archive_path = download_url(self.dataset_url, self.output_directory, overwrite) extract_file(archive_path, remove_input=True) - def preprocess( - self, - num_partitions=1, - remap_ids=True, - splits=[0.9, 0.05, 0.05], - sequential_train_nodes=False, - partitioned_eval=False, - ): + def preprocess(self, num_partitions=1, remap_ids=True, splits=[0.9, 0.05, 0.05], sequential_train_nodes=False): converter = TorchEdgeListConverter( output_dir=self.output_directory, train_edges=self.input_edges,