Skip to content

Commit

Permalink
chore(refactor): use with open, consolidate/un-nest if ...
Browse files Browse the repository at this point in the history
Various cleanups:

use 'with open(...)' rather than 'try: .. finally' to close file.

use "return ..." rather than "x = ...; return x"

Change if/else to if ternaries

Swap ' for " to prevent escaping nested "

Add lint silencing comments

Fix bare return that needs to be return None.
  • Loading branch information
rouilj committed Mar 17, 2024
1 parent 9c7604c commit 11956cb
Showing 1 changed file with 54 additions and 73 deletions.
127 changes: 54 additions & 73 deletions roundup/cgi/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -442,11 +442,11 @@ def __init__(self, instance, request, env, form=None, translator=None):
# will never arrive.
# If not defined, set CONTENT_LENGTH to 0 so it doesn't
# hang reading the data.
if self.env['REQUEST_METHOD'] in ['OPTIONS', 'DELETE', 'PATCH']:
if 'CONTENT_LENGTH' not in self.env:
self.env['CONTENT_LENGTH'] = 0
logger.debug("Setting CONTENT_LENGTH to 0 for method: %s",
self.env['REQUEST_METHOD'])
if self.env['REQUEST_METHOD'] in ['OPTIONS', 'DELETE', 'PATCH'] \
and 'CONTENT_LENGTH' not in self.env:
self.env['CONTENT_LENGTH'] = 0
logger.debug("Setting CONTENT_LENGTH to 0 for method: %s",
self.env['REQUEST_METHOD'])

# cgi.FieldStorage must save all data as
# binary/bytes. Subclass BinaryFieldStorage does this.
Expand Down Expand Up @@ -496,8 +496,7 @@ def __init__(self, instance, request, env, form=None, translator=None):

def _gen_nonce(self):
""" generate a unique nonce """
n = b2s(base64.b32encode(random_.token_bytes(40)))
return n
return b2s(base64.b32encode(random_.token_bytes(40)))

def setTranslator(self, translator=None):
"""Replace the translation engine
Expand Down Expand Up @@ -694,7 +693,8 @@ def handle_rest(self):
if (self.is_cors_preflight()):
self.handle_preflight()
return
elif not self.db.security.hasPermission('Rest Access', self.userid):

if not self.db.security.hasPermission('Rest Access', self.userid):
output = s2b('{ "error": { "status": 403, "msg": "Forbidden." } }')
self.reject_request(output,
message_type="application/json",
Expand Down Expand Up @@ -811,14 +811,11 @@ def inner_main(self):
self.form_wins = True
self._error_message = msg.args

if csrf_ok:
# csrf checks pass. Run actions etc.
# possibly handle a form submit action (may change
# self.classname and self.template, and may also
# append error/ok_messages)
html = self.handle_action()
else:
html = None
# If csrf checks pass. Run actions etc.
# handle_action() may handle a form submit action.
# It can change self.classname and self.template,
# and may also append error/ok_messages.
html = self.handle_action() if csrf_ok else None

if html:
self.write_html(html)
Expand Down Expand Up @@ -883,7 +880,7 @@ def inner_main(self):
self.response_code = http_.client.UNAUTHORIZED
realm = self.instance.config.TRACKER_NAME
self.setHeader("WWW-Authenticate",
"Basic realm=\"%s\"" % realm)
'Basic realm="%s"' % realm)
else:
self.response_code = http_.client.FORBIDDEN
self.renderFrontPage(str(message))
Expand Down Expand Up @@ -1041,10 +1038,7 @@ def determine_charset(self):

def _decode_charref(matchobj):
num = matchobj.group(1)
if num[0].lower() == 'x':
uc = int(num[1:], 16)
else:
uc = int(num)
uc = int(num[1:], 16) if num[0].lower() == 'x' else int(num)
return uchr(uc)

for field_name in self.form:
Expand Down Expand Up @@ -1235,7 +1229,7 @@ def determine_user(self, is_api=False):

# will be used later to override the get_roles method
# having it defined as truthy allows it to be used.
override_get_roles = lambda self: iter_roles( # noqa: E731
override_get_roles = lambda self: iter_roles( # noqa: ARG005
','.join(token['roles']))

# if user was not set by http authorization, try session lookup
Expand Down Expand Up @@ -1291,6 +1285,8 @@ def check_anonymous_access(self):
raise SeriousError(
self._('broken form: multiple @action values submitted'))
elif action != '':
# '' is value when no action parameter was found so run
# this to extract action string value when action found.
action = action.value.lower()
if action in ('login', 'register'):
return
Expand All @@ -1302,8 +1298,8 @@ def check_anonymous_access(self):
return

# otherwise for everything else
if self.user == 'anonymous':
if not self.db.security.hasPermission('Web Access', self.userid):
if self.user == 'anonymous' and \
not self.db.security.hasPermission('Web Access', self.userid):
raise Unauthorised(self._("Anonymous users are not "
"allowed to use the web interface"))

Expand All @@ -1326,10 +1322,7 @@ def is_origin_header_ok(self, api=False, credentials=False):
try:
origin = self.env['HTTP_ORIGIN']
except KeyError:
if self.env['REQUEST_METHOD'] == 'GET':
return True
else:
return False
return self.env['REQUEST_METHOD'] == 'GET'

# note base https://host/... ends host with with a /,
# so add it to origin.
Expand Down Expand Up @@ -1478,7 +1471,7 @@ def handle_csrf(self, api=False):
"ORIGIN",
"REFERER",
"X-FORWARDED-HOST",
"HOST"
"HOST",
]

header_pass = 0 # count of passing header checks
Expand Down Expand Up @@ -1581,21 +1574,20 @@ def handle_csrf(self, api=False):
raise UsageError(self._("Unable to verify sufficient headers"))

enforce = config['WEB_CSRF_ENFORCE_HEADER_X-REQUESTED-WITH']
if api:
if enforce in ['required', 'yes']:
# if we get here we have usually passed at least one
# header check. We check for presence of this custom
# header for xmlrpc/rest calls only.
# E.G. X-Requested-With: XMLHttpRequest
# Note we do not use CSRF nonces for xmlrpc/rest requests.
#
# see: https://www.owasp.org/index.php/Cross-Site_Request_Forgery_(CSRF)_Prevention_Cheat_Sheet#Protecting_REST_Services:_Use_of_Custom_Request_Headers
if 'HTTP_X_REQUESTED_WITH' not in self.env:
logger.error(self._(
''"csrf X-REQUESTED-WITH xmlrpc required header "
''"check failed for user%s."),
current_user)
raise UsageError(self._("Required Header Missing"))
if api and enforce in ['required', 'yes']:
# if we get here we have usually passed at least one
# header check. We check for presence of this custom
# header for xmlrpc/rest calls only.
# E.G. X-Requested-With: XMLHttpRequest
# Note we do not use CSRF nonces for xmlrpc/rest requests.
#
# see: https://www.owasp.org/index.php/Cross-Site_Request_Forgery_(CSRF)_Prevention_Cheat_Sheet#Protecting_REST_Services:_Use_of_Custom_Request_Headers
if 'HTTP_X_REQUESTED_WITH' not in self.env:
logger.error(self._(
''"csrf X-REQUESTED-WITH xmlrpc required header "
''"check failed for user%s."),
current_user)
raise UsageError(self._("Required Header Missing"))

# Expire old csrf tokens now so we don't use them. These will
# be committed after the otks.destroy below. Note that the
Expand Down Expand Up @@ -1703,17 +1695,16 @@ def opendb(self, username):
if not hasattr(self, 'db'):
self.db = self.instance.open(username)
self.db.tx_Source = "web"
else:
if self.instance.optimize:
elif self.instance.optimize:
self.db.setCurrentUser(username)
self.db.tx_Source = "web"
else:
self.db.close()
self.db = self.instance.open(username)
self.db.tx_Source = "web"
# The old session API refers to the closed database;
# we can no longer use it.
self.session_api = Session(self)
else:
self.db.close()
self.db = self.instance.open(username)
self.db.tx_Source = "web"
# The old session API refers to the closed database;
# we can no longer use it.
self.session_api = Session(self)

# match designator in URL stripping leading 0's. So:
# https://issues.roundup-tracker.org/issue002551190 is the same as
Expand Down Expand Up @@ -1793,7 +1784,7 @@ def determine_context(self, dre=dre_url):
else:
self.template = ''
return
elif path[0] in ('_file', '@@file'):
if path[0] in ('_file', '@@file'):
raise SendStaticFile(os.path.join(*path[1:]))
else:
self.classname = path[0]
Expand Down Expand Up @@ -1973,10 +1964,7 @@ def serve_static_file(self, file):
file = str(file)
mime_type = mimetypes.guess_type(file)[0]
if not mime_type:
if file.endswith('.css'):
mime_type = 'text/css'
else:
mime_type = 'text/plain'
mime_type = 'text/css' if file.endswith('.css') else 'text/plain'

# get filename: given a/b/c.js extract c.js
fn = file.rpartition("/")[2]
Expand Down Expand Up @@ -2079,12 +2067,9 @@ def selectTemplate(self, name, view):
if (view and view.find('|') != -1):
# we have alternate templates, parse them apart.
(oktmpl, errortmpl) = view.split("|", 2)
if self._error_message:
# we have an error, use errortmpl
view = errortmpl
else:
# no error message recorded, use oktmpl
view = oktmpl

# Choose the right template
view = errortmpl if self._error_message else oktmpl

loader = self.instance.templates

Expand Down Expand Up @@ -2135,7 +2120,7 @@ def renderContext(self):
# catch errors so we can handle PT rendering errors more nicely
args = {
'ok_message': self._ok_message,
'error_message': self._error_message
'error_message': self._error_message,
}
pt = self.instance.templates.load(tplname)
# let the template render figure stuff out
Expand Down Expand Up @@ -2189,8 +2174,7 @@ def renderContext(self):
# than the one we tried to generate above.
if sys.version_info[0] > 2:
raise exc_info[0](exc_info[1]).with_traceback(exc_info[2])
else:
exec('raise exc_info[0], exc_info[1], exc_info[2]') # nosec
exec('raise exc_info[0], exc_info[1], exc_info[2]') # nosec

def renderError(self, error, response_code=400, use_template=True):
self.response_code = response_code
Expand Down Expand Up @@ -2222,7 +2206,7 @@ def renderError(self, error, response_code=400, use_template=True):

args = {
'ok_message': self._ok_message,
'error_message': self._error_message
'error_message': self._error_message,
}

try:
Expand Down Expand Up @@ -2573,7 +2557,7 @@ def handle_range_header(self, length, etag):
# If the condition doesn't match the entity tag, then we
# must send the client the entire file.
if if_range != etag:
return
return None
# The grammar for the Range header value is:
#
# ranges-specifier = byte-ranges-specifier
Expand Down Expand Up @@ -2761,13 +2745,10 @@ def write_file(self, filename):
self._socket_op(self.request.sendfile, filename, offset, length)
return
# Fallback to the "write" operation.
f = open(filename, 'rb')
try:
with open(filename, 'rb') as f:
if offset:
f.seek(offset)
content = f.read(length)
finally:
f.close()
self.write(content)

def setHeader(self, header, value):
Expand Down

0 comments on commit 11956cb

Please sign in to comment.