Quickstart¶
Eager to get started? This page gives a good introduction in how to get started with Niquests.
First, make sure that:
Niquests is installed
Niquests is up-to-date
Let’s get started with some simple examples.
Note
Any async example must be enclosed in a proper async function and started by asyncio.run(...).
import asyncio
import niquests
async def main() -> None:
"""paste your example code here!"""
if __name__ == "__main__":
asyncio.run(main())
Make a Request¶
Making a request with Niquests is very simple.
Begin by importing the Niquests module:
import niquests
Now, let’s try to get a webpage. For this example, let’s get GitHub’s public timeline.
r = niquests.get('https://api.github.com/events')
r = await niquests.aget('https://api.github.com/events')
Now, we have a Response object called r. We can
get all the information we need from this object.
Niquests’ simple API means that all forms of HTTP request are as obvious. For example, this is how you make an HTTP POST request:
r = niquests.post('https://httpbin.org/post', data={'key': 'value'})
r = await niquests.apost('https://httpbin.org/post', data={'key': 'value'})
Nice, right? What about the other HTTP request types: PUT, DELETE, HEAD and OPTIONS? These are all just as simple:
r = niquests.put('https://httpbin.org/put', data={'key': 'value'})
r = niquests.delete('https://httpbin.org/delete')
r = niquests.head('https://httpbin.org/get')
r = niquests.options('https://httpbin.org/get')
r = await niquests.aput('https://httpbin.org/put', data={'key': 'value'})
r = await niquests.adelete('https://httpbin.org/delete')
r = await niquests.ahead('https://httpbin.org/get')
r = await niquests.aoptions('https://httpbin.org/get')
That’s all well and good, but it’s also only the start of what Niquests can do.
Passing Parameters In URLs¶
You often want to send some sort of data in the URL’s query string. If
you were constructing the URL by hand, this data would be given as key/value
pairs in the URL after a question mark, e.g. httpbin.org/get?key=val.
Niquests allows you to provide these arguments as a dictionary of strings,
using the params keyword argument. As an example, if you wanted to pass
key1=value1 and key2=value2 to httpbin.org/get, you would use the
following code:
payload = {'key1': 'value1', 'key2': 'value2'}
r = niquests.get('https://httpbin.org/get', params=payload)
payload = {'key1': 'value1', 'key2': 'value2'}
r = await niquests.aget('https://httpbin.org/get', params=payload)
You can see that the URL has been correctly encoded by printing the URL:
print(r.url) # 'https://httpbin.org/get?key2=value2&key1=value1'
Note that any dictionary key whose value is None will not be added to the
URL’s query string.
You can also pass a list of items as a value:
payload = {'key1': 'value1', 'key2': ['value2', 'value3']}
r = niquests.get('https://httpbin.org/get', params=payload)
print(r.url) # 'https://httpbin.org/get?key1=value1&key2=value2&key2=value3'
payload = {'key1': 'value1', 'key2': ['value2', 'value3']}
r = await niquests.aget('https://httpbin.org/get', params=payload)
print(r.url) # 'https://httpbin.org/get?key1=value1&key2=value2&key2=value3'
Response Content¶
We can read the content of the server’s response. Consider the GitHub timeline again:
import niquests
r = niquests.get('https://api.github.com/events')
print(r.text) # '[{"repository":{"open_issues":0,"url":"https://github.com/...
import niquests
r = await niquests.aget('https://api.github.com/events')
print(r.text) # '[{"repository":{"open_issues":0,"url":"https://github.com/...
Niquests will automatically decode content from the server. Most unicode charsets are seamlessly decoded.
When you make a request, Niquests makes educated guesses about the encoding of
the response based on the HTTP headers. The text encoding guessed by Niquests
is used when you access r.text. You can find out what encoding Niquests is
using, and change it, using the r.encoding property:
print(r.encoding) # 'utf-8'
r.encoding = 'ISO-8859-1' # force assign a specific encoding!
Warning
If Niquests is unable to decode the content to string with confidence, it simply return None.
If you change the encoding, Niquests will use the new value of r.encoding
whenever you call r.text. You might want to do this in any situation where
you can apply special logic to work out what the encoding of the content will
be. For example, HTML and XML have the ability to specify their encoding in
their body. In situations like this, you should use r.content to find the
encoding, and then set r.encoding. This will let you use r.text with
the correct encoding.
Niquests will also use custom encodings in the event that you need them. If
you have created your own encoding and registered it with the codecs
module, you can simply use the codec name as the value of r.encoding and
Niquests will handle the decoding for you.
Binary Response Content¶
You can also access the response body as bytes, for non-text requests:
>>> r.content
b'[{"repository":{"open_issues":0,"url":"https://github.com/...
The gzip and deflate transfer-encodings are automatically decoded for you.
The br transfer-encoding is automatically decoded for you if a Brotli library
like brotli or brotlicffi is installed.
The zstd transfer-encoding is automatically decoded for you if the zstandard library zstandard is installed.
For example, to create an image from binary data returned by a request, you can use the following code:
>>> from PIL import Image
>>> from io import BytesIO
>>> i = Image.open(BytesIO(r.content))
JSON Response Content¶
There’s also a builtin JSON decoder, in case you’re dealing with JSON data:
import niquests
r = niquests.get('https://api.github.com/events')
print(r.json()) # [{'repository': {'open_issues': 0, 'url': 'https://github.com/...
import niquests
r = await niquests.aget('https://api.github.com/events')
print(r.json()) # [{'repository': {'open_issues': 0, 'url': 'https://github.com/...
In case the JSON decoding fails, r.json() raises an exception. For example, if
the response gets a 204 (No Content), or if the response contains invalid JSON,
attempting r.json() raises niquests.exceptions.JSONDecodeError. This wrapper exception
provides interoperability for multiple exceptions that may be thrown by different
python versions and json serialization libraries.
Warning
It should be noted that this method will raise niquests.exceptions.JSONDecodeError if the proper Content-Type isn’t set to anything that refer to JSON.
It should be noted that the success of the call to r.json() does not
indicate the success of the response. Some servers may return a JSON object in a
failed response (e.g. error details with HTTP 500). Such JSON will be decoded
and returned. To check that a request is successful, use
r.raise_for_status() or check r.status_code is what you expect.
Note
Since Niquests 3.2, r.raise_for_status() is chainable as it returns self if everything went fine.
Tip
Niquests support using orjson instead of the json standard library. To leverage that feature, install orjson or niquests[speedups]. This can dramatically increase performance.
Tip
For typed JSON deserialization (e.g. with msgspec, pydantic, or cattrs),
use r.content directly instead of r.json() for significantly better performance.
For example, msgspec.json.decode(r.content, type=list[User]) decodes bytes into typed
objects in a single pass, avoiding the intermediate dict. This is 2-5x faster than
msgspec.convert(r.json(), list[User]).
Raw Response Content¶
In the rare case that you’d like to get the raw socket response from the
server, you can access r.raw. If you want to do this, make sure you set
stream=True in your initial request. Once you do, you can do this:
r = niquests.get('https://api.github.com/events', stream=True)
r.raw
# <urllib3.response.HTTPResponse object at 0x101194810>
r.raw.read(10)
# b'\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\x03'
r = await niquests.aget('https://api.github.com/events', stream=True)
r.raw
# <urllib3._async.response.AsyncHTTPResponse object at 0x101194810>
await r.raw.read(10)
# b'\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\x03'
In general, however, you should use a pattern like this to save what is being streamed to a file:
with open(filename, 'wb') as fd:
for chunk in r.iter_content(chunk_size=128):
fd.write(chunk)
with open(filename, 'wb') as fd:
async for chunk in await r.iter_content(chunk_size=128):
fd.write(chunk)
Warning
It is recommended to use aiofiles or similar to handle file I/O in async mode.
Using Response.iter_content will handle a lot of what you would otherwise
have to handle when using Response.raw directly. When streaming a
download, the above is the preferred and recommended way to retrieve the
content. Note that chunk_size can be freely adjusted to a number that
may better fit your use cases.
Note
An important note about using Response.iter_content versus Response.raw.
Response.iter_content will automatically decode the gzip and deflate
transfer-encodings. Response.iter_raw is a raw stream of bytes – it does not
transform the response content. If you really need access to the bytes as they
were returned, use Response.iter_raw.
Custom Headers¶
If you’d like to add HTTP headers to a request, simply pass in a dict to the
headers parameter.
For example, we didn’t specify our user-agent in the previous example:
url = 'https://api.github.com/some/endpoint'
headers = {'user-agent': 'my-app/0.0.1'}
r = niquests.get(url, headers=headers)
url = 'https://api.github.com/some/endpoint'
headers = {'user-agent': 'my-app/0.0.1'}
r = await niquests.aget(url, headers=headers)
Note: Custom headers are given less precedence than more specific sources of information. For instance:
Authorization headers set with headers= will be overridden if credentials are specified in
.netrc, which in turn will be overridden by theauth=parameter. Niquests will search for the netrc file at ~/.netrc, ~/_netrc, or at the path specified by the NETRC environment variable.Authorization headers will be removed if you get redirected off-host.
Proxy-Authorization headers will be overridden by proxy credentials provided in the URL.
Content-Length headers will be overridden when we can determine the length of the content.
Furthermore, Niquests does not change its behavior at all based on which custom headers are specified. The headers are simply passed on into the final request.
Note: All header values must be a string, bytestring, or unicode. While permitted, it’s advised to avoid passing unicode header values.
More complicated POST requests¶
Typically, you want to send some form-encoded data — much like an HTML form.
To do this, simply pass a dictionary to the data argument. Your
dictionary of data will automatically be form-encoded when the request is made:
>>> payload = {'key1': 'value1', 'key2': 'value2'}
>>> r = niquests.post('https://httpbin.org/post', data=payload)
>>> print(r.text)
{
...
"form": {
"key2": "value2",
"key1": "value1"
},
...
}
The data argument can also have multiple values for each key. This can be
done by making data either a list of tuples or a dictionary with lists
as values. This is particularly useful when the form has multiple elements that
use the same key:
>>> payload_tuples = [('key1', 'value1'), ('key1', 'value2')]
>>> r1 = niquests.post('https://httpbin.org/post', data=payload_tuples)
>>> payload_dict = {'key1': ['value1', 'value2']}
>>> r2 = niquests.post('https://httpbin.org/post', data=payload_dict)
>>> print(r1.text)
{
...
"form": {
"key1": [
"value1",
"value2"
]
},
...
}
>>> r1.text == r2.text
True
There are times that you may want to send data that is not form-encoded. If
you pass in a string instead of a dict, that data will be posted directly.
For example, the GitHub API v3 accepts JSON-Encoded POST/PATCH data:
>>> import json
>>> url = 'https://api.github.com/some/endpoint'
>>> payload = {'some': 'data'}
>>> r = niquests.post(url, data=json.dumps(payload))
Please note that the above code will NOT add the Content-Type header
(so in particular it will NOT set it to application/json).
If you need that header set and you don’t want to encode the dict yourself,
you can also pass it directly using the json parameter (added in version 2.4.2)
and it will be encoded automatically:
>>> url = 'https://api.github.com/some/endpoint'
>>> payload = {'some': 'data'}
>>> r = niquests.post(url, json=payload)
Note, the json parameter is ignored if either data or files is passed.
POST a Multipart Form-Data without File¶
Since Niquests 3.1.2 it is possible to overrule the default conversion to application/x-www-form-urlencoded type.
You can submit a form-data by helping Niquests understand what you meant.
>>> url = 'https://httpbin.org/post'
>>> payload = {'some': 'data'}
>>> r = niquests.post(url, data=payload, headers={"Content-Type": "multipart/form-data"})
Now, instead of submitting a urlencoded body, as per the default, Niquests will send instead a proper form-data.
Note
You can also specify manually a boundary in the header value. Niquests will reuse it. Otherwise it will assign a random one.
POST a Multipart-Encoded File¶
Niquests makes it simple to upload Multipart-encoded files:
>>> url = 'https://httpbin.org/post'
>>> files = {'file': open('report.xls', 'rb')}
>>> r = niquests.post(url, files=files)
>>> r.text
{
...
"files": {
"file": "<censored...binary...data>"
},
...
}
You can set the filename, content_type and headers explicitly:
>>> url = 'https://httpbin.org/post'
>>> files = {'file': ('report.xls', open('report.xls', 'rb'), 'application/vnd.ms-excel', {'Expires': '0'})}
>>> r = niquests.post(url, files=files)
>>> r.text
{
...
"files": {
"file": "<censored...binary...data>"
},
...
}
If you want, you can send strings to be received as files:
>>> url = 'https://httpbin.org/post'
>>> files = {'file': ('report.csv', 'some,data,to,send\nanother,row,to,send\n')}
>>> r = niquests.post(url, files=files)
>>> r.text
{
...
"files": {
"file": "some,data,to,send\\nanother,row,to,send\\n"
},
...
}
In the event you are posting a very large file as a multipart/form-data
request, you may want to stream the request. By default, niquests does not
support this, but there is a separate package which does -
requests-toolbelt. You should read the toolbelt’s documentation for more details about how to use it.
For sending multiple files in one request refer to the advanced section.
Response Status Codes¶
We can check the response status code:
>>> r = niquests.get('https://httpbin.org/get')
>>> r.status_code
200
Niquests also comes with a built-in status code lookup object for easy reference:
>>> r.status_code == niquests.codes.ok
True
If we made a bad request (a 4XX client error or 5XX server error response), we
can raise it with
Response.raise_for_status():
>>> bad_r = niquests.get('https://httpbin.org/status/404')
>>> bad_r.status_code
404
>>> bad_r.raise_for_status()
Traceback (most recent call last):
File "requests/models.py", line 832, in raise_for_status
raise http_error
niquests.exceptions.HTTPError: 404 Client Error
But, since our status_code for r was 200, when we call
raise_for_status() we get:
>>> r.raise_for_status()
<Response HTTP/2 [200]>
All is well.
Response Headers¶
We can view the server’s response headers using a Python dictionary:
>>> r.headers
{
'content-encoding': 'gzip',
'transfer-encoding': 'chunked',
'connection': 'close',
'server': 'nginx/1.0.4',
'x-runtime': '148ms',
'etag': '"e1ca502697e5c9317743dc078f67693f"',
'content-type': 'application/json'
}
The dictionary is special, though: it’s made just for HTTP headers. According to RFC 7230, HTTP Header names are case-insensitive.
So, we can access the headers using any capitalization we want:
>>> r.headers['Content-Type']
'application/json'
>>> r.headers.get('content-type')
'application/json'
It is also special in that the server could have sent the same header multiple times with different values, but requests combines them so they can be represented in the dictionary within a single mapping, as per RFC 7230:
A recipient MAY combine multiple header fields with the same field name into one “field-name: field-value” pair, without changing the semantics of the message, by appending each subsequent field value to the combined field value in order, separated by a comma.
It most cases you’d rather quickly access specific key element of headers. Fortunately, you can access HTTP headers as they were objects.
>>> r.oheaders.content_type.charset 'utf-8' >>> r.oheaders.report_to.max_age '604800' >>> str(r.oheaders.date) 'Mon, 02 Oct 2023 05:34:48 GMT' >>> from kiss_headers import get_polymorphic, Date >>> h = get_polymorphic(r.oheaders.date, Date) >>> repr(h.get_datetime()) datetime.datetime(2023, 10, 2, 5, 39, 46, tzinfo=datetime.timezone.utc)
To explore possibilities, visit the kiss-headers documentation at https://jawah.github.io/kiss-headers/
Redirection and History¶
By default Niquests will perform location redirection for all verbs except HEAD.
We can use the history property of the Response object to track redirection.
The Response.history list contains the
Response objects that were created in order to
complete the request. The list is sorted from the oldest to the most recent
response.
For example, GitHub redirects all HTTP requests to HTTPS:
>>> r = niquests.get('http://github.com/')
>>> r.url
'https://github.com/'
>>> r.status_code
200
>>> r.history
[<Response HTTP/2 [301]>]
If you’re using GET, OPTIONS, POST, PUT, PATCH or DELETE, you can disable
redirection handling with the allow_redirects parameter:
>>> r = niquests.get('http://github.com/', allow_redirects=False)
>>> r.status_code
301
>>> r.history
[]
If you’re using HEAD, you can enable redirection as well:
>>> r = niquests.head('http://github.com/', allow_redirects=True)
>>> r.url
'https://github.com/'
>>> r.history
[<Response HTTP/2 [301]>]
Timeouts¶
You can tell Niquests to stop waiting for a response after a given number of
seconds with the timeout parameter. Nearly all production code should use
this parameter in nearly all requests. By default GET, HEAD, OPTIONS ships with a
30 seconds timeout delay and 120 seconds for the rest:
>>> niquests.get('https://github.com/', timeout=0.001)
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
niquests.exceptions.Timeout: HTTPConnectionPool(host='github.com', port=80): Request timed out. (timeout=0.001)
Note
timeout is not a time limit on the entire response download;
rather, an exception is raised if the server has not issued a
response for timeout seconds (more precisely, if no bytes have been
received on the underlying socket for timeout seconds). If no timeout is specified explicitly, requests
use the default according to your HTTP verb. Either 30 seconds or 120 seconds.
Warning
We know that users are surprised by the timeout behaviors. You should know
that Niquests is bound to some legacy behaviors that existed well prior us.
Let’s say that you set up timeout=1 to a specific host. Now let’s say on
purpose that the host is down. Then we should expect the request to fail
exactly 1s after. That is correct. But! Beware that if the host has more than
1 DNS records (either A or AAAA), they all will be tested with set timeout limit!
So if example.tld has two IPs associated, then you should expect 2s max delay.
And so on, so forth…
Tip
Set happy_eyeballs=True when constructing your Session to try all endpoints simultaneously.
This will help you circumvent most of the connectivity issues.
Warning
Unfortunately, due to a Python restriction, we cannot ensure that timeout is respected if your system DNS is
unresponsive. This only applies in synchronous mode (i.e. not async).
To circumvent that issue, you should use a more modern DNS resolver solution. See resolver=... parameter.
Errors and Exceptions¶
In the event of a network problem (e.g. DNS failure, refused connection, etc),
Niquests will raise a ConnectionError exception.
Response.raise_for_status() will
raise an HTTPError if the HTTP request
returned an unsuccessful status code.
If a request times out, a Timeout exception is
raised.
If a request exceeds the configured number of maximum redirections, a
TooManyRedirects exception is raised.
All exceptions that Niquests explicitly raises inherit from
niquests.exceptions.RequestException.
HTTP/3 over QUIC¶
Niquests relies on urllib3.future that relies on the qh3 package. The underlying package may or may not be installed on your environment.
If it is not present, no HTTP/3 or QUIC support will be present.
If you uninstall the qh3 package it disable the support for HTTP/3 without breaking anything. On the overhand, installing it manually (may require compilation toolchain) will bring its support.
Find a quick way to know if your environment is capable of emitting HTTP/3 requests by:
>>> from niquests import get
>>> r = get("https://1.1.1.1")
>>> r
<Response HTTP/2 [200]>
>>> r = get("https://1.1.1.1")
>>> r
<Response HTTP/3 [200]>
The underlying library natively understand the Alt-Svc header and is constantly looking for the h3
alternative service. Once it finds it, and is deemed valid, it opens up a QUIC connection to the target.
It is saved in-memory by Niquests.
You may also run the following command python -m niquests.help to find out if you support HTTP/3.
In 98 percents of the case, the answer is yes!
Note
Since urllib3.future version 2.4+ we support negotiating HTTP/3 without a first TCP connection if the remote peer indicated in a HTTPS (DNS) record that the server support HTTP/3.
Multiplexed Connection¶
Starting from Niquests 3.2 you can issue concurrent requests without having multiple connections. It can leverage multiplexing when your remote peer support either HTTP/2, or HTTP/3.
The only thing you will ever have to do to get started is to specify multiplexed=True from
within your Session constructor.
Any Response returned by get, post, put, etc… will be a lazy instance of Response.
Note
An important note about using Session(multiplexed=True) is that, in order to be efficient
and actually leverage its perks, you will have to issue multiple concurrent request before
actually trying to access any Response methods or attributes.
Modern browsers like Firefox, and Chrome utilize something really like multiplexed=True mode!
It’s a bit like if we have a controlled concurrent environment.
Gather responses¶
Emitting concurrent requests and loading them via Session.gather():
from niquests import Session
from time import time
s = Session(multiplexed=True)
before = time()
responses = []
responses.append(
s.get("https://httpbingo.org/delay/3")
)
responses.append(
s.get("https://httpbingo.org/delay/1")
)
s.gather()
print(f"waited {time() - before} second(s)") # will print 3s
from niquests import AsyncSession
from time import time
s = AsyncSession(multiplexed=True)
before = time()
responses = []
responses.append(
await s.get("https://httpbingo.org/delay/3")
)
responses.append(
await s.get("https://httpbingo.org/delay/1")
)
await s.gather()
print(f"waited {time() - before} second(s)") # will print 3s
Direct Access¶
Emitting concurrent requests and loading them via direct access:
from niquests import Session
from time import time
s = Session(multiplexed=True)
before = time()
responses = []
responses.append(
s.get("https://httpbingo.org/delay/3")
)
responses.append(
s.get("https://httpbingo.org/delay/1")
)
# internally call gather with self (Response)
print(responses[0].status_code) # 200! :! Hidden call to s.gather(responses[0])
print(responses[1].status_code) # 200!
print(f"waited {time() - before} second(s)") # will print 3s
The possible algorithms are actually nearly limitless, and you may arrange/write you own scheduling technics!
Session Gather¶
The Session instance expose a method called gather(*responses, max_fetch = None), you may call it to
improve the efficiency of resolving your _lazy_ responses.
Here are the possible outcome of invocation:
s.gather() # resolve all pending "lazy" responses
s.gather(resp) # resolve given "resp" only
s.gather(max_fetch=2) # resolve two responses (the first two that come)
s.gather(resp_a, resp_b, resp_c) # resolve all three
s.gather(resp_a, resp_b, resp_c, max_fetch=1) # only resolve the first one
await s.gather() # resolve all pending "lazy" responses
await s.gather(resp) # resolve given "resp" only
await s.gather(max_fetch=2) # resolve two responses (the first two that come)
await s.gather(resp_a, resp_b, resp_c) # resolve all three
await s.gather(resp_a, resp_b, resp_c, max_fetch=1) # only resolve the first one
Note
Call to s.gather is optional, you can access at will the responses properties and methods at any time.
Async session¶
You may have a program that require awaitable HTTP request. You are in luck as Niquests ships with
an implementation of Session that support async.
All known methods remain the same at the sole difference that it return a coroutine.
Here is a basic example:
import asyncio
from niquests import AsyncSession, Response
async def fetch(url: str) -> Response:
async with AsyncSession() as s:
return await s.get(url)
async def main() -> None:
tasks = []
for _ in range(10):
tasks.append(asyncio.create_task(fetch("https://httpbingo.org/delay/1")))
responses = await asyncio.gather(*tasks)
print(responses)
if __name__ == "__main__":
asyncio.run(main())
Warning
For the time being Niquests only support asyncio as the backend library for async. Contributions are welcomed if you want it to be compatible with anyio for example.
Note
Shortcut functions get, post, …, from the top-level package does not support async.
Async and Multiplex¶
You can leverage a multiplexed connection while in an async context! It’s the perfect solution while dealing with two or more hosts that support HTTP/2 onward.
Look at this basic sample:
import asyncio
from niquests import AsyncSession, Response
async def fetch(url: str) -> list[Response]:
responses = []
async with AsyncSession(multiplexed=True) as s:
for _ in range(10):
responses.append(await s.get(url))
await s.gather()
return responses
async def main() -> None:
tasks = []
for _ in range(10):
tasks.append(asyncio.create_task(fetch("https://httpbingo.org/delay/1")))
responses_responses = await asyncio.gather(*tasks)
responses = [item for sublist in responses_responses for item in sublist]
print(responses)
if __name__ == "__main__":
asyncio.run(main())
Warning
Combining AsyncSession with multiplexed=True and passing stream=True produces AsyncResponse, make sure to call await session.gather() before trying to access directly the lazy instance of response.
AsyncResponse for streams¶
Delaying the content consumption in an async context can be easily achieved using:
import niquests
import asyncio
async def main() -> None:
async with niquests.AsyncSession() as s:
r = await s.get("https://httpbingo.org/get", stream=True)
async for chunk in await r.iter_content(16):
print(chunk)
if __name__ == "__main__":
asyncio.run(main())
Or using the iter_line method as such:
import niquests
import asyncio
async def main() -> None:
async with niquests.AsyncSession() as s:
r = await s.get("https://httpbingo.org/get", stream=True)
async for chunk in r.iter_line():
print(chunk)
if __name__ == "__main__":
asyncio.run(main())
Or simply by doing:
import niquests
import asyncio
async def main() -> None:
async with niquests.AsyncSession() as s:
r = await s.get("https://httpbingo.org/get", stream=True)
payload = await r.json()
if __name__ == "__main__":
asyncio.run(main())
When you specify stream=True within a AsyncSession, the returned object will be of type AsyncResponse.
So that the following methods and properties will be coroutines (aka. awaitable):
iter_content(…)
iter_lines(…)
content
json(…)
text(…)
close()
When enabling multiplexing while in an async context, you will have to issue a call to await s.gather()
to avoid blocking your event loop.
Here is a basic example of how you would do it:
import niquests
import asyncio
async def main() -> None:
responses = []
async with niquests.AsyncSession(multiplexed=True) as s:
responses.append(
await s.get("https://httpbingo.org/get", stream=True)
)
responses.append(
await s.get("https://httpbingo.org/get", stream=True)
)
print(responses)
await s.gather()
print(responses)
for response in responses:
async for chunk in await response.iter_content(16):
print(chunk)
if __name__ == "__main__":
asyncio.run(main())
Warning
Accessing (non awaitable attribute or method) of a lazy AsyncResponse without a call to s.gather() will raise an error.
Scale your Session / Pool¶
By default, Niquests allow, concurrently 10 hosts, and 10 connections per host. You can at your own discretion increase or decrease the values.
To do so, you are invited to set the following parameters within a Session constructor:
Session(pool_connections=10, pool_maxsize=10)
pool_connections means the number of host target (or pool of connections if you prefer).
pool_maxsize means the maximum of concurrent connexion per host target/pool.
Tip
Due to the multiplexed aspect of both HTTP/2, and HTTP/3 you can issue, usually, more than 200 requests per connection without ever needing to create another one.
Note
This setting is most useful for multi-threading/tasks application.
Pool Connections¶
Setting pool_connections=2 will keep the connection to host-b.tld and host-c.tld.
host-a.tld will be silently discarded.
import niquests
with niquests.Session(pool_connections=2) as s:
s.get("https://host-a.tld/some")
s.get("https://host-b.tld/some")
s.get("https://host-c.tld/some")
import niquests
async with niquests.AsyncSession(pool_connections=2) as s:
await s.get("https://host-a.tld/some")
await s.get("https://host-b.tld/some")
await s.get("https://host-c.tld/some")
Attention
Unfortunately, due to backward compatibility issues, those settings applies PER SCHEME.
pool_connections=2 will allow up to 2 HTTP (unencrypted) and 2 HTTPS (encrypted)
connections. Meaning that you can still get 4 hosts being kept alive.
Pool Maxsize¶
Setting pool_maxsize=2 will allow up to 2 connection to host-a.tld.
This settings is only useful in a concurrent environment. Either async or threaded.
DNS Resolution¶
Niquests has a built-in support for DNS over HTTPS, DNS over TLS, DNS over UDP, and DNS over QUIC. Thanks to our built-in system trust store access, you don’t have to worry one bit about certificates validation.
This feature is based on the native implementation brought to you by the awesome urllib3.future. Once you have specified a custom resolver (e.g. not the system default), you will automatically be protected with DNSSEC in additions to specifics security perks on chosen protocol.
Specify your own resolver¶
In order to specify a resolver, you have to use a Session. Each Session can have a different resolver.
Here is a basic example that leverage Google public DNS over HTTPS.
from niquests import Session
with Session(resolver="doh+google://") as s:
resp = s.get("https://httpbingo.org/get")
from niquests import AsyncSession
async with AsyncSession(resolver="doh+google://") as s:
resp = await s.get("https://httpbingo.org/get")
Here, the domain name (httpbingo.org) will be resolved using the provided DNS provider (e.g. Google public and encrypted DNS).
Note
By default, Niquests still use the good old, often insecure, system DNS.
Use multiple resolvers¶
You may specify a list of resolvers to be tested in presented order.
from niquests import Session
with Session(resolver=["doh+google://", "doh://cloudflare-dns.com"]) as s:
resp = s.get("https://httpbingo.org/get")
from niquests import AsyncSession
async with AsyncSession(resolver=["doh+google://", "doh://cloudflare-dns.com"]) as s:
resp = await s.get("https://httpbingo.org/get")
The second entry doh://cloudflare-dns.com will only be tested if doh+google:// failed to provide a usable answer.
Note
In a multi-threaded context, both resolvers are going to be used in order to improve performance.
Supported DNS url¶
Niquests support a wide range of DNS protocols. Here are a few examples:
"doh+google://" # shortcut url for Google DNS over HTTPS
"dot+google://" # shortcut url for Google DNS over TLS
"doh+cloudflare://" # shortcut url for Cloudflare DNS over HTTPS
"doq+adguard://" # shortcut url for Adguard DNS over QUIC
"dou://1.1.1.1" # url for DNS over UDP (Plain resolver)
"dou://1.1.1.1:8853" # url for DNS over UDP using port 8853 (Plain resolver)
"doh://my-resolver.tld" # url for DNS over HTTPS using server my-resolver.tld
Note
Learn more by looking at the urllib3.future documentation: https://urllib3future.readthedocs.io/en/latest/advanced-usage.html#using-a-custom-dns-resolver
Set DNS via environment¶
You can set the NIQUESTS_DNS_URL environment variable with desired resolver, it will be
used in every Session that does not manually specify a resolver.
Example:
export NIQUESTS_DNS_URL="doh://google.dns"
Disable DNS certificate verification¶
Simply add verify=false into your DNS url to pursue.
from niquests import Session
with Session(resolver="doh+google://default/?verify=false") as s:
resp = s.get("https://httpbingo.org/get")
from niquests import AsyncSession
async with AsyncSession(resolver="doh+google://default/?verify=false") as s:
resp = await s.get("https://httpbingo.org/get")
Warning
Doing a s.get("https://httpbingo.org/get", verify=False) does not impact the resolver.
Timeouts¶
You may set a specific timeout for domain name resolution by appending ?timeout=1 to the resolver configuration.
from niquests import Session
with Session(resolver="doh+google://default/?timeout=1") as s:
resp = s.get("https://httpbingo.org/get")
from niquests import AsyncSession
async with AsyncSession(resolver="doh+google://default/?timeout=1") as s:
resp = await s.get("https://httpbingo.org/get")
This will prevent any DNS resolution that last longer to a second.
Happy Eyeballs¶
New in version 3.5.5.
Thanks to the underlying library (urllib3.future) we are able to serve the Happy Eyeballs feature, one toggle away.
Happy Eyeballs (also called Fast Fallback) is an algorithm published by the IETF that makes dual-stack applications (those that understand both IPv4 and IPv6) more responsive to users by attempting to connect using both IPv4 and IPv6 at the same time (preferring IPv6), thus minimizing common problems experienced by users with imperfect IPv6 connections or setups.
The name “happy eyeballs” derives from the term “eyeball” to describe endpoints which represent human Internet end-users, as opposed to servers.
import niquests
with niquests.Session(happy_eyeballs=True) as s:
...
import niquests
async with niquests.AsyncSession(happy_eyeballs=True) as s:
...
A mere happy_eyeballs=True is sufficient to leverage its potential.
Note
In case a server yield multiple IPv4 addresses but no IPv6, this still applies. Meaning that Niquests will connect concurrently to presented addresses and determine what is the fastest endpoint.
Note
Like urllib3.future, you can pass an integer to increase the default number of concurrent connection to be tested. See https://urllib3future.readthedocs.io/en/latest/advanced-usage.html#happy-eyeballs to learn more.
OCSP requests (certificate revocation checks) will follow given happy_eyeballs=True parameter.
Warning
This feature is disabled by default and we are actually planning to make it enabled as the default in a future major.
WebSockets¶
New in version 3.9: Requires to install an extra. pip install niquests[ws].
It is undeniable that WebSockets are a vital part of the web ecosystem along with HTTP. We noticed that most users met frictions when trying to deal with a WebSocket server for the first time, that is why we decided to expand Niquests capabilities to automatically handle WebSockets for you.
Quick start¶
In the following example, we will explore how to interact with a basic, but well known echo server.
from niquests import Session
with Session() as s:
resp = s.get(
"wss://echo.websocket.org",
)
print(resp.status_code) # it says "101", for "Switching Protocol"
print(resp.extension.next_payload()) # unpack the next message from server
resp.extension.send_payload("Hello World") # automatically sends a text message to the server
print(resp.extension.next_payload() == "Hello World") # output True!
resp.extension.close() # don't forget this call to release the connection!
from niquests import AsyncSession
import asyncio
async def main() -> None:
async with AsyncSession() as s:
resp = await s.get("wss://echo.websocket.org")
# ...
print(await resp.extension.next_payload()) # unpack the next message from server
await resp.extension.send_payload("Hello World") # automatically sends a text message to the server
print((await resp.extension.next_payload()) == "Hello World") # output True!
await resp.extension.close()
Warning
Without the extra installed, you will get an exception that indicate that the scheme is unsupported.
Note
Historically, Requests only accepted http:// and https:// as schemes. But now, you may use wss:// for WebSocket Secure or ws:// for WebSocket over PlainText.
Warning
Be careful when accessing resp.extension, if anything goes wrong in the “establishment” phase, meaning the server denies us the WebSocket upgrade, it will be worth None.
WebSocket and HTTP/2+¶
By default, Niquests negotiate WebSocket over HTTP/1.1 but it is well capable of doing so over HTTP/2 and HTTP/3 following RFC8441.
But rare are the servers capable of bootstrapping WebSocket over a multiplexed connection. There’s a little tweak to the URL
so that it can infer your desire to use a modern protocol, like so wss+rfc8441://echo.websocket.org.
Warning
echo.websocket.org don’t support WebSocket over HTTP/2.
Ping and Pong¶
Ping sent by a server are automatically handled/answered by Niquests each time to read from the socket with next_payload(). However, we do not send automatically Ping TO the server.
from niquests import Session
with Session() as s:
resp = s.get(
"wss://echo.websocket.org",
)
resp.extension.ping() # send a ping to the websocket server, notify it that you're still there!
from niquests import AsyncSession
async with AsyncSession() as s:
resp = await s.get(
"wss://echo.websocket.org",
)
await resp.extension.ping() # send a ping to the websocket server, notify it that you're still there!
You can use the elementary methods provided by Niquests to construct your own logic.
Binary and Text Messages¶
You may use next_payload() and send_payload(...) with str or bytes.
If next_payload() output bytes, then it is a BinaryMessage.
If next_payload() output str, then it is a TextMessage.
The same apply to send_payload(...), if passed item is str, then we send a TextMessage.
Otherwise, it will be a BinaryMessage.
Warning
Niquests does not buffer “incomplete” message (e.g. end marker for a message). It returns every chunk received as is.
Note
If next_payload() returns None, that means that the remote choose to close the connection.
Others¶
Every other features still applies with WebSocket, like proxies, happy eyeballs, thread/task safety, etc… See relevant docs for more.
Example with Concurrency¶
In the following example, we will see how to communicate with a WebSocket server that echo what we send to him. We will use a Thread for the reads and the main thread for write operations.
from __future__ import annotations
from niquests import Session, Response, ReadTimeout
from threading import Thread
from time import sleep
def pull_message_from_server(my_response: Response) -> None:
"""Read messages here."""
iteration_counter = 0
while my_response.extension.closed is False:
try:
# will block for 1s top
message = my_response.extension.next_payload()
if message is None: # server just closed the connection. exit.
print("received goaway from server")
return
print(f"received message: '{message}'")
except ReadTimeout: # if no message received within 1s
pass
sleep(1) # let some time for the write part to acquire the lock
iteration_counter += 1
# send a ping every four iteration
if iteration_counter % 4 == 0:
my_response.extension.ping()
print("ping sent")
if __name__ == "__main__":
with Session() as s:
# connect to websocket server "echo.websocket.org" with timeout of 1s (both read and connect)
resp = s.get("wss://echo.websocket.org", timeout=1)
if resp.status_code != 101:
exit(1)
t = Thread(target=pull_message_from_server, args=(resp,))
t.start()
# send messages here
for i in range(30):
to_send = f"Hello World {i}"
resp.extension.send_payload(to_send)
print(f"sent message: '{to_send}'")
sleep(1) # let some time for the read part to acquire the lock
# exit gently!
resp.extension.close()
# wait for thread proper exit.
t.join()
print("program ended!")
Warning
The sleep serve the purpose to relax the lock on either the read or write side, so that one would not block the other forever.
import asyncio
from niquests import AsyncSession, ReadTimeout, Response
async def read_from_ws(my_response: Response) -> None:
iteration_counter = 0
while my_response.extension.closed is False:
try:
# will block for 1s top
message = await my_response.extension.next_payload()
if message is None: # server just closed the connection. exit.
print("received goaway from server")
return
print(f"received message: '{message}'")
except ReadTimeout: # if no message received within 1s
pass
await asyncio.sleep(1) # let some time for the write part to acquire the lock
iteration_counter += 1
# send a ping every four iteration
if iteration_counter % 4 == 0:
await my_response.extension.ping()
print("ping sent")
async def main() -> None:
async with AsyncSession() as s:
resp = await s.get("wss://echo.websocket.org", timeout=1)
print(resp)
task = asyncio.create_task(read_from_ws(resp))
for i in range(30):
to_send = f"Hello World {i}"
await resp.extension.send_payload(to_send)
print(f"sent message: '{to_send}'")
await asyncio.sleep(1) # let some time for the read part to acquire the lock
# exit gently!
await resp.extension.close()
await task
if __name__ == "__main__":
asyncio.run(main())
Note
The given example are really basic ones. You may adjust at will the settings and algorithm to match your requisites.
Server Sent Event (SSE)¶
New in version 3.11.2.
Server sent event or widely known with its acronym SSE is a extremely popular method to stream continuously event from the server to the client in real time.
Before this built-in feature, most way to leverage this were to induce a bit of hacks into your http client.
Starting example¶
Thanks to urllib3-future native SSE extension, we can effortlessly manage a stream of event. Here is a really basic example of how to proceed.
import niquests
if __name__ == "__main__":
r = niquests.post("sse://httpbingo.org/sse")
print(r) # output: <Response HTTP/2 [200]>
while r.extension.closed is False:
event: niquests.ServerSentEvent = r.extension.next_payload() # ServerSentEvent(event='ping', data='{"id":0,"timestamp":1732857000473}')
import niquests
import asyncio
async def main() -> None:
async with niquests.AsyncSession() as s:
r = await s.post("sse://httpbingo.org/sse")
print(r) # output: <Response HTTP/2 [200]>
while r.extension.closed is False:
print(await r.extension.next_payload()) # ServerSentEvent(event='ping', data='{"id":0,"timestamp":1732857000473}')
if __name__ == "__main__":
asyncio.run(main())
We purposely set the scheme to sse:// to indicate our intent to consume a SSE endpoint.
Note
sse:// is using https:// under the hood. To avoid using an encrypted connection, use psse:// instead.
You will notice that the program is similar to our WebSocket implementation. Excepted that the next_payload()
method returns by default a ServerSentEvent object.
Extracting raw event¶
In the case where your server weren’t compliant to the defined web standard for SSE (e.g. add custom field/line style)
you can extract a str instead of a ServerSentEvent object by passing raw=True into our next_payload()
method.
As such:
while r.extension.closed is False:
print(r.extension.next_payload(raw=True)) # "event: ping\ndata: {"id":9,"timestamp":1732857471733}\n\n"
Warning
As with WebSocket, next_payload method may return None if the server terminate the stream.
Interrupt the stream¶
A server may send event forever. And to avoid the awkward situation where your client receive unsolicited data you should at all time close the SSE extension to notify the remote peer about your intent to stop.
For example, the following test server send events until you say to stop: sse://sse.dev/test
See how to stop cleanly the flow of events:
import niquests
if __name__ == "__main__":
r = niquests.post("sse://sse.dev/test")
events = []
while r.extension.closed is False:
event = r.extension.next_payload()
if event is None: # the remote peer closed it himself
break
events.append(event) # add the event to list
if len(events) >= 10: # close ourselves SSE stream & notify remote peer.
r.extension.close()
import niquests
import asyncio
async def main() -> None:
async with niquests.AsyncSession() as s:
r = await s.post("sse://sse.dev/test")
events = []
while r.extension.closed is False:
event = await r.extension.next_payload()
if event is None: # the remote peer closed it himself
break
events.append(event) # add the event to list
if len(events) >= 10: # close ourselves SSE stream & notify remote peer.
await r.extension.close()
if __name__ == "__main__":
asyncio.run(main())
ServerSentEvent¶
Note
A ServerSentEvent object is returned by default with the next_payload() method. Or None if the server terminate the flow of events.
It’s a parsed SSE (single event). The object have nice shortcuts like:
payload.json()(any) to automatically unserialize passed json data.payload.id(str)payload.data(str) for the raw message payloadpayload.event(str) for the event type (e.g. message, ping, etc…)payload.retry(int)
The full class source is located at https://github.com/jawah/urllib3.future/blob/3d7c5d9446880a8d473b9be4db0bcd419fb32dee/src/urllib3/contrib/webextensions/sse.py#L14
Notes¶
SSE can be reached from HTTP/1, HTTP/2 or HTTP/3 at will. Niquests makes this very easy. Moreover every features like proxies, happy-eyeballs, hooks, etc.. can be used as you always did.
Unix Socket¶
New in version 3.17.0.
Warning
Only on Linux/Unix systems. Unix sockets can only implement HTTP/1, and HTTP/2 (h2c).
Niquests natively supports connecting to services via Unix domain sockets. This is particularly useful for communicating with local services like Docker, databases, or any application exposing a Unix socket API (docker don’t).
Basic Usage¶
To connect via a Unix socket, use the http+unix:// scheme with the URL-encoded socket path:
from niquests import Session
with Session() as s:
# %2F is the URL-encoded forward slash
response = s.get("http+unix://%2Fvar%2Frun%2Fdocker.sock/version")
print(response.json())
from niquests import AsyncSession
async with AsyncSession() as s:
response = await s.get("http+unix://%2Fvar%2Frun%2Fdocker.sock/version")
print(response.json())
Tip
You can also use the base_url parameter in Session to avoid writing http+unix://%2Fvar%2Frun%2Fdocker.sock/ over and over again.
Warning
To speak with a h2c (HTTP/2 over cleartext) unix socket you will have to disable HTTP/1 first via Session(disable_http1=True). Not many services support that.
URL Format¶
The Unix socket URL follows this pattern:
http+unix://<url-encoded-socket-path>/<api-path>
For example, to access /var/run/docker.sock with path /version:
Socket path:
/var/run/docker.sockURL-encoded:
%2Fvar%2Frun%2Fdocker.sockFull URL:
http+unix://%2Fvar%2Frun%2Fdocker.sock/version
Tip
Use urllib.parse.quote(path, safe='') to URL-encode socket paths programmatically.
Concurrent Connections¶
Unix sockets support multiple concurrent connections, just like TCP sockets:
import asyncio
from niquests import AsyncSession
async def main():
async with AsyncSession() as s:
endpoints = ["/containers/json", "/images/json", "/version", "/info"]
tasks = [
s.get(f"http+unix://%2Fvar%2Frun%2Fdocker.sock{ep}")
for ep in endpoints
]
responses = await asyncio.gather(*tasks)
print(responses)
asyncio.run(main())
Note
You can also leverage a thread pool executor in a sync context as you always did with http.
WebSocket and SSE¶
New in version 3.18.0.
Niquests powerful architecture permit to easily boot WS/SSE extension as you were using “normal” http.
import niquests
if __name__ == "__main__":
with niquests.Session() as s:
r = s.get("psse+unix://%2Ftmp%2Fhello.sock/sse")
while not r.extension.closed:
print(r.extension.next_payload())
Note
Take a good look at the scheme, instead of http+unix:// it’s psse+unix:// to let know about your intention of automatically start response.extension.
You can leverage psse+unix:// for the SSE extension and ws+unix:// for the Websocket one. To learn more about WS
and SSE handling, see the dedicated section about it.
Warning
ws+unix:// requires you to have the ws extra installed.
WSGI/ASGI Application Testing¶
New in version 3.17.0.
Niquests provides built-in adapters for testing WSGI and ASGI applications directly without starting a server. This is particularly useful for integration testing.
Warning
This feature silently ignore fine tuning parameters like “http version enable/disable”, “pool sizing”, “multiplexing”, … that are only meant for true HTTP connections.
ASGI Applications (Async)¶
Test your FastAPI, Starlette, or other ASGI applications directly:
from fastapi import FastAPI, Request
app = FastAPI()
@app.get("/hello")
async def hello(request: Request):
return {"message": "hello from asgi"}
@app.api_route("/echo", methods=["GET", "POST"])
async def echo(request: Request):
body = await request.body()
return {"body": body.decode()}
Basic usage:
import asyncio
from niquests import AsyncSession
async def main():
async with AsyncSession(app=app) as s:
resp = await s.get("/hello?foo=bar")
print(resp.status_code) # 200
print(resp.json()) # {"message": "hello from asgi"}
asyncio.run(main())
Streaming responses:
async def main():
async with AsyncSession(app=app) as s:
resp = await s.post("/echo", data=b"foobar", stream=True)
body = b""
async for chunk in await resp.iter_content(6):
body += chunk
print(body)
asyncio.run(main())
WebSocket and SSE:
New in version 3.18.0.
WebSocket and Server-Sent Events work with ASGI applications using the exact same interfaces as the main HTTP part.
Use wss:// (or ws://) for WebSocket and sse:// (or psse://) for SSE, just like you would with a live server.
from fastapi import FastAPI, WebSocket
from starlette.responses import StreamingResponse
app = FastAPI()
@app.websocket("/ws-echo")
async def ws_echo(websocket: WebSocket):
await websocket.accept()
while True:
data = await websocket.receive_text()
await websocket.send_text(f"echo: {data}")
@app.get("/sse-events")
async def sse_events():
async def generate():
for i in range(3):
yield f"event: message\ndata: event {i}\n\n"
return StreamingResponse(generate(), media_type="text/event-stream")
WebSocket:
from niquests import Session
with Session(app=app) as s:
resp = s.get("wss://default/ws-echo")
resp.extension.send_payload("Hello")
print(resp.extension.next_payload()) # "echo: Hello"
resp.extension.close()
SSE:
from niquests import Session
with Session(app=app) as s:
resp = s.get("sse://default/sse-events")
while not resp.extension.closed:
event = resp.extension.next_payload()
if event is None:
break
print(event) # ServerSentEvent(event='message', data='event 0')
import asyncio
from niquests import AsyncSession
async def main():
async with AsyncSession(app=app) as s:
# WebSocket
resp = await s.get("wss://default/ws-echo")
await resp.extension.send_payload("Hello")
print(await resp.extension.next_payload()) # "echo: Hello"
await resp.extension.close()
# SSE
resp = await s.get("sse://default/sse-events")
while not resp.extension.closed:
event = await resp.extension.next_payload()
if event is None:
break
print(event)
asyncio.run(main())
Note
You can also use an ASGI app within a synchronous Session at the cost of loosing streaming responses. And in the sync version, lifespan events (startup, shutdown) are handled automatically.
WSGI Applications (Sync)¶
Test your Flask, Django, or other WSGI applications:
from flask import Flask, request, jsonify
app = Flask(__name__)
@app.route("/hello")
def hello():
return jsonify({"message": "hello from wsgi"})
@app.route("/echo", methods=["GET", "POST"])
def echo():
return jsonify({"body": request.get_data(as_text=True)})
Basic usage:
from niquests import Session
with Session(app=app) as s:
resp = s.get("/hello?foo=bar")
print(resp.status_code) # 200
print(resp.json()) # {"message": "hello from wsgi"}
Streaming responses:
with Session(app=app) as s:
resp = s.post("/echo", data=b"foobar", stream=True)
print(resp.json())
for chunk in resp.iter_content(6):
...
Server-Sent Events:
New in version 3.18.0.
SSE works with WSGI applications using the same interface as the main HTTP part.
from flask import Flask, Response
app = Flask(__name__)
@app.route("/sse-events")
def sse_events():
def generate():
for i in range(3):
yield f"event: message\ndata: event {i}\n\n"
return Response(generate(), mimetype="text/event-stream")
from niquests import Session
with Session(app=app) as s:
resp = s.get("sse://default/sse-events")
while not resp.extension.closed:
event = resp.extension.next_payload()
if event is None:
break
print(event) # ServerSentEvent(event='message', data='event 0')
Warning
WebSocket is not supported over WSGI as it is a request/response-only protocol. Use an ASGI application for WebSocket testing.
Running in the Browser (Pyodide)¶
New in version 3.18.0.
Niquests runs natively in Pyodide with zero configuration changes.
Your existing code: HTTP requests, WebSocket, and SSE works identically using Session, or AsyncSession,
and resp.extension. The adapter is selected automatically when Pyodide is detected.
Warning
The sync interfaces requires a JSPI capable browser or Node interpreter. Modern build of Firefox, Chrome and Node support it.
# This exact code works in both CPython and Pyodide:
import niquests
resp = niquests.get("https://httpbingo.org/get")
print(resp.json())
# This exact code works in both CPython and Pyodide:
import niquests
resp = await niquests.aget("https://httpbingo.org/get")
print(resp.json())
Note
Even if Niquests exposes synchronous interfaces for HTTP request, you should always prefer async/await. Browsers are async first-class. You may face undesirable performance bottlenecks.
WebSocket and SSE use the same API as described in the sections above:
from niquests import Session
with Session() as s:
# Behind the scenes, uses the browser's native WebSocket API
resp = s.get("wss://echo.websocket.org")
resp.extension.send_payload("Hello")
print(resp.extension.next_payload())
resp.extension.close()
# Behind the scenes, uses fetch streaming under the hood
resp = s.get("sse://some-server.example/events")
while not resp.extension.closed:
event = resp.extension.next_payload()
if event is None:
break
print(event)
from niquests import AsyncSession
async with AsyncSession() as s:
# Behind the scenes, uses the browser's native WebSocket API
resp = await s.get("wss://echo.websocket.org")
await resp.extension.send_payload("Hello")
print(await resp.extension.next_payload())
await resp.extension.close()
# Behind the scenes, uses fetch streaming under the hood
resp = await s.get("sse://some-server.example/events")
while not resp.extension.closed:
event = await resp.extension.next_payload()
if event is None:
break
print(event)
What the browser controls¶
Under Pyodide, the browser’s network stack handles the actual connections. This means a few things behave differently from your usual Python! There are not limitations of Niquests, but inherent properties of the browser sandbox:
DNS resolution is handled by the browser. Custom resolvers, DNS-over-HTTPS, Protocol toggles, and so on have no effect.
TLS is handled by the browser. The
verifyandcertparameters are ignored, the browser uses its own certificate store. And unfortunately, by direct impact,response.conn_infois unset.CORS applies. The remote server must include the appropriate
Access-Control-Allow-Originheaders, or the browser will block the request.response.http_version is None. The browser does not expose which HTTP protocol (HTTP/1.1, h2, h3) was negotiated easily.
Certain headers just cannot be set. The browser forbids overriding
Host,Origin,Cookie,Connection, and other forbidden headers.No HTTP+Unix sockets. Unix domain sockets are not available in the browser environment.
Pool sizing, HTTP version toggles, and multiplexing settings are silently ignored, the browser manages its own connection pool.
No redirection history accessible through the heavy browser sandbox.
Disable autoredirect isn’t supported due to above. (intermediaries requests are ‘opaque’, so we can’t build a response object with them).
Proxies aren’t allowed because, well(…) the browser need to be configured, and those settings are nowhere near accessible through WASM/JS.
pre_send and early_response hooks are silently ignored.
extras such as socks, ocsp, speedups, zstd can’t be installed/aren’t used in WASM.
That’s a lot of thing missing! But you are now informed.
Scheme mapping¶
The scheme prefixes work exactly as elsewhere:
sse://maps tohttps://for SSEpsse://maps tohttp://for plaintext SSEwss://andws://use the browser’s native WebSocket
Ready for more? Check out the advanced section.