691 lines
24 KiB
Python
691 lines
24 KiB
Python
from __future__ import annotations
|
||
|
||
import re
|
||
import locale
|
||
import struct
|
||
import sys
|
||
import warnings
|
||
from http.cookies import SimpleCookie
|
||
from pathlib import Path
|
||
from typing import TYPE_CHECKING, Any, Literal, Optional, Union, cast
|
||
|
||
import certifi
|
||
|
||
from ._wrapper import ffi, lib
|
||
from .const import CurlECode, CurlHttpVersion, CurlInfo, CurlOpt, CurlWsFlag
|
||
from .utils import CurlCffiWarning
|
||
|
||
DEFAULT_CACERT = certifi.where()
|
||
REASON_PHRASE_RE = re.compile(rb"HTTP/\d\.\d [0-9]{3} (.*)")
|
||
STATUS_LINE_RE = re.compile(rb"HTTP/(\d\.\d) ([0-9]{3}) (.*)")
|
||
|
||
if TYPE_CHECKING:
|
||
|
||
class CurlWsFrame:
|
||
age: int
|
||
flags: int
|
||
offset: int
|
||
bytesleft: int
|
||
len: int
|
||
|
||
|
||
class CurlError(Exception):
|
||
"""Base exception for curl_cffi package"""
|
||
|
||
def __init__(self, msg, code: Union[CurlECode, Literal[0]] = 0, *args, **kwargs):
|
||
super().__init__(msg, *args, **kwargs)
|
||
self.code: Union[CurlECode, Literal[0]] = code
|
||
|
||
|
||
CURLINFO_TEXT = 0
|
||
CURLINFO_HEADER_IN = 1
|
||
CURLINFO_HEADER_OUT = 2
|
||
CURLINFO_DATA_IN = 3
|
||
CURLINFO_DATA_OUT = 4
|
||
CURLINFO_SSL_DATA_IN = 5
|
||
CURLINFO_SSL_DATA_OUT = 6
|
||
|
||
CURL_WRITEFUNC_PAUSE = 0x10000001
|
||
CURL_WRITEFUNC_ERROR = 0xFFFFFFFF
|
||
|
||
|
||
@ffi.def_extern()
|
||
def debug_function(curl, type_: int, data, size: int, clientp) -> int:
|
||
"""ffi callback for curl debug info"""
|
||
callback = ffi.from_handle(clientp)
|
||
text = ffi.buffer(data, size)[:]
|
||
callback(type_, text)
|
||
return 0
|
||
|
||
|
||
def bytes_to_hex(b: bytes, uppercase: bool = False) -> str:
|
||
"""
|
||
Convert a bytes object to a space-separated hex string, e.g. "0a ff 3c".
|
||
If uppercase=True, letters will be A–F instead of a–f.
|
||
"""
|
||
fmt = "{:02X}" if uppercase else "{:02x}"
|
||
return " ".join(fmt.format(byte) for byte in b)
|
||
|
||
|
||
def debug_function_default(type_: int, data: bytes) -> None:
|
||
PREFIXES = {
|
||
CURLINFO_TEXT: "*",
|
||
CURLINFO_HEADER_IN: "<",
|
||
CURLINFO_HEADER_OUT: ">",
|
||
CURLINFO_DATA_IN: "< DATA",
|
||
CURLINFO_DATA_OUT: "> DATA",
|
||
CURLINFO_SSL_DATA_IN: "< SSL",
|
||
CURLINFO_SSL_DATA_OUT: "> SSL",
|
||
}
|
||
MAX_SHOW_BYTES = 40
|
||
prefix = PREFIXES.get(type_, "*")
|
||
|
||
# always show ssl data in binary format
|
||
if type_ in (CURLINFO_SSL_DATA_IN, CURLINFO_SSL_DATA_OUT):
|
||
hex_str = bytes_to_hex(data[:MAX_SHOW_BYTES])
|
||
postfix = "" if len(data) <= MAX_SHOW_BYTES else "..."
|
||
sys.stderr.write(f"{prefix} [{len(data)} bytes]: {hex_str}{postfix}\n")
|
||
else:
|
||
try:
|
||
text = data.decode("utf-8")
|
||
sys.stderr.write(f"{prefix} {text}")
|
||
if type_ not in (CURLINFO_TEXT, CURLINFO_HEADER_IN, CURLINFO_HEADER_OUT):
|
||
sys.stderr.write("\n")
|
||
except UnicodeDecodeError:
|
||
# Fallback to hex representation of first MAX_SHOW_BYTES bytes
|
||
hex_str = bytes_to_hex(data[:MAX_SHOW_BYTES])
|
||
postfix = "" if len(data) <= MAX_SHOW_BYTES else "..."
|
||
sys.stderr.write(f"{prefix} [{len(data)} bytes]: {hex_str}{postfix}\n")
|
||
|
||
|
||
@ffi.def_extern()
|
||
def buffer_callback(ptr, size, nmemb, userdata):
|
||
"""ffi callback for curl write function, directly writes to a buffer"""
|
||
# assert size == 1
|
||
buffer = ffi.from_handle(userdata)
|
||
buffer.write(ffi.buffer(ptr, nmemb)[:])
|
||
return nmemb * size
|
||
|
||
|
||
def ensure_int(s):
|
||
if not s:
|
||
return 0
|
||
return int(s)
|
||
|
||
|
||
@ffi.def_extern()
|
||
def write_callback(ptr, size, nmemb, userdata):
|
||
"""ffi callback for curl write function, calls the callback python function"""
|
||
# although similar enough to the function above, kept here for performance reasons
|
||
callback = ffi.from_handle(userdata)
|
||
wrote = callback(ffi.buffer(ptr, nmemb)[:])
|
||
wrote = ensure_int(wrote)
|
||
if wrote == CURL_WRITEFUNC_PAUSE or wrote == CURL_WRITEFUNC_ERROR: # noqa: SIM109
|
||
return wrote
|
||
# should make this an exception in future versions
|
||
if wrote != nmemb * size:
|
||
warnings.warn("Wrote bytes != received bytes.", CurlCffiWarning, stacklevel=2)
|
||
return nmemb * size
|
||
|
||
|
||
# Credits: @alexio777 on https://github.com/lexiforest/curl_cffi/issues/4
|
||
def slist_to_list(head) -> list[bytes]:
|
||
"""Converts curl slist to a python list."""
|
||
result = []
|
||
ptr = head
|
||
while ptr:
|
||
result.append(ffi.string(ptr.data))
|
||
ptr = ptr.next
|
||
lib.curl_slist_free_all(head)
|
||
return result
|
||
|
||
|
||
class Curl:
|
||
"""
|
||
Wrapper for ``curl_easy_*`` functions of libcurl.
|
||
"""
|
||
|
||
_WS_RECV_BUFFER_SIZE = 128 * 1024 # 128 kB
|
||
|
||
def __init__(self, cacert: str = "", debug: bool = False, handle=None) -> None:
|
||
"""
|
||
Parameters:
|
||
cacert: CA cert path to use, by default, certs from ``certifi`` are used.
|
||
debug: whether to show curl debug messages.
|
||
handle: a curl handle instance from ``curl_easy_init``.
|
||
"""
|
||
self._curl = handle if handle else lib.curl_easy_init()
|
||
self._headers = ffi.NULL
|
||
self._proxy_headers = ffi.NULL
|
||
self._resolve = ffi.NULL
|
||
self._cacert = cacert or DEFAULT_CACERT
|
||
self._is_cert_set = False
|
||
self._write_handle: Any = None
|
||
self._header_handle: Any = None
|
||
self._debug_handle: Any = None
|
||
self._body_handle: Any = None
|
||
# TODO: use CURL_ERROR_SIZE
|
||
self._error_buffer = ffi.new("char[]", 256)
|
||
self._debug = debug
|
||
self._set_error_buffer()
|
||
|
||
# Pre-allocated CFFI objects for WebSocket performance
|
||
self._ws_recv_buffer = ffi.new("char[]", self._WS_RECV_BUFFER_SIZE)
|
||
self._ws_recv_n_recv = ffi.new("size_t *")
|
||
self._ws_recv_p_frame = ffi.new("struct curl_ws_frame **")
|
||
self._ws_send_n_sent = ffi.new("size_t *")
|
||
|
||
def _set_error_buffer(self) -> None:
|
||
ret = lib._curl_easy_setopt(self._curl, CurlOpt.ERRORBUFFER, self._error_buffer)
|
||
if ret != 0:
|
||
warnings.warn("Failed to set error buffer", CurlCffiWarning, stacklevel=2)
|
||
if self._debug:
|
||
self.debug()
|
||
|
||
def debug(self) -> None:
|
||
"""Set debug to True"""
|
||
self.setopt(CurlOpt.VERBOSE, 1)
|
||
self.setopt(CurlOpt.DEBUGFUNCTION, True)
|
||
|
||
def __del__(self) -> None:
|
||
self.close()
|
||
|
||
def _check_error(self, errcode: int, *args: Any) -> None:
|
||
if errcode == 0:
|
||
return
|
||
|
||
error = self._get_error(errcode, *args)
|
||
if error is not None:
|
||
raise error
|
||
|
||
def _get_error(self, errcode: int, *args: Any):
|
||
if errcode != 0:
|
||
errmsg = ffi.string(self._error_buffer).decode(errors="backslashreplace")
|
||
action = " ".join([str(a) for a in args])
|
||
return CurlError(
|
||
f"Failed to {action}, curl: ({errcode}) {errmsg}. "
|
||
"See https://curl.se/libcurl/c/libcurl-errors.html first for more "
|
||
"details.",
|
||
code=cast(CurlECode, errcode),
|
||
)
|
||
|
||
def setopt(self, option: CurlOpt, value: Any) -> int:
|
||
"""Wrapper for ``curl_easy_setopt``.
|
||
|
||
Args:
|
||
option: option to set, using constants from CurlOpt enum
|
||
value: value to set, strings will be handled automatically
|
||
|
||
Returns:
|
||
0 if no error, see ``CurlECode``.
|
||
"""
|
||
if self._curl is None:
|
||
return 0 # silently ignore if curl handle is None
|
||
input_option = {
|
||
# this should be int in curl, but cffi requires pointer for void*
|
||
# it will be convert back in the glue c code.
|
||
0: "long*",
|
||
10000: "char*",
|
||
20000: "void*",
|
||
30000: "int64_t*", # offset type
|
||
40000: "void*", # blob type
|
||
}
|
||
# print("option", option, "value", value)
|
||
|
||
# Convert value
|
||
value_type = input_option.get((option // 10000) * 10000)
|
||
if value_type == "long*" or value_type == "int64_t*":
|
||
c_value = ffi.new(value_type, value)
|
||
elif option == CurlOpt.WRITEDATA:
|
||
c_value = ffi.new_handle(value)
|
||
self._write_handle = c_value
|
||
lib._curl_easy_setopt(
|
||
self._curl, CurlOpt.WRITEFUNCTION, lib.buffer_callback
|
||
)
|
||
elif option == CurlOpt.HEADERDATA:
|
||
c_value = ffi.new_handle(value)
|
||
self._header_handle = c_value
|
||
lib._curl_easy_setopt(
|
||
self._curl, CurlOpt.HEADERFUNCTION, lib.buffer_callback
|
||
)
|
||
elif option == CurlOpt.WRITEFUNCTION:
|
||
c_value = ffi.new_handle(value)
|
||
self._write_handle = c_value
|
||
lib._curl_easy_setopt(self._curl, CurlOpt.WRITEFUNCTION, lib.write_callback)
|
||
option = CurlOpt.WRITEDATA
|
||
elif option == CurlOpt.HEADERFUNCTION:
|
||
c_value = ffi.new_handle(value)
|
||
self._header_handle = c_value
|
||
lib._curl_easy_setopt(
|
||
self._curl, CurlOpt.HEADERFUNCTION, lib.write_callback
|
||
)
|
||
option = CurlOpt.HEADERDATA
|
||
elif option == CurlOpt.DEBUGFUNCTION:
|
||
if value is True:
|
||
value = debug_function_default
|
||
c_value = ffi.new_handle(value)
|
||
self._debug_handle = c_value
|
||
lib._curl_easy_setopt(self._curl, CurlOpt.DEBUGFUNCTION, lib.debug_function)
|
||
option = CurlOpt.DEBUGDATA
|
||
elif value_type == "char*":
|
||
if isinstance(value, str):
|
||
# Windows/libcurl expects ANSI code page for file paths (char*).
|
||
# Non-ASCII paths encoded as UTF-8 can trigger ErrCode 77.
|
||
# Encode file-path-like options using the system encoding on Windows.
|
||
filepath_opts = {
|
||
CurlOpt.CAINFO,
|
||
CurlOpt.CAPATH,
|
||
CurlOpt.PROXY_CAINFO,
|
||
CurlOpt.PROXY_CAPATH,
|
||
CurlOpt.SSLCERT,
|
||
CurlOpt.SSLKEY,
|
||
CurlOpt.CRLFILE,
|
||
CurlOpt.ISSUERCERT,
|
||
CurlOpt.SSH_PUBLIC_KEYFILE,
|
||
CurlOpt.SSH_PRIVATE_KEYFILE,
|
||
CurlOpt.COOKIEFILE,
|
||
CurlOpt.COOKIEJAR,
|
||
CurlOpt.NETRC_FILE,
|
||
CurlOpt.UNIX_SOCKET_PATH,
|
||
}
|
||
if sys.platform.startswith("win") and option in filepath_opts:
|
||
# Use the process ANSI code page to match what CRT fopen expects.
|
||
enc = locale.getpreferredencoding(False)
|
||
c_value = value.encode(enc, errors="strict")
|
||
else:
|
||
c_value = value.encode()
|
||
else:
|
||
c_value = value
|
||
# Must keep a reference, otherwise may be GCed.
|
||
if option == CurlOpt.POSTFIELDS:
|
||
self._body_handle = c_value
|
||
else:
|
||
raise NotImplementedError(f"Option unsupported: {option}")
|
||
|
||
if option == CurlOpt.HTTPHEADER:
|
||
for header in value:
|
||
self._headers = lib.curl_slist_append(self._headers, header)
|
||
ret = lib._curl_easy_setopt(self._curl, option, self._headers)
|
||
elif option == CurlOpt.PROXYHEADER:
|
||
for proxy_header in value:
|
||
self._proxy_headers = lib.curl_slist_append(
|
||
self._proxy_headers, proxy_header
|
||
)
|
||
ret = lib._curl_easy_setopt(self._curl, option, self._proxy_headers)
|
||
elif option == CurlOpt.RESOLVE:
|
||
for resolve in value:
|
||
if isinstance(resolve, str):
|
||
resolve = resolve.encode()
|
||
self._resolve = lib.curl_slist_append(self._resolve, resolve)
|
||
ret = lib._curl_easy_setopt(self._curl, option, self._resolve)
|
||
else:
|
||
ret = lib._curl_easy_setopt(self._curl, option, c_value)
|
||
self._check_error(ret, "setopt", option, value)
|
||
|
||
if option == CurlOpt.CAINFO:
|
||
self._is_cert_set = True
|
||
|
||
return ret
|
||
|
||
def getinfo(self, option: CurlInfo) -> Union[bytes, int, float, list]:
|
||
"""Wrapper for ``curl_easy_getinfo``. Gets information in response after
|
||
curl.perform.
|
||
|
||
Parameters:
|
||
option: option to get info of, using constants from ``CurlInfo`` enum
|
||
|
||
Returns:
|
||
value retrieved from last perform.
|
||
"""
|
||
ret_option = {
|
||
0x100000: "char**",
|
||
0x200000: "long*",
|
||
0x300000: "double*",
|
||
0x400000: "struct curl_slist **",
|
||
0x500000: "long*",
|
||
0x600000: "int64_t*",
|
||
}
|
||
ret_cast_option = {
|
||
0x100000: ffi.string,
|
||
0x200000: int,
|
||
0x300000: float,
|
||
0x400000: list,
|
||
0x500000: int,
|
||
0x600000: int,
|
||
}
|
||
|
||
option_type = option & 0xF00000
|
||
|
||
if self._curl is None:
|
||
if option_type == 0x100000:
|
||
return b""
|
||
return ret_cast_option[option_type]()
|
||
|
||
c_value = ffi.new(ret_option[option_type])
|
||
ret = lib.curl_easy_getinfo(self._curl, option, c_value)
|
||
self._check_error(ret, "getinfo", option)
|
||
# cookielist and ssl_engines starts with 0x400000, see also: const.py
|
||
if option_type == 0x400000:
|
||
return slist_to_list(c_value[0])
|
||
if c_value[0] == ffi.NULL:
|
||
return b""
|
||
|
||
return ret_cast_option[option_type](c_value[0])
|
||
|
||
def version(self) -> bytes:
|
||
"""Get the underlying libcurl version."""
|
||
return ffi.string(lib.curl_version())
|
||
|
||
def impersonate(self, target: str, default_headers: bool = True) -> int:
|
||
"""Set the browser type to impersonate.
|
||
|
||
Parameters:
|
||
target: browser to impersonate.
|
||
default_headers: whether to add default headers, like User-Agent.
|
||
|
||
Returns:
|
||
0 if no error.
|
||
"""
|
||
if self._curl is None:
|
||
return 0 # silently ignore if curl handle is None
|
||
return lib.curl_easy_impersonate(
|
||
self._curl, target.encode(), int(default_headers)
|
||
)
|
||
|
||
def _ensure_cacert(self) -> None:
|
||
if not self._is_cert_set:
|
||
ret = self.setopt(CurlOpt.CAINFO, self._cacert)
|
||
self._check_error(ret, "set cacert")
|
||
ret = self.setopt(CurlOpt.PROXY_CAINFO, self._cacert)
|
||
self._check_error(ret, "set proxy cacert")
|
||
|
||
def perform(self, clear_headers: bool = True, clear_resolve: bool = True) -> None:
|
||
"""Wrapper for ``curl_easy_perform``, performs a curl request.
|
||
|
||
Parameters:
|
||
clear_headers: clear header slist used in this perform
|
||
clear_resolve: clear resolve slist used in this perform
|
||
|
||
Raises:
|
||
CurlError: if the perform was not successful.
|
||
"""
|
||
if self._curl is None:
|
||
raise CurlError("Cannot perform request on closed handle.")
|
||
|
||
# make sure we set a cacert store
|
||
self._ensure_cacert()
|
||
|
||
# here we go
|
||
ret = lib.curl_easy_perform(self._curl)
|
||
|
||
try:
|
||
self._check_error(ret, "perform")
|
||
finally:
|
||
# cleaning
|
||
self.clean_handles_and_buffers(clear_headers, clear_resolve)
|
||
|
||
def upkeep(self) -> int:
|
||
if self._curl is None:
|
||
return 0 # silently ignore if curl handle is None
|
||
return lib.curl_easy_upkeep(self._curl)
|
||
|
||
def clean_handles_and_buffers(
|
||
self, clear_headers: bool = True, clear_resolve: bool = True
|
||
) -> None:
|
||
"""Clean up handles and buffers after ``perform`` and ``close``,
|
||
called at the end of ``perform`` and ``close``."""
|
||
self._write_handle = None
|
||
self._header_handle = None
|
||
self._debug_handle = None
|
||
self._body_handle = None
|
||
|
||
if clear_resolve:
|
||
if self._resolve != ffi.NULL:
|
||
lib.curl_slist_free_all(self._resolve)
|
||
self._resolve = ffi.NULL
|
||
|
||
if clear_headers:
|
||
if self._headers != ffi.NULL:
|
||
lib.curl_slist_free_all(self._headers)
|
||
self._headers = ffi.NULL
|
||
|
||
if self._proxy_headers != ffi.NULL:
|
||
lib.curl_slist_free_all(self._proxy_headers)
|
||
self._proxy_headers = ffi.NULL
|
||
|
||
def duphandle(self) -> Curl:
|
||
"""Wrapper for ``curl_easy_duphandle``.
|
||
|
||
This is not a full copy of entire curl object in python. For example, headers
|
||
handle is not copied, you have to set them again."""
|
||
if self._curl is None:
|
||
raise CurlError("Cannot duplicate closed handle.")
|
||
new_handle = lib.curl_easy_duphandle(self._curl)
|
||
c = Curl(cacert=self._cacert, debug=self._debug, handle=new_handle)
|
||
return c
|
||
|
||
def reset(self) -> None:
|
||
"""Reset all curl options, wrapper for ``curl_easy_reset``."""
|
||
self._is_cert_set = False
|
||
if self._curl is not None:
|
||
lib.curl_easy_reset(self._curl)
|
||
self._set_error_buffer()
|
||
self._resolve = ffi.NULL
|
||
|
||
def parse_cookie_headers(self, headers: list[bytes]) -> SimpleCookie:
|
||
"""Extract ``cookies.SimpleCookie`` from header lines.
|
||
|
||
Parameters:
|
||
headers: list of headers in bytes.
|
||
|
||
Returns:
|
||
A parsed cookies.SimpleCookie instance.
|
||
"""
|
||
cookie: SimpleCookie = SimpleCookie()
|
||
for header in headers:
|
||
if header.lower().startswith(b"set-cookie: "):
|
||
cookie.load(header[12:].decode()) # len("set-cookie: ") == 12
|
||
return cookie
|
||
|
||
@staticmethod
|
||
def get_reason_phrase(status_line: bytes) -> bytes:
|
||
"""Extract reason phrase, like ``OK``, ``Not Found`` from response status
|
||
line."""
|
||
m = REASON_PHRASE_RE.match(status_line)
|
||
return m.group(1) if m else b""
|
||
|
||
@staticmethod
|
||
def parse_status_line(status_line: bytes) -> tuple[CurlHttpVersion, int, bytes]:
|
||
"""Parse status line.
|
||
|
||
Returns:
|
||
http_version, status_code, and reason phrase
|
||
"""
|
||
m = STATUS_LINE_RE.match(status_line)
|
||
if not m:
|
||
return CurlHttpVersion.V1_0, 0, b""
|
||
if m.group(1) == "2.0":
|
||
http_version = CurlHttpVersion.V2_0
|
||
elif m.group(1) == "1.1":
|
||
http_version = CurlHttpVersion.V1_1
|
||
elif m.group(1) == "1.0":
|
||
http_version = CurlHttpVersion.V1_0
|
||
else:
|
||
http_version = CurlHttpVersion.NONE
|
||
status_code = int(m.group(2))
|
||
reason = m.group(3)
|
||
|
||
return http_version, status_code, reason
|
||
|
||
def close(self) -> None:
|
||
"""Close and cleanup curl handle, wrapper for ``curl_easy_cleanup``."""
|
||
self.clean_handles_and_buffers()
|
||
|
||
if self._curl:
|
||
lib.curl_easy_cleanup(self._curl)
|
||
self._curl = None
|
||
ffi.release(self._error_buffer)
|
||
|
||
if self._ws_recv_buffer is not None:
|
||
ffi.release(self._ws_recv_buffer)
|
||
self._ws_recv_buffer = None
|
||
|
||
def ws_recv(self) -> tuple[bytes, CurlWsFrame]:
|
||
"""Receive a frame from a websocket connection.
|
||
|
||
Returns:
|
||
a tuple of frame content and curl frame meta struct.
|
||
|
||
Raises:
|
||
CurlError: if failed.
|
||
"""
|
||
if self._curl is None:
|
||
raise CurlError("Cannot receive websocket data on closed handle.")
|
||
ret = lib.curl_ws_recv(
|
||
self._curl,
|
||
self._ws_recv_buffer,
|
||
self._WS_RECV_BUFFER_SIZE,
|
||
self._ws_recv_n_recv,
|
||
self._ws_recv_p_frame,
|
||
)
|
||
self._check_error(ret, "WS_RECV")
|
||
|
||
# Frame meta explained: https://curl.se/libcurl/c/curl_ws_meta.html
|
||
frame = self._ws_recv_p_frame[0]
|
||
return ffi.buffer(self._ws_recv_buffer)[: self._ws_recv_n_recv[0]], frame
|
||
|
||
def ws_send(self, payload: bytes, flags: CurlWsFlag = CurlWsFlag.BINARY) -> int:
|
||
"""Send data to a websocket connection.
|
||
|
||
Args:
|
||
payload: content to send.
|
||
flags: websocket flag to set for the frame, default: binary.
|
||
|
||
Returns:
|
||
0 if no error.
|
||
|
||
Raises:
|
||
CurlError: if failed.
|
||
"""
|
||
if self._curl is None:
|
||
raise CurlError("Cannot send websocket data on closed handle.")
|
||
|
||
buffer = ffi.from_buffer(payload)
|
||
ret = lib.curl_ws_send(
|
||
self._curl, buffer, len(payload), self._ws_send_n_sent, 0, flags
|
||
)
|
||
self._check_error(ret, "WS_SEND")
|
||
return self._ws_send_n_sent[0]
|
||
|
||
def ws_close(self, code: int = 1000, message: bytes = b"") -> int:
|
||
"""Close a websocket connection. Shorthand for :meth:`ws_send`
|
||
with close code and message. Note that to completely close the connection,
|
||
you must close the curl handle after this call with :meth:`close`.
|
||
|
||
Args:
|
||
code: close code.
|
||
message: close message.
|
||
|
||
Returns:
|
||
0 if no error.
|
||
|
||
Raises:
|
||
CurlError: if failed.
|
||
"""
|
||
payload = struct.pack("!H", code) + message
|
||
return self.ws_send(payload, flags=CurlWsFlag.CLOSE)
|
||
|
||
|
||
class CurlMime:
|
||
"""Wrapper for the ``curl_mime_`` API."""
|
||
|
||
def __init__(self, curl: Optional[Curl] = None):
|
||
"""
|
||
Args:
|
||
curl: Curl instance to use.
|
||
"""
|
||
self._curl = curl if curl else Curl()
|
||
self._form = lib.curl_mime_init(self._curl._curl)
|
||
|
||
def addpart(
|
||
self,
|
||
name: str,
|
||
*,
|
||
content_type: Optional[str] = None,
|
||
filename: Optional[str] = None,
|
||
local_path: Optional[Union[str, bytes, Path]] = None,
|
||
data: Optional[bytes] = None,
|
||
) -> None:
|
||
"""Add a mime part for a mutlipart html form.
|
||
|
||
Note: You can only use either local_path or data, not both.
|
||
|
||
Args:
|
||
name: name of the field.
|
||
content_type: content_type for the field. for example: ``image/png``.
|
||
filename: filename for the server.
|
||
local_path: file to upload on local disk.
|
||
data: file content to upload.
|
||
"""
|
||
part = lib.curl_mime_addpart(self._form)
|
||
|
||
ret = lib.curl_mime_name(part, name.encode())
|
||
if ret != 0:
|
||
raise CurlError("Add field failed.")
|
||
|
||
# mime type
|
||
if content_type is not None:
|
||
ret = lib.curl_mime_type(part, content_type.encode())
|
||
if ret != 0:
|
||
raise CurlError("Add field failed.")
|
||
|
||
# remote file name
|
||
if filename is not None:
|
||
ret = lib.curl_mime_filename(part, filename.encode())
|
||
if ret != 0:
|
||
raise CurlError("Add field failed.")
|
||
|
||
if local_path and data:
|
||
raise CurlError("Can not use local_path and data at the same time.")
|
||
|
||
# this is a filename
|
||
if local_path is not None:
|
||
if isinstance(local_path, Path):
|
||
local_path_str = str(local_path)
|
||
elif isinstance(local_path, bytes):
|
||
local_path_str = local_path.decode()
|
||
else:
|
||
local_path_str = local_path
|
||
|
||
if not Path(local_path_str).exists():
|
||
raise FileNotFoundError(f"File not found at {local_path_str}")
|
||
ret = lib.curl_mime_filedata(part, local_path_str.encode())
|
||
if ret != 0:
|
||
raise CurlError("Add field failed.")
|
||
|
||
if data is not None:
|
||
if not isinstance(data, bytes):
|
||
data = str(data).encode()
|
||
ret = lib.curl_mime_data(part, data, len(data))
|
||
|
||
@classmethod
|
||
def from_list(cls, files: list[dict]):
|
||
"""Create a multipart instance from a list of dict, for keys, see ``addpart``"""
|
||
form = cls()
|
||
for file in files:
|
||
form.addpart(**file)
|
||
return form
|
||
|
||
def attach(self, curl: Optional[Curl] = None) -> None:
|
||
"""Attach the mime instance to a curl instance."""
|
||
c = curl if curl else self._curl
|
||
c.setopt(CurlOpt.MIMEPOST, self._form)
|
||
|
||
def close(self) -> None:
|
||
"""Close the mime instance and underlying files. This method must be called
|
||
after ``perform`` or ``request``."""
|
||
lib.curl_mime_free(self._form)
|
||
self._form = ffi.NULL
|
||
|
||
def __del__(self) -> None:
|
||
self.close()
|