Source code for niquests.async_session

from __future__ import annotations

import os
import sys
import time
import typing
import warnings
from collections import OrderedDict
from datetime import timedelta
from urllib.parse import urlparse

from .status_codes import codes

if typing.TYPE_CHECKING:
    from typing_extensions import Literal

from ._compat import HAS_LEGACY_URLLIB3, iscoroutinefunction, urllib3_ensure_type
from ._constant import (
    DEFAULT_POOLSIZE,
    DEFAULT_RETRIES,
    READ_DEFAULT_TIMEOUT,
    WRITE_DEFAULT_TIMEOUT,
)
from .adapters import AsyncBaseAdapter, AsyncHTTPAdapter
from .cookies import (
    RequestsCookieJar,
    coerce_to_requests_cookiejar,
    extract_cookies_to_jar,
    merge_cookies,
)
from .exceptions import (
    ChunkedEncodingError,
    ContentDecodingError,
    InvalidSchema,
    TooManyRedirects,
)
from .extensions.revocation import DEFAULT_STRATEGY, RevocationConfiguration
from .extensions.sgi._async import AsyncServerGatewayInterface
from .extensions.unixsocket._async import AsyncUnixAdapter

try:
    from .extensions.pyodide._async import AsyncPyodideAdapter
except ImportError:
    AsyncPyodideAdapter = None  # type: ignore[assignment,misc]
    if sys.platform == "emscripten":
        warnings.warn(
            "unable to load native WASM adapter for AsyncSession. HTTP requests may fail in async.",
            UserWarning,
        )
from .hooks import async_dispatch_hook, default_hooks
from .models import (
    DEFAULT_REDIRECT_LIMIT,
    AsyncResponse,
    PreparedRequest,
    Request,
    Response,
    TransferProgress,
)
from .packages.urllib3 import ConnectionInfo
from .packages.urllib3.contrib.resolver._async import AsyncBaseResolver
from .packages.urllib3.contrib.webextensions._async import load_extension
from .sessions import Session, merge_hooks
from .structures import AsyncQuicSharedCache, CaseInsensitiveDict
from .typing import (
    ASGIApp,
    AsyncBodyType,
    AsyncHookType,
    AsyncHttpAuthenticationType,
    AsyncResolverType,
    BodyType,
    CacheLayerAltSvcType,
    CookiesType,
    HeadersType,
    HttpAuthenticationType,
    HttpMethodType,
    MultiPartFilesAltType,
    MultiPartFilesType,
    ProxyType,
    QueryParameterType,
    RetryType,
    TimeoutType,
    TLSClientCertType,
    TLSVerifyType,
)
from .utils import (
    _deepcopy_ci,
    _swap_context,
    arewind_body,
    create_async_resolver,
    default_headers,
    parse_scheme,
    requote_uri,
    resolve_proxies,
    rewind_body,
    should_check_crl,
    should_check_ocsp,
    urljoin_safe,
)

# Preferred clock, based on which one is more accurate on a given system.
if sys.platform == "win32":
    preferred_clock = time.perf_counter
else:
    preferred_clock = time.time


