Skip to content

Commit

Permalink
perf: parse TSV data in blocks using Pandas
Browse files Browse the repository at this point in the history
  • Loading branch information
tskir committed Nov 26, 2023
1 parent 5da2bd3 commit 6c31f92
Showing 1 changed file with 80 additions and 83 deletions.
163 changes: 80 additions & 83 deletions src/beam/eqtl_catalogue.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ def _fetch_data_in_blocks(self, uri: str) -> Iterator[str]:
buffer = ""
while True:
# Read more data from the URI source.
buffer += text_stream.read(self.chunk_size)
buffer += text_stream.read(self.block_size)
# If we don't have any data, this means we reached the end of the stream.
if not buffer:
break
Expand All @@ -198,114 +198,115 @@ def _fetch_data_in_blocks(self, uri: str) -> Iterator[str]:
buffer = buffer[rightmost_newline_split:]

def _split_final_data(
self, qtl_group: str, chromosome: str, block_index: int, data: List[str]
) -> Iterator[tuple[str, str, int, List[str]]]:
self, qtl_group: str, chromosome: str, block_index: int, df: pd.DataFrame
) -> Iterator[tuple[str, str, int, pd.DataFrame]]:
"""Process the final chunk of the data and split into partitions as close to self.emit_block_size as possible.
Args:
qtl_group (str): QTL group field used for study ID.
chromosome (str): Chromosome identifier.
block_index (int): Starting number of the block to emit.
data (List[str]): Remaining chunk data to split.
df (pd.DataFrame): Remaining chunk data to split.
Yields:
tuple[str, str, int, List[str]]: Tuple of values to generate the final Parquet file.
tuple[str, str, int, pd.DataFrame]: Tuple of values to generate the final Parquet file.
"""
import math

number_of_blocks = max(round(len(data) / self.emit_block_size), 1)
records_per_block = math.ceil(len(data) / number_of_blocks)
for index in range(0, len(data), records_per_block):
number_of_blocks = max(round(len(df) / self.emit_block_size), 1)
records_per_block = math.ceil(len(df) / number_of_blocks)
for index in range(0, len(df), records_per_block):
yield (
qtl_group,
chromosome,
block_index,
data[index : index + records_per_block],
df[index : index + records_per_block],
)
block_index += 1

