How to Avoid Issues When Waiting On Multiple Events in Python Asyncio

Asyncio is a powerful way to implement multitasking in Python but failing to use it properly can lead to poor code performance.

David Plummer
Python in Plain English

--

Cover image for the article showing an abstract metallic design.

What is asyncio?

Asyncio is a Python library that implements cooperative multitasking for code that frequently needs to pause and wait for input/output (I/O) operations. It is co-operative in the sense that each piece of code must explicitly stop and hand control back to the scheduler so that other parts of the code may run, and only one piece of code can run at any one time.

This approach to multitasking works efficiently when the amount of time tasks spend executing is very short relative to the time they spend waiting. A common use case is a web server, for which the server spends a lot of time waiting for a user to make a request relative to the amount of time needed to process it.

Asyncio executes tasks one at a time. Each task must pause execution for other tasks to run. A task is scheduled to run if it is not waiting on an external event. When tasks complete they are removed from the event loop.

In the case where a task is waiting on a single event, such as a timer or response to a web request, implementation using the asyncio await keyword is straightforward. However, when waiting for several tasks to complete, for example waiting to get data from multiple sources in order to complete a web response, can lead to problems that are not immediately apparent from the documentation.

Hold tight to the handle

Python memory management uses reference counting to determine when an object is no longer needed and can be removed from memory. In most situations the programmer doesn’t need to consciously think about reference counting because it is clear from the code when variables go out of scope, the reference count drops to zero and memory can be released.

However, with asyncio this is not always the case. In the following contrived code snippet, we create two tasks: boil a kettle and clean cups.

def make_cups_of_tea():
boiling_kettle = asyncio.create_task(boil_kettle(1.5))
cleaning_cups = asyncio.create_task(clean_cups(4))
# boiling_kettle and cleaning_cups deleted when function
goes out of scope.

When function make_cups_of_tea completes the reference count on both boiling_kettle and cleaning_cups drop to zero and the references to the created tasks are deleted. However, it is only the references that are deleted, the tasks themselves are not deleted and remain on the event loop.

When create_task is called a reference to the task is returned to the calling coroutine (task handle), and a reference is also held by the scheduler so that tasks can be executed in rotation. As stated above, when the reference count on a task drops to zero, then the task is deleted. However, when executed in the above snippet passes outside the scope of the function then the tasks are not deleted because the scheduler is still holding a reference to the task.

The newly created task has two references: One from the code and the other from the event loop scheduler.

To remove the task from the event loop the function needs to cancel the task. It is only possible to do this using the task handle (a reference to the task). If the task handle is lost before the task completes then canceling the task becomes icky— if you don’t retain the task handle you could query the event loop for a list of unfinished tasks and determine which task needs to be canceled, though this is time-consuming and requires forethought to individually name tasks.

The programmer must make sure that the task handle is retained until the task completes. This is not a default behaviour in Python.

Waiting on multiple events

For the majority of asyncio examples, where the demonstration code is waiting on a single event, the issue of retaining a task handle until the task completes is not an issue. In these cases, the code blocks until the task complete, in which case the task has already been removed from the event loop when the coroutine returns a value.

result = await coroutine()
# Task is removed from the event loop once coroutine finishes and
returns a value.

However, when waiting on multiple coroutines there is no guarantee that all tasks will terminate at the same time and we may wish to wait until all tasks are complete.

For example, if we want to make a cup of tea, we may need to wait for the kettle to boil and cups to be cleaned before we can put a tea bag in the cup and add hot water. If we use two await statements sequentially we can boil the kettle and then clean the cups. This allows us to do other tasks whilst we wait for the kettle and cups to be cleaned, however, it doesn’t allow us to boil the kettle and clean the cups at the same time.

async def boil_kettle(water_volume: float) -> str:
await asyncio.sleep(30 + water_volume * 30)
return "Kettle boiled."
async def clean_cups(number_of_cups: int) -> str:
await asyncio.sleep(number_of_cups * 15)
return "Cups cleaned."
# Sequential execution, awaiting each coroutine in turn.
await boil_kettle(1.5) # 1.5 litres in kettle
await clean_cups(4) # Four cups to clean

