AN INTRODUCTION TO ASYNCHRONOUS PROGRAMMING IN PYTHON

An Introduction To Asynchronous Programming In Python

Date Published:
Last Modified:

Overview

Asynchronous programming is a software paradigm which involves scheduling many small tasks that are invoked when events occurs. It is also known as event-driven programming. It is an alternative (although it can also be a complement) to both multi-threading and multiprocessing. Asynchronous programming is well suited to tasks which are IO bound and not CPU bound. It is well suited for IO bound applications because it allows other tasks to occurs while one task is blocked, waiting on some external process to complete. Because control is only given up explicitly with the await keyword, you do not have to worry about common multi-threading issues such as data contention. It is not well suited to CPU bound applications because it does not make use of multiple cores/CPUs.

Two core parts of Pythons asynchronous capabilities a provided through the await and async keywords. The rest of the functionality is largely supplied by the asyncio library. The asyncio library provides event loops and functions to create, run and await tasks. Event loops are the “runners” of asynchronous functions. They keep track of all the coroutines which are currently blocked waiting for an event, and continue these coroutines from where they left off once the event occurs.

When you wait for an event with the await keyword, Python can save the state of the function (i.e. the value of all the local variables, and the point of execution), and return to the active event loop. In the active event loop, the application can respond to other events while it is waiting. Once the specific event you waited on occurs, Python restores the state of the function and returns execution to that exact point is was saved at (they are very similar to Python generators).

Python's style of asynchronous programming goes a long way to prevent call-back hell. Call-back hell was a common problem in Javascript (and many other languages) before the use of futures and promises became popular. It occurred because the only way to perform asynchronous programming was to provide callbacks (lambda functions). These nested within each other, broke the flow of the code, and severely hindered the readability of the software.

However smart are flexible asynchronous programming may be, synchronous programming is still the bread-and-butter of the Python language. Unfortunately, the two don't mix that well (you can't await a synchronous function — and forgetting to await an asynchronous function will just return a coroutine object). You can think of synchronous Python and asynchronous Python as two separate programming styles, and most of your libraries have to be specifically designed to work with the style you are using.

What Is A Coroutine?

A coroutine is a Python function that has the keyword await before the def, e.g.:

async def my_coroutine():
  print('Hello')

Calling a coroutine normally won't actually do what you expect!

my_coroutine() # "Nothing" happens!

It would be wrong to say that nothing at all happens. Instead of calling the function, my_coroutine() creates and returns a coroutine object. This coroutine object can be waited on with:

await my_coroutine() # This time, 'Hello' will be printed

But please remember, await can only be called within a asynchronous function. So in reality, the call would have to look something like this:

async def my_coroutine_1():
  print('Hello')

async def my_coroutine_2():
  await my_coroutine_1() # This time, 'Hello' will be printed

So now you a probably thinking, since the parent function, and the parent's parent function, and the parent's parent's parent function all have to defined with async to be able to use await…where does it stop? What if my main() is not async? And even if that was, how would I call it? This is where the asyncio library comes into play.

So actually, I lied, you can actually call an async function from a non-async function, but you have to use asyncio to do so. The simplest way is to use asyncio.run(), which takes a coroutine, runs it in a new event loop, and then returns.

import asyncio

async def my_coroutine():
  await asyncio.sleep(1)
  print('Hello after 1s')

def main(): # NOTE: main() is not async!
  # We can call an async function from a non-async function by using the asyncio library
  asyncio.run(my_coroutine)

if __name__ == '__main__':
  main()

If you forget to await all coroutines, Python will print the warning:

main.py:6: RuntimeWarning: coroutine 'my_coroutine' was never awaited

Before Python v3.5

Before Python v3.5, the async keyword is not available. You can however use a decorator to define a coroutine:

@asyncio.coroutine
def my_coroutine():
  print('Hello')

And instead of using await to call the above coroutine, you would use the yield from syntax:

yield from my_coroutine()

Calling Async Code From Sync

Invariably, at some point you will want to call asynchronous code from a synchronous function. What you can't do is:

def main():
  await my_coroutine() # ERROR: We can't use `await` inside a synchronous function (main() is synchronous)

However, remember that we can always pass control over to the event loop from synchronous code. The easiest way to do this is with asyncio.run():

def main():
  asyncio.run(my_coroutine) # Passes control to the event loop, which will run my_coroutine, and then return control to here.

Creating A Worker Model

Below is a Python snippet showing a worker/job application using asynchronous programming. 10 jobs are created. 3 workers are created which will process these 10 jobs. Each worker is started as a task with asyncio.create_task(). The jobs are fed to the workers via a asyncio.Queue. Each worker awaits a job on the queue, processes the job, and then waits for another one. Once all of the jobs are processed, the workers are terminated and the application exits.

import asyncio
import random

async def worker_fn(id: str, job_queue: asyncio.Queue) -> None:
    while True:
        sleep_for = await job_queue.get()

        print(f'Worker {id} sleeping for {sleep_for:.2}s.')
        await asyncio.sleep(sleep_for)
        print(f'Worker {id} woke up.')

        job_queue.task_done()

async def main() -> None:

    queue = asyncio.Queue()

    # Create jobs for workers to complete
    print(f'Creating jobs...')
    for i in range(0, 10):
        sleep_for_s = random.uniform(0.1, 1.0)
        queue.put_nowait(sleep_for_s)

    # Create three worker tasks
    print(f'Creating and starting workers...')
    workers = []
    for i in range(3):
        worker = asyncio.create_task(worker_fn(i, queue))
        workers.append(worker)

    print(f'Waiting for jobs to be completed.')
    await queue.join()
    print(f'Jobs finished.')

    print(f'Terminating workers...')
    for worker in workers:
        worker.cancel()

    await asyncio.gather(*workers, return_exceptions=True)
    print(f'Workers terminated. Example finished.')
    
if __name__ == '__main__':
    asyncio.run(main())
    exit(0)

Will produce the following output:

$ python worker_example.py 
Creating jobs...
Creating and starting workers...
Waiting for jobs to be completed.
Worker 0 sleeping for 0.15s.
Worker 1 sleeping for 0.39s.
Worker 2 sleeping for 0.49s.
Worker 0 woke up.
Worker 0 sleeping for 0.12s.
Worker 0 woke up.
Worker 0 sleeping for 0.7s.
Worker 1 woke up.
Worker 1 sleeping for 0.52s.
Worker 2 woke up.
Worker 2 sleeping for 0.63s.
Worker 1 woke up.
Worker 1 sleeping for 0.98s.
Worker 0 woke up.
Worker 0 sleeping for 0.33s.
Worker 2 woke up.
Worker 2 sleeping for 0.39s.
Worker 0 woke up.
Worker 2 woke up.
Worker 1 woke up.
Jobs finished.
Terminating workers...
Workers terminated. Example finished.

Make sure that you terminate all the tasks before terminating the application. If you terminate while a task is still waiting on a queue you will get the following warning:

Task was destroyed but it is pending!

Like this page? Upvote with shurikens!

Tags:

comments powered by Disqus