local.py 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131
  1. import asyncio
  2. import contextlib
  3. import contextvars
  4. import threading
  5. from typing import Any, Dict, Union
  6. class _CVar:
  7. """Storage utility for Local."""
  8. def __init__(self) -> None:
  9. self._data: "contextvars.ContextVar[Dict[str, Any]]" = contextvars.ContextVar(
  10. "asgiref.local"
  11. )
  12. def __getattr__(self, key):
  13. storage_object = self._data.get({})
  14. try:
  15. return storage_object[key]
  16. except KeyError:
  17. raise AttributeError(f"{self!r} object has no attribute {key!r}")
  18. def __setattr__(self, key: str, value: Any) -> None:
  19. if key == "_data":
  20. return super().__setattr__(key, value)
  21. storage_object = self._data.get({}).copy()
  22. storage_object[key] = value
  23. self._data.set(storage_object)
  24. def __delattr__(self, key: str) -> None:
  25. storage_object = self._data.get({}).copy()
  26. if key in storage_object:
  27. del storage_object[key]
  28. self._data.set(storage_object)
  29. else:
  30. raise AttributeError(f"{self!r} object has no attribute {key!r}")
  31. class Local:
  32. """Local storage for async tasks.
  33. This is a namespace object (similar to `threading.local`) where data is
  34. also local to the current async task (if there is one).
  35. In async threads, local means in the same sense as the `contextvars`
  36. module - i.e. a value set in an async frame will be visible:
  37. - to other async code `await`-ed from this frame.
  38. - to tasks spawned using `asyncio` utilities (`create_task`, `wait_for`,
  39. `gather` and probably others).
  40. - to code scheduled in a sync thread using `sync_to_async`
  41. In "sync" threads (a thread with no async event loop running), the
  42. data is thread-local, but additionally shared with async code executed
  43. via the `async_to_sync` utility, which schedules async code in a new thread
  44. and copies context across to that thread.
  45. If `thread_critical` is True, then the local will only be visible per-thread,
  46. behaving exactly like `threading.local` if the thread is sync, and as
  47. `contextvars` if the thread is async. This allows genuinely thread-sensitive
  48. code (such as DB handles) to be kept stricly to their initial thread and
  49. disable the sharing across `sync_to_async` and `async_to_sync` wrapped calls.
  50. Unlike plain `contextvars` objects, this utility is threadsafe.
  51. """
  52. def __init__(self, thread_critical: bool = False) -> None:
  53. self._thread_critical = thread_critical
  54. self._thread_lock = threading.RLock()
  55. self._storage: "Union[threading.local, _CVar]"
  56. if thread_critical:
  57. # Thread-local storage
  58. self._storage = threading.local()
  59. else:
  60. # Contextvar storage
  61. self._storage = _CVar()
  62. @contextlib.contextmanager
  63. def _lock_storage(self):
  64. # Thread safe access to storage
  65. if self._thread_critical:
  66. is_async = True
  67. try:
  68. # this is a test for are we in a async or sync
  69. # thread - will raise RuntimeError if there is
  70. # no current loop
  71. asyncio.get_running_loop()
  72. except RuntimeError:
  73. is_async = False
  74. if not is_async:
  75. # We are in a sync thread, the storage is
  76. # just the plain thread local (i.e, "global within
  77. # this thread" - it doesn't matter where you are
  78. # in a call stack you see the same storage)
  79. yield self._storage
  80. else:
  81. # We are in an async thread - storage is still
  82. # local to this thread, but additionally should
  83. # behave like a context var (is only visible with
  84. # the same async call stack)
  85. # Ensure context exists in the current thread
  86. if not hasattr(self._storage, "cvar"):
  87. self._storage.cvar = _CVar()
  88. # self._storage is a thread local, so the members
  89. # can't be accessed in another thread (we don't
  90. # need any locks)
  91. yield self._storage.cvar
  92. else:
  93. # Lock for thread_critical=False as other threads
  94. # can access the exact same storage object
  95. with self._thread_lock:
  96. yield self._storage
  97. def __getattr__(self, key):
  98. with self._lock_storage() as storage:
  99. return getattr(storage, key)
  100. def __setattr__(self, key, value):
  101. if key in ("_local", "_storage", "_thread_critical", "_thread_lock"):
  102. return super().__setattr__(key, value)
  103. with self._lock_storage() as storage:
  104. setattr(storage, key, value)
  105. def __delattr__(self, key):
  106. with self._lock_storage() as storage:
  107. delattr(storage, key)