Skip to content

Commit

Permalink
Switch from httpx to aiohttp to solve slow uploads (#87)
Browse files Browse the repository at this point in the history
* Switch from httpx to aiohttp to solve slow uploads

* Httpx for oauth

Co-authored-by: Philip Stadermann <philip.stadermann@gdata.de>
Co-authored-by: PT-ATA No One <ata-no-one@gdata.de>
  • Loading branch information
3 people authored Sep 6, 2022
1 parent 913f63e commit cb8934d
Show file tree
Hide file tree
Showing 4 changed files with 19 additions and 17 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -44,3 +44,4 @@ obj/
# Python
__pycache__/
_trial_temp/
*.zip
3 changes: 2 additions & 1 deletion python/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@ xmlrunner==1.7.7
unittest-xml-reporting==3.2.0
websockets~=10.3
python-dotenv==0.20.0
httpx[http2]==0.23.0
build==0.7.0
jwt==1.3.1
authlib==1.0.1
aiofiles==22.1.0
aiohttp==3.8.1
httpx[http2]==0.23.0
3 changes: 2 additions & 1 deletion python/setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ install_requires =
httpx[http2] == 0.23.0
jwt == 1.3.1
authlib == 1.0.1
aiofiles==0.8.0
aiofiles==22.1.0
aiohttp==3.8.1

[options.packages.find]
where = src
29 changes: 14 additions & 15 deletions python/src/vaas/vaas.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,20 +9,17 @@
from typing import Optional
import asyncio
from asyncio import Future
import aiohttp
import ssl
from urllib.parse import urlparse
import aiofiles
from jwt import JWT
import httpx
import websockets.client
from authlib.integrations.httpx_client import AsyncOAuth2Client


URL = "wss://gateway-vaas.gdatasecurity.de"
TIMEOUT = 60
HTTP2 = False
# TODO: Set to default of 5 once Vaas upload endpoint is 100% streaming
UPLOAD_TIMEOUT = 600
TIMEOUT = 600


class VaasTracing:
Expand Down Expand Up @@ -101,14 +98,16 @@ def __init__(self, tracing=VaasTracing(), options=VaasOptions()):
self.websocket = None
self.session_id = None
self.results = {}
self.httpx_client: Optional[httpx.AsyncClient] = None
self.http_client: Optional[aiohttp.ClientSession] = None
self.options = options
self.verify_ssl = True

async def connect(self, token, url=URL, verify=True):
"""Connect to VaaS
token -- OpenID Connect token signed by a trusted identity provider
"""
self.verify_ssl = verify
self.websocket = await connect_websocket(url, verify)
authenticate_request = {"kind": "AuthRequest", "token": token}

Expand All @@ -123,7 +122,7 @@ async def connect(self, token, url=URL, verify=True):
self.__receive_loop()
) # fire and forget async_foo()

self.httpx_client = httpx.AsyncClient(http2=HTTP2, verify=verify)
self.http_client = aiohttp.ClientSession()

async def connect_with_client_credentials(
self, client_id, client_secret, token_endpoint, url=URL, verify=True
Expand All @@ -147,8 +146,8 @@ async def close(self):
await self.websocket.close()
if self.loop_result is not None:
await self.loop_result
if self.httpx_client is not None:
await self.httpx_client.aclose()
if self.http_client is not None:
await self.http_client.close()

async def __aenter__(self):
return self
Expand Down Expand Up @@ -253,22 +252,22 @@ async def __upload(self, token, upload_uri, buffer_or_file, content_length):
decoded_token = jwt.decode(token, do_verify=False)
trace_id = decoded_token.get("traceId")
try:
await self.httpx_client.put(
await self.http_client.put(
url=upload_uri,
content=buffer_or_file,
data=buffer_or_file,
headers={
"Authorization": token,
"traceParent": trace_id,
"Content-Length": str(content_length),
},
timeout=UPLOAD_TIMEOUT,
verify_ssl=self.verify_ssl
)
except httpx.TimeoutException as ex:
except aiohttp.ServerTimeoutError as ex:
self.tracing.trace_upload_timeout(content_length)
raise VaasTimeoutError() from ex

async def for_url(self, url):
"""Returns the verdict for a file from an url"""
response = await self.httpx_client.get(url)
buffer = response.content
response = await self.http_client.get(url)
buffer = await response.content.read()
return await self.for_buffer(buffer)

0 comments on commit cb8934d

Please sign in to comment.