diff --git a/src/beam/eqtl_catalogue.py b/src/beam/eqtl_catalogue.py index c78298601..83600556e 100644 --- a/src/beam/eqtl_catalogue.py +++ b/src/beam/eqtl_catalogue.py @@ -188,7 +188,7 @@ def _fetch_data_in_blocks(self, uri: str) -> Iterator[str]: buffer = "" while True: # Read more data from the URI source. - buffer += text_stream.read(self.chunk_size) + buffer += text_stream.read(self.block_size) # If we don't have any data, this means we reached the end of the stream. if not buffer: break @@ -198,114 +198,115 @@ def _fetch_data_in_blocks(self, uri: str) -> Iterator[str]: buffer = buffer[rightmost_newline_split:] def _split_final_data( - self, qtl_group: str, chromosome: str, block_index: int, data: List[str] - ) -> Iterator[tuple[str, str, int, List[str]]]: + self, qtl_group: str, chromosome: str, block_index: int, df: pd.DataFrame + ) -> Iterator[tuple[str, str, int, pd.DataFrame]]: """Process the final chunk of the data and split into partitions as close to self.emit_block_size as possible. Args: qtl_group (str): QTL group field used for study ID. chromosome (str): Chromosome identifier. block_index (int): Starting number of the block to emit. - data (List[str]): Remaining chunk data to split. + df (pd.DataFrame): Remaining chunk data to split. Yields: - tuple[str, str, int, List[str]]: Tuple of values to generate the final Parquet file. + tuple[str, str, int, pd.DataFrame]: Tuple of values to generate the final Parquet file. """ import math - number_of_blocks = max(round(len(data) / self.emit_block_size), 1) - records_per_block = math.ceil(len(data) / number_of_blocks) - for index in range(0, len(data), records_per_block): + number_of_blocks = max(round(len(df) / self.emit_block_size), 1) + records_per_block = math.ceil(len(df) / number_of_blocks) + for index in range(0, len(df), records_per_block): yield ( qtl_group, chromosome, block_index, - data[index : index + records_per_block], + df[index : index + records_per_block], ) block_index += 1 def process( self, record: Dict[str, Any], - ) -> Iterator[tuple[str, str, int, List[str]]]: - """Process one input file and yield per-chromosome blocks of records. + ) -> Iterator[tuple[str, str, int, pd.DataFrame]]: + """Process one input file and yield block records ready for Parquet writing. Args: record (Dict[str, Any]): A record describing one input file and its attributes. Yields: - tuple[str, str, int, List[str]]: Attribute and data list. + tuple[str, str, int, pd.DataFrame]: Attribute and data list. """ - import gzip import io - import typing + + import pandas as pd assert ( record["study"] == "GTEx_V8" ), "Only GTEx_V8 studies are currently supported." http_path = record["ftp_path"].replace("ftp://", "http://") - with self.resilient_urlopen(http_path) as compressed_stream: - # See: https://stackoverflow.com/a/58407810. - compressed_stream_typed = typing.cast(typing.IO[bytes], compressed_stream) - with gzip.GzipFile(fileobj=compressed_stream_typed) as uncompressed_stream: - uncompressed_stream_typed = typing.cast( - typing.IO[bytes], uncompressed_stream + qtl_group = record["qtl_group"] + + current_chromosome = "" + current_block_index = 0 + current_data_block = pd.DataFrame(columns=self.FIELDS) + + for data_block in self._fetch_data_in_blocks(http_path): + # Parse data block. + data_io = io.StringIO(data_block) + df_block = pd.DataFrame(data_io, columns=FIELDS) + + # Split data block by chromosome. + df_block_by_chromosome = [ + (key, group) + for key, group in df_block.groupby("chromosome", sort=False) + ] + first_chromosome_in_block = df_block_by_chromosome[0][0] + + # If this is the first block we ever see, initialise "current_chromosome". + if not current_chromosome: + current_chromosome = first_chromosome_in_block + + # If the block starts with the chromosome we are currently processing (this is going to almost always be the case), + # we append the data from the block to the current chromosome block. + if first_chromosome_in_block == current_chromosome: + current_data_block = pd.concat( + current_data_block, df_block_by_chromosome[0][1] ) - with io.TextIOWrapper(uncompressed_stream_typed) as text_stream: - current_chromosome = "" - current_data_block: List[Any] = [] - current_block_index = 0 - observed_chromosomes = set() - chromosome_index = self.FIELDS.index("chromosome") - for i, row in enumerate(text_stream): - if i == 0: - # Skip header. - continue - data = row.split("\t") - # Perform actions depending on the chromosome. - chromosome = data[chromosome_index] - if not current_chromosome: - # Initialise for the first record. - current_chromosome = chromosome - if len(current_data_block) >= self.emit_ready_buffer: - data_block = current_data_block[: self.emit_block_size] - yield ( - record["qtl_group"], - current_chromosome, - current_block_index, - data_block, - ) - current_data_block = current_data_block[ - self.emit_block_size : - ] - current_block_index += 1 - if chromosome != current_chromosome: - # Yield the block(s) and reset everything. - for block in self._split_final_data( - record["qtl_group"], - current_chromosome, - current_block_index, - current_data_block, - ): - yield block - current_data_block = [] - current_block_index = 0 - observed_chromosomes.add(current_chromosome) - assert ( - chromosome not in observed_chromosomes - ), f"Chromosome {chromosome} appears twice in data" - current_chromosome = chromosome - # Expand existing block. - current_data_block.append(data) - # Yield any remaining block(s). - if current_data_block: - for block in self._split_final_data( - record["qtl_group"], - current_chromosome, - current_block_index, - current_data_block, - ): - yield block + df_block_by_chromosome = df_block_by_chromosome[1:] + + # If we have any more chromosomes in the block, then we should emit: + if df_block_by_chromosome: + # The current chromosome. + for block in self._split_final_data( + qtl_group, + current_chromosome, + current_block_index, + current_data_block, + ): + yield block + # All chromosomes in the block except the last one (if any are present) because they are complete. + for chromosome, chromosome_block in df_block_by_chromosome[:-1]: + for block in self._split_final_data( + qtl_group, chromosome, 0, chromosome_block + ): + yield block + # And then set current chromosome to the last chromosome in the block. + current_chromosome, current_data_block = df_block_by_chromosome[-1] + current_block_index = 0 + + # If we have enough data for the chromosome we are currently processing, we can emit some blocks already. + while len(current_data_block) >= self.emit_ready_buffer: + emit_block = current_data_block[: self.emit_block_size] + yield (qtl_group, current_chromosome, current_block_index, emit_block) + current_block_index += 1 + current_data_block = current_data_block[self.emit_block_size :] + + # Finally, if we have any data remaining at the end of processing, we should emit it. + if len(current_data_block): + for block in self._split_final_data( + qtl_group, current_chromosome, current_block_index, current_data_block + ): + yield block class WriteData(beam.DoFn): @@ -315,15 +316,13 @@ class WriteData(beam.DoFn): PYARROW_SCHEMA = PYARROW_SCHEMA FIELDS = FIELDS - def process(self, element: tuple[str, str, int, List[str]]) -> None: + def process(self, element: tuple[str, str, int, pd.DataFrame]) -> None: """Write a Parquet file for a given input file. Args: - element (tuple[str, str, int, List[str]]): key and grouped values. + element (tuple[str, str, int, pd.DataFrame]): key and grouped values. """ - import pandas as pd - - qtl_group, chromosome, chunk_number, records = element + qtl_group, chromosome, chunk_number, df = element output_filename = ( f"{self.EQTL_CATALOGUE_OUPUT_BASE}/" "analysisType=eQTL/" @@ -333,9 +332,7 @@ def process(self, element: tuple[str, str, int, List[str]]) -> None: f"chromosome={chromosome}/" # Example: 13. f"part-{chunk_number:05}.snappy.parquet" ) - pd.DataFrame(records, columns=self.FIELDS).to_parquet( - output_filename, compression="snappy" - ) + df.to_parquet(output_filename, compression="snappy") def run_pipeline() -> None: