Skip to content

Commit

Permalink
Set line length to 120
Browse files Browse the repository at this point in the history
  • Loading branch information
karpetrosyan committed Oct 27, 2023
1 parent e2c47bc commit 4934810
Show file tree
Hide file tree
Showing 41 changed files with 210 additions and 664 deletions.
58 changes: 14 additions & 44 deletions httpx/_auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,7 @@ def auth_flow(self, request: Request) -> typing.Generator[Request, Response, Non
"""
yield request

def sync_auth_flow(
self, request: Request
) -> typing.Generator[Request, Response, None]:
def sync_auth_flow(self, request: Request) -> typing.Generator[Request, Response, None]:
"""
Execute the authentication flow synchronously.
Expand All @@ -80,9 +78,7 @@ def sync_auth_flow(
except StopIteration:
break

async def async_auth_flow(
self, request: Request
) -> typing.AsyncGenerator[Request, Response]:
async def async_auth_flow(self, request: Request) -> typing.AsyncGenerator[Request, Response]:
"""
Execute the authentication flow asynchronously.
Expand Down Expand Up @@ -125,18 +121,14 @@ class BasicAuth(Auth):
and uses HTTP Basic authentication.
"""

def __init__(
self, username: typing.Union[str, bytes], password: typing.Union[str, bytes]
):
def __init__(self, username: typing.Union[str, bytes], password: typing.Union[str, bytes]):
self._auth_header = self._build_auth_header(username, password)

def auth_flow(self, request: Request) -> typing.Generator[Request, Response, None]:
request.headers["Authorization"] = self._auth_header
yield request

def _build_auth_header(
self, username: typing.Union[str, bytes], password: typing.Union[str, bytes]
) -> str:
def _build_auth_header(self, username: typing.Union[str, bytes], password: typing.Union[str, bytes]) -> str:
userpass = b":".join((to_bytes(username), to_bytes(password)))
token = b64encode(userpass).decode()
return f"Basic {token}"
Expand All @@ -157,14 +149,10 @@ def auth_flow(self, request: Request) -> typing.Generator[Request, Response, Non
yield request
else:
# Build a basic auth header with credentials from the netrc file.
request.headers["Authorization"] = self._build_auth_header(
username=auth_info[0], password=auth_info[2]
)
request.headers["Authorization"] = self._build_auth_header(username=auth_info[0], password=auth_info[2])
yield request

def _build_auth_header(
self, username: typing.Union[str, bytes], password: typing.Union[str, bytes]
) -> str:
def _build_auth_header(self, username: typing.Union[str, bytes], password: typing.Union[str, bytes]) -> str:
userpass = b":".join((to_bytes(username), to_bytes(password)))
token = b64encode(userpass).decode()
return f"Basic {token}"
Expand All @@ -182,19 +170,15 @@ class DigestAuth(Auth):
"SHA-512-SESS": hashlib.sha512,
}

def __init__(
self, username: typing.Union[str, bytes], password: typing.Union[str, bytes]
) -> None:
def __init__(self, username: typing.Union[str, bytes], password: typing.Union[str, bytes]) -> None:
self._username = to_bytes(username)
self._password = to_bytes(password)
self._last_challenge: typing.Optional[_DigestAuthChallenge] = None
self._nonce_count = 1

def auth_flow(self, request: Request) -> typing.Generator[Request, Response, None]:
if self._last_challenge:
request.headers["Authorization"] = self._build_auth_header(
request, self._last_challenge
)
request.headers["Authorization"] = self._build_auth_header(request, self._last_challenge)

response = yield request

Expand All @@ -214,16 +198,12 @@ def auth_flow(self, request: Request) -> typing.Generator[Request, Response, Non
self._last_challenge = self._parse_challenge(request, response, auth_header)
self._nonce_count = 1

request.headers["Authorization"] = self._build_auth_header(
request, self._last_challenge
)
request.headers["Authorization"] = self._build_auth_header(request, self._last_challenge)
if response.cookies:
Cookies(response.cookies).set_cookie_header(request=request)
yield request

def _parse_challenge(
self, request: Request, response: Response, auth_header: str
) -> "_DigestAuthChallenge":
def _parse_challenge(self, request: Request, response: Response, auth_header: str) -> "_DigestAuthChallenge":
"""
Returns a challenge from a Digest WWW-Authenticate header.
These take the form of:
Expand All @@ -245,16 +225,12 @@ def _parse_challenge(
algorithm = header_dict.get("algorithm", "MD5")
opaque = header_dict["opaque"].encode() if "opaque" in header_dict else None
qop = header_dict["qop"].encode() if "qop" in header_dict else None
return _DigestAuthChallenge(
realm=realm, nonce=nonce, algorithm=algorithm, opaque=opaque, qop=qop
)
return _DigestAuthChallenge(realm=realm, nonce=nonce, algorithm=algorithm, opaque=opaque, qop=qop)
except KeyError as exc:
message = "Malformed Digest WWW-Authenticate header"
raise ProtocolError(message, request=request) from exc

def _build_auth_header(
self, request: Request, challenge: "_DigestAuthChallenge"
) -> str:
def _build_auth_header(self, request: Request, challenge: "_DigestAuthChallenge") -> str:
hash_func = self._ALGORITHM_TO_HASH_FUNCTION[challenge.algorithm.upper()]

def digest(data: bytes) -> bytes:
Expand Down Expand Up @@ -316,18 +292,12 @@ def _get_header_value(self, header_fields: typing.Dict[str, bytes]) -> str:
for i, (field, value) in enumerate(header_fields.items()):
if i > 0:
header_value += ", "
template = (
QUOTED_TEMPLATE
if field not in NON_QUOTED_FIELDS
else NON_QUOTED_TEMPLATE
)
template = QUOTED_TEMPLATE if field not in NON_QUOTED_FIELDS else NON_QUOTED_TEMPLATE
header_value += template.format(field, to_str(value))

return header_value

def _resolve_qop(
self, qop: typing.Optional[bytes], request: Request
) -> typing.Optional[bytes]:
def _resolve_qop(self, qop: typing.Optional[bytes], request: Request) -> typing.Optional[bytes]:
if qop is None:
return None
qops = re.split(b", ?", qop)
Expand Down
Loading

0 comments on commit 4934810

Please sign in to comment.