To execute the tasks in parallel we can use the asyncio wait function. This function takes an iterable of tasks and returns when the defined condition is met. The conditions are:

  • FIRST_COMPLETED — Returns when the first task completes.
  • ALL_COMPLETED — Returns when all tasks are complete. If an exception is raised in a task then it is noted, however, instead of stopping execution, all other tasks are allowed to complete.
  • FIRST_EXCEPTION —Same as ALL_COMPLETED with the change that the function immediately returns if an exception is raised by a task, irrespective of whether other tasks have been completed or not.

Using our example for making a cup of tea, the following code waits for both the kettle to boil and cups to be cleaned. The ability to pass coroutines into the wait function was deprecated in Python 3.8 but is included here to illustrate why this change was desirable.

# Calling with coroutines deprecated in Python 3.8 to be removed in 3.11
done, pending = await asyncio.wait(
[boil_kettle(1.5), clean_cups(4)],
return_when=asyncio.ALL_COMPLETED
)

Coroutines cannot be called directly, they need to be scheduled as a task on the event loop so that they can be executed at some time in the future. Therefore, when the coroutines are passed to wait a task is created which is scheduled to be executed in the future. As the task is created within the wait function the task handles are not immediately available to the outer calling routine. So the outer calling routine the tasks, not the return values, are passed back in the two lists done and pending.

This creates two key issues:

  • If we want to check whether a coroutine completed we could search the done list using coroutine_name in done, however, this will not work as the list is a list of tasks, not the calling coroutines.
  • It is not obvious that the function wraps coroutines in tasks on the event loop. Therefore, if we are executing coroutines that don’t return values it is too easy to write await asyncio.wait(... without capturing handles to the tasks. If there are unfinished tasks when the function returns we are unable to terminate them. This may leave never-ending tasks on the event loop which won't terminate until the program ends, potentially filling memory if the routine is called frequently.

Managing the lifecycle of tasks

Therefore, it is better practice to explicitly manage the lifecycle of the tasks by creating them prior to calling asyncio.wait and then testing that all tasks completed, cancelling any uncompleted tasks if necessary. Using the example of making a cup of tea, the following code creates two tasks and then waits for one of the tasks to complete.

Tasks are created using asyncio.create_task. Note, it is not necessary to name tasks although it does make it easier to identify tasks when debugging and when error messages are generated.

When the first task completes asyncio.wait returns two lists: done and pending. We can iterate over each of these lists:

  • pending contains tasks that have not been completed. These are still scheduled to execute and, if we exit the main function without canceling them, they will remain on the event loop until they terminate — which could be indefinite.
  • done contains tasks that have been completed (in this case only one task, the first to complete). The results from the task are obtained by calling task.results().

Note: Pending tasks should be canceled before the result of any completed task is requested. If a completed task is terminated as a result of an exception then that exception will only be re-thrown when task.results() is called. If this occurs before pending tasks are canceled then they may remain on the event loop. Exceptions will be explored in more detail in the next article.

Asyncio.gather

This article would not be complete without including asyncio.gather. The function may be considered a high-level wrapper around asyncio.wait with the parameter return_when=ALL_COMPLETE. A common use case is when the code is waiting for several tasks to complete before continuing.

async def main():
results = await asyncio.gather(boil_kettle(1.5), clean_cups(4))
print(results)

The most significant difference from asyncio.wait is asyncio.gather is called with coroutines as arguments to the function, not as an iterable of tasks. The function waits for all co-routines to complete before returning their results in a list (with the same ordering as coroutines in the argument list). So, for our kettle example, the results are as follows.

['Kettle boiled.', 'Cups cleaned.']

asyncio.gather creates and removes tasks automatically, which we would expect if all tasks have to complete before the calling function can continue. However, this simplicity can hide a nasty surprise. The default value of return_exceptions is to return control to the calling function if a coroutine throws an exception — even when other coroutines are still executing. This could leave pending tasks on the event loop, that cannot be accessed and may run indefinitely. It is essential that coroutines are designed to terminate so that they do not persist should another coroutine throw an exception.

This article is focused on asyncio.wait assuming that coroutines do not throw exceptions; the next article will look at how exceptions are captured and propagated to the calling function.

Enjoyed reading this article? Don’t forget to follow me for more articles and, if you are not already a member, sign up for a Medium membership using my affiliate link to access all of my subscriber content and that of other greater writers.

More content at PlainEnglish.io. Sign up for our free weekly newsletter. Follow us on Twitter and LinkedIn. Join our community Discord.

--

--

Writing on systems thinking; data analytics in health and care; and anything else that makes the grey cells itch.