testing.py 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137
  1. import asyncio
  2. import contextvars
  3. import time
  4. from .compatibility import guarantee_single_callable
  5. from .timeout import timeout as async_timeout
  6. class ApplicationCommunicator:
  7. """
  8. Runs an ASGI application in a test mode, allowing sending of
  9. messages to it and retrieval of messages it sends.
  10. """
  11. def __init__(self, application, scope):
  12. self._future = None
  13. self.application = guarantee_single_callable(application)
  14. self.scope = scope
  15. self._input_queue = None
  16. self._output_queue = None
  17. # For Python 3.9 we need to lazily bind the queues, on 3.10+ they bind the
  18. # event loop lazily.
  19. @property
  20. def input_queue(self):
  21. if self._input_queue is None:
  22. self._input_queue = asyncio.Queue()
  23. return self._input_queue
  24. @property
  25. def output_queue(self):
  26. if self._output_queue is None:
  27. self._output_queue = asyncio.Queue()
  28. return self._output_queue
  29. @property
  30. def future(self):
  31. if self._future is None:
  32. # Clear context - this ensures that context vars set in the testing scope
  33. # are not "leaked" into the application which would normally begin with
  34. # an empty context. In Python >= 3.11 this could also be written as:
  35. # asyncio.create_task(..., context=contextvars.Context())
  36. self._future = contextvars.Context().run(
  37. asyncio.create_task,
  38. self.application(
  39. self.scope, self.input_queue.get, self.output_queue.put
  40. ),
  41. )
  42. return self._future
  43. async def wait(self, timeout=1):
  44. """
  45. Waits for the application to stop itself and returns any exceptions.
  46. """
  47. try:
  48. async with async_timeout(timeout):
  49. try:
  50. await self.future
  51. self.future.result()
  52. except asyncio.CancelledError:
  53. pass
  54. finally:
  55. if not self.future.done():
  56. self.future.cancel()
  57. try:
  58. await self.future
  59. except asyncio.CancelledError:
  60. pass
  61. def stop(self, exceptions=True):
  62. future = self._future
  63. if future is None:
  64. return
  65. if not future.done():
  66. future.cancel()
  67. elif exceptions:
  68. # Give a chance to raise any exceptions
  69. future.result()
  70. def __del__(self):
  71. # Clean up on deletion
  72. try:
  73. self.stop(exceptions=False)
  74. except RuntimeError:
  75. # Event loop already stopped
  76. pass
  77. async def send_input(self, message):
  78. """
  79. Sends a single message to the application
  80. """
  81. # Make sure there's not an exception to raise from the task
  82. if self.future.done():
  83. self.future.result()
  84. # Give it the message
  85. await self.input_queue.put(message)
  86. async def receive_output(self, timeout=1):
  87. """
  88. Receives a single message from the application, with optional timeout.
  89. """
  90. # Make sure there's not an exception to raise from the task
  91. if self.future.done():
  92. self.future.result()
  93. # Wait and receive the message
  94. try:
  95. async with async_timeout(timeout):
  96. return await self.output_queue.get()
  97. except asyncio.TimeoutError as e:
  98. # See if we have another error to raise inside
  99. if self.future.done():
  100. self.future.result()
  101. else:
  102. self.future.cancel()
  103. try:
  104. await self.future
  105. except asyncio.CancelledError:
  106. pass
  107. raise e
  108. async def receive_nothing(self, timeout=0.1, interval=0.01):
  109. """
  110. Checks that there is no message to receive in the given time.
  111. """
  112. # Make sure there's not an exception to raise from the task
  113. if self.future.done():
  114. self.future.result()
  115. # `interval` has precedence over `timeout`
  116. start = time.monotonic()
  117. while time.monotonic() - start < timeout:
  118. if not self.output_queue.empty():
  119. return False
  120. await asyncio.sleep(interval)
  121. return self.output_queue.empty()