from contextlib import suppress import queue import re import warnings from concurrent.futures import Future from typing import Any, Callable, Optional, Union from collections.abc import Awaitable from ..curl import Curl from ..utils import CurlCffiWarning from .cookies import Cookies from .exceptions import HTTPError, RequestException from .headers import Headers # Use orjson if present try: from orjson import loads except ImportError: from json import loads with suppress(ImportError): from markdownify import markdownify as md import readability as rd CHARSET_RE = re.compile(r"charset=([\w-]+)") STREAM_END = object() def clear_queue(q: queue.Queue): with q.mutex: q.queue.clear() q.all_tasks_done.notify_all() q.unfinished_tasks = 0 class Request: """Representing a sent request.""" def __init__(self, url: str, headers: Headers, method: str): self.url = url self.headers = headers self.method = method class Response: """Contains information the server sends. Attributes: url: url used in the request. content: response body in bytes. text: response body in str. status_code: http status code. reason: http response reason, such as OK, Not Found. ok: is status_code in [200, 400)? headers: response headers. cookies: response cookies. elapsed: how many seconds the request cost. encoding: http body encoding. charset: alias for encoding. primary_ip: primary ip of the server. primary_port: primary port of the server. local_ip: local ip used in this connection. local_port: local port used in this connection. charset_encoding: encoding specified by the Content-Type header. default_encoding: encoding for decoding response content if charset is not found in headers. Defaults to "utf-8". Can be set to a callable for automatic detection. redirect_count: how many redirects happened. redirect_url: the final redirected url. http_version: http version used. history: history redirections, only headers are available. """ def __init__(self, curl: Optional[Curl] = None, request: Optional[Request] = None): self.curl = curl self.request = request self.url = "" self.content = b"" self.status_code = 200 self.reason = "OK" self.ok = True self.headers = Headers() self.cookies = Cookies() self.elapsed = 0.0 self.default_encoding: Union[str, Callable[[bytes], str]] = "utf-8" self.redirect_count = 0 self.redirect_url = "" self.http_version = 0 self.primary_ip: str = "" self.primary_port: int = 0 self.local_ip: str = "" self.local_port: int = 0 self.history: list[dict[str, Any]] = [] self.infos: dict[str, Any] = {} self.queue: Optional[queue.Queue] = None self.stream_task: Optional[Future] = None self.astream_task: Optional[Awaitable] = None self.quit_now = None @property def charset(self) -> str: """Alias for encoding.""" return self.encoding @property def encoding(self) -> str: """ Determines the encoding to decode byte content into text. The method follows a specific priority to decide the encoding: 1. If ``.encoding`` has been explicitly set, it is used. 2. The encoding specified by the ``charset`` parameter in the ``Content-Type`` header. 3. The encoding specified by the ``default_encoding`` attribute. This can either be a string (e.g., "utf-8") or a callable for charset autodetection. """ if not hasattr(self, "_encoding"): encoding = self.charset_encoding if encoding is None: if isinstance(self.default_encoding, str): encoding = self.default_encoding elif callable(self.default_encoding): encoding = self.default_encoding(self.content) self._encoding = encoding or "utf-8" return self._encoding @encoding.setter def encoding(self, value: str) -> None: if hasattr(self, "_text"): raise ValueError("Cannot set encoding after text has been accessed") self._encoding = value @property def charset_encoding(self) -> Optional[str]: """Return the encoding, as specified by the Content-Type header.""" content_type = self.headers.get("Content-Type") if content_type: charset_match = CHARSET_RE.search(content_type) return charset_match.group(1) if charset_match else None return None @property def text(self) -> str: if not hasattr(self, "_text"): if not self.content: self._text = "" else: self._text = self._decode(self.content) return self._text def markdown(self) -> str: doc = rd.Document(self.content) title = doc.title() summary = doc.summary(html_partial=True) body_as_md = md(f"