def process(
self,
record: Dict[str, Any],
) -> Iterator[tuple[str, str, int, List[str]]]:
"""Process one input file and yield per-chromosome blocks of records.
) -> Iterator[tuple[str, str, int, pd.DataFrame]]:
"""Process one input file and yield block records ready for Parquet writing.
Args:
record (Dict[str, Any]): A record describing one input file and its attributes.
Yields:
tuple[str, str, int, List[str]]: Attribute and data list.
tuple[str, str, int, pd.DataFrame]: Attribute and data list.
"""
import gzip
import io
import typing

import pandas as pd

assert (
record["study"] == "GTEx_V8"
), "Only GTEx_V8 studies are currently supported."
http_path = record["ftp_path"].replace("ftp://", "http://")
with self.resilient_urlopen(http_path) as compressed_stream:
# See: https://stackoverflow.com/a/58407810.
compressed_stream_typed = typing.cast(typing.IO[bytes], compressed_stream)
with gzip.GzipFile(fileobj=compressed_stream_typed) as uncompressed_stream:
uncompressed_stream_typed = typing.cast(
typing.IO[bytes], uncompressed_stream
qtl_group = record["qtl_group"]

current_chromosome = ""
current_block_index = 0
current_data_block = pd.DataFrame(columns=self.FIELDS)

for data_block in self._fetch_data_in_blocks(http_path):
# Parse data block.
data_io = io.StringIO(data_block)
df_block = pd.DataFrame(data_io, columns=FIELDS)

# Split data block by chromosome.
df_block_by_chromosome = [
(key, group)
for key, group in df_block.groupby("chromosome", sort=False)
]
first_chromosome_in_block = df_block_by_chromosome[0][0]

# If this is the first block we ever see, initialise "current_chromosome".
if not current_chromosome:
current_chromosome = first_chromosome_in_block

# If the block starts with the chromosome we are currently processing (this is going to almost always be the case),
# we append the data from the block to the current chromosome block.
if first_chromosome_in_block == current_chromosome:
current_data_block = pd.concat(
current_data_block, df_block_by_chromosome[0][1]
)
with io.TextIOWrapper(uncompressed_stream_typed) as text_stream:
current_chromosome = ""
current_data_block: List[Any] = []
current_block_index = 0
observed_chromosomes = set()
chromosome_index = self.FIELDS.index("chromosome")
for i, row in enumerate(text_stream):
if i == 0:
# Skip header.
continue
data = row.split("\t")
# Perform actions depending on the chromosome.
chromosome = data[chromosome_index]
if not current_chromosome:
# Initialise for the first record.
current_chromosome = chromosome
if len(current_data_block) >= self.emit_ready_buffer:
data_block = current_data_block[: self.emit_block_size]
yield (
record["qtl_group"],
current_chromosome,
current_block_index,
data_block,
)
current_data_block = current_data_block[
self.emit_block_size :
]
current_block_index += 1
if chromosome != current_chromosome:
# Yield the block(s) and reset everything.
for block in self._split_final_data(
record["qtl_group"],
current_chromosome,
current_block_index,
current_data_block,
):
yield block
current_data_block = []
current_block_index = 0
observed_chromosomes.add(current_chromosome)
assert (
chromosome not in observed_chromosomes
), f"Chromosome {chromosome} appears twice in data"
current_chromosome = chromosome
# Expand existing block.
current_data_block.append(data)
# Yield any remaining block(s).
if current_data_block:
for block in self._split_final_data(
record["qtl_group"],
current_chromosome,
current_block_index,
current_data_block,
):
yield block
df_block_by_chromosome = df_block_by_chromosome[1:]

# If we have any more chromosomes in the block, then we should emit:
if df_block_by_chromosome:
# The current chromosome.
for block in self._split_final_data(
qtl_group,
current_chromosome,
current_block_index,
current_data_block,
):
yield block
# All chromosomes in the block except the last one (if any are present) because they are complete.
for chromosome, chromosome_block in df_block_by_chromosome[:-1]:
for block in self._split_final_data(
qtl_group, chromosome, 0, chromosome_block
):
yield block
# And then set current chromosome to the last chromosome in the block.
current_chromosome, current_data_block = df_block_by_chromosome[-1]
current_block_index = 0

# If we have enough data for the chromosome we are currently processing, we can emit some blocks already.
while len(current_data_block) >= self.emit_ready_buffer:
emit_block = current_data_block[: self.emit_block_size]
yield (qtl_group, current_chromosome, current_block_index, emit_block)
current_block_index += 1
current_data_block = current_data_block[self.emit_block_size :]

# Finally, if we have any data remaining at the end of processing, we should emit it.
if len(current_data_block):
for block in self._split_final_data(
qtl_group, current_chromosome, current_block_index, current_data_block
):
yield block


class WriteData(beam.DoFn):
Expand All @@ -315,15 +316,13 @@ class WriteData(beam.DoFn):
PYARROW_SCHEMA = PYARROW_SCHEMA
FIELDS = FIELDS

def process(self, element: tuple[str, str, int, List[str]]) -> None:
def process(self, element: tuple[str, str, int, pd.DataFrame]) -> None:
"""Write a Parquet file for a given input file.
Args:
element (tuple[str, str, int, List[str]]): key and grouped values.
element (tuple[str, str, int, pd.DataFrame]): key and grouped values.
"""
import pandas as pd

qtl_group, chromosome, chunk_number, records = element
qtl_group, chromosome, chunk_number, df = element
output_filename = (
f"{self.EQTL_CATALOGUE_OUPUT_BASE}/"
"analysisType=eQTL/"
Expand All @@ -333,9 +332,7 @@ def process(self, element: tuple[str, str, int, List[str]]) -> None:
f"chromosome={chromosome}/" # Example: 13.
f"part-{chunk_number:05}.snappy.parquet"
)
pd.DataFrame(records, columns=self.FIELDS).to_parquet(
output_filename, compression="snappy"
)
df.to_parquet(output_filename, compression="snappy")


def run_pipeline() -> None:
Expand Down

0 comments on commit 6c31f92

Please sign in to comment.