From acc265655d6dde4ff1bce65308e9e152fbf3381a Mon Sep 17 00:00:00 2001 From: Thomas Fischbacher Date: Wed, 19 Jul 2023 05:43:51 -0700 Subject: [PATCH] Small Python modernization of Brotli code. PiperOrigin-RevId: 549289787 --- python/bro.py | 102 +++++++++++++++++++++++++++++++++----------------- 1 file changed, 68 insertions(+), 34 deletions(-) diff --git a/python/bro.py b/python/bro.py index b742fd734..f0adf5c0c 100755 --- a/python/bro.py +++ b/python/bro.py @@ -1,7 +1,11 @@ #! /usr/bin/env python """Compression/decompression utility using the Brotli algorithm.""" +# Note: Python2 has been deprecated long ago, but some projects out in +# the wide world may still use it nevertheless. This should not +# deprive them from being able to run Brotli. from __future__ import print_function + import argparse import os import platform @@ -9,8 +13,9 @@ import brotli + # default values of encoder parameters -DEFAULT_PARAMS = { +_DEFAULT_PARAMS = { 'mode': brotli.MODE_GENERIC, 'quality': 11, 'lgwin': 22, @@ -19,17 +24,32 @@ def get_binary_stdio(stream): - """ Return the specified standard input, output or errors stream as a - 'raw' buffer object suitable for reading/writing binary data from/to it. + """Return the specified stdin/stdout/stderr stream. + + If the stdio stream requested (i.e. sys.(stdin|stdout|stderr)) + has been replaced with a stream object that does not have a `.buffer` + attribute, this will return the original stdio stream's buffer, i.e. + `sys.__(stdin|stdout|stderr)__.buffer`. + + Args: + stream: One of 'stdin', 'stdout', 'stderr'. + + Returns: + The stream, as a 'raw' buffer object (i.e. io.BufferedIOBase subclass + instance such as io.Bufferedreader/io.BufferedWriter), suitable for + reading/writing binary data from/to it. """ - assert stream in ['stdin', 'stdout', 'stderr'], 'invalid stream name' - stdio = getattr(sys, stream) + if stream == 'stdin': stdio = sys.stdin + elif stream == 'stdout': stdio = sys.stdout + elif stream == 'stderr': stdio = sys.stderr + else: + raise ValueError('invalid stream name: %s' % (stream,)) if sys.version_info[0] < 3: if sys.platform == 'win32': # set I/O stream binary flag on python2.x (Windows) runtime = platform.python_implementation() if runtime == 'PyPy': - # the msvcrt trick doesn't work in pypy, so I use fdopen + # the msvcrt trick doesn't work in pypy, so use fdopen(). mode = 'rb' if stream == 'stdin' else 'wb' stdio = os.fdopen(stdio.fileno(), mode, 0) else: @@ -38,12 +58,20 @@ def get_binary_stdio(stream): msvcrt.setmode(stdio.fileno(), os.O_BINARY) return stdio else: - # get 'buffer' attribute to read/write binary data on python3.x - if hasattr(stdio, 'buffer'): + try: return stdio.buffer - else: - orig_stdio = getattr(sys, '__%s__' % stream) - return orig_stdio.buffer + except AttributeError: + # The Python reference explains + # (-> https://docs.python.org/3/library/sys.html#sys.stdin) + # that the `.buffer` attribute might not exist, since + # the standard streams might have been replaced by something else + # (such as an `io.StringIO()` - perhaps via + # `contextlib.redirect_stdout()`). + # We fall back to the original stdio in these cases. + if stream == 'stdin': return sys.__stdin__.buffer + if stream == 'stdout': return sys.__stdout__.buffer + if stream == 'stderr': return sys.__stderr__.buffer + assert False, 'Impossible Situation.' def main(args=None): @@ -114,46 +142,52 @@ def main(args=None): help='Base 2 logarithm of the maximum input block size. ' 'Range is 16 to 24. If set to 0, the value will be set based ' 'on the quality. Defaults to 0.') - # set default values using global DEFAULT_PARAMS dictionary - parser.set_defaults(**DEFAULT_PARAMS) + # set default values using global _DEFAULT_PARAMS dictionary + parser.set_defaults(**_DEFAULT_PARAMS) options = parser.parse_args(args=args) if options.infile: - if not os.path.isfile(options.infile): - parser.error('file "%s" not found' % options.infile) - with open(options.infile, 'rb') as infile: - data = infile.read() + try: + with open(options.infile, 'rb') as infile: + data = infile.read() + except OSError: + parser.error('Could not read --infile: %s' % (infile,)) else: if sys.stdin.isatty(): # interactive console, just quit - parser.error('no input') + parser.error('No input (called from interactive terminal).') infile = get_binary_stdio('stdin') data = infile.read() if options.outfile: - if os.path.isfile(options.outfile) and not options.force: - parser.error('output file exists') + # Caution! If `options.outfile` is a broken symlink, will try to + # redirect the write according to symlink. + if os.path.exists(options.outfile) and not options.force: + parser.error(('Target --outfile=%s already exists, ' + 'but --force was not requested.') % (outfile,)) outfile = open(options.outfile, 'wb') + did_open_outfile = True else: outfile = get_binary_stdio('stdout') - + did_open_outfile = False try: - if options.decompress: - data = brotli.decompress(data) - else: - data = brotli.compress( - data, - mode=options.mode, - quality=options.quality, - lgwin=options.lgwin, - lgblock=options.lgblock) + try: + if options.decompress: + data = brotli.decompress(data) + else: + data = brotli.compress( + data, + mode=options.mode, + quality=options.quality, + lgwin=options.lgwin, + lgblock=options.lgblock) + outfile.write(data) + finally: + if did_open_outfile: outfile.close() except brotli.error as e: parser.exit(1, - 'bro: error: %s: %s' % (e, options.infile or 'sys.stdin')) - - outfile.write(data) - outfile.close() + 'bro: error: %s: %s' % (e, options.infile or '{stdin}')) if __name__ == '__main__':