Skip to content

Commit

Permalink
fix: keep streams open
Browse files Browse the repository at this point in the history
  • Loading branch information
tskir committed Dec 3, 2023
1 parent cb75567 commit a3f67cf
Showing 1 changed file with 4 additions and 5 deletions.
9 changes: 4 additions & 5 deletions src/batch/spark_prep.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,11 +174,10 @@ def cast_to_bytes(x: Any) -> typing.IO[bytes]:
return typing.cast(typing.IO[bytes], x)

# Set up streams.
with cast_to_bytes(ResilientFetch(self.input_uri)) as gzip_stream:
with cast_to_bytes(gzip.GzipFile(fileobj=gzip_stream)) as bytes_stream:
with io.TextIOWrapper(bytes_stream) as text_stream:
self.text_stream = text_stream
self.field_names = text_stream.readline().split("\t")
gzip_stream = cast_to_bytes(ResilientFetch(self.input_uri))
bytes_stream = cast_to_bytes(gzip.GzipFile(fileobj=gzip_stream))
self.text_stream = io.TextIOWrapper(bytes_stream)
self.field_names = self.text_stream.readline().split("\t")

def _p1_fetch_data(self, q_out: Queue[str | None]) -> None:
"""Fetch data from the URI in blocks.
Expand Down

0 comments on commit a3f67cf

Please sign in to comment.