Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 56 additions & 0 deletions tornado/test/util_test.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import datetime
import gzip
import re
import sys
import textwrap
Expand All @@ -15,6 +16,7 @@
raise_exc_info,
re_unescape,
timedelta_to_seconds,
GzipDecompressor,
)


Expand Down Expand Up @@ -366,3 +368,57 @@ def test_version_info_compatible(self):

def test_current_version(self):
self.assert_version_info_compatible(tornado.version, tornado.version_info)


class GzipDecompressorTest(unittest.TestCase):
def test_concatenated_gzip_members(self):
"""Test that concatenated gzip members are fully decompressed."""
data1 = b"First gzip member content."
data2 = b"Second gzip member content."

member1 = gzip.compress(data1)
member2 = gzip.compress(data2)

concatenated = member1 + member2
decompressor = GzipDecompressor()
result = decompressor.decompress(concatenated)

expected = data1 + data2
self.assertEqual(
result, expected, "Concatenated gzip members should be fully decompressed"
)

def test_single_gzip_member(self):
"""Test that single gzip member is decompressed correctly."""
data = b"This is some example data that will be compressed using gzip."
compressed = gzip.compress(data)

decompressor = GzipDecompressor()
result = decompressor.decompress(compressed)

self.assertEqual(result, data)

def test_multiple_concatenated_members(self):
"""Test that three or more concatenated gzip members are fully decompressed."""
data1 = b"First member."
data2 = b"Second member."
data3 = b"Third member."

concatenated = gzip.compress(data1) + gzip.compress(data2) + gzip.compress(data3)
decompressor = GzipDecompressor()
result = decompressor.decompress(concatenated)

expected = data1 + data2 + data3
self.assertEqual(result, expected)

def test_decompress_after_flush_raises(self):
"""Test that decompress() raises RuntimeError after flush()."""
data = b"Test data"
compressed = gzip.compress(data)

decompressor = GzipDecompressor()
decompressor.decompress(compressed)
decompressor.flush()

with self.assertRaises(RuntimeError):
decompressor.decompress(gzip.compress(b"More data"))
32 changes: 30 additions & 2 deletions tornado/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ def __init__(self) -> None:
# http://stackoverflow.com/questions/1838699/how-can-i-decompress-a-gzip-stream-with-zlib
# This works on cpython and pypy, but not jython.
self.decompressobj = zlib.decompressobj(16 + zlib.MAX_WBITS)
self._flushed = False

def decompress(self, value: bytes, max_length: int = 0) -> bytes:
"""Decompress a chunk, returning newly-available data.
Expand All @@ -82,7 +83,32 @@ def decompress(self, value: bytes, max_length: int = 0) -> bytes:
in ``unconsumed_tail``; you must retrieve this value and pass
it back to a future call to `decompress` if it is not empty.
"""
return self.decompressobj.decompress(value, max_length)
if self._flushed:
raise RuntimeError("Cannot call decompress() after flush()")

data = value
out = bytearray()
remaining = max_length

while True:
chunk = self.decompressobj.decompress(data, remaining)
out.extend(chunk)

if max_length:
remaining = max_length - len(out)
if remaining <= 0:
break

# Handle concatenated gzip members
unused = self.decompressobj.unused_data
if unused:
data = unused
self.decompressobj = zlib.decompressobj(16 + zlib.MAX_WBITS)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this reset the _flushed flag?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to reset it here — we're already inside decompress() which checks _flushed at the top, so we can only reach this code path when _flushed is False. The flag is about the GzipDecompressor lifecycle, not the inner zlib object.

continue

break

return bytes(out)

@property
def unconsumed_tail(self) -> bytes:
Expand All @@ -95,7 +121,9 @@ def flush(self) -> bytes:
Also checks for errors such as truncated input.
No other methods may be called on this object after `flush`.
"""
return self.decompressobj.flush()
result = self.decompressobj.flush()
self._flushed = True
return result


def import_object(name: str) -> Any:
Expand Down