[docs] class AsyncSession(Session): """A Requests asynchronous session. Provides cookie persistence, connection-pooling, and configuration. Basic Usage:: >>> import niquests >>> s = niquests.AsyncSession() >>> await s.get('https://httpbin.org/get') <Response HTTP/2 [200]> Or as a context manager:: >>> async with niquests.AsyncSession() as s: ... await s.get('https://httpbin.org/get') <Response HTTP/2 [200]> """ def __init__( self, *, resolver: AsyncResolverType | None = None, source_address: tuple[str, int] | None = None, quic_cache_layer: CacheLayerAltSvcType | None = None, retries: RetryType = DEFAULT_RETRIES, multiplexed: bool = False, disable_http1: bool = False, disable_http2: bool = False, disable_http3: bool = False, disable_ipv6: bool = False, disable_ipv4: bool = False, pool_connections: int = DEFAULT_POOLSIZE, pool_maxsize: int = DEFAULT_POOLSIZE, happy_eyeballs: bool | int = False, keepalive_delay: float | int | None = 600.0, keepalive_idle_window: float | int | None = 60.0, base_url: str | None = None, timeout: TimeoutType | None = None, headers: HeadersType | None = None, auth: HttpAuthenticationType | AsyncHttpAuthenticationType | None = None, hooks: AsyncHookType[PreparedRequest | Response | AsyncResponse] | None = None, revocation_configuration: RevocationConfiguration | None = DEFAULT_STRATEGY, app: ASGIApp | None = None, params: QueryParameterType | None = None, cookies: CookiesType | None = None, proxies: ProxyType | None = None, verify: TLSVerifyType | None = None, cert: TLSClientCertType | None = None, allow_incoming_cookies: bool = True, ): if [disable_ipv4, disable_ipv6].count(True) == 2: raise RuntimeError("Cannot disable both IPv4 and IPv6") #: Configured retries for current Session self.retries = retries if ( self.retries and HAS_LEGACY_URLLIB3 and hasattr(self.retries, "total") and "urllib3_future" not in str(type(self.retries)) ): self.retries = urllib3_ensure_type(self.retries) # type: ignore[type-var] #: A case-insensitive dictionary of headers to be sent on each #: :class:`Request <Request>` sent from this #: :class:`Session <Session>`. self.headers = CaseInsensitiveDict(headers) if headers is not None else default_headers() #: Default Authentication tuple or object to attach to #: :class:`Request <Request>`. self.auth = auth # type: ignore[assignment] #: Dictionary mapping protocol or protocol and host to the URL of the proxy #: (e.g. {'http': 'foo.bar:3128', 'http://host.name': 'foo.bar:4012'}) to #: be used on each :class:`Request <Request>`. self.proxies: ProxyType = proxies if proxies is not None else {} #: Event-handling hooks. self.hooks: AsyncHookType[PreparedRequest | Response | AsyncResponse] = ( merge_hooks(default_hooks(), hooks) # type: ignore[assignment] if hooks is not None else default_hooks() ) #: Dictionary of querystring data to attach to each #: :class:`Request <Request>`. The dictionary values may be lists for #: representing multivalued query parameters. self.params: QueryParameterType = params if params is not None else {} #: Stream response content default. self.stream = False #: Toggle to leverage multiplexed connection. self.multiplexed = multiplexed #: Custom DNS resolution method. self.resolver: AsyncBaseResolver = create_async_resolver(resolver) #: Internal use, know whether we should/can close it on session close. self._own_resolver: bool = resolver != self.resolver #: global timeout configuration self.timeout = timeout #: Bind to address/network adapter self.source_address = source_address self._disable_http1 = disable_http1 self._disable_http2 = disable_http2 self._disable_http3 = disable_http3 self._disable_ipv4 = disable_ipv4 self._disable_ipv6 = disable_ipv6 self._pool_connections = pool_connections self._pool_maxsize = pool_maxsize self._happy_eyeballs = happy_eyeballs self._keepalive_delay = keepalive_delay self._keepalive_idle_window = keepalive_idle_window #: SSL Verification default. #: Defaults to `True`, requiring requests to verify the TLS certificate at the #: remote end. #: If verify is set to `False`, requests will accept any TLS certificate #: presented by the server, and will ignore hostname mismatches and/or #: expired certificates, which will make your application vulnerable to #: man-in-the-middle (MitM) attacks. #: Only set this to `False` for testing. self.verify: TLSVerifyType = verify if verify is not None else True #: SSL client certificate default, if String, path to ssl client #: cert file (.pem). If Tuple, ('cert', 'key') pair, or ('cert', 'key', 'key_password'). self.cert: TLSClientCertType | None = cert #: Maximum number of redirects allowed. If the request exceeds this #: limit, a :class:`TooManyRedirects` exception is raised. #: This defaults to requests.models.DEFAULT_REDIRECT_LIMIT, which is #: 30. self.max_redirects: int = DEFAULT_REDIRECT_LIMIT #: Trust environment settings for proxy configuration, default #: authentication and similar. self.trust_env: bool = True #: Automatically set a URL prefix to every emitted request. self.base_url: str | None = base_url #: A CookieJar containing all currently outstanding cookies set on this #: session. It is always a #: :class:`RequestsCookieJar <requests.cookies.RequestsCookieJar>`. A mapping or a #: native ``cookielib.CookieJar`` passed at construction time is silently coerced. self.cookies: RequestsCookieJar = coerce_to_requests_cookiejar(cookies, thread_free=True) #: Toggle whether cookies sent by the remote peer (``Set-Cookie``) are extracted and merged #: into this Session. When ``False``, every incoming cookie is ignored while outgoing cookies #: set by the user are still emitted. self.allow_incoming_cookies: bool = allow_incoming_cookies #: A simple dict that allows us to persist which server support QUIC #: It is simply forwarded to urllib3.future that handle the caching logic. #: Can be any mutable mapping. self.quic_cache_layer = quic_cache_layer if quic_cache_layer is not None else AsyncQuicSharedCache(max_size=12_288) #: Don't try to manipulate this object. #: It cannot be pickled and accessing this object may cause #: unattended errors. self._ocsp_cache: typing.Any | None = None #: Don't try to manipulate this object. #: It cannot be pickled and accessing this object may cause #: unattended errors. self._crl_cache: typing.Any | None = None #: How we should handle the revocation check for TLS newly acquired connection. self._revocation_configuration: RevocationConfiguration | None = revocation_configuration # Default connection adapters. self.adapters: OrderedDict[str, AsyncBaseAdapter] = OrderedDict() # type: ignore[assignment] if AsyncPyodideAdapter is None: self.mount( "https://", AsyncHTTPAdapter( quic_cache_layer=self.quic_cache_layer, max_retries=retries, disable_http1=disable_http1, disable_http2=disable_http2, disable_http3=disable_http3, resolver=resolver, source_address=source_address, disable_ipv4=disable_ipv4, disable_ipv6=disable_ipv6, pool_connections=pool_connections, pool_maxsize=pool_maxsize, happy_eyeballs=happy_eyeballs, keepalive_delay=keepalive_delay, keepalive_idle_window=keepalive_idle_window, revocation_configuration=revocation_configuration, allow_incoming_cookies=allow_incoming_cookies, ), ) self.mount( "http://", AsyncHTTPAdapter( max_retries=retries, disable_http1=disable_http1, disable_http2=disable_http2, disable_http3=disable_http3, resolver=resolver, source_address=source_address, disable_ipv4=disable_ipv4, disable_ipv6=disable_ipv6, pool_connections=pool_connections, pool_maxsize=pool_maxsize, happy_eyeballs=happy_eyeballs, keepalive_delay=keepalive_delay, keepalive_idle_window=keepalive_idle_window, revocation_configuration=revocation_configuration, allow_incoming_cookies=allow_incoming_cookies, ), ) self.mount( "http+unix://", AsyncUnixAdapter( max_retries=retries, disable_http1=disable_http1, disable_http2=disable_http2, disable_http3=disable_http3, resolver=resolver, source_address=source_address, disable_ipv4=disable_ipv4, disable_ipv6=disable_ipv6, pool_connections=pool_connections, pool_maxsize=pool_maxsize, happy_eyeballs=happy_eyeballs, keepalive_delay=keepalive_delay, keepalive_idle_window=keepalive_idle_window, revocation_configuration=revocation_configuration, allow_incoming_cookies=allow_incoming_cookies, ), ) else: wasm_adapter = AsyncPyodideAdapter( max_retries=retries, ) for supported_scheme in ["http", "https", "sse", "psse", "ws", "wss"]: self.mount(f"{supported_scheme}://", wasm_adapter) if app is not None: adapter = AsyncServerGatewayInterface( app=app, max_retries=retries, ) self.mount("asgi://default", adapter) for _scheme in ("ws", "wss", "sse", "psse"): self.mount(f"{_scheme}://default", adapter) if self.base_url is None: self.base_url = "asgi://default" def __repr__(self) -> str: return f"<AsyncSession {repr(self.adapters).replace('OrderedDict(', '')[:-1]}>" def __enter__(self) -> typing.NoReturn: raise SyntaxError('You probably meant "async with". Did you forget to prepend the "async" keyword?') async def __aenter__(self) -> AsyncSession: return self async def __aexit__(self, exc, value, tb) -> None: await self.close() def __setstate__(self, state): for attr, value in state.items(): setattr(self, attr, value) self.resolver = create_async_resolver(None) self._own_resolver = True self.adapters = OrderedDict() self.mount( "https://", AsyncHTTPAdapter( quic_cache_layer=self.quic_cache_layer, max_retries=self.retries, disable_http1=self._disable_http1, disable_http2=self._disable_http2, disable_http3=self._disable_http3, source_address=self.source_address, disable_ipv4=self._disable_ipv4, disable_ipv6=self._disable_ipv6, resolver=self.resolver, pool_connections=self._pool_connections, pool_maxsize=self._pool_maxsize, happy_eyeballs=self._happy_eyeballs, keepalive_delay=self._keepalive_delay, keepalive_idle_window=self._keepalive_idle_window, revocation_configuration=self._revocation_configuration, allow_incoming_cookies=self.allow_incoming_cookies, ), ) self.mount( "http://", AsyncHTTPAdapter( max_retries=self.retries, disable_http1=self._disable_http1, disable_http2=self._disable_http2, disable_http3=self._disable_http3, source_address=self.source_address, disable_ipv4=self._disable_ipv4, disable_ipv6=self._disable_ipv6, resolver=self.resolver, pool_connections=self._pool_connections, pool_maxsize=self._pool_maxsize, happy_eyeballs=self._happy_eyeballs, keepalive_delay=self._keepalive_delay, keepalive_idle_window=self._keepalive_idle_window, revocation_configuration=self._revocation_configuration, allow_incoming_cookies=self.allow_incoming_cookies, ), ) self.mount( "http+unix://", AsyncUnixAdapter( max_retries=self.retries, disable_http1=self._disable_http1, disable_http2=self._disable_http2, disable_http3=self._disable_http3, source_address=self.source_address, disable_ipv4=self._disable_ipv4, disable_ipv6=self._disable_ipv6, resolver=self.resolver, pool_connections=self._pool_connections, pool_maxsize=self._pool_maxsize, happy_eyeballs=self._happy_eyeballs, keepalive_delay=self._keepalive_delay, keepalive_idle_window=self._keepalive_idle_window, revocation_configuration=self._revocation_configuration, allow_incoming_cookies=self.allow_incoming_cookies, ), ) for adapter in self.adapters.values(): if hasattr(adapter, "_ocsp_cache"): adapter._ocsp_cache = self._ocsp_cache if hasattr(adapter, "_crl_cache"): adapter._crl_cache = self._crl_cache
[docs] def mount(self, prefix: str, adapter: AsyncBaseAdapter) -> None: # type: ignore[override] super().mount(prefix, adapter) # type: ignore[arg-type]
[docs] def get_adapter(self, url: str) -> AsyncBaseAdapter: # type: ignore[override] for prefix, adapter in self.adapters.items(): if url.lower().startswith(prefix.lower()): return adapter # If no adapter matches our prefix, that usually means we want # an HTTP extension like wss (e.g. WebSocket). scheme = parse_scheme(url) if "+" in scheme: scheme, implementation = tuple(scheme.split("+", maxsplit=1)) else: implementation = None try: extension = load_extension(scheme, implementation=implementation) for prefix, adapter in self.adapters.items(): if scheme in extension.supported_schemes() and extension.scheme_to_http_scheme(scheme) == parse_scheme(prefix): return adapter except ImportError: pass # add a hint if wss:// fails when Niquests document its support. # they probably forgot about the extra. if url.startswith("ws://") or url.startswith("wss://"): additional_hint = " Did you forget to install the extra for WebSocket? Run `pip install niquests[ws]` to fix this." else: additional_hint = "" # Nothing matches :-/ raise InvalidSchema(f"No connection adapters were found for {url!r}{additional_hint}")
@typing.overload # type: ignore[override] async def send(self, request: PreparedRequest, *, stream: Literal[True], **kwargs: typing.Any) -> AsyncResponse: ... @typing.overload # type: ignore[override] async def send(self, request: PreparedRequest, **kwargs: typing.Any) -> Response: ...
[docs] async def send( # type: ignore[override] self, request: PreparedRequest, **kwargs: typing.Any ) -> Response | AsyncResponse: # type: ignore[override] """Send a given PreparedRequest.""" # It's possible that users might accidentally send a Request object. # Guard against that specific failure case. if isinstance(request, Request): raise ValueError("You can only send PreparedRequests.") # Set defaults that the hooks can utilize to ensure they always have # the correct parameters to reproduce the previous request. kwargs.setdefault("stream", self.stream) kwargs.setdefault("verify", self.verify) kwargs.setdefault("cert", self.cert) if "proxies" not in kwargs: kwargs["proxies"] = resolve_proxies(request, self.proxies, self.trust_env) if ( "timeout" in kwargs and kwargs["timeout"] and HAS_LEGACY_URLLIB3 and hasattr(kwargs["timeout"], "total") and "urllib3_future" not in str(type(kwargs["timeout"])) ): kwargs["timeout"] = urllib3_ensure_type(kwargs["timeout"]) # Set up variables needed for resolve_redirects and dispatching of hooks allow_redirects = kwargs.pop("allow_redirects", True) stream = kwargs.get("stream") hooks = request.hooks # Dispatch the pre_request hook here so it fires regardless of whether # the user came in through AsyncSession.request() or called # AsyncSession.send() directly with a manually prepared PreparedRequest. # see https://github.com/jawah/niquests/issues/389 request = await async_dispatch_hook( # type: ignore[assignment] "pre_request", hooks, # type: ignore[arg-type] request, ) ptr_request = request async def on_post_connection(conn_info: ConnectionInfo) -> None: """This function will be called by urllib3.future just after establishing the connection.""" nonlocal ptr_request, request, kwargs ptr_request.conn_info = conn_info if ptr_request.url and ptr_request.url.startswith("https://") and kwargs["verify"]: strict_ocsp_enabled: bool = os.environ.get("NIQUESTS_STRICT_OCSP", "0") != "0" if not strict_ocsp_enabled and self._revocation_configuration is not None: strict_ocsp_enabled = self._revocation_configuration.strict_mode if should_check_ocsp(conn_info, self._revocation_configuration): try: from .extensions.revocation._ocsp._async import ( InMemoryRevocationStatus, ) from .extensions.revocation._ocsp._async import ( verify as ocsp_verify, ) except ImportError: pass else: if self._ocsp_cache is None: self._ocsp_cache = InMemoryRevocationStatus() for adapter in self.adapters.values(): if hasattr(adapter, "_ocsp_cache"): adapter._ocsp_cache = self._ocsp_cache await ocsp_verify( ptr_request, strict_ocsp_enabled, 0.2 if not strict_ocsp_enabled else 1.0, kwargs["proxies"], resolver=self.resolver, happy_eyeballs=self._happy_eyeballs, cache=self._ocsp_cache, ) if should_check_crl(conn_info, self._revocation_configuration): try: from .extensions.revocation._crl._async import ( InMemoryRevocationList, ) from .extensions.revocation._crl._async import ( verify as crl_verify, ) except ImportError: pass else: if self._crl_cache is None: self._crl_cache = InMemoryRevocationList() for adapter in self.adapters.values(): if hasattr(adapter, "_crl_cache"): adapter._crl_cache = self._crl_cache await crl_verify( ptr_request, strict_ocsp_enabled, 0.2 if not strict_ocsp_enabled else 1.0, kwargs["proxies"], resolver=self.resolver, happy_eyeballs=self._happy_eyeballs, cache=self._crl_cache, ) # don't trigger pre_send for redirects if ptr_request == request: await async_dispatch_hook("pre_send", hooks, ptr_request) # type: ignore[arg-type] async def handle_upload_progress( total_sent: int, content_length: int | None, is_completed: bool, any_error: bool, ) -> None: nonlocal ptr_request, request, kwargs if ptr_request != request: return if request.upload_progress is None: request.upload_progress = TransferProgress() request.upload_progress.total = total_sent request.upload_progress.content_length = content_length request.upload_progress.is_completed = is_completed request.upload_progress.any_error = any_error await async_dispatch_hook("on_upload", hooks, request) # type: ignore[arg-type] async def on_early_response(early_response: Response) -> None: await async_dispatch_hook("early_response", hooks, early_response) # type: ignore[arg-type] kwargs.setdefault("on_post_connection", on_post_connection) kwargs.setdefault("on_upload_body", handle_upload_progress) kwargs.setdefault("multiplexed", self.multiplexed) kwargs.setdefault("on_early_response", on_early_response) assert request.url is not None # Recycle the resolver if unavailable if not self.resolver.is_available(): if not self._own_resolver: warnings.warn( "A externally instantiated resolver was closed. Attempt to recycling it internally, " "the Session will detach itself from given resolver.", ResourceWarning, ) await self.close() self.resolver = self.resolver.recycle() self.mount( "https://", AsyncHTTPAdapter( quic_cache_layer=self.quic_cache_layer, max_retries=self.retries, disable_http1=self._disable_http1, disable_http2=self._disable_http2, disable_http3=self._disable_http3, resolver=self.resolver, source_address=self.source_address, disable_ipv4=self._disable_ipv4, disable_ipv6=self._disable_ipv6, pool_connections=self._pool_connections, pool_maxsize=self._pool_maxsize, happy_eyeballs=self._happy_eyeballs, keepalive_delay=self._keepalive_delay, keepalive_idle_window=self._keepalive_idle_window, revocation_configuration=self._revocation_configuration, allow_incoming_cookies=self.allow_incoming_cookies, ), ) self.mount( "http://", AsyncHTTPAdapter( max_retries=self.retries, disable_http1=self._disable_http1, disable_http2=self._disable_http2, disable_http3=self._disable_http3, resolver=self.resolver, source_address=self.source_address, disable_ipv4=self._disable_ipv4, disable_ipv6=self._disable_ipv6, pool_connections=self._pool_connections, pool_maxsize=self._pool_maxsize, happy_eyeballs=self._happy_eyeballs, keepalive_delay=self._keepalive_delay, keepalive_idle_window=self._keepalive_idle_window, revocation_configuration=self._revocation_configuration, allow_incoming_cookies=self.allow_incoming_cookies, ), ) self.mount( "http+unix://", AsyncUnixAdapter( max_retries=self.retries, disable_http1=self._disable_http1, disable_http2=self._disable_http2, disable_http3=self._disable_http3, resolver=self.resolver, source_address=self.source_address, disable_ipv4=self._disable_ipv4, disable_ipv6=self._disable_ipv6, pool_connections=self._pool_connections, pool_maxsize=self._pool_maxsize, happy_eyeballs=self._happy_eyeballs, keepalive_delay=self._keepalive_delay, keepalive_idle_window=self._keepalive_idle_window, revocation_configuration=self._revocation_configuration, allow_incoming_cookies=self.allow_incoming_cookies, ), ) # Get the appropriate adapter to use adapter = self.get_adapter(url=request.url) # Start time (approximately) of the request start = preferred_clock() # Send the request r = await adapter.send(request, **kwargs) # Make sure the timings data are kept as is, conn_info is a reference to # urllib3-future conn_info. request.conn_info = _deepcopy_ci(request.conn_info) # We are leveraging a multiplexed connection if hasattr(r, "lazy") and r.lazy is True: async def _redirect_method_ref(x, y): aiter_gen = self.resolve_redirects(x, y, yield_requests=True, **kwargs) try: return await aiter_gen.__anext__() except StopAsyncIteration: return None finally: await aiter_gen.aclose() r._resolve_redirect = _redirect_method_ref # in multiplexed mode, we are unable to forward this local function for safety reasons. kwargs["on_post_connection"] = None # we intentionally set 'niquests' as the prefix. urllib3.future have its own parameters. r._promise.update_parameters( { "niquests_is_stream": stream, "niquests_start": start, "niquests_hooks": hooks, "niquests_cookies": self.cookies, "niquests_allow_redirect": allow_redirects, "niquests_kwargs": kwargs, # You may be wondering why we are setting redirect info in promise ctx. # because in multiplexed mode, we are not fully aware of hop/redirect count "niquests_redirect_count": 0, "niquests_max_redirects": self.max_redirects, } ) if not stream: _swap_context(r) return r # Total elapsed time of the request (approximately) elapsed = preferred_clock() - start r.elapsed = timedelta(seconds=elapsed) # Response manipulation hooks r = await async_dispatch_hook("response", hooks, r, **kwargs) # type: ignore[arg-type] # Persist cookies if self.allow_incoming_cookies: if r.history: # If the hooks create history then we want those cookies too for resp in r.history: extract_cookies_to_jar(self.cookies, resp.request, resp.raw) extract_cookies_to_jar(self.cookies, request, r.raw) # Resolve redirects if allowed. if allow_redirects: # Redirect resolving generator. gen = self.resolve_redirects(r, request, yield_requests_trail=True, **kwargs) history = [] async for resp_or_req in gen: if isinstance(resp_or_req, Response): history.append(resp_or_req) continue ptr_request = resp_or_req else: history = [] # Shuffle things around if there's history. if history: # Insert the first (original) request at the start history.insert(0, r) # Get the last request made r = history.pop() for hr in history: if isinstance(hr, AsyncResponse): _swap_context(hr) r.history = history # type: ignore[assignment] # If redirects aren't being followed, store the response on the Request for Response.next(). if not allow_redirects: if r.is_redirect: gen = self.resolve_redirects(r, request, yield_requests=True, **kwargs) try: r._next = await gen.__anext__() # type: ignore[assignment] except StopAsyncIteration: pass finally: await gen.aclose() if not stream: if isinstance(r, AsyncResponse): if r.extension is None: await r.content _swap_context(r) else: if r.extension is None: r.content return r
[docs] async def resolve_redirects( # type: ignore[override] self, resp: AsyncResponse, req: PreparedRequest, stream: bool = False, timeout: int | float | None = None, verify: TLSVerifyType = True, cert: TLSClientCertType | None = None, proxies: ProxyType | None = None, yield_requests: bool = False, yield_requests_trail: bool = False, **adapter_kwargs: typing.Any, ) -> typing.AsyncGenerator[AsyncResponse | PreparedRequest, None]: """Receives a Response. Returns a generator of Responses or Requests.""" hist = [] # keep track of history url = self.get_redirect_target(resp) previous_fragment = urlparse(req.url).fragment while url: prepared_request = req.copy() # Update history and keep track of redirects. # resp.history must ignore the original request in this loop hist.append(resp) resp.history = hist[1:] # type: ignore[assignment] assert resp.raw is not None try: if isinstance(resp, AsyncResponse): await resp.content # Consume socket so it can be released else: resp.content except (ChunkedEncodingError, ContentDecodingError, RuntimeError): await resp.raw.read(decode_content=False) if len(resp.history) >= self.max_redirects: raise TooManyRedirects(f"Exceeded {self.max_redirects} redirects.", response=resp) # Release the connection back into the pool. if isinstance(resp, AsyncResponse): await resp.close() else: resp.close() # Handle redirection without scheme (see: RFC 1808 Section 4) if url.startswith("//"): assert resp.url is not None target_scheme = parse_scheme(resp.url) if isinstance(target_scheme, bytes): target_scheme = target_scheme.decode() url = ":".join([target_scheme, url]) # Normalize url case and attach previous fragment if needed (RFC 7231 7.1.2) parsed = urlparse(url) if parsed.fragment == "" and previous_fragment: parsed = parsed._replace( fragment=previous_fragment if isinstance(previous_fragment, str) else previous_fragment.decode("utf-8") ) elif parsed.fragment: previous_fragment = parsed.fragment url = parsed.geturl() # Facilitate relative 'location' headers, as allowed by RFC 7231. # (e.g. '/path/to/resource' instead of 'http://domain.tld/path/to/resource') # Compliant with RFC3986, we percent encode the url. if not parsed.netloc: url = urljoin_safe(resp.url, requote_uri(url)) # type: ignore[type-var,arg-type] assert isinstance(url, str), f"urljoin produced {type(url)} instead of str" else: url = requote_uri(url) # this shouldn't happen, but kept in extreme case of being nice with BC. if isinstance(url, bytes): url = url.decode("utf-8") prepared_request.url = url assert prepared_request.headers is not None self.rebuild_method(prepared_request, resp) # https://github.com/psf/requests/issues/1084 if resp.status_code not in ( codes.temporary_redirect, # type: ignore[attr-defined] codes.permanent_redirect, # type: ignore[attr-defined] ): # https://github.com/psf/requests/issues/3490 purged_headers = ("Content-Length", "Content-Type", "Transfer-Encoding") for header in purged_headers: prepared_request.headers.pop(header, None) prepared_request.body = None headers = prepared_request.headers headers.pop("Cookie", None) assert prepared_request._cookies is not None # Extract any cookies sent on the response to the cookiejar # in the new request. Because we've mutated our copied prepared # request, use the old one that we haven't yet touched. if self.allow_incoming_cookies: extract_cookies_to_jar(prepared_request._cookies, req, resp.raw) merge_cookies(prepared_request._cookies, self.cookies) prepared_request.prepare_cookies(prepared_request._cookies) # Rebuild auth and proxy information. proxies = self.rebuild_proxies(prepared_request, proxies) self.rebuild_auth(prepared_request, resp) # A failed tell() sets `_body_position` to `object()`. This non-None # value ensures `rewindable` will be True, allowing us to raise an # UnrewindableBodyError, instead of hanging the connection. rewindable = prepared_request._body_position is not None and ( "Content-Length" in headers or "Transfer-Encoding" in headers ) # Attempt to rewind consumed file-like object. if rewindable: # could be async file with awaitable tell/seek! if ( prepared_request.body is not None and hasattr(prepared_request.body, "tell") and iscoroutinefunction(prepared_request.body.tell) ): await arewind_body(prepared_request) else: rewind_body(prepared_request) # Override the original request. req = prepared_request if yield_requests: yield req else: if yield_requests_trail: yield req resp = await self.send( # type: ignore[assignment] req, stream=stream, timeout=timeout, verify=verify, cert=cert, proxies=proxies, allow_redirects=False, **adapter_kwargs, ) # If the initial request was intended to be lazy but didn't meet required criteria # e.g. Setting multiplexed=True, requesting HTTP/1.1 only capable and getting redirected # to an HTTP/2+ endpoint. if hasattr(resp, "lazy") and resp.lazy: await self.gather(resp) if self.allow_incoming_cookies: extract_cookies_to_jar(self.cookies, prepared_request, resp.raw) # extract redirect url, if any, for the next loop url = self.get_redirect_target(resp) yield resp
@typing.overload # type: ignore[override] async def request( self, method: HttpMethodType, url: str, params: QueryParameterType | None = ..., data: BodyType | AsyncBodyType | None = ..., headers: HeadersType | None = ..., cookies: CookiesType | None = ..., files: MultiPartFilesType | MultiPartFilesAltType | None = ..., auth: HttpAuthenticationType | AsyncHttpAuthenticationType | None = ..., timeout: TimeoutType | None = ..., allow_redirects: bool = ..., proxies: ProxyType | None = ..., hooks: AsyncHookType[PreparedRequest | Response] | None = ..., stream: Literal[False] | None = ..., verify: TLSVerifyType | None = ..., cert: TLSClientCertType | None = ..., json: typing.Any | None = ..., override_scheme: str | None = ..., ) -> Response: ... @typing.overload # type: ignore[override] async def request( self, method: HttpMethodType, url: str, params: QueryParameterType | None = ..., data: BodyType | AsyncBodyType | None = ..., headers: HeadersType | None = ..., cookies: CookiesType | None = ..., files: MultiPartFilesType | MultiPartFilesAltType | None = ..., auth: HttpAuthenticationType | AsyncHttpAuthenticationType | None = ..., timeout: TimeoutType | None = ..., allow_redirects: bool = ..., proxies: ProxyType | None = ..., hooks: AsyncHookType[PreparedRequest | Response] | None = ..., stream: Literal[True] = ..., verify: TLSVerifyType | None = ..., cert: TLSClientCertType | None = ..., json: typing.Any | None = ..., override_scheme: str | None = ..., ) -> AsyncResponse: ...
[docs] async def request( # type: ignore[override] self, method: HttpMethodType, url: str, params: QueryParameterType | None = None, data: BodyType | AsyncBodyType | None = None, headers: HeadersType | None = None, cookies: CookiesType | None = None, files: MultiPartFilesType | MultiPartFilesAltType | None = None, auth: HttpAuthenticationType | AsyncHttpAuthenticationType | None = None, timeout: TimeoutType | None = None, allow_redirects: bool = True, proxies: ProxyType | None = None, hooks: AsyncHookType[PreparedRequest | Response] | None = None, stream: bool | None = None, verify: TLSVerifyType | None = None, cert: TLSClientCertType | None = None, json: typing.Any | None = None, override_scheme: str | None = None, ) -> Response | AsyncResponse: if method.isupper() is False: method = method.upper() # Create the Request. req = Request( method=method, url=url, headers=headers, files=files, data=data or {}, json=json, params=params or {}, auth=auth, cookies=cookies, hooks=hooks, base_url=self.base_url, override_scheme=override_scheme, ) prep: PreparedRequest = self.prepare_request(req) if prep._asynchronous_auth: r = await prep._pending_async_auth(prep) prep.__dict__.update(r.__dict__) prep.prepare_content_length(prep.body) del prep._pending_async_auth if ( isinstance(prep._body_position, object) and prep.body is not None and hasattr(prep.body, "tell") and iscoroutinefunction(prep.body.tell) ): try: prep._body_position = await prep.body.tell() except OSError: pass assert prep.url is not None proxies = proxies or {} settings = self.merge_environment_settings(prep.url, proxies, stream, verify, cert) # Send the request. send_kwargs = { "timeout": timeout or self.timeout, "allow_redirects": allow_redirects, } send_kwargs.update(settings) # Ensuring we always apply a default timeout as per Niquests policy. if send_kwargs["timeout"] is None: send_kwargs["timeout"] = ( WRITE_DEFAULT_TIMEOUT if method in {"POST", "PUT", "DELETE", "PATCH"} else READ_DEFAULT_TIMEOUT ) return await self.send(prep, **send_kwargs)
@typing.overload # type: ignore[override] async def get( self, url: str, *, params: QueryParameterType | None = ..., headers: HeadersType | None = ..., cookies: CookiesType | None = ..., auth: HttpAuthenticationType | AsyncHttpAuthenticationType | None = ..., timeout: TimeoutType | None = ..., allow_redirects: bool = ..., proxies: ProxyType | None = ..., hooks: AsyncHookType[PreparedRequest | Response] | None = ..., verify: TLSVerifyType | None = ..., stream: Literal[False] | None = ..., cert: TLSClientCertType | None = ..., **kwargs: typing.Any, ) -> Response: ... @typing.overload # type: ignore[override] async def get( self, url: str, *, params: QueryParameterType | None = ..., headers: HeadersType | None = ..., cookies: CookiesType | None = ..., auth: HttpAuthenticationType | AsyncHttpAuthenticationType | None = ..., timeout: TimeoutType | None = ..., allow_redirects: bool = ..., proxies: ProxyType | None = ..., hooks: AsyncHookType[PreparedRequest | Response] | None = ..., verify: TLSVerifyType | None = ..., stream: Literal[True] = ..., cert: TLSClientCertType | None = ..., **kwargs: typing.Any, ) -> AsyncResponse: ...
[docs] async def get( # type: ignore[override] self, url: str, *, params: QueryParameterType | None = None, headers: HeadersType | None = None, cookies: CookiesType | None = None, auth: HttpAuthenticationType | AsyncHttpAuthenticationType | None = None, timeout: TimeoutType | None = None, allow_redirects: bool = True, proxies: ProxyType | None = None, hooks: AsyncHookType[PreparedRequest | Response] | None = None, verify: TLSVerifyType | None = None, stream: bool | None = None, cert: TLSClientCertType | None = None, **kwargs: typing.Any, ) -> Response | AsyncResponse: return await self.request( # type: ignore[call-overload,misc] "GET", url, params=params, headers=headers, cookies=cookies, auth=auth, timeout=timeout, allow_redirects=allow_redirects, proxies=proxies, hooks=hooks, verify=verify, stream=stream, # type: ignore[arg-type] cert=cert, **kwargs, )
@typing.overload # type: ignore[override] async def options( self, url: str, *, params: QueryParameterType | None = ..., headers: HeadersType | None = ..., cookies: CookiesType | None = ..., auth: HttpAuthenticationType | AsyncHttpAuthenticationType | None = ..., timeout: TimeoutType | None = ..., allow_redirects: bool = ..., proxies: ProxyType | None = ..., hooks: AsyncHookType[PreparedRequest | Response] | None = ..., verify: TLSVerifyType | None = ..., stream: Literal[False] | Literal[None] = ..., cert: TLSClientCertType | None = ..., **kwargs: typing.Any, ) -> Response: ... @typing.overload # type: ignore[override] async def options( self, url: str, *, params: QueryParameterType | None = ..., headers: HeadersType | None = ..., cookies: CookiesType | None = ..., auth: HttpAuthenticationType | AsyncHttpAuthenticationType | None = ..., timeout: TimeoutType | None = ..., allow_redirects: bool = ..., proxies: ProxyType | None = ..., hooks: AsyncHookType[PreparedRequest | Response] | None = ..., verify: TLSVerifyType | None = ..., stream: Literal[True], cert: TLSClientCertType | None = ..., **kwargs: typing.Any, ) -> AsyncResponse: ...
[docs] async def options( # type: ignore[override] self, url: str, *, params: QueryParameterType | None = None, headers: HeadersType | None = None, cookies: CookiesType | None = None, auth: HttpAuthenticationType | AsyncHttpAuthenticationType | None = None, timeout: TimeoutType | None = None, allow_redirects: bool = True, proxies: ProxyType | None = None, hooks: AsyncHookType[PreparedRequest | Response] | None = None, verify: TLSVerifyType | None = None, stream: bool | None = None, cert: TLSClientCertType | None = None, **kwargs: typing.Any, ) -> Response | AsyncResponse: return await self.request( # type: ignore[call-overload,misc] "OPTIONS", url, params=params, headers=headers, cookies=cookies, auth=auth, timeout=timeout, allow_redirects=allow_redirects, proxies=proxies, hooks=hooks, verify=verify, stream=stream, # type: ignore[arg-type] cert=cert, **kwargs, )
@typing.overload # type: ignore[override] async def head( self, url: str, *, params: QueryParameterType | None = ..., headers: HeadersType | None = ..., cookies: CookiesType | None = ..., auth: HttpAuthenticationType | AsyncHttpAuthenticationType | None = ..., timeout: TimeoutType | None = ..., allow_redirects: bool = ..., proxies: ProxyType | None = ..., hooks: AsyncHookType[PreparedRequest | Response] | None = ..., verify: TLSVerifyType | None = ..., stream: Literal[False] | Literal[None] = ..., cert: TLSClientCertType | None = ..., **kwargs: typing.Any, ) -> Response: ... @typing.overload # type: ignore[override] async def head( self, url: str, *, params: QueryParameterType | None = ..., headers: HeadersType | None = ..., cookies: CookiesType | None = ..., auth: HttpAuthenticationType | AsyncHttpAuthenticationType | None = ..., timeout: TimeoutType | None = ..., allow_redirects: bool = ..., proxies: ProxyType | None = ..., hooks: AsyncHookType[PreparedRequest | Response] | None = ..., verify: TLSVerifyType | None = ..., stream: Literal[True], cert: TLSClientCertType | None = ..., **kwargs: typing.Any, ) -> AsyncResponse: ...
[docs] async def head( # type: ignore[override] self, url: str, *, params: QueryParameterType | None = None, headers: HeadersType | None = None, cookies: CookiesType | None = None, auth: HttpAuthenticationType | AsyncHttpAuthenticationType | None = None, timeout: TimeoutType | None = None, allow_redirects: bool = False, proxies: ProxyType | None = None, hooks: AsyncHookType[PreparedRequest | Response] | None = None, verify: TLSVerifyType | None = None, stream: bool | None = None, cert: TLSClientCertType | None = None, **kwargs: typing.Any, ) -> Response | AsyncResponse: return await self.request( # type: ignore[call-overload,misc] "HEAD", url, params=params, headers=headers, cookies=cookies, auth=auth, timeout=timeout, allow_redirects=allow_redirects, proxies=proxies, hooks=hooks, verify=verify, stream=stream, # type: ignore[arg-type] cert=cert, **kwargs, )
@typing.overload # type: ignore[override] async def post( self, url: str, data: BodyType | AsyncBodyType | None = ..., json: typing.Any | None = ..., *, params: QueryParameterType | None = ..., headers: HeadersType | None = ..., cookies: CookiesType | None = ..., files: MultiPartFilesType | MultiPartFilesAltType | None = ..., auth: HttpAuthenticationType | AsyncHttpAuthenticationType | None = ..., timeout: TimeoutType | None = ..., allow_redirects: bool = ..., proxies: ProxyType | None = ..., hooks: AsyncHookType[PreparedRequest | Response] | None = ..., verify: TLSVerifyType | None = ..., stream: Literal[False] | Literal[None] = ..., cert: TLSClientCertType | None = ..., ) -> Response: ... @typing.overload # type: ignore[override] async def post( self, url: str, data: BodyType | AsyncBodyType | None = ..., json: typing.Any | None = ..., *, params: QueryParameterType | None = ..., headers: HeadersType | None = ..., cookies: CookiesType | None = ..., files: MultiPartFilesType | MultiPartFilesAltType | None = ..., auth: HttpAuthenticationType | AsyncHttpAuthenticationType | None = ..., timeout: TimeoutType | None = ..., allow_redirects: bool = ..., proxies: ProxyType | None = ..., hooks: AsyncHookType[PreparedRequest | Response] | None = ..., verify: TLSVerifyType | None = ..., stream: Literal[True], cert: TLSClientCertType | None = ..., ) -> AsyncResponse: ...
[docs] async def post( # type: ignore[override] self, url: str, data: BodyType | AsyncBodyType | None = None, json: typing.Any | None = None, *, params: QueryParameterType | None = None, headers: HeadersType | None = None, cookies: CookiesType | None = None, files: MultiPartFilesType | MultiPartFilesAltType | None = None, auth: HttpAuthenticationType | AsyncHttpAuthenticationType | None = None, timeout: TimeoutType | None = None, allow_redirects: bool = True, proxies: ProxyType | None = None, hooks: AsyncHookType[PreparedRequest | Response] | None = None, verify: TLSVerifyType | None = None, stream: bool | None = None, cert: TLSClientCertType | None = None, ) -> Response | AsyncResponse: return await self.request( # type: ignore[call-overload,misc] "POST", url, data=data, json=json, params=params, headers=headers, cookies=cookies, files=files, auth=auth, timeout=timeout, allow_redirects=allow_redirects, proxies=proxies, hooks=hooks, verify=verify, stream=stream, # type: ignore[arg-type] cert=cert, )
@typing.overload # type: ignore[override] async def put( self, url: str, data: BodyType | AsyncBodyType | None = ..., *, json: typing.Any | None = ..., params: QueryParameterType | None = ..., headers: HeadersType | None = ..., cookies: CookiesType | None = ..., files: MultiPartFilesType | MultiPartFilesAltType | None = ..., auth: HttpAuthenticationType | AsyncHttpAuthenticationType | None = ..., timeout: TimeoutType | None = ..., allow_redirects: bool = ..., proxies: ProxyType | None = ..., hooks: AsyncHookType[PreparedRequest | Response] | None = ..., verify: TLSVerifyType | None = ..., stream: Literal[False] | Literal[None] = ..., cert: TLSClientCertType | None = ..., ) -> Response: ... @typing.overload # type: ignore[override] async def put( self, url: str, data: BodyType | AsyncBodyType | None = ..., *, json: typing.Any | None = ..., params: QueryParameterType | None = ..., headers: HeadersType | None = ..., cookies: CookiesType | None = ..., files: MultiPartFilesType | MultiPartFilesAltType | None = ..., auth: HttpAuthenticationType | AsyncHttpAuthenticationType | None = ..., timeout: TimeoutType | None = ..., allow_redirects: bool = ..., proxies: ProxyType | None = ..., hooks: AsyncHookType[PreparedRequest | Response] | None = ..., verify: TLSVerifyType | None = ..., stream: Literal[True], cert: TLSClientCertType | None = ..., ) -> AsyncResponse: ...
[docs] async def put( # type: ignore[override] self, url: str, data: BodyType | AsyncBodyType | None = None, *, json: typing.Any | None = None, params: QueryParameterType | None = None, headers: HeadersType | None = None, cookies: CookiesType | None = None, files: MultiPartFilesType | MultiPartFilesAltType | None = None, auth: HttpAuthenticationType | AsyncHttpAuthenticationType | None = None, timeout: TimeoutType | None = None, allow_redirects: bool = True, proxies: ProxyType | None = None, hooks: AsyncHookType[PreparedRequest | Response] | None = None, verify: TLSVerifyType | None = None, stream: bool | None = None, cert: TLSClientCertType | None = None, ) -> Response | AsyncResponse: return await self.request( # type: ignore[call-overload,misc] "PUT", url, data=data, json=json, params=params, headers=headers, cookies=cookies, files=files, auth=auth, timeout=timeout, allow_redirects=allow_redirects, proxies=proxies, hooks=hooks, verify=verify, stream=stream, # type: ignore[arg-type] cert=cert, )
@typing.overload # type: ignore[override] async def patch( self, url: str, data: BodyType | AsyncBodyType | None = ..., *, json: typing.Any | None = ..., params: QueryParameterType | None = ..., headers: HeadersType | None = ..., cookies: CookiesType | None = ..., files: MultiPartFilesType | MultiPartFilesAltType | None = ..., auth: HttpAuthenticationType | AsyncHttpAuthenticationType | None = ..., timeout: TimeoutType | None = ..., allow_redirects: bool = ..., proxies: ProxyType | None = ..., hooks: AsyncHookType[PreparedRequest | Response] | None = ..., verify: TLSVerifyType | None = ..., stream: Literal[False] | Literal[None] = ..., cert: TLSClientCertType | None = ..., ) -> Response: ... @typing.overload # type: ignore[override] async def patch( self, url: str, data: BodyType | AsyncBodyType | None = ..., *, json: typing.Any | None = ..., params: QueryParameterType | None = ..., headers: HeadersType | None = ..., cookies: CookiesType | None = ..., files: MultiPartFilesType | MultiPartFilesAltType | None = ..., auth: HttpAuthenticationType | AsyncHttpAuthenticationType | None = ..., timeout: TimeoutType | None = ..., allow_redirects: bool = ..., proxies: ProxyType | None = ..., hooks: AsyncHookType[PreparedRequest | Response] | None = ..., verify: TLSVerifyType | None = ..., stream: Literal[True], cert: TLSClientCertType | None = ..., ) -> AsyncResponse: ...
[docs] async def patch( # type: ignore[override] self, url: str, data: BodyType | AsyncBodyType | None = None, *, json: typing.Any | None = None, params: QueryParameterType | None = None, headers: HeadersType | None = None, cookies: CookiesType | None = None, files: MultiPartFilesType | MultiPartFilesAltType | None = None, auth: HttpAuthenticationType | AsyncHttpAuthenticationType | None = None, timeout: TimeoutType | None = None, allow_redirects: bool = True, proxies: ProxyType | None = None, hooks: AsyncHookType[PreparedRequest | Response] | None = None, verify: TLSVerifyType | None = None, stream: bool | None = None, cert: TLSClientCertType | None = None, ) -> Response | AsyncResponse: return await self.request( # type: ignore[call-overload,misc] "PATCH", url, data=data, json=json, params=params, headers=headers, cookies=cookies, files=files, auth=auth, timeout=timeout, allow_redirects=allow_redirects, proxies=proxies, hooks=hooks, verify=verify, stream=stream, # type: ignore[arg-type] cert=cert, )
@typing.overload # type: ignore[override] async def delete( self, url: str, *, params: QueryParameterType | None = ..., headers: HeadersType | None = ..., cookies: CookiesType | None = ..., auth: HttpAuthenticationType | AsyncHttpAuthenticationType | None = ..., timeout: TimeoutType | None = ..., allow_redirects: bool = ..., proxies: ProxyType | None = ..., hooks: AsyncHookType[PreparedRequest | Response] | None = ..., verify: TLSVerifyType | None = ..., stream: Literal[False] | Literal[None] = ..., cert: TLSClientCertType | None = ..., **kwargs: typing.Any, ) -> Response: ... @typing.overload # type: ignore[override] async def delete( self, url: str, *, params: QueryParameterType | None = ..., headers: HeadersType | None = ..., cookies: CookiesType | None = ..., auth: HttpAuthenticationType | AsyncHttpAuthenticationType | None = ..., timeout: TimeoutType | None = ..., allow_redirects: bool = ..., proxies: ProxyType | None = ..., hooks: AsyncHookType[PreparedRequest | Response] | None = ..., verify: TLSVerifyType | None = ..., stream: Literal[True], cert: TLSClientCertType | None = ..., **kwargs: typing.Any, ) -> AsyncResponse: ...
[docs] async def delete( # type: ignore[override] self, url: str, *, params: QueryParameterType | None = None, headers: HeadersType | None = None, cookies: CookiesType | None = None, auth: HttpAuthenticationType | AsyncHttpAuthenticationType | None = None, timeout: TimeoutType | None = None, allow_redirects: bool = True, proxies: ProxyType | None = None, hooks: AsyncHookType[PreparedRequest | Response] | None = None, verify: TLSVerifyType | None = None, stream: bool | None = None, cert: TLSClientCertType | None = None, **kwargs: typing.Any, ) -> Response | AsyncResponse: return await self.request( # type: ignore[call-overload,misc] "DELETE", url, params=params, headers=headers, cookies=cookies, auth=auth, timeout=timeout, allow_redirects=allow_redirects, proxies=proxies, hooks=hooks, verify=verify, stream=stream, # type: ignore[arg-type] cert=cert, **kwargs, )
[docs] async def gather(self, *responses: Response, max_fetch: int | None = None) -> None: # type: ignore[override] if self.multiplexed is False: return for adapter in self.adapters.values(): await adapter.gather(*responses, max_fetch=max_fetch)
[docs] async def close(self) -> None: # type: ignore[override] for v in self.adapters.values(): await v.close() if self._own_resolver: await self.resolver.close()