Skip to content

Commit

Permalink
Merge branch 'main' of github.com:sebastien/extra
Browse files Browse the repository at this point in the history
  • Loading branch information
sebastien committed Oct 3, 2024
2 parents 6c78e09 + 2db9769 commit 30afbc4
Show file tree
Hide file tree
Showing 9 changed files with 748 additions and 176 deletions.
38 changes: 38 additions & 0 deletions examples/client-gzip.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import asyncio
from extra.client import HTTPClient
from extra.http.model import HTTPBodyBlob
from extra.utils.codec import GZipDecoder


# NOTE: Start "examples/sse.py"
async def main(path: str, host: str = "127.0.0.1", port: int = 443, ssl: bool = True):
transform = GZipDecoder()

with open("/dev/stdout", "wb") as f:
async for atom in HTTPClient.Request(
host=host,
method="GET",
port=port,
path=path,
timeout=11.0,
streaming=False,
headers={"Accept-Encoding": "gzip"},
ssl=ssl,
):
if isinstance(atom, HTTPBodyBlob):
f.write(transform.feed(atom.payload) or b"")
f.write(transform.flush() or b"")


if __name__ == "__main__":
import sys

args = sys.argv[2:] or ["/index"]
n = len(args)
asyncio.run(
main(
path="/gh/lodash/lodash/4.17.15-npm/lodash.min.js",
host="cdn.statically.io",
)
)
# EOF
7 changes: 6 additions & 1 deletion src/py/extra/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
from .http.model import HTTPRequest, HTTPResponse, HTTPRequestError # NOQA: F401
from .http.model import (
HTTPRequest,
HTTPResponse,
HTTPResponseLine,
HTTPRequestError,
) # NOQA: F401
from .decorators import on, expose, pre, post # NOQA: F401
from .server import run # NOQA: F401
from .model import Service # NOQA: F401
Expand Down
104 changes: 89 additions & 15 deletions src/py/extra/client.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from typing import NamedTuple, ClassVar, AsyncGenerator, Self, Any, Iterator
from urllib.parse import quote_plus, urlparse
from contextvars import ContextVar
from .utils.io import asWritable
from contextlib import contextmanager
from dataclasses import dataclass
import asyncio, ssl, time, os
Expand All @@ -9,12 +10,15 @@
from .http.model import (
HTTPRequest,
HTTPResponse,
HTTPHeaders,
HTTPRequestBody,
HTTPBodyStream,
HTTPBodyAsyncStream,
HTTPBodyBlob,
HTTPBodyFile,
HTTPHeaders,
HTTPBody,
HTTPBodyIO,
HTTPAtom,
HTTPProcessingStatus,
headername,
)
from .http.parser import HTTPParser

