Effective Python Async like a PRO πŸπŸ”€

gui commited a year ago · 🐍 Python async

I noticed some people using the async syntax without knowing what they were doing.

First, they think async is parallel which is not true as I explain in another article.

Then they write code that doesn't take any advantage of Python async. In other words, they write sync code with async syntax.

The goal of this post is to point out these performance issues and help you benefit the most from async code.

πŸ€” When to use Python Async

Async only makes sense if you're doing IO.

There's ZERO benefit in using async to stuff like this that is CPU-bound:

import asyncio


async def sum_two_numbers_async(n1: int, n2: int) -> int:
    return n1 + n2


async def main():
    await sum_two_numbers_async(2, 2)
    await sum_two_numbers_async(4, 4)


asyncio.run(main())

Your code might even get slower by doing that due to the Event Loop.

That's because Python async only optimizes IDLE time!

If these concepts are new to you, read this article first:

Async python in real life πŸπŸ”€
Await Async Python applied with real examples. I show a slow API server and a slow database, and explain why async is not parallel but concurrent....

IO-bound operations are related to reading/writing operations.

A good example would be:

πŸ‘† All these operations consist of waiting for the data to be available.

While the data is UNAVAILABLE the EVENT LOOP does something else.

This is Concurrency.

NOT Parallelism.

πŸ–ΌοΈ Python Async Await Example

Let's set up a scenario to get started.

We need to build a simple Pokedex that queries for 3 pokemons simultaneously (so we benefit from async).

After querying the pokemons we're going to build an object with them, so:

Step Operation type
Query pokeapi.co IO-bound
Build an object holding the data CPU-bound

I'll be using pydantic for model parsing and httpx for HTTP as its syntax is compatible with requests.

🐌 Use Python async and await

Let's start with the basic scenario that everybody writes and proudly says: "the code is async".

Take your time to visualize:

import asyncio
from datetime import timedelta
import time
import httpx
from pydantic import BaseModel

class Pokemon(BaseModel): # πŸ‘ˆ Defines model to parse pokemon
    name: str
    types: list[str]

def parse_pokemon(pokemon_data: dict) -> Pokemon: # πŸ‘ˆ CPU-bound operation
    print("πŸ”„ Parsing pokemon")

    poke_types = []
    for poke_type in pokemon_data["types"]:
        poke_types.append(poke_type["type"]["name"])

    return Pokemon(name=pokemon_data['name'], types=poke_types)

async def get_pokemon(name: str) -> dict | None: # πŸ‘ˆ IO-bound operation
    async with httpx.AsyncClient() as client:
        print(f"πŸ” Querying for '{name}'")
        resp = await client.get(f"https://pokeapi.co/api/v2/pokemon/{name}")
        print(f"πŸ™Œ Got data for '{name}'")

        try:
            resp.raise_for_status()

        except httpx.HTTPStatusError as err:
            if err.response.status_code == 404:
                return None

            raise

        else:
            return resp.json()

async def get_all(*names: str): # πŸ‘ˆ Async
    started_at = time.time()

    for name in names: # πŸ‘ˆ Iterates over all names
        if data := await get_pokemon(name): # πŸ‘ˆ Invokes async function
            pokemon = parse_pokemon(data)
            print(f"πŸ’ {pokemon.name} is of type(s) {','.join(pokemon.types)}")
        else:
            print(f"❌ No data found for '{name}'")

    finished_at = time.time()
    elapsed_time = finished_at - started_at
    print(f"⏲️ Done in {timedelta(seconds=elapsed_time)}")


POKE_NAMES = ["blaziken", "pikachu", "lugia", "bad_name"]
asyncio.run(get_all(*POKE_NAMES))

This produces the following output:

πŸ” Querying for 'blaziken'
πŸ™Œ Got data for 'blaziken'
πŸ”„ Parsing pokemon
πŸ’ blaziken is of type(s) fire,fighting

πŸ” Querying for 'pikachu'
πŸ™Œ Got data for 'pikachu'
πŸ”„ Parsing pokemon
πŸ’ pikachu is of type(s) electric

πŸ” Querying for 'lugia'
πŸ™Œ Got data for 'lugia'
πŸ”„ Parsing pokemon
πŸ’ lugia is of type(s) psychic,flying

πŸ” Querying for 'bad_name'
πŸ™Œ Got data for 'bad_name'
❌ No data found for 'bad_name'

