Skip to content

Commit

Permalink
WIP: using a queue to implement stream
Browse files Browse the repository at this point in the history
  • Loading branch information
perklet committed Sep 22, 2023
1 parent 0fffcd8 commit b62b786
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 6 deletions.
9 changes: 9 additions & 0 deletions curl_cffi/requests/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ def __init__(self, curl: Optional[Curl] = None, request: Optional[Request] = Non
self.redirect_url = ""
self.http_version = 0
self.history = []
self.queue = None

@property
def text(self) -> str:
Expand All @@ -60,6 +61,14 @@ def raise_for_status(self):
if not self.ok:
raise RequestsError(f"HTTP Error {self.status_code}: {self.reason}")

def iter(self):
while True:
chunk = self.queue.get() # type: ignore
if chunk is None:
self.curl.reset()
return
yield chunk

def json(self, **kw):
return loads(self.content, **kw)

Expand Down
41 changes: 35 additions & 6 deletions curl_cffi/requests/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@
import re
import threading
import warnings
import queue
from enum import Enum
from functools import partialmethod
from io import BytesIO
from json import dumps
from typing import Callable, Dict, List, Optional, Tuple, Union, cast
from urllib.parse import ParseResult, parse_qsl, unquote, urlencode, urlparse
from concurrent.futures import ThreadPoolExecutor

from .. import AsyncCurl, Curl, CurlError, CurlInfo, CurlOpt, CurlHttpVersion
from .cookies import Cookies, CookieTypes, CurlMorsel
Expand Down Expand Up @@ -185,6 +187,7 @@ def _set_curl_options(
default_headers: Optional[bool] = None,
http_version: Optional[CurlHttpVersion] = None,
interface: Optional[str] = None,
stream: bool = False,
):
c = curl

Expand Down Expand Up @@ -355,12 +358,14 @@ def _set_curl_options(
for k, v in self.curl_options.items():
c.setopt(k, v)

if content_callback is None:
buffer = None
if stream:
c.setopt(CurlOpt.WRITEFUNCTION, self.queue.put) # type: ignore
elif content_callback is not None:
c.setopt(CurlOpt.WRITEFUNCTION, content_callback)
else:
buffer = BytesIO()
c.setopt(CurlOpt.WRITEDATA, buffer)
else:
buffer = None
c.setopt(CurlOpt.WRITEFUNCTION, content_callback)
header_buffer = BytesIO()
c.setopt(CurlOpt.HEADERDATA, header_buffer)

Expand Down Expand Up @@ -470,6 +475,8 @@ def __init__(
super().__init__(**kwargs)
self._thread = thread
self._use_thread_local_curl = use_thread_local_curl
self._queue = None
self._executor = None
if use_thread_local_curl:
self._local = threading.local()
if curl:
Expand All @@ -492,6 +499,18 @@ def curl(self):
else:
return self._curl

@property
def executor(self):
if self._executor is None:
self._executor = ThreadPoolExecutor()
return self._executor

@property
def queue(self):
if self._queue is None:
self._queue = queue.Queue()
return self._queue

def __enter__(self):
return self

Expand Down Expand Up @@ -525,6 +544,7 @@ def request(
default_headers: Optional[bool] = None,
http_version: Optional[CurlHttpVersion] = None,
interface: Optional[str] = None,
stream: bool = False
) -> Response:
"""Send the request, see [curl_cffi.requests.request](/api/curl_cffi.requests/#curl_cffi.requests.request) for details on parameters."""
c = self.curl
Expand All @@ -551,6 +571,7 @@ def request(
default_headers=default_headers,
http_version=http_version,
interface=interface,
stream=stream,
)
try:
if self._thread == "eventlet":
Expand All @@ -560,17 +581,25 @@ def request(
# see: https://www.gevent.org/api/gevent.threadpool.html
gevent.get_hub().threadpool.spawn(c.perform).get()
else:
c.perform()
if stream:
def perform():
c.perform()
self.queue.put(None)
self.executor.submit(perform)
else:
c.perform()
except CurlError as e:
rsp = self._parse_response(c, buffer, header_buffer)
rsp.request = req
raise RequestsError(str(e), e.code, rsp) from e
else:
rsp = self._parse_response(c, buffer, header_buffer)
rsp.request = req
rsp.queue = self.queue
return rsp
finally:
self.curl.reset()
if not stream:
self.curl.reset()

head = partialmethod(request, "HEAD")
get = partialmethod(request, "GET")
Expand Down

0 comments on commit b62b786

Please sign in to comment.