257 lines
7.9 KiB
Python
257 lines
7.9 KiB
Python
|
from __future__ import annotations
|
||
|
|
||
|
import io
|
||
|
import typing
|
||
|
from base64 import b64encode
|
||
|
from enum import Enum
|
||
|
|
||
|
from ..exceptions import UnrewindableBodyError
|
||
|
from .util import to_bytes
|
||
|
|
||
|
if typing.TYPE_CHECKING:
|
||
|
from typing import Final
|
||
|
|
||
|
# Pass as a value within ``headers`` to skip
|
||
|
# emitting some HTTP headers that are added automatically.
|
||
|
# The only headers that are supported are ``Accept-Encoding``,
|
||
|
# ``Host``, and ``User-Agent``.
|
||
|
SKIP_HEADER = "@@@SKIP_HEADER@@@"
|
||
|
SKIPPABLE_HEADERS = frozenset(["accept-encoding", "host", "user-agent"])
|
||
|
|
||
|
ACCEPT_ENCODING = "gzip,deflate"
|
||
|
try:
|
||
|
try:
|
||
|
import brotlicffi as _unused_module_brotli # type: ignore[import] # noqa: F401
|
||
|
except ImportError:
|
||
|
import brotli as _unused_module_brotli # type: ignore[import] # noqa: F401
|
||
|
except ImportError:
|
||
|
pass
|
||
|
else:
|
||
|
ACCEPT_ENCODING += ",br"
|
||
|
try:
|
||
|
import zstandard as _unused_module_zstd # type: ignore[import] # noqa: F401
|
||
|
except ImportError:
|
||
|
pass
|
||
|
else:
|
||
|
ACCEPT_ENCODING += ",zstd"
|
||
|
|
||
|
|
||
|
class _TYPE_FAILEDTELL(Enum):
|
||
|
token = 0
|
||
|
|
||
|
|
||
|
_FAILEDTELL: Final[_TYPE_FAILEDTELL] = _TYPE_FAILEDTELL.token
|
||
|
|
||
|
_TYPE_BODY_POSITION = typing.Union[int, _TYPE_FAILEDTELL]
|
||
|
|
||
|
# When sending a request with these methods we aren't expecting
|
||
|
# a body so don't need to set an explicit 'Content-Length: 0'
|
||
|
# The reason we do this in the negative instead of tracking methods
|
||
|
# which 'should' have a body is because unknown methods should be
|
||
|
# treated as if they were 'POST' which *does* expect a body.
|
||
|
_METHODS_NOT_EXPECTING_BODY = {"GET", "HEAD", "DELETE", "TRACE", "OPTIONS", "CONNECT"}
|
||
|
|
||
|
|
||
|
def make_headers(
|
||
|
keep_alive: bool | None = None,
|
||
|
accept_encoding: bool | list[str] | str | None = None,
|
||
|
user_agent: str | None = None,
|
||
|
basic_auth: str | None = None,
|
||
|
proxy_basic_auth: str | None = None,
|
||
|
disable_cache: bool | None = None,
|
||
|
) -> dict[str, str]:
|
||
|
"""
|
||
|
Shortcuts for generating request headers.
|
||
|
|
||
|
:param keep_alive:
|
||
|
If ``True``, adds 'connection: keep-alive' header.
|
||
|
|
||
|
:param accept_encoding:
|
||
|
Can be a boolean, list, or string.
|
||
|
``True`` translates to 'gzip,deflate'. If either the ``brotli`` or
|
||
|
``brotlicffi`` package is installed 'gzip,deflate,br' is used instead.
|
||
|
List will get joined by comma.
|
||
|
String will be used as provided.
|
||
|
|
||
|
:param user_agent:
|
||
|
String representing the user-agent you want, such as
|
||
|
"python-urllib3/0.6"
|
||
|
|
||
|
:param basic_auth:
|
||
|
Colon-separated username:password string for 'authorization: basic ...'
|
||
|
auth header.
|
||
|
|
||
|
:param proxy_basic_auth:
|
||
|
Colon-separated username:password string for 'proxy-authorization: basic ...'
|
||
|
auth header.
|
||
|
|
||
|
:param disable_cache:
|
||
|
If ``True``, adds 'cache-control: no-cache' header.
|
||
|
|
||
|
Example:
|
||
|
|
||
|
.. code-block:: python
|
||
|
|
||
|
import urllib3
|
||
|
|
||
|
print(urllib3.util.make_headers(keep_alive=True, user_agent="Batman/1.0"))
|
||
|
# {'connection': 'keep-alive', 'user-agent': 'Batman/1.0'}
|
||
|
print(urllib3.util.make_headers(accept_encoding=True))
|
||
|
# {'accept-encoding': 'gzip,deflate'}
|
||
|
"""
|
||
|
headers: dict[str, str] = {}
|
||
|
if accept_encoding:
|
||
|
if isinstance(accept_encoding, str):
|
||
|
pass
|
||
|
elif isinstance(accept_encoding, list):
|
||
|
accept_encoding = ",".join(accept_encoding)
|
||
|
else:
|
||
|
accept_encoding = ACCEPT_ENCODING
|
||
|
headers["accept-encoding"] = accept_encoding
|
||
|
|
||
|
if user_agent:
|
||
|
headers["user-agent"] = user_agent
|
||
|
|
||
|
if keep_alive:
|
||
|
headers["connection"] = "keep-alive"
|
||
|
|
||
|
if basic_auth:
|
||
|
headers[
|
||
|
"authorization"
|
||
|
] = f"Basic {b64encode(basic_auth.encode('latin-1')).decode()}"
|
||
|
|
||
|
if proxy_basic_auth:
|
||
|
headers[
|
||
|
"proxy-authorization"
|
||
|
] = f"Basic {b64encode(proxy_basic_auth.encode('latin-1')).decode()}"
|
||
|
|
||
|
if disable_cache:
|
||
|
headers["cache-control"] = "no-cache"
|
||
|
|
||
|
return headers
|
||
|
|
||
|
|
||
|
def set_file_position(
|
||
|
body: typing.Any, pos: _TYPE_BODY_POSITION | None
|
||
|
) -> _TYPE_BODY_POSITION | None:
|
||
|
"""
|
||
|
If a position is provided, move file to that point.
|
||
|
Otherwise, we'll attempt to record a position for future use.
|
||
|
"""
|
||
|
if pos is not None:
|
||
|
rewind_body(body, pos)
|
||
|
elif getattr(body, "tell", None) is not None:
|
||
|
try:
|
||
|
pos = body.tell()
|
||
|
except OSError:
|
||
|
# This differentiates from None, allowing us to catch
|
||
|
# a failed `tell()` later when trying to rewind the body.
|
||
|
pos = _FAILEDTELL
|
||
|
|
||
|
return pos
|
||
|
|
||
|
|
||
|
def rewind_body(body: typing.IO[typing.AnyStr], body_pos: _TYPE_BODY_POSITION) -> None:
|
||
|
"""
|
||
|
Attempt to rewind body to a certain position.
|
||
|
Primarily used for request redirects and retries.
|
||
|
|
||
|
:param body:
|
||
|
File-like object that supports seek.
|
||
|
|
||
|
:param int pos:
|
||
|
Position to seek to in file.
|
||
|
"""
|
||
|
body_seek = getattr(body, "seek", None)
|
||
|
if body_seek is not None and isinstance(body_pos, int):
|
||
|
try:
|
||
|
body_seek(body_pos)
|
||
|
except OSError as e:
|
||
|
raise UnrewindableBodyError(
|
||
|
"An error occurred when rewinding request body for redirect/retry."
|
||
|
) from e
|
||
|
elif body_pos is _FAILEDTELL:
|
||
|
raise UnrewindableBodyError(
|
||
|
"Unable to record file position for rewinding "
|
||
|
"request body during a redirect/retry."
|
||
|
)
|
||
|
else:
|
||
|
raise ValueError(
|
||
|
f"body_pos must be of type integer, instead it was {type(body_pos)}."
|
||
|
)
|
||
|
|
||
|
|
||
|
class ChunksAndContentLength(typing.NamedTuple):
|
||
|
chunks: typing.Iterable[bytes] | None
|
||
|
content_length: int | None
|
||
|
|
||
|
|
||
|
def body_to_chunks(
|
||
|
body: typing.Any | None, method: str, blocksize: int
|
||
|
) -> ChunksAndContentLength:
|
||
|
"""Takes the HTTP request method, body, and blocksize and
|
||
|
transforms them into an iterable of chunks to pass to
|
||
|
socket.sendall() and an optional 'Content-Length' header.
|
||
|
|
||
|
A 'Content-Length' of 'None' indicates the length of the body
|
||
|
can't be determined so should use 'Transfer-Encoding: chunked'
|
||
|
for framing instead.
|
||
|
"""
|
||
|
|
||
|
chunks: typing.Iterable[bytes] | None
|
||
|
content_length: int | None
|
||
|
|
||
|
# No body, we need to make a recommendation on 'Content-Length'
|
||
|
# based on whether that request method is expected to have
|
||
|
# a body or not.
|
||
|
if body is None:
|
||
|
chunks = None
|
||
|
if method.upper() not in _METHODS_NOT_EXPECTING_BODY:
|
||
|
content_length = 0
|
||
|
else:
|
||
|
content_length = None
|
||
|
|
||
|
# Bytes or strings become bytes
|
||
|
elif isinstance(body, (str, bytes)):
|
||
|
chunks = (to_bytes(body),)
|
||
|
content_length = len(chunks[0])
|
||
|
|
||
|
# File-like object, TODO: use seek() and tell() for length?
|
||
|
elif hasattr(body, "read"):
|
||
|
|
||
|
def chunk_readable() -> typing.Iterable[bytes]:
|
||
|
nonlocal body, blocksize
|
||
|
encode = isinstance(body, io.TextIOBase)
|
||
|
while True:
|
||
|
datablock = body.read(blocksize)
|
||
|
if not datablock:
|
||
|
break
|
||
|
if encode:
|
||
|
datablock = datablock.encode("iso-8859-1")
|
||
|
yield datablock
|
||
|
|
||
|
chunks = chunk_readable()
|
||
|
content_length = None
|
||
|
|
||
|
# Otherwise we need to start checking via duck-typing.
|
||
|
else:
|
||
|
try:
|
||
|
# Check if the body implements the buffer API.
|
||
|
mv = memoryview(body)
|
||
|
except TypeError:
|
||
|
try:
|
||
|
# Check if the body is an iterable
|
||
|
chunks = iter(body)
|
||
|
content_length = None
|
||
|
except TypeError:
|
||
|
raise TypeError(
|
||
|
f"'body' must be a bytes-like object, file-like "
|
||
|
f"object, or iterable. Instead was {body!r}"
|
||
|
) from None
|
||
|
else:
|
||
|
# Since it implements the buffer API can be passed directly to socket.sendall()
|
||
|
chunks = (body,)
|
||
|
content_length = mv.nbytes
|
||
|
|
||
|
return ChunksAndContentLength(chunks=chunks, content_length=content_length)
|