⏲️ Done in 0:00:02.152331

This is bad usage for this scenario.

If you analyze the output you'll understand that:

We're requesting one HTTP resource at a time thus it doesn't matter if we use async or not.

Let's fix that! πŸ§‘β€πŸ­

Use Python asyncio.create_task and asyncio.gather

If you want 2 or more functions to run concurrently, you need asyncio.create_task.

Creating a task triggers the async operation, and it needs to be awaited at some point.

For example:

task = create_task(my_async_function('arg1'))
result = await task

As we're creating many tasks, we need asyncio.gather which awaits all tasks to be done.

This is our code now (check the get_all function):

import asyncio
from datetime import timedelta
import time
import httpx
from pydantic import BaseModel

class Pokemon(BaseModel):
    name: str
    types: list[str]

def parse_pokemon(pokemon_data: dict) -> Pokemon:
    print("πŸ”„ Parsing pokemon")

    poke_types = []
    for poke_type in pokemon_data["types"]:
        poke_types.append(poke_type["type"]["name"])

    return Pokemon(name=pokemon_data['name'], types=poke_types)

async def get_pokemon(name: str) -> dict | None:
    async with httpx.AsyncClient() as client:
        print(f"πŸ” Querying for '{name}'")
        resp = await client.get(f"https://pokeapi.co/api/v2/pokemon/{name}")
        print(f"πŸ™Œ Got data for '{name}'")

        try:
            resp.raise_for_status()

        except httpx.HTTPStatusError as err:
            if err.response.status_code == 404:
                return None

            raise

        else:
            return resp.json()

async def get_all(*names: str):
    started_at = time.time()

    # πŸ‘‡ Create tasks, so we start requesting all of them concurrently
    tasks = [asyncio.create_task(get_pokemon(name)) for name in names]

    # πŸ‘‡ Await ALL
    results = await asyncio.gather(*tasks)

    for result in results:
        if result:
            pokemon = parse_pokemon(result)
            print(f"πŸ’ {pokemon.name} is of type(s) {','.join(pokemon.types)}")
        else:
            print(f"❌ No data found for...")

    finished_at = time.time()
    elapsed_time = finished_at - started_at
    print(f"⏲️ Done in {timedelta(seconds=elapsed_time)}")


POKE_NAMES = ["blaziken", "pikachu", "lugia", "bad_name"]
asyncio.run(get_all(*POKE_NAMES))

And this is the output:

πŸ” Querying for 'blaziken'
πŸ” Querying for 'pikachu'
πŸ” Querying for 'lugia'
πŸ” Querying for 'bad_name'

πŸ™Œ Got data for 'lugia'
πŸ™Œ Got data for 'blaziken'
πŸ™Œ Got data for 'pikachu'
πŸ™Œ Got data for 'bad_name'

πŸ”„ Parsing pokemon
πŸ’ blaziken is of type(s) fire,fighting
πŸ”„ Parsing pokemon
πŸ’ pikachu is of type(s) electric
πŸ”„ Parsing pokemon
πŸ’ lugia is of type(s) psychic,flying
❌ No data found for...

⏲️ Done in 0:00:00.495780

We dropped from ~2s to 500ms just by using Python async correctly.

Note how:

Use Python asyncio.as_completed

There will be moments when you don't have to await for every single task to be processed right away.

That's similar to our scenario, we can start parsing the data right after the first data becomes available.

We do this by using asyncio.as_completed which returns a generator with completed coroutines:

import asyncio
from datetime import timedelta
import time
import httpx
from pydantic import BaseModel

class Pokemon(BaseModel):
    name: str
    types: list[str]

def parse_pokemon(pokemon_data: dict) -> Pokemon:
    print(f"πŸ”„ Parsing pokemon '{pokemon_data['name']}'")

    poke_types = []
    for poke_type in pokemon_data["types"]:
        poke_types.append(poke_type["type"]["name"])

    return Pokemon(name=pokemon_data['name'], types=poke_types)

async def get_pokemon(name: str) -> dict | None:
    async with httpx.AsyncClient() as client:
        print(f"πŸ” Querying for '{name}'")
        resp = await client.get(f"https://pokeapi.co/api/v2/pokemon/{name}")
        print(f"πŸ™Œ Got data for '{name}'")

        try:
            resp.raise_for_status()

        except httpx.HTTPStatusError as err:
            if err.response.status_code == 404:
                return None

            raise

        else:
            return resp.json()

