Skip to content

Commit

Permalink
[Fix] client: sending the body properly
Browse files Browse the repository at this point in the history
  • Loading branch information
sebastien committed Sep 26, 2024
1 parent 2953a0a commit d7f2ad7
Show file tree
Hide file tree
Showing 2 changed files with 83 additions and 8 deletions.
87 changes: 79 additions & 8 deletions src/py/extra/client.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from typing import NamedTuple, ClassVar, AsyncGenerator, Self, Any, Iterator
from urllib.parse import quote_plus, urlparse
from contextvars import ContextVar
from .utils.io import asWritable
from contextlib import contextmanager
from dataclasses import dataclass
import asyncio, ssl, time, os
Expand All @@ -9,12 +10,15 @@
from .http.model import (
HTTPRequest,
HTTPResponse,
HTTPResponseStream,
HTTPResponseAsyncStream,
HTTPBodyBlob,
HTTPResponseFile,
HTTPHeaders,
HTTPRequestBody,
HTTPBodyBlob,
HTTPAtom,
HTTPProcessingStatus,
headername,
)
from .http.parser import HTTPParser

Expand Down Expand Up @@ -259,7 +263,6 @@ async def get(
# then we close the connection, or return a new one.
while cxn:
c = cxn.pop()
print("POOOL CONN", c, c.isValid)
if c.isValid:
return c
else:
Expand Down Expand Up @@ -335,8 +338,6 @@ async def OnRequest(
host: str,
cxn: Connection,
*,
headers: dict[str, str] | None = None,
body: HTTPRequestBody | HTTPBodyBlob | None = None,
timeout: float | None = 2.0,
buffer: int = 32_000,
streaming: bool | None = None,
Expand All @@ -345,19 +346,18 @@ async def OnRequest(
"""Low level function to process HTTP requests with the given connection."""
# We send the line
line = f"{request.method} {request.path} HTTP/1.1\r\n".encode()
# We send the headers
head: dict[str, str] = (
{headername(k): v for k, v in headers.items()} if headers else {}
)
head: dict[str, str] = request.headers
if "Host" not in head:
head["Host"] = host
body = request.body
if not streaming and "Content-Length" not in head:
head["Content-Length"] = (
"0"
if body is None
else (
str(body.length)
if isinstance(body, HTTPBodyBlob)
or isinstance(body, HTTPResponseFile)
else str(body.expected or "0")
)
)
Expand All @@ -368,6 +368,39 @@ async def OnRequest(
cxn.writer.write(payload)
cxn.writer.write(b"\r\n\r\n")
await cxn.writer.drain()
# NOTE: This is a common logic shared with the server
# And send the request
if isinstance(body, HTTPBodyBlob):
cxn.writer.write(body.payload)
elif isinstance(body, HTTPResponseFile):
fd: int = -1
try:
fd = os.open(str(body.path), os.O_RDONLY)
while True:
chunk = os.read(fd, 64_000)
if chunk:
cxn.writer.write(chunk)
else:
break
finally:
if fd > 0:
os.close(fd)
elif isinstance(body, HTTPResponseStream):
# No keep alive with streaming as these are long
# lived requests.
for chunk in body.stream:
cxn.writer.write(asWritable(chunk))
await cxn.writer.drain()
elif isinstance(body, HTTPResponseAsyncStream):
# No keep alive with streaming as these are long
# lived requests.
async for chunk in body.stream:
cxn.writer.write(asWritable(chunk))
await cxn.writer.drain()
elif body is None:
pass
else:
raise ValueError(f"Unsupported body format: {body}")

iteration: int = 0
# --
Expand Down Expand Up @@ -522,6 +555,7 @@ async def Request(
path,
query=None,
headers=HTTPHeaders(headers or {}),
body=body,
),
host,
cxn,
Expand All @@ -547,6 +581,43 @@ def pooling(idle: float | int | None = None) -> Iterator[ConnectionPool]:
pool.pop().release()


async def request(
method: str,
host: str,
path: str,
*,
port: int | None = None,
headers: dict[str, str] | None = None,
body: HTTPRequestBody | HTTPBodyBlob | None = None,
params: dict[str, str] | str | None = None,
ssl: bool = True,
verified: bool = True,
timeout: float = 10.0,
follow: bool = True,
proxy: tuple[str, int] | bool | None = None,
connection: Connection | None = None,
streaming: bool | None = None,
keepalive: bool = False,
) -> AsyncGenerator[HTTPAtom, None]:
async for atom in HTTPClient.Request(
method,
host,
path,
port=port,
headers=headers,
body=body,
params=params,
ssl=ssl,
verified=verified,
follow=follow,
proxy=proxy,
connection=connection,
streaming=streaming,
keepalive=keepalive,
):
yield atom


if __name__ == "__main__":

async def main() -> None:
Expand Down
4 changes: 4 additions & 0 deletions src/py/extra/http/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,10 @@ class HTTPResponseFile(NamedTuple):
path: Path
fd: int | None = None

@property
def length(self) -> int:
return self.path.stat().st_size


class HTTPResponseStream(NamedTuple):
stream: Generator[str | bytes | TPrimitive, Any, Any]
Expand Down

0 comments on commit d7f2ad7

Please sign in to comment.