sync.py 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655
  1. import asyncio
  2. import asyncio.coroutines
  3. import contextvars
  4. import functools
  5. import inspect
  6. import os
  7. import sys
  8. import threading
  9. import warnings
  10. import weakref
  11. from concurrent.futures import Future, ThreadPoolExecutor
  12. from typing import (
  13. TYPE_CHECKING,
  14. Any,
  15. Awaitable,
  16. Callable,
  17. Coroutine,
  18. Dict,
  19. Generic,
  20. List,
  21. Optional,
  22. TypeVar,
  23. Union,
  24. overload,
  25. )
  26. from .current_thread_executor import CurrentThreadExecutor
  27. from .local import Local
  28. if sys.version_info >= (3, 10):
  29. from typing import ParamSpec
  30. else:
  31. from typing_extensions import ParamSpec
  32. if TYPE_CHECKING:
  33. # This is not available to import at runtime
  34. from _typeshed import OptExcInfo
  35. _F = TypeVar("_F", bound=Callable[..., Any])
  36. _P = ParamSpec("_P")
  37. _R = TypeVar("_R")
  38. def _restore_context(context: contextvars.Context) -> None:
  39. # Check for changes in contextvars, and set them to the current
  40. # context for downstream consumers
  41. for cvar in context:
  42. cvalue = context.get(cvar)
  43. try:
  44. if cvar.get() != cvalue:
  45. cvar.set(cvalue)
  46. except LookupError:
  47. cvar.set(cvalue)
  48. # Python 3.12 deprecates asyncio.iscoroutinefunction() as an alias for
  49. # inspect.iscoroutinefunction(), whilst also removing the _is_coroutine marker.
  50. # The latter is replaced with the inspect.markcoroutinefunction decorator.
  51. # Until 3.12 is the minimum supported Python version, provide a shim.
  52. if hasattr(inspect, "markcoroutinefunction"):
  53. iscoroutinefunction = inspect.iscoroutinefunction
  54. markcoroutinefunction: Callable[[_F], _F] = inspect.markcoroutinefunction
  55. else:
  56. iscoroutinefunction = asyncio.iscoroutinefunction # type: ignore[assignment]
  57. def markcoroutinefunction(func: _F) -> _F:
  58. func._is_coroutine = asyncio.coroutines._is_coroutine # type: ignore
  59. return func
  60. class AsyncSingleThreadContext:
  61. """Context manager to run async code inside the same thread.
  62. Normally, AsyncToSync functions run either inside a separate ThreadPoolExecutor or
  63. the main event loop if it exists. This context manager ensures that all AsyncToSync
  64. functions execute within the same thread.
  65. This context manager is re-entrant, so only the outer-most call to
  66. AsyncSingleThreadContext will set the context.
  67. Usage:
  68. >>> import asyncio
  69. >>> with AsyncSingleThreadContext():
  70. ... async_to_sync(asyncio.sleep(1))()
  71. """
  72. def __init__(self):
  73. self.token = None
  74. def __enter__(self):
  75. try:
  76. AsyncToSync.async_single_thread_context.get()
  77. except LookupError:
  78. self.token = AsyncToSync.async_single_thread_context.set(self)
  79. return self
  80. def __exit__(self, exc, value, tb):
  81. if not self.token:
  82. return
  83. executor = AsyncToSync.context_to_thread_executor.pop(self, None)
  84. if executor:
  85. executor.shutdown()
  86. AsyncToSync.async_single_thread_context.reset(self.token)
  87. class ThreadSensitiveContext:
  88. """Async context manager to manage context for thread sensitive mode
  89. This context manager controls which thread pool executor is used when in
  90. thread sensitive mode. By default, a single thread pool executor is shared
  91. within a process.
  92. The ThreadSensitiveContext() context manager may be used to specify a
  93. thread pool per context.
  94. This context manager is re-entrant, so only the outer-most call to
  95. ThreadSensitiveContext will set the context.
  96. Usage:
  97. >>> import time
  98. >>> async with ThreadSensitiveContext():
  99. ... await sync_to_async(time.sleep, 1)()
  100. """
  101. def __init__(self):
  102. self.token = None
  103. async def __aenter__(self):
  104. try:
  105. SyncToAsync.thread_sensitive_context.get()
  106. except LookupError:
  107. self.token = SyncToAsync.thread_sensitive_context.set(self)
  108. return self
  109. async def __aexit__(self, exc, value, tb):
  110. if not self.token:
  111. return
  112. executor = SyncToAsync.context_to_thread_executor.pop(self, None)
  113. if executor:
  114. executor.shutdown()
  115. SyncToAsync.thread_sensitive_context.reset(self.token)
  116. class AsyncToSync(Generic[_P, _R]):
  117. """
  118. Utility class which turns an awaitable that only works on the thread with
  119. the event loop into a synchronous callable that works in a subthread.
  120. If the call stack contains an async loop, the code runs there.
  121. Otherwise, the code runs in a new loop in a new thread.
  122. Either way, this thread then pauses and waits to run any thread_sensitive
  123. code called from further down the call stack using SyncToAsync, before
  124. finally exiting once the async task returns.
  125. """
  126. # Keeps a reference to the CurrentThreadExecutor in local context, so that
  127. # any sync_to_async inside the wrapped code can find it.
  128. executors: "Local" = Local()
  129. # When we can't find a CurrentThreadExecutor from the context, such as
  130. # inside create_task, we'll look it up here from the running event loop.
  131. loop_thread_executors: "Dict[asyncio.AbstractEventLoop, CurrentThreadExecutor]" = {}
  132. async_single_thread_context: "contextvars.ContextVar[AsyncSingleThreadContext]" = (
  133. contextvars.ContextVar("async_single_thread_context")
  134. )
  135. context_to_thread_executor: "weakref.WeakKeyDictionary[AsyncSingleThreadContext, ThreadPoolExecutor]" = (
  136. weakref.WeakKeyDictionary()
  137. )
  138. def __init__(
  139. self,
  140. awaitable: Union[
  141. Callable[_P, Coroutine[Any, Any, _R]],
  142. Callable[_P, Awaitable[_R]],
  143. ],
  144. force_new_loop: bool = False,
  145. ):
  146. if not callable(awaitable) or (
  147. not iscoroutinefunction(awaitable)
  148. and not iscoroutinefunction(getattr(awaitable, "__call__", awaitable))
  149. ):
  150. # Python does not have very reliable detection of async functions
  151. # (lots of false negatives) so this is just a warning.
  152. warnings.warn(
  153. "async_to_sync was passed a non-async-marked callable", stacklevel=2
  154. )
  155. self.awaitable = awaitable
  156. try:
  157. self.__self__ = self.awaitable.__self__ # type: ignore[union-attr]
  158. except AttributeError:
  159. pass
  160. self.force_new_loop = force_new_loop
  161. self.main_event_loop = None
  162. try:
  163. self.main_event_loop = asyncio.get_running_loop()
  164. except RuntimeError:
  165. # There's no event loop in this thread.
  166. pass
  167. def __call__(self, *args: _P.args, **kwargs: _P.kwargs) -> _R:
  168. __traceback_hide__ = True # noqa: F841
  169. if not self.force_new_loop and not self.main_event_loop:
  170. # There's no event loop in this thread. Look for the threadlocal if
  171. # we're inside SyncToAsync
  172. main_event_loop_pid = getattr(
  173. SyncToAsync.threadlocal, "main_event_loop_pid", None
  174. )
  175. # We make sure the parent loop is from the same process - if
  176. # they've forked, this is not going to be valid any more (#194)
  177. if main_event_loop_pid and main_event_loop_pid == os.getpid():
  178. self.main_event_loop = getattr(
  179. SyncToAsync.threadlocal, "main_event_loop", None
  180. )
  181. # You can't call AsyncToSync from a thread with a running event loop
  182. try:
  183. asyncio.get_running_loop()
  184. except RuntimeError:
  185. pass
  186. else:
  187. raise RuntimeError(
  188. "You cannot use AsyncToSync in the same thread as an async event loop - "
  189. "just await the async function directly."
  190. )
  191. # Make a future for the return information
  192. call_result: "Future[_R]" = Future()
  193. # Make a CurrentThreadExecutor we'll use to idle in this thread - we
  194. # need one for every sync frame, even if there's one above us in the
  195. # same thread.
  196. old_executor = getattr(self.executors, "current", None)
  197. current_executor = CurrentThreadExecutor(old_executor)
  198. self.executors.current = current_executor
  199. # Wrapping context in list so it can be reassigned from within
  200. # `main_wrap`.
  201. context = [contextvars.copy_context()]
  202. # Get task context so that parent task knows which task to propagate
  203. # an asyncio.CancelledError to.
  204. task_context = getattr(SyncToAsync.threadlocal, "task_context", None)
  205. # Use call_soon_threadsafe to schedule a synchronous callback on the
  206. # main event loop's thread if it's there, otherwise make a new loop
  207. # in this thread.
  208. try:
  209. awaitable = self.main_wrap(
  210. call_result,
  211. sys.exc_info(),
  212. task_context,
  213. context,
  214. # prepare an awaitable which can be passed as is to self.main_wrap,
  215. # so that `args` and `kwargs` don't need to be
  216. # destructured when passed to self.main_wrap
  217. # (which is required by `ParamSpec`)
  218. # as that may cause overlapping arguments
  219. self.awaitable(*args, **kwargs),
  220. )
  221. async def new_loop_wrap() -> None:
  222. loop = asyncio.get_running_loop()
  223. self.loop_thread_executors[loop] = current_executor
  224. try:
  225. await awaitable
  226. finally:
  227. del self.loop_thread_executors[loop]
  228. if self.main_event_loop is not None:
  229. try:
  230. self.main_event_loop.call_soon_threadsafe(
  231. self.main_event_loop.create_task, awaitable
  232. )
  233. except RuntimeError:
  234. running_in_main_event_loop = False
  235. else:
  236. running_in_main_event_loop = True
  237. # Run the CurrentThreadExecutor until the future is done.
  238. current_executor.run_until_future(call_result)
  239. else:
  240. running_in_main_event_loop = False
  241. if not running_in_main_event_loop:
  242. loop_executor = None
  243. if self.async_single_thread_context.get(None):
  244. single_thread_context = self.async_single_thread_context.get()
  245. if single_thread_context in self.context_to_thread_executor:
  246. loop_executor = self.context_to_thread_executor[
  247. single_thread_context
  248. ]
  249. else:
  250. loop_executor = ThreadPoolExecutor(max_workers=1)
  251. self.context_to_thread_executor[
  252. single_thread_context
  253. ] = loop_executor
  254. else:
  255. # Make our own event loop - in a new thread - and run inside that.
  256. loop_executor = ThreadPoolExecutor(max_workers=1)
  257. loop_future = loop_executor.submit(asyncio.run, new_loop_wrap())
  258. # Run the CurrentThreadExecutor until the future is done.
  259. current_executor.run_until_future(loop_future)
  260. # Wait for future and/or allow for exception propagation
  261. loop_future.result()
  262. finally:
  263. _restore_context(context[0])
  264. # Restore old current thread executor state
  265. self.executors.current = old_executor
  266. # Wait for results from the future.
  267. return call_result.result()
  268. def __get__(self, parent: Any, objtype: Any) -> Callable[_P, _R]:
  269. """
  270. Include self for methods
  271. """
  272. func = functools.partial(self.__call__, parent)
  273. return functools.update_wrapper(func, self.awaitable)
  274. async def main_wrap(
  275. self,
  276. call_result: "Future[_R]",
  277. exc_info: "OptExcInfo",
  278. task_context: "Optional[List[asyncio.Task[Any]]]",
  279. context: List[contextvars.Context],
  280. awaitable: Union[Coroutine[Any, Any, _R], Awaitable[_R]],
  281. ) -> None:
  282. """
  283. Wraps the awaitable with something that puts the result into the
  284. result/exception future.
  285. """
  286. __traceback_hide__ = True # noqa: F841
  287. if context is not None:
  288. _restore_context(context[0])
  289. current_task = asyncio.current_task()
  290. if current_task is not None and task_context is not None:
  291. task_context.append(current_task)
  292. try:
  293. # If we have an exception, run the function inside the except block
  294. # after raising it so exc_info is correctly populated.
  295. if exc_info[1]:
  296. try:
  297. raise exc_info[1]
  298. except BaseException:
  299. result = await awaitable
  300. else:
  301. result = await awaitable
  302. except BaseException as e:
  303. call_result.set_exception(e)
  304. else:
  305. call_result.set_result(result)
  306. finally:
  307. if current_task is not None and task_context is not None:
  308. task_context.remove(current_task)
  309. context[0] = contextvars.copy_context()
  310. class SyncToAsync(Generic[_P, _R]):
  311. """
  312. Utility class which turns a synchronous callable into an awaitable that
  313. runs in a threadpool. It also sets a threadlocal inside the thread so
  314. calls to AsyncToSync can escape it.
  315. If thread_sensitive is passed, the code will run in the same thread as any
  316. outer code. This is needed for underlying Python code that is not
  317. threadsafe (for example, code which handles SQLite database connections).
  318. If the outermost program is async (i.e. SyncToAsync is outermost), then
  319. this will be a dedicated single sub-thread that all sync code runs in,
  320. one after the other. If the outermost program is sync (i.e. AsyncToSync is
  321. outermost), this will just be the main thread. This is achieved by idling
  322. with a CurrentThreadExecutor while AsyncToSync is blocking its sync parent,
  323. rather than just blocking.
  324. If executor is passed in, that will be used instead of the loop's default executor.
  325. In order to pass in an executor, thread_sensitive must be set to False, otherwise
  326. a TypeError will be raised.
  327. """
  328. # Storage for main event loop references
  329. threadlocal = threading.local()
  330. # Single-thread executor for thread-sensitive code
  331. single_thread_executor = ThreadPoolExecutor(max_workers=1)
  332. # Maintain a contextvar for the current execution context. Optionally used
  333. # for thread sensitive mode.
  334. thread_sensitive_context: "contextvars.ContextVar[ThreadSensitiveContext]" = (
  335. contextvars.ContextVar("thread_sensitive_context")
  336. )
  337. # Contextvar that is used to detect if the single thread executor
  338. # would be awaited on while already being used in the same context
  339. deadlock_context: "contextvars.ContextVar[bool]" = contextvars.ContextVar(
  340. "deadlock_context"
  341. )
  342. # Maintaining a weak reference to the context ensures that thread pools are
  343. # erased once the context goes out of scope. This terminates the thread pool.
  344. context_to_thread_executor: "weakref.WeakKeyDictionary[ThreadSensitiveContext, ThreadPoolExecutor]" = (
  345. weakref.WeakKeyDictionary()
  346. )
  347. def __init__(
  348. self,
  349. func: Callable[_P, _R],
  350. thread_sensitive: bool = True,
  351. executor: Optional["ThreadPoolExecutor"] = None,
  352. context: Optional[contextvars.Context] = None,
  353. ) -> None:
  354. if (
  355. not callable(func)
  356. or iscoroutinefunction(func)
  357. or iscoroutinefunction(getattr(func, "__call__", func))
  358. ):
  359. raise TypeError("sync_to_async can only be applied to sync functions.")
  360. self.func = func
  361. self.context = context
  362. functools.update_wrapper(self, func)
  363. self._thread_sensitive = thread_sensitive
  364. markcoroutinefunction(self)
  365. if thread_sensitive and executor is not None:
  366. raise TypeError("executor must not be set when thread_sensitive is True")
  367. self._executor = executor
  368. try:
  369. self.__self__ = func.__self__ # type: ignore
  370. except AttributeError:
  371. pass
  372. async def __call__(self, *args: _P.args, **kwargs: _P.kwargs) -> _R:
  373. __traceback_hide__ = True # noqa: F841
  374. loop = asyncio.get_running_loop()
  375. # Work out what thread to run the code in
  376. if self._thread_sensitive:
  377. current_thread_executor = getattr(AsyncToSync.executors, "current", None)
  378. if current_thread_executor:
  379. # If we have a parent sync thread above somewhere, use that
  380. executor = current_thread_executor
  381. elif self.thread_sensitive_context.get(None):
  382. # If we have a way of retrieving the current context, attempt
  383. # to use a per-context thread pool executor
  384. thread_sensitive_context = self.thread_sensitive_context.get()
  385. if thread_sensitive_context in self.context_to_thread_executor:
  386. # Re-use thread executor in current context
  387. executor = self.context_to_thread_executor[thread_sensitive_context]
  388. else:
  389. # Create new thread executor in current context
  390. executor = ThreadPoolExecutor(max_workers=1)
  391. self.context_to_thread_executor[thread_sensitive_context] = executor
  392. elif loop in AsyncToSync.loop_thread_executors:
  393. # Re-use thread executor for running loop
  394. executor = AsyncToSync.loop_thread_executors[loop]
  395. elif self.deadlock_context.get(False):
  396. raise RuntimeError(
  397. "Single thread executor already being used, would deadlock"
  398. )
  399. else:
  400. # Otherwise, we run it in a fixed single thread
  401. executor = self.single_thread_executor
  402. self.deadlock_context.set(True)
  403. else:
  404. # Use the passed in executor, or the loop's default if it is None
  405. executor = self._executor
  406. context = contextvars.copy_context() if self.context is None else self.context
  407. child = functools.partial(self.func, *args, **kwargs)
  408. func = context.run
  409. task_context: List[asyncio.Task[Any]] = []
  410. # Run the code in the right thread
  411. exec_coro = loop.run_in_executor(
  412. executor,
  413. functools.partial(
  414. self.thread_handler,
  415. loop,
  416. sys.exc_info(),
  417. task_context,
  418. func,
  419. child,
  420. ),
  421. )
  422. ret: _R
  423. try:
  424. ret = await asyncio.shield(exec_coro)
  425. except asyncio.CancelledError:
  426. cancel_parent = True
  427. try:
  428. task = task_context[0]
  429. task.cancel()
  430. try:
  431. await task
  432. cancel_parent = False
  433. except asyncio.CancelledError:
  434. pass
  435. except IndexError:
  436. pass
  437. if exec_coro.done():
  438. raise
  439. if cancel_parent:
  440. exec_coro.cancel()
  441. ret = await exec_coro
  442. finally:
  443. if self.context is None:
  444. _restore_context(context)
  445. self.deadlock_context.set(False)
  446. return ret
  447. def __get__(
  448. self, parent: Any, objtype: Any
  449. ) -> Callable[_P, Coroutine[Any, Any, _R]]:
  450. """
  451. Include self for methods
  452. """
  453. func = functools.partial(self.__call__, parent)
  454. return functools.update_wrapper(func, self.func)
  455. def thread_handler(self, loop, exc_info, task_context, func, *args, **kwargs):
  456. """
  457. Wraps the sync application with exception handling.
  458. """
  459. __traceback_hide__ = True # noqa: F841
  460. # Set the threadlocal for AsyncToSync
  461. self.threadlocal.main_event_loop = loop
  462. self.threadlocal.main_event_loop_pid = os.getpid()
  463. self.threadlocal.task_context = task_context
  464. # Run the function
  465. # If we have an exception, run the function inside the except block
  466. # after raising it so exc_info is correctly populated.
  467. if exc_info[1]:
  468. try:
  469. raise exc_info[1]
  470. except BaseException:
  471. return func(*args, **kwargs)
  472. else:
  473. return func(*args, **kwargs)
  474. @overload
  475. def async_to_sync(
  476. *,
  477. force_new_loop: bool = False,
  478. ) -> Callable[
  479. [Union[Callable[_P, Coroutine[Any, Any, _R]], Callable[_P, Awaitable[_R]]]],
  480. Callable[_P, _R],
  481. ]:
  482. ...
  483. @overload
  484. def async_to_sync(
  485. awaitable: Union[
  486. Callable[_P, Coroutine[Any, Any, _R]],
  487. Callable[_P, Awaitable[_R]],
  488. ],
  489. *,
  490. force_new_loop: bool = False,
  491. ) -> Callable[_P, _R]:
  492. ...
  493. def async_to_sync(
  494. awaitable: Optional[
  495. Union[
  496. Callable[_P, Coroutine[Any, Any, _R]],
  497. Callable[_P, Awaitable[_R]],
  498. ]
  499. ] = None,
  500. *,
  501. force_new_loop: bool = False,
  502. ) -> Union[
  503. Callable[
  504. [Union[Callable[_P, Coroutine[Any, Any, _R]], Callable[_P, Awaitable[_R]]]],
  505. Callable[_P, _R],
  506. ],
  507. Callable[_P, _R],
  508. ]:
  509. if awaitable is None:
  510. return lambda f: AsyncToSync(
  511. f,
  512. force_new_loop=force_new_loop,
  513. )
  514. return AsyncToSync(
  515. awaitable,
  516. force_new_loop=force_new_loop,
  517. )
  518. @overload
  519. def sync_to_async(
  520. *,
  521. thread_sensitive: bool = True,
  522. executor: Optional["ThreadPoolExecutor"] = None,
  523. context: Optional[contextvars.Context] = None,
  524. ) -> Callable[[Callable[_P, _R]], Callable[_P, Coroutine[Any, Any, _R]]]:
  525. ...
  526. @overload
  527. def sync_to_async(
  528. func: Callable[_P, _R],
  529. *,
  530. thread_sensitive: bool = True,
  531. executor: Optional["ThreadPoolExecutor"] = None,
  532. context: Optional[contextvars.Context] = None,
  533. ) -> Callable[_P, Coroutine[Any, Any, _R]]:
  534. ...
  535. def sync_to_async(
  536. func: Optional[Callable[_P, _R]] = None,
  537. *,
  538. thread_sensitive: bool = True,
  539. executor: Optional["ThreadPoolExecutor"] = None,
  540. context: Optional[contextvars.Context] = None,
  541. ) -> Union[
  542. Callable[[Callable[_P, _R]], Callable[_P, Coroutine[Any, Any, _R]]],
  543. Callable[_P, Coroutine[Any, Any, _R]],
  544. ]:
  545. if func is None:
  546. return lambda f: SyncToAsync(
  547. f,
  548. thread_sensitive=thread_sensitive,
  549. executor=executor,
  550. context=context,
  551. )
  552. return SyncToAsync(
  553. func,
  554. thread_sensitive=thread_sensitive,
  555. executor=executor,
  556. context=context,
  557. )