async def get_all(*names: str):
    started_at = time.time()

    tasks = [asyncio.create_task(get_pokemon(name)) for name in names]

    # πŸ‘‡ Process the tasks individually as they become available
    for coro in asyncio.as_completed(tasks):
        result = await coro # πŸ‘ˆ You still need to await

        if result:
            pokemon = parse_pokemon(result)
            print(f"πŸ’ {pokemon.name} is of type(s) {','.join(pokemon.types)}")
        else:
            print(f"❌ No data found for...")

    finished_at = time.time()
    elapsed_time = finished_at - started_at
    print(f"⏲️ Done in {timedelta(seconds=elapsed_time)}")


POKE_NAMES = ["blaziken", "pikachu", "lugia", "bad_name"]
asyncio.run(get_all(*POKE_NAMES))

The benefit is not easily visible:

πŸ” Querying for 'blaziken'
πŸ” Querying for 'pikachu'
πŸ” Querying for 'lugia'
πŸ” Querying for 'bad_name'

πŸ™Œ Got data for 'blaziken'
πŸ”„ Parsing pokemon 'blaziken'
πŸ’ blaziken is of type(s) fire,fighting
πŸ™Œ Got data for 'bad_name'
πŸ™Œ Got data for 'lugia'
πŸ™Œ Got data for 'pikachu'
❌ No data found for...
πŸ”„ Parsing pokemon 'lugia'
πŸ’ lugia is of type(s) psychic,flying
πŸ”„ Parsing pokemon 'pikachu'
πŸ’ pikachu is of type(s) electric

⏲️ Done in 0:00:00.316266

We still query everything at once (which is good).

Note how the order is completely mixed up though.

It means that Python processed the data as soon as it got available, giving enough time for other requests to finish later.

You'll become a better developer if you understand when/why to use async, await, create_task, gather, and as_completed.

This is part of the book I'm currently writing. If you want to stop writing 'OK-code' that works and start writing 'GREAT-code', you should consider getting your copy before I finish writing it (I'll increase the price once it's released).

Get your copy here:

Python Like a PRO πŸπŸ“š Book
βš οΈπŸ“š This book is still under development (that’s why it’s so cheap right now, the price will increase once all chapters are published).You need to know what the hell you’re doing πŸ”₯🐍Python is one of the most flexible languages I have had contact with.Everything too flexible enhances the odds of ba…

πŸ”€ Real-life scenario using Async IO

I'm currently working for another SF startup: Silk Security and we rely a lot on third-party integrations and their APIs.

We query a lot of data, and we need to do it as fast as possible.

For example, we query Snyk's API to collect code vulnerabilities.

Snyk's data is composed of Organizations that contain many Projects that contain many Issues.

It means that we need to list all projects and organizations before getting any issues.

So picture it as:

Snyk Flow Overview

Note how many queries we need to do! We do them concurrently.

We need to be careful with rate limiting issues that the API may throw. To resolve that we limit the number of queries we do in a single shot, and we start running some processing before querying for more data.

This allows us to gain time and don't hit any rate limits imposed by the API while.

See a redacted code snippet from a real project running in production:

def _iter_grouped(self, issues: list[ResultType], group_count: int):
    group_count = min([len(issues), group_count])

    return zip(*[iter(issues)] * group_count)


async def get_issue_details(self):
    ...

    # NOTE: We need to be careful here, we can't create tasks for every issue or Snyk will raise 449
    # Instead, let's do it in chunks, and let's yield as it's done, so we can spend some time processing it
    # and we can query Snyk again.
    chunk_count = 4 # πŸ‘ˆ Limit to 4 queries at a time
    coro: Awaitable[tuple[ResultType | None]]
    for issues in self._iter_grouped(issues, chunk_count):
        tasks = [asyncio.create_task(self._get_data(project, issue)) for issue in issues]

        for coro in asyncio.as_completed(tasks):
            issue, details = await coro

            yield issue, details

We can represent it as:

Generators + asyncio.as_completed flow

If you learned something new today consider giving me a follow on Twitter. I frequently share Python content and cool projects. DMs are open to any feedback.

  • LinkedIn
  • Tumblr
  • Reddit
  • Google+
  • Pinterest
  • Pocket