http.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470
  1. import base64
  2. import datetime
  3. import ipaddress
  4. import re
  5. import unicodedata
  6. from binascii import Error as BinasciiError
  7. from email.utils import formatdate
  8. from urllib.parse import (
  9. ParseResult,
  10. SplitResult,
  11. _coerce_args,
  12. _splitnetloc,
  13. _splitparams,
  14. quote,
  15. scheme_chars,
  16. unquote,
  17. )
  18. from urllib.parse import urlencode as original_urlencode
  19. from urllib.parse import uses_params
  20. from django.utils.datastructures import MultiValueDict
  21. from django.utils.regex_helper import _lazy_re_compile
  22. # Based on RFC 9110 Appendix A.
  23. ETAG_MATCH = _lazy_re_compile(
  24. r"""
  25. \A( # start of string and capture group
  26. (?:W/)? # optional weak indicator
  27. " # opening quote
  28. [^"]* # any sequence of non-quote characters
  29. " # end quote
  30. )\Z # end of string and capture group
  31. """,
  32. re.X,
  33. )
  34. MONTHS = "jan feb mar apr may jun jul aug sep oct nov dec".split()
  35. __D = r"(?P<day>[0-9]{2})"
  36. __D2 = r"(?P<day>[ 0-9][0-9])"
  37. __M = r"(?P<mon>\w{3})"
  38. __Y = r"(?P<year>[0-9]{4})"
  39. __Y2 = r"(?P<year>[0-9]{2})"
  40. __T = r"(?P<hour>[0-9]{2}):(?P<min>[0-9]{2}):(?P<sec>[0-9]{2})"
  41. RFC1123_DATE = _lazy_re_compile(r"^\w{3}, %s %s %s %s GMT$" % (__D, __M, __Y, __T))
  42. RFC850_DATE = _lazy_re_compile(r"^\w{6,9}, %s-%s-%s %s GMT$" % (__D, __M, __Y2, __T))
  43. ASCTIME_DATE = _lazy_re_compile(r"^\w{3} %s %s %s %s$" % (__M, __D2, __T, __Y))
  44. RFC3986_GENDELIMS = ":/?#[]@"
  45. RFC3986_SUBDELIMS = "!$&'()*+,;="
  46. MAX_URL_LENGTH = 2048
  47. MAX_URL_REDIRECT_LENGTH = 16384
  48. # TODO: Remove when dropping support for PY38.
  49. # Unsafe bytes to be removed per WHATWG spec.
  50. _UNSAFE_URL_BYTES_TO_REMOVE = ["\t", "\r", "\n"]
  51. def urlencode(query, doseq=False):
  52. """
  53. A version of Python's urllib.parse.urlencode() function that can operate on
  54. MultiValueDict and non-string values.
  55. """
  56. if isinstance(query, MultiValueDict):
  57. query = query.lists()
  58. elif hasattr(query, "items"):
  59. query = query.items()
  60. query_params = []
  61. for key, value in query:
  62. if value is None:
  63. raise TypeError(
  64. "Cannot encode None for key '%s' in a query string. Did you "
  65. "mean to pass an empty string or omit the value?" % key
  66. )
  67. elif not doseq or isinstance(value, (str, bytes)):
  68. query_val = value
  69. else:
  70. try:
  71. itr = iter(value)
  72. except TypeError:
  73. query_val = value
  74. else:
  75. # Consume generators and iterators, when doseq=True, to
  76. # work around https://bugs.python.org/issue31706.
  77. query_val = []
  78. for item in itr:
  79. if item is None:
  80. raise TypeError(
  81. "Cannot encode None for key '%s' in a query "
  82. "string. Did you mean to pass an empty string or "
  83. "omit the value?" % key
  84. )
  85. elif not isinstance(item, bytes):
  86. item = str(item)
  87. query_val.append(item)
  88. query_params.append((key, query_val))
  89. return original_urlencode(query_params, doseq)
  90. def http_date(epoch_seconds=None):
  91. """
  92. Format the time to match the RFC 5322 date format as specified by RFC 9110
  93. Section 5.6.7.
  94. `epoch_seconds` is a floating point number expressed in seconds since the
  95. epoch, in UTC - such as that outputted by time.time(). If set to None, it
  96. defaults to the current time.
  97. Output a string in the format 'Wdy, DD Mon YYYY HH:MM:SS GMT'.
  98. """
  99. return formatdate(epoch_seconds, usegmt=True)
  100. def parse_http_date(date):
  101. """
  102. Parse a date format as specified by HTTP RFC 9110 Section 5.6.7.
  103. The three formats allowed by the RFC are accepted, even if only the first
  104. one is still in widespread use.
  105. Return an integer expressed in seconds since the epoch, in UTC.
  106. """
  107. # email.utils.parsedate() does the job for RFC 1123 dates; unfortunately
  108. # RFC 9110 makes it mandatory to support RFC 850 dates too. So we roll
  109. # our own RFC-compliant parsing.
  110. for regex in RFC1123_DATE, RFC850_DATE, ASCTIME_DATE:
  111. m = regex.match(date)
  112. if m is not None:
  113. break
  114. else:
  115. raise ValueError("%r is not in a valid HTTP date format" % date)
  116. try:
  117. tz = datetime.timezone.utc
  118. year = int(m["year"])
  119. if year < 100:
  120. current_year = datetime.datetime.now(tz=tz).year
  121. current_century = current_year - (current_year % 100)
  122. if year - (current_year % 100) > 50:
  123. # year that appears to be more than 50 years in the future are
  124. # interpreted as representing the past.
  125. year += current_century - 100
  126. else:
  127. year += current_century
  128. month = MONTHS.index(m["mon"].lower()) + 1
  129. day = int(m["day"])
  130. hour = int(m["hour"])
  131. min = int(m["min"])
  132. sec = int(m["sec"])
  133. result = datetime.datetime(year, month, day, hour, min, sec, tzinfo=tz)
  134. return int(result.timestamp())
  135. except Exception as exc:
  136. raise ValueError("%r is not a valid date" % date) from exc
  137. def parse_http_date_safe(date):
  138. """
  139. Same as parse_http_date, but return None if the input is invalid.
  140. """
  141. try:
  142. return parse_http_date(date)
  143. except Exception:
  144. pass
  145. # Base 36 functions: useful for generating compact URLs
  146. def base36_to_int(s):
  147. """
  148. Convert a base 36 string to an int. Raise ValueError if the input won't fit
  149. into an int.
  150. """
  151. # To prevent overconsumption of server resources, reject any
  152. # base36 string that is longer than 13 base36 digits (13 digits
  153. # is sufficient to base36-encode any 64-bit integer)
  154. if len(s) > 13:
  155. raise ValueError("Base36 input too large")
  156. return int(s, 36)
  157. def int_to_base36(i):
  158. """Convert an integer to a base36 string."""
  159. char_set = "0123456789abcdefghijklmnopqrstuvwxyz"
  160. if i < 0:
  161. raise ValueError("Negative base36 conversion input.")
  162. if i < 36:
  163. return char_set[i]
  164. b36 = ""
  165. while i != 0:
  166. i, n = divmod(i, 36)
  167. b36 = char_set[n] + b36
  168. return b36
  169. def urlsafe_base64_encode(s):
  170. """
  171. Encode a bytestring to a base64 string for use in URLs. Strip any trailing
  172. equal signs.
  173. """
  174. return base64.urlsafe_b64encode(s).rstrip(b"\n=").decode("ascii")
  175. def urlsafe_base64_decode(s):
  176. """
  177. Decode a base64 encoded string. Add back any trailing equal signs that
  178. might have been stripped.
  179. """
  180. s = s.encode()
  181. try:
  182. return base64.urlsafe_b64decode(s.ljust(len(s) + len(s) % 4, b"="))
  183. except (LookupError, BinasciiError) as e:
  184. raise ValueError(e)
  185. def parse_etags(etag_str):
  186. """
  187. Parse a string of ETags given in an If-None-Match or If-Match header as
  188. defined by RFC 9110. Return a list of quoted ETags, or ['*'] if all ETags
  189. should be matched.
  190. """
  191. if etag_str.strip() == "*":
  192. return ["*"]
  193. else:
  194. # Parse each ETag individually, and return any that are valid.
  195. etag_matches = (ETAG_MATCH.match(etag.strip()) for etag in etag_str.split(","))
  196. return [match[1] for match in etag_matches if match]
  197. def quote_etag(etag_str):
  198. """
  199. If the provided string is already a quoted ETag, return it. Otherwise, wrap
  200. the string in quotes, making it a strong ETag.
  201. """
  202. if ETAG_MATCH.match(etag_str):
  203. return etag_str
  204. else:
  205. return '"%s"' % etag_str
  206. def is_same_domain(host, pattern):
  207. """
  208. Return ``True`` if the host is either an exact match or a match
  209. to the wildcard pattern.
  210. Any pattern beginning with a period matches a domain and all of its
  211. subdomains. (e.g. ``.example.com`` matches ``example.com`` and
  212. ``foo.example.com``). Anything else is an exact string match.
  213. """
  214. if not pattern:
  215. return False
  216. pattern = pattern.lower()
  217. return (
  218. pattern[0] == "."
  219. and (host.endswith(pattern) or host == pattern[1:])
  220. or pattern == host
  221. )
  222. def url_has_allowed_host_and_scheme(url, allowed_hosts, require_https=False):
  223. """
  224. Return ``True`` if the url uses an allowed host and a safe scheme.
  225. Always return ``False`` on an empty url.
  226. If ``require_https`` is ``True``, only 'https' will be considered a valid
  227. scheme, as opposed to 'http' and 'https' with the default, ``False``.
  228. Note: "True" doesn't entail that a URL is "safe". It may still be e.g.
  229. quoted incorrectly. Ensure to also use django.utils.encoding.iri_to_uri()
  230. on the path component of untrusted URLs.
  231. """
  232. if url is not None:
  233. url = url.strip()
  234. if not url:
  235. return False
  236. if allowed_hosts is None:
  237. allowed_hosts = set()
  238. elif isinstance(allowed_hosts, str):
  239. allowed_hosts = {allowed_hosts}
  240. # Chrome treats \ completely as / in paths but it could be part of some
  241. # basic auth credentials so we need to check both URLs.
  242. return _url_has_allowed_host_and_scheme(
  243. url, allowed_hosts, require_https=require_https
  244. ) and _url_has_allowed_host_and_scheme(
  245. url.replace("\\", "/"), allowed_hosts, require_https=require_https
  246. )
  247. # TODO: Remove when dropping support for PY38.
  248. # Copied from urllib.parse.urlparse() but uses fixed urlsplit() function.
  249. def _urlparse(url, scheme="", allow_fragments=True):
  250. """Parse a URL into 6 components:
  251. <scheme>://<netloc>/<path>;<params>?<query>#<fragment>
  252. Return a 6-tuple: (scheme, netloc, path, params, query, fragment).
  253. Note that we don't break the components up in smaller bits
  254. (e.g. netloc is a single string) and we don't expand % escapes."""
  255. url, scheme, _coerce_result = _coerce_args(url, scheme)
  256. splitresult = _urlsplit(url, scheme, allow_fragments)
  257. scheme, netloc, url, query, fragment = splitresult
  258. if scheme in uses_params and ";" in url:
  259. url, params = _splitparams(url)
  260. else:
  261. params = ""
  262. result = ParseResult(scheme, netloc, url, params, query, fragment)
  263. return _coerce_result(result)
  264. # TODO: Remove when dropping support for PY38.
  265. def _remove_unsafe_bytes_from_url(url):
  266. for b in _UNSAFE_URL_BYTES_TO_REMOVE:
  267. url = url.replace(b, "")
  268. return url
  269. # TODO: Remove when dropping support for PY38.
  270. def _check_bracketed_host(hostname):
  271. # Valid bracketed hosts are defined in
  272. # https://www.rfc-editor.org/rfc/rfc3986#page-49 and
  273. # https://url.spec.whatwg.org/.
  274. if hostname.startswith("v"):
  275. if not re.match(r"\Av[a-fA-F0-9]+\..+\Z", hostname):
  276. raise ValueError("IPvFuture address is invalid")
  277. else:
  278. # Throws Value Error if not IPv6 or IPv4.
  279. ip = ipaddress.ip_address(hostname)
  280. if isinstance(ip, ipaddress.IPv4Address):
  281. raise ValueError("An IPv4 address cannot be in brackets")
  282. # TODO: Remove when dropping support for PY38.
  283. # Backport of urllib.parse.urlsplit() from Python 3.9.
  284. def _urlsplit(url, scheme="", allow_fragments=True):
  285. """Parse a URL into 5 components:
  286. <scheme>://<netloc>/<path>?<query>#<fragment>
  287. Return a 5-tuple: (scheme, netloc, path, query, fragment).
  288. Note that we don't break the components up in smaller bits
  289. (e.g. netloc is a single string) and we don't expand % escapes."""
  290. url, scheme, _coerce_result = _coerce_args(url, scheme)
  291. url = _remove_unsafe_bytes_from_url(url)
  292. scheme = _remove_unsafe_bytes_from_url(scheme)
  293. netloc = query = fragment = ""
  294. i = url.find(":")
  295. if i > 0:
  296. for c in url[:i]:
  297. if c not in scheme_chars:
  298. break
  299. else:
  300. scheme, url = url[:i].lower(), url[i + 1 :]
  301. if url[:2] == "//":
  302. netloc, url = _splitnetloc(url, 2)
  303. if ("[" in netloc and "]" not in netloc) or (
  304. "]" in netloc and "[" not in netloc
  305. ):
  306. raise ValueError("Invalid IPv6 URL")
  307. if "[" in netloc and "]" in netloc:
  308. bracketed_host = netloc.partition("[")[2].partition("]")[0]
  309. _check_bracketed_host(bracketed_host)
  310. if allow_fragments and "#" in url:
  311. url, fragment = url.split("#", 1)
  312. if "?" in url:
  313. url, query = url.split("?", 1)
  314. v = SplitResult(scheme, netloc, url, query, fragment)
  315. return _coerce_result(v)
  316. def _url_has_allowed_host_and_scheme(url, allowed_hosts, require_https=False):
  317. # Chrome considers any URL with more than two slashes to be absolute, but
  318. # urlparse is not so flexible. Treat any url with three slashes as unsafe.
  319. if url.startswith("///"):
  320. return False
  321. try:
  322. url_info = _urlparse(url)
  323. except ValueError: # e.g. invalid IPv6 addresses
  324. return False
  325. # Forbid URLs like http:///example.com - with a scheme, but without a hostname.
  326. # In that URL, example.com is not the hostname but, a path component. However,
  327. # Chrome will still consider example.com to be the hostname, so we must not
  328. # allow this syntax.
  329. if not url_info.netloc and url_info.scheme:
  330. return False
  331. # Forbid URLs that start with control characters. Some browsers (like
  332. # Chrome) ignore quite a few control characters at the start of a
  333. # URL and might consider the URL as scheme relative.
  334. if unicodedata.category(url[0])[0] == "C":
  335. return False
  336. scheme = url_info.scheme
  337. # Consider URLs without a scheme (e.g. //example.com/p) to be http.
  338. if not url_info.scheme and url_info.netloc:
  339. scheme = "http"
  340. valid_schemes = ["https"] if require_https else ["http", "https"]
  341. return (not url_info.netloc or url_info.netloc in allowed_hosts) and (
  342. not scheme or scheme in valid_schemes
  343. )
  344. def escape_leading_slashes(url):
  345. """
  346. If redirecting to an absolute path (two leading slashes), a slash must be
  347. escaped to prevent browsers from handling the path as schemaless and
  348. redirecting to another host.
  349. """
  350. if url.startswith("//"):
  351. url = "/%2F{}".format(url[2:])
  352. return url
  353. def _parseparam(s):
  354. while s[:1] == ";":
  355. s = s[1:]
  356. end = s.find(";")
  357. while end > 0 and (s.count('"', 0, end) - s.count('\\"', 0, end)) % 2:
  358. end = s.find(";", end + 1)
  359. if end < 0:
  360. end = len(s)
  361. f = s[:end]
  362. yield f.strip()
  363. s = s[end:]
  364. def parse_header_parameters(line):
  365. """
  366. Parse a Content-type like header.
  367. Return the main content-type and a dictionary of options.
  368. """
  369. parts = _parseparam(";" + line)
  370. key = parts.__next__().lower()
  371. pdict = {}
  372. for p in parts:
  373. i = p.find("=")
  374. if i >= 0:
  375. has_encoding = False
  376. name = p[:i].strip().lower()
  377. if name.endswith("*"):
  378. # Lang/encoding embedded in the value (like "filename*=UTF-8''file.ext")
  379. # https://tools.ietf.org/html/rfc2231#section-4
  380. name = name[:-1]
  381. if p.count("'") == 2:
  382. has_encoding = True
  383. value = p[i + 1 :].strip()
  384. if len(value) >= 2 and value[0] == value[-1] == '"':
  385. value = value[1:-1]
  386. value = value.replace("\\\\", "\\").replace('\\"', '"')
  387. if has_encoding:
  388. encoding, lang, value = value.split("'")
  389. value = unquote(value, encoding=encoding)
  390. pdict[name] = value
  391. return key, pdict
  392. def content_disposition_header(as_attachment, filename):
  393. """
  394. Construct a Content-Disposition HTTP header value from the given filename
  395. as specified by RFC 6266.
  396. """
  397. if filename:
  398. disposition = "attachment" if as_attachment else "inline"
  399. try:
  400. filename.encode("ascii")
  401. file_expr = 'filename="{}"'.format(
  402. filename.replace("\\", "\\\\").replace('"', r"\"")
  403. )
  404. except UnicodeEncodeError:
  405. file_expr = "filename*=utf-8''{}".format(quote(filename))
  406. return f"{disposition}; {file_expr}"
  407. elif as_attachment:
  408. return "attachment"
  409. else:
  410. return None