Expand Down Expand Up @@ -259,7 +263,6 @@ async def get(
# then we close the connection, or return a new one.
while cxn:
c = cxn.pop()
print("POOOL CONN", c, c.isValid)
if c.isValid:
return c
else:
Expand Down Expand Up @@ -335,8 +338,6 @@ async def OnRequest(
host: str,
cxn: Connection,
*,
headers: dict[str, str] | None = None,
body: HTTPRequestBody | HTTPBodyBlob | None = None,
timeout: float | None = 2.0,
buffer: int = 32_000,
streaming: bool | None = None,
Expand All @@ -345,19 +346,17 @@ async def OnRequest(
"""Low level function to process HTTP requests with the given connection."""
# We send the line
line = f"{request.method} {request.path} HTTP/1.1\r\n".encode()
# We send the headers
head: dict[str, str] = (
{headername(k): v for k, v in headers.items()} if headers else {}
)
head: dict[str, str] = request.headers
if "Host" not in head:
head["Host"] = host
body = request._body
if not streaming and "Content-Length" not in head:
head["Content-Length"] = (
"0"
if body is None
else (
str(body.length)
if isinstance(body, HTTPBodyBlob)
if isinstance(body, HTTPBodyBlob) or isinstance(body, HTTPBodyFile)
else str(body.expected or "0")
)
)
Expand All @@ -368,6 +367,39 @@ async def OnRequest(
cxn.writer.write(payload)
cxn.writer.write(b"\r\n\r\n")
await cxn.writer.drain()
# NOTE: This is a common logic shared with the server
# And send the request
if isinstance(body, HTTPBodyBlob):
cxn.writer.write(body.payload)
elif isinstance(body, HTTPBodyFile):
fd: int = -1
try:
fd = os.open(str(body.path), os.O_RDONLY)
while True:
chunk = os.read(fd, 64_000)
if chunk:
cxn.writer.write(chunk)
else:
break
finally:
if fd > 0:
os.close(fd)
elif isinstance(body, HTTPBodyStream):
# No keep alive with streaming as these are long
# lived requests.
for chunk in body.stream:
cxn.writer.write(asWritable(chunk))
await cxn.writer.drain()
elif isinstance(body, HTTPBodyAsyncStream):
# No keep alive with streaming as these are long
# lived requests.
async for chunk in body.stream:
cxn.writer.write(asWritable(chunk))
await cxn.writer.drain()
elif body is None:
pass
else:
raise ValueError(f"Unsupported body format: {body}")

iteration: int = 0
# --
Expand Down Expand Up @@ -402,15 +434,19 @@ async def OnRequest(
if atom is HTTPProcessingStatus.Complete:
status = atom
elif isinstance(atom, HTTPResponse):
if atom.body:
yield atom.body
res = atom
break
else:
yield atom
iteration += 1
if (
streaming is True
or res
and res.headers.contentType in {"text/event-stream"}
# We continue if we have streaming or
status is HTTPProcessingStatus.Processing
or streaming is True
or (res and res.body and HTTPBody.HasRemaining(res.body))
or (res and res.headers.contentType in {"text/event-stream"})
):
# TODO: We should swap out the body for a streaming body
cxn.isStreaming = True
Expand Down Expand Up @@ -443,7 +479,7 @@ async def Request(
*,
port: int | None = None,
headers: dict[str, str] | None = None,
body: HTTPRequestBody | HTTPBodyBlob | None = None,
body: HTTPBodyIO | HTTPBodyBlob | None = None,
params: dict[str, str] | str | None = None,
ssl: bool = True,
verified: bool = True,
Expand Down Expand Up @@ -522,6 +558,7 @@ async def Request(
path,
query=None,
headers=HTTPHeaders(headers or {}),
body=body,
),
host,
cxn,
Expand All @@ -547,6 +584,43 @@ def pooling(idle: float | int | None = None) -> Iterator[ConnectionPool]:
pool.pop().release()


async def request(
method: str,
host: str,
path: str,
*,
port: int | None = None,
headers: dict[str, str] | None = None,
body: HTTPBodyIO | HTTPBodyBlob | None = None,
params: dict[str, str] | str | None = None,
ssl: bool = True,
verified: bool = True,
timeout: float = 10.0,
follow: bool = True,
proxy: tuple[str, int] | bool | None = None,
connection: Connection | None = None,
streaming: bool | None = None,
keepalive: bool = False,
) -> AsyncGenerator[HTTPAtom, None]:
async for atom in HTTPClient.Request(
method,
host,
path,
port=port,
headers=headers,
body=body,
params=params,
ssl=ssl,
verified=verified,
follow=follow,
proxy=proxy,
connection=connection,
streaming=streaming,
keepalive=keepalive,
):
yield atom


if __name__ == "__main__":

async def main() -> None:
Expand Down
12 changes: 6 additions & 6 deletions src/py/extra/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@
from .http.model import (
HTTPRequest,
HTTPHeaders,
HTTPResponseFile,
HTTPResponseStream,
HTTPResponseAsyncStream,
HTTPBodyFile,
HTTPBodyStream,
HTTPBodyAsyncStream,
HTTPBodyBlob,
HTTPResponse,
headername,
Expand Down Expand Up @@ -144,18 +144,18 @@ async def FromResponse(response: HTTPResponse) -> dict[str, Any]:
pass
elif isinstance(response.body, HTTPBodyBlob):
buffer.write(response.body.payload or b"")
elif isinstance(response.body, HTTPResponseFile):
elif isinstance(response.body, HTTPBodyFile):
with open(response.body.path, "rb") as f:
while data := f.read(32_000):
buffer.write(data)
elif isinstance(response.body, HTTPResponseStream):
elif isinstance(response.body, HTTPBodyStream):
# TODO: Should handle exception
try:
for chunk in response.body.stream:
buffer.write(asWritable(chunk))
finally:
response.body.stream.close()
elif isinstance(response.body, HTTPResponseAsyncStream):
elif isinstance(response.body, HTTPBodyAsyncStream):
# No keep alive with streaming as these are long
# lived requests.
try:
Expand Down
Loading

0 comments on commit 30afbc4

Please sign in to comment.