FAEA/venv/lib/python3.10/site-packages/curl_cffi/curl.py

691 lines
24 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from __future__ import annotations
import re
import locale
import struct
import sys
import warnings
from http.cookies import SimpleCookie
from pathlib import Path
from typing import TYPE_CHECKING, Any, Literal, Optional, Union, cast
import certifi
from ._wrapper import ffi, lib
from .const import CurlECode, CurlHttpVersion, CurlInfo, CurlOpt, CurlWsFlag
from .utils import CurlCffiWarning
DEFAULT_CACERT = certifi.where()
REASON_PHRASE_RE = re.compile(rb"HTTP/\d\.\d [0-9]{3} (.*)")
STATUS_LINE_RE = re.compile(rb"HTTP/(\d\.\d) ([0-9]{3}) (.*)")
if TYPE_CHECKING:
class CurlWsFrame:
age: int
flags: int
offset: int
bytesleft: int
len: int
class CurlError(Exception):
"""Base exception for curl_cffi package"""
def __init__(self, msg, code: Union[CurlECode, Literal[0]] = 0, *args, **kwargs):
super().__init__(msg, *args, **kwargs)
self.code: Union[CurlECode, Literal[0]] = code
CURLINFO_TEXT = 0
CURLINFO_HEADER_IN = 1
CURLINFO_HEADER_OUT = 2
CURLINFO_DATA_IN = 3
CURLINFO_DATA_OUT = 4
CURLINFO_SSL_DATA_IN = 5
CURLINFO_SSL_DATA_OUT = 6
CURL_WRITEFUNC_PAUSE = 0x10000001
CURL_WRITEFUNC_ERROR = 0xFFFFFFFF
@ffi.def_extern()
def debug_function(curl, type_: int, data, size: int, clientp) -> int:
"""ffi callback for curl debug info"""
callback = ffi.from_handle(clientp)
text = ffi.buffer(data, size)[:]
callback(type_, text)
return 0
def bytes_to_hex(b: bytes, uppercase: bool = False) -> str:
"""
Convert a bytes object to a space-separated hex string, e.g. "0a ff 3c".
If uppercase=True, letters will be AF instead of af.
"""
fmt = "{:02X}" if uppercase else "{:02x}"
return " ".join(fmt.format(byte) for byte in b)
def debug_function_default(type_: int, data: bytes) -> None:
PREFIXES = {
CURLINFO_TEXT: "*",
CURLINFO_HEADER_IN: "<",
CURLINFO_HEADER_OUT: ">",
CURLINFO_DATA_IN: "< DATA",
CURLINFO_DATA_OUT: "> DATA",
CURLINFO_SSL_DATA_IN: "< SSL",
CURLINFO_SSL_DATA_OUT: "> SSL",
}
MAX_SHOW_BYTES = 40
prefix = PREFIXES.get(type_, "*")
# always show ssl data in binary format
if type_ in (CURLINFO_SSL_DATA_IN, CURLINFO_SSL_DATA_OUT):
hex_str = bytes_to_hex(data[:MAX_SHOW_BYTES])
postfix = "" if len(data) <= MAX_SHOW_BYTES else "..."
sys.stderr.write(f"{prefix} [{len(data)} bytes]: {hex_str}{postfix}\n")
else:
try:
text = data.decode("utf-8")
sys.stderr.write(f"{prefix} {text}")
if type_ not in (CURLINFO_TEXT, CURLINFO_HEADER_IN, CURLINFO_HEADER_OUT):
sys.stderr.write("\n")
except UnicodeDecodeError:
# Fallback to hex representation of first MAX_SHOW_BYTES bytes
hex_str = bytes_to_hex(data[:MAX_SHOW_BYTES])
postfix = "" if len(data) <= MAX_SHOW_BYTES else "..."
sys.stderr.write(f"{prefix} [{len(data)} bytes]: {hex_str}{postfix}\n")
@ffi.def_extern()
def buffer_callback(ptr, size, nmemb, userdata):
"""ffi callback for curl write function, directly writes to a buffer"""
# assert size == 1
buffer = ffi.from_handle(userdata)
buffer.write(ffi.buffer(ptr, nmemb)[:])
return nmemb * size
def ensure_int(s):
if not s:
return 0
return int(s)
@ffi.def_extern()
def write_callback(ptr, size, nmemb, userdata):
"""ffi callback for curl write function, calls the callback python function"""
# although similar enough to the function above, kept here for performance reasons
callback = ffi.from_handle(userdata)
wrote = callback(ffi.buffer(ptr, nmemb)[:])
wrote = ensure_int(wrote)
if wrote == CURL_WRITEFUNC_PAUSE or wrote == CURL_WRITEFUNC_ERROR: # noqa: SIM109
return wrote
# should make this an exception in future versions
if wrote != nmemb * size:
warnings.warn("Wrote bytes != received bytes.", CurlCffiWarning, stacklevel=2)
return nmemb * size
# Credits: @alexio777 on https://github.com/lexiforest/curl_cffi/issues/4
def slist_to_list(head) -> list[bytes]:
"""Converts curl slist to a python list."""
result = []
ptr = head
while ptr:
result.append(ffi.string(ptr.data))
ptr = ptr.next
lib.curl_slist_free_all(head)
return result
class Curl:
"""
Wrapper for ``curl_easy_*`` functions of libcurl.
"""
_WS_RECV_BUFFER_SIZE = 128 * 1024 # 128 kB
def __init__(self, cacert: str = "", debug: bool = False, handle=None) -> None:
"""
Parameters:
cacert: CA cert path to use, by default, certs from ``certifi`` are used.
debug: whether to show curl debug messages.
handle: a curl handle instance from ``curl_easy_init``.
"""
self._curl = handle if handle else lib.curl_easy_init()
self._headers = ffi.NULL
self._proxy_headers = ffi.NULL
self._resolve = ffi.NULL
self._cacert = cacert or DEFAULT_CACERT
self._is_cert_set = False
self._write_handle: Any = None
self._header_handle: Any = None
self._debug_handle: Any = None
self._body_handle: Any = None
# TODO: use CURL_ERROR_SIZE
self._error_buffer = ffi.new("char[]", 256)
self._debug = debug
self._set_error_buffer()
# Pre-allocated CFFI objects for WebSocket performance
self._ws_recv_buffer = ffi.new("char[]", self._WS_RECV_BUFFER_SIZE)
self._ws_recv_n_recv = ffi.new("size_t *")
self._ws_recv_p_frame = ffi.new("struct curl_ws_frame **")
self._ws_send_n_sent = ffi.new("size_t *")
def _set_error_buffer(self) -> None:
ret = lib._curl_easy_setopt(self._curl, CurlOpt.ERRORBUFFER, self._error_buffer)
if ret != 0:
warnings.warn("Failed to set error buffer", CurlCffiWarning, stacklevel=2)
if self._debug:
self.debug()
def debug(self) -> None:
"""Set debug to True"""
self.setopt(CurlOpt.VERBOSE, 1)
self.setopt(CurlOpt.DEBUGFUNCTION, True)
def __del__(self) -> None:
self.close()
def _check_error(self, errcode: int, *args: Any) -> None:
if errcode == 0:
return
error = self._get_error(errcode, *args)
if error is not None:
raise error
def _get_error(self, errcode: int, *args: Any):
if errcode != 0:
errmsg = ffi.string(self._error_buffer).decode(errors="backslashreplace")
action = " ".join([str(a) for a in args])
return CurlError(
f"Failed to {action}, curl: ({errcode}) {errmsg}. "
"See https://curl.se/libcurl/c/libcurl-errors.html first for more "
"details.",
code=cast(CurlECode, errcode),
)
def setopt(self, option: CurlOpt, value: Any) -> int:
"""Wrapper for ``curl_easy_setopt``.
Args:
option: option to set, using constants from CurlOpt enum
value: value to set, strings will be handled automatically
Returns:
0 if no error, see ``CurlECode``.
"""
if self._curl is None:
return 0 # silently ignore if curl handle is None
input_option = {
# this should be int in curl, but cffi requires pointer for void*
# it will be convert back in the glue c code.
0: "long*",
10000: "char*",
20000: "void*",
30000: "int64_t*", # offset type
40000: "void*", # blob type
}
# print("option", option, "value", value)
# Convert value
value_type = input_option.get((option // 10000) * 10000)
if value_type == "long*" or value_type == "int64_t*":
c_value = ffi.new(value_type, value)
elif option == CurlOpt.WRITEDATA:
c_value = ffi.new_handle(value)
self._write_handle = c_value
lib._curl_easy_setopt(
self._curl, CurlOpt.WRITEFUNCTION, lib.buffer_callback
)
elif option == CurlOpt.HEADERDATA:
c_value = ffi.new_handle(value)
self._header_handle = c_value
lib._curl_easy_setopt(
self._curl, CurlOpt.HEADERFUNCTION, lib.buffer_callback
)
elif option == CurlOpt.WRITEFUNCTION:
c_value = ffi.new_handle(value)
self._write_handle = c_value
lib._curl_easy_setopt(self._curl, CurlOpt.WRITEFUNCTION, lib.write_callback)
option = CurlOpt.WRITEDATA
elif option == CurlOpt.HEADERFUNCTION:
c_value = ffi.new_handle(value)
self._header_handle = c_value
lib._curl_easy_setopt(
self._curl, CurlOpt.HEADERFUNCTION, lib.write_callback
)
option = CurlOpt.HEADERDATA
elif option == CurlOpt.DEBUGFUNCTION:
if value is True:
value = debug_function_default
c_value = ffi.new_handle(value)
self._debug_handle = c_value
lib._curl_easy_setopt(self._curl, CurlOpt.DEBUGFUNCTION, lib.debug_function)
option = CurlOpt.DEBUGDATA
elif value_type == "char*":
if isinstance(value, str):
# Windows/libcurl expects ANSI code page for file paths (char*).
# Non-ASCII paths encoded as UTF-8 can trigger ErrCode 77.
# Encode file-path-like options using the system encoding on Windows.
filepath_opts = {
CurlOpt.CAINFO,
CurlOpt.CAPATH,
CurlOpt.PROXY_CAINFO,
CurlOpt.PROXY_CAPATH,
CurlOpt.SSLCERT,
CurlOpt.SSLKEY,
CurlOpt.CRLFILE,
CurlOpt.ISSUERCERT,
CurlOpt.SSH_PUBLIC_KEYFILE,
CurlOpt.SSH_PRIVATE_KEYFILE,
CurlOpt.COOKIEFILE,
CurlOpt.COOKIEJAR,
CurlOpt.NETRC_FILE,
CurlOpt.UNIX_SOCKET_PATH,
}
if sys.platform.startswith("win") and option in filepath_opts:
# Use the process ANSI code page to match what CRT fopen expects.
enc = locale.getpreferredencoding(False)
c_value = value.encode(enc, errors="strict")
else:
c_value = value.encode()
else:
c_value = value
# Must keep a reference, otherwise may be GCed.
if option == CurlOpt.POSTFIELDS:
self._body_handle = c_value
else:
raise NotImplementedError(f"Option unsupported: {option}")
if option == CurlOpt.HTTPHEADER:
for header in value:
self._headers = lib.curl_slist_append(self._headers, header)
ret = lib._curl_easy_setopt(self._curl, option, self._headers)
elif option == CurlOpt.PROXYHEADER:
for proxy_header in value:
self._proxy_headers = lib.curl_slist_append(
self._proxy_headers, proxy_header
)
ret = lib._curl_easy_setopt(self._curl, option, self._proxy_headers)
elif option == CurlOpt.RESOLVE:
for resolve in value:
if isinstance(resolve, str):
resolve = resolve.encode()
self._resolve = lib.curl_slist_append(self._resolve, resolve)
ret = lib._curl_easy_setopt(self._curl, option, self._resolve)
else:
ret = lib._curl_easy_setopt(self._curl, option, c_value)
self._check_error(ret, "setopt", option, value)
if option == CurlOpt.CAINFO:
self._is_cert_set = True
return ret
def getinfo(self, option: CurlInfo) -> Union[bytes, int, float, list]:
"""Wrapper for ``curl_easy_getinfo``. Gets information in response after
curl.perform.
Parameters:
option: option to get info of, using constants from ``CurlInfo`` enum
Returns:
value retrieved from last perform.
"""
ret_option = {
0x100000: "char**",
0x200000: "long*",
0x300000: "double*",
0x400000: "struct curl_slist **",
0x500000: "long*",
0x600000: "int64_t*",
}
ret_cast_option = {
0x100000: ffi.string,
0x200000: int,
0x300000: float,
0x400000: list,
0x500000: int,
0x600000: int,
}
option_type = option & 0xF00000
if self._curl is None:
if option_type == 0x100000:
return b""
return ret_cast_option[option_type]()
c_value = ffi.new(ret_option[option_type])
ret = lib.curl_easy_getinfo(self._curl, option, c_value)
self._check_error(ret, "getinfo", option)
# cookielist and ssl_engines starts with 0x400000, see also: const.py
if option_type == 0x400000:
return slist_to_list(c_value[0])
if c_value[0] == ffi.NULL:
return b""
return ret_cast_option[option_type](c_value[0])
def version(self) -> bytes:
"""Get the underlying libcurl version."""
return ffi.string(lib.curl_version())
def impersonate(self, target: str, default_headers: bool = True) -> int:
"""Set the browser type to impersonate.
Parameters:
target: browser to impersonate.
default_headers: whether to add default headers, like User-Agent.
Returns:
0 if no error.
"""
if self._curl is None:
return 0 # silently ignore if curl handle is None
return lib.curl_easy_impersonate(
self._curl, target.encode(), int(default_headers)
)
def _ensure_cacert(self) -> None:
if not self._is_cert_set:
ret = self.setopt(CurlOpt.CAINFO, self._cacert)
self._check_error(ret, "set cacert")
ret = self.setopt(CurlOpt.PROXY_CAINFO, self._cacert)
self._check_error(ret, "set proxy cacert")
def perform(self, clear_headers: bool = True, clear_resolve: bool = True) -> None:
"""Wrapper for ``curl_easy_perform``, performs a curl request.
Parameters:
clear_headers: clear header slist used in this perform
clear_resolve: clear resolve slist used in this perform
Raises:
CurlError: if the perform was not successful.
"""
if self._curl is None:
raise CurlError("Cannot perform request on closed handle.")
# make sure we set a cacert store
self._ensure_cacert()
# here we go
ret = lib.curl_easy_perform(self._curl)
try:
self._check_error(ret, "perform")
finally:
# cleaning
self.clean_handles_and_buffers(clear_headers, clear_resolve)
def upkeep(self) -> int:
if self._curl is None:
return 0 # silently ignore if curl handle is None
return lib.curl_easy_upkeep(self._curl)
def clean_handles_and_buffers(
self, clear_headers: bool = True, clear_resolve: bool = True
) -> None:
"""Clean up handles and buffers after ``perform`` and ``close``,
called at the end of ``perform`` and ``close``."""
self._write_handle = None
self._header_handle = None
self._debug_handle = None
self._body_handle = None
if clear_resolve:
if self._resolve != ffi.NULL:
lib.curl_slist_free_all(self._resolve)
self._resolve = ffi.NULL
if clear_headers:
if self._headers != ffi.NULL:
lib.curl_slist_free_all(self._headers)
self._headers = ffi.NULL
if self._proxy_headers != ffi.NULL:
lib.curl_slist_free_all(self._proxy_headers)
self._proxy_headers = ffi.NULL
def duphandle(self) -> Curl:
"""Wrapper for ``curl_easy_duphandle``.
This is not a full copy of entire curl object in python. For example, headers
handle is not copied, you have to set them again."""
if self._curl is None:
raise CurlError("Cannot duplicate closed handle.")
new_handle = lib.curl_easy_duphandle(self._curl)
c = Curl(cacert=self._cacert, debug=self._debug, handle=new_handle)
return c
def reset(self) -> None:
"""Reset all curl options, wrapper for ``curl_easy_reset``."""
self._is_cert_set = False
if self._curl is not None:
lib.curl_easy_reset(self._curl)
self._set_error_buffer()
self._resolve = ffi.NULL
def parse_cookie_headers(self, headers: list[bytes]) -> SimpleCookie:
"""Extract ``cookies.SimpleCookie`` from header lines.
Parameters:
headers: list of headers in bytes.
Returns:
A parsed cookies.SimpleCookie instance.
"""
cookie: SimpleCookie = SimpleCookie()
for header in headers:
if header.lower().startswith(b"set-cookie: "):
cookie.load(header[12:].decode()) # len("set-cookie: ") == 12
return cookie
@staticmethod
def get_reason_phrase(status_line: bytes) -> bytes:
"""Extract reason phrase, like ``OK``, ``Not Found`` from response status
line."""
m = REASON_PHRASE_RE.match(status_line)
return m.group(1) if m else b""
@staticmethod
def parse_status_line(status_line: bytes) -> tuple[CurlHttpVersion, int, bytes]:
"""Parse status line.
Returns:
http_version, status_code, and reason phrase
"""
m = STATUS_LINE_RE.match(status_line)
if not m:
return CurlHttpVersion.V1_0, 0, b""
if m.group(1) == "2.0":
http_version = CurlHttpVersion.V2_0
elif m.group(1) == "1.1":
http_version = CurlHttpVersion.V1_1
elif m.group(1) == "1.0":
http_version = CurlHttpVersion.V1_0
else:
http_version = CurlHttpVersion.NONE
status_code = int(m.group(2))
reason = m.group(3)
return http_version, status_code, reason
def close(self) -> None:
"""Close and cleanup curl handle, wrapper for ``curl_easy_cleanup``."""
self.clean_handles_and_buffers()
if self._curl:
lib.curl_easy_cleanup(self._curl)
self._curl = None
ffi.release(self._error_buffer)
if self._ws_recv_buffer is not None:
ffi.release(self._ws_recv_buffer)
self._ws_recv_buffer = None
def ws_recv(self) -> tuple[bytes, CurlWsFrame]:
"""Receive a frame from a websocket connection.
Returns:
a tuple of frame content and curl frame meta struct.
Raises:
CurlError: if failed.
"""
if self._curl is None:
raise CurlError("Cannot receive websocket data on closed handle.")
ret = lib.curl_ws_recv(
self._curl,
self._ws_recv_buffer,
self._WS_RECV_BUFFER_SIZE,
self._ws_recv_n_recv,
self._ws_recv_p_frame,
)
self._check_error(ret, "WS_RECV")
# Frame meta explained: https://curl.se/libcurl/c/curl_ws_meta.html
frame = self._ws_recv_p_frame[0]
return ffi.buffer(self._ws_recv_buffer)[: self._ws_recv_n_recv[0]], frame
def ws_send(self, payload: bytes, flags: CurlWsFlag = CurlWsFlag.BINARY) -> int:
"""Send data to a websocket connection.
Args:
payload: content to send.
flags: websocket flag to set for the frame, default: binary.
Returns:
0 if no error.
Raises:
CurlError: if failed.
"""
if self._curl is None:
raise CurlError("Cannot send websocket data on closed handle.")
buffer = ffi.from_buffer(payload)
ret = lib.curl_ws_send(
self._curl, buffer, len(payload), self._ws_send_n_sent, 0, flags
)
self._check_error(ret, "WS_SEND")
return self._ws_send_n_sent[0]
def ws_close(self, code: int = 1000, message: bytes = b"") -> int:
"""Close a websocket connection. Shorthand for :meth:`ws_send`
with close code and message. Note that to completely close the connection,
you must close the curl handle after this call with :meth:`close`.
Args:
code: close code.
message: close message.
Returns:
0 if no error.
Raises:
CurlError: if failed.
"""
payload = struct.pack("!H", code) + message
return self.ws_send(payload, flags=CurlWsFlag.CLOSE)
class CurlMime:
"""Wrapper for the ``curl_mime_`` API."""
def __init__(self, curl: Optional[Curl] = None):
"""
Args:
curl: Curl instance to use.
"""
self._curl = curl if curl else Curl()
self._form = lib.curl_mime_init(self._curl._curl)
def addpart(
self,
name: str,
*,
content_type: Optional[str] = None,
filename: Optional[str] = None,
local_path: Optional[Union[str, bytes, Path]] = None,
data: Optional[bytes] = None,
) -> None:
"""Add a mime part for a mutlipart html form.
Note: You can only use either local_path or data, not both.
Args:
name: name of the field.
content_type: content_type for the field. for example: ``image/png``.
filename: filename for the server.
local_path: file to upload on local disk.
data: file content to upload.
"""
part = lib.curl_mime_addpart(self._form)
ret = lib.curl_mime_name(part, name.encode())
if ret != 0:
raise CurlError("Add field failed.")
# mime type
if content_type is not None:
ret = lib.curl_mime_type(part, content_type.encode())
if ret != 0:
raise CurlError("Add field failed.")
# remote file name
if filename is not None:
ret = lib.curl_mime_filename(part, filename.encode())
if ret != 0:
raise CurlError("Add field failed.")
if local_path and data:
raise CurlError("Can not use local_path and data at the same time.")
# this is a filename
if local_path is not None:
if isinstance(local_path, Path):
local_path_str = str(local_path)
elif isinstance(local_path, bytes):
local_path_str = local_path.decode()
else:
local_path_str = local_path
if not Path(local_path_str).exists():
raise FileNotFoundError(f"File not found at {local_path_str}")
ret = lib.curl_mime_filedata(part, local_path_str.encode())
if ret != 0:
raise CurlError("Add field failed.")
if data is not None:
if not isinstance(data, bytes):
data = str(data).encode()
ret = lib.curl_mime_data(part, data, len(data))
@classmethod
def from_list(cls, files: list[dict]):
"""Create a multipart instance from a list of dict, for keys, see ``addpart``"""
form = cls()
for file in files:
form.addpart(**file)
return form
def attach(self, curl: Optional[Curl] = None) -> None:
"""Attach the mime instance to a curl instance."""
c = curl if curl else self._curl
c.setopt(CurlOpt.MIMEPOST, self._form)
def close(self) -> None:
"""Close the mime instance and underlying files. This method must be called
after ``perform`` or ``request``."""
lib.curl_mime_free(self._form)
self._form = ffi.NULL
def __del__(self) -> None:
self.close()