Effective Python Async like a PRO ππ
I noticed some people using the async syntax without knowing what they were doing.
First, they think async is parallel which is not true as I explain in another article.
Then they write code that doesn't take any advantage of Python async. In other words, they write sync code with async syntax.
The goal of this post is to point out these performance issues and help you benefit the most from async code.
π€ When to use Python Async
Async only makes sense if you're doing IO.
There's ZERO benefit in using async to stuff like this that is CPU-bound:
import asyncio
async def sum_two_numbers_async(n1: int, n2: int) -> int:
return n1 + n2
async def main():
await sum_two_numbers_async(2, 2)
await sum_two_numbers_async(4, 4)
asyncio.run(main())
Your code might even get slower by doing that due to the Event Loop.
That's because Python async only optimizes IDLE time!
If these concepts are new to you, read this article first:
IO-bound operations are related to reading/writing operations.
A good example would be:
- Requesting some data from HTTP
- Reading/Writing some
json
/txt
file - Reading data from a database
π All these operations consist of waiting for the data to be available.
While the data is UNAVAILABLE the EVENT LOOP does something else.
This is Concurrency.
NOT Parallelism.
πΌοΈ Python Async Await Example
Let's set up a scenario to get started.
We need to build a simple Pokedex that queries for 3 pokemons simultaneously (so we benefit from async).
After querying the pokemons we're going to build an object with them, so:
Step | Operation type |
---|---|
Query pokeapi.co | IO-bound |
Build an object holding the data | CPU-bound |
I'll be using pydantic
for model parsing and httpx
for HTTP as its syntax is compatible with requests
.
π Use Python async
and await
Let's start with the basic scenario that everybody writes and proudly says: "the code is async".
Take your time to visualize:
- The model class
- The
parse_pokemon
function (CPU-bound) - The
get_pokemon
function (IO-bound) - The
get_all
function
import asyncio
from datetime import timedelta
import time
import httpx
from pydantic import BaseModel
class Pokemon(BaseModel): # π Defines model to parse pokemon
name: str
types: list[str]
def parse_pokemon(pokemon_data: dict) -> Pokemon: # π CPU-bound operation
print("π Parsing pokemon")
poke_types = []
for poke_type in pokemon_data["types"]:
poke_types.append(poke_type["type"]["name"])
return Pokemon(name=pokemon_data['name'], types=poke_types)
async def get_pokemon(name: str) -> dict | None: # π IO-bound operation
async with httpx.AsyncClient() as client:
print(f"π Querying for '{name}'")
resp = await client.get(f"https://pokeapi.co/api/v2/pokemon/{name}")
print(f"π Got data for '{name}'")
try:
resp.raise_for_status()
except httpx.HTTPStatusError as err:
if err.response.status_code == 404:
return None
raise
else:
return resp.json()
async def get_all(*names: str): # π Async
started_at = time.time()
for name in names: # π Iterates over all names
if data := await get_pokemon(name): # π Invokes async function
pokemon = parse_pokemon(data)
print(f"π {pokemon.name} is of type(s) {','.join(pokemon.types)}")
else:
print(f"β No data found for '{name}'")
finished_at = time.time()
elapsed_time = finished_at - started_at
print(f"β²οΈ Done in {timedelta(seconds=elapsed_time)}")
POKE_NAMES = ["blaziken", "pikachu", "lugia", "bad_name"]
asyncio.run(get_all(*POKE_NAMES))
This produces the following output:
π Querying for 'blaziken'
π Got data for 'blaziken'
π Parsing pokemon
π blaziken is of type(s) fire,fighting
π Querying for 'pikachu'
π Got data for 'pikachu'
π Parsing pokemon
π pikachu is of type(s) electric
π Querying for 'lugia'
π Got data for 'lugia'
π Parsing pokemon
π lugia is of type(s) psychic,flying
π Querying for 'bad_name'
π Got data for 'bad_name'
β No data found for 'bad_name'
β²οΈ Done in 0:00:02.152331
This is bad usage for this scenario.
If you analyze the output you'll understand that:
We're requesting one HTTP resource at a time thus it doesn't matter if we use async or not.
Let's fix that! π§βπ
Use Python asyncio.create_task
and asyncio.gather
If you want 2 or more functions to run concurrently, you need asyncio.create_task
.
Creating a task triggers the async operation, and it needs to be awaited at some point.
For example:
task = create_task(my_async_function('arg1'))
result = await task
As we're creating many tasks, we need asyncio.gather
which awaits all tasks to be done.
This is our code now (check the get_all
function):
import asyncio
from datetime import timedelta
import time
import httpx
from pydantic import BaseModel
class Pokemon(BaseModel):
name: str
types: list[str]
def parse_pokemon(pokemon_data: dict) -> Pokemon:
print("π Parsing pokemon")
poke_types = []
for poke_type in pokemon_data["types"]:
poke_types.append(poke_type["type"]["name"])
return Pokemon(name=pokemon_data['name'], types=poke_types)
async def get_pokemon(name: str) -> dict | None:
async with httpx.AsyncClient() as client:
print(f"π Querying for '{name}'")
resp = await client.get(f"https://pokeapi.co/api/v2/pokemon/{name}")
print(f"π Got data for '{name}'")
try:
resp.raise_for_status()
except httpx.HTTPStatusError as err:
if err.response.status_code == 404:
return None
raise
else:
return resp.json()
async def get_all(*names: str):
started_at = time.time()
# π Create tasks, so we start requesting all of them concurrently
tasks = [asyncio.create_task(get_pokemon(name)) for name in names]
# π Await ALL
results = await asyncio.gather(*tasks)
for result in results:
if result:
pokemon = parse_pokemon(result)
print(f"π {pokemon.name} is of type(s) {','.join(pokemon.types)}")
else:
print(f"β No data found for...")
finished_at = time.time()
elapsed_time = finished_at - started_at
print(f"β²οΈ Done in {timedelta(seconds=elapsed_time)}")
POKE_NAMES = ["blaziken", "pikachu", "lugia", "bad_name"]
asyncio.run(get_all(*POKE_NAMES))
And this is the output:
π Querying for 'blaziken'
π Querying for 'pikachu'
π Querying for 'lugia'
π Querying for 'bad_name'
π Got data for 'lugia'
π Got data for 'blaziken'
π Got data for 'pikachu'
π Got data for 'bad_name'
π Parsing pokemon
π blaziken is of type(s) fire,fighting
π Parsing pokemon
π pikachu is of type(s) electric
π Parsing pokemon
π lugia is of type(s) psychic,flying
β No data found for...
β²οΈ Done in 0:00:00.495780
We dropped from ~2s to 500ms just by using Python async correctly.
Note how:
- We query everything right away in the order passed (e.g.
blaziken
first) - We retrieve the data in a random order as they become available (e.g. Now
lugia
comes first) - We parse the data in sequence (it's CPU-bound anyway)
Use Python asyncio.as_completed
There will be moments when you don't have to await for every single task to be processed right away.
That's similar to our scenario, we can start parsing the data right after the first data becomes available.
We do this by using asyncio.as_completed
which returns a generator with completed coroutines:
import asyncio
from datetime import timedelta
import time
import httpx
from pydantic import BaseModel
class Pokemon(BaseModel):
name: str
types: list[str]
def parse_pokemon(pokemon_data: dict) -> Pokemon:
print(f"π Parsing pokemon '{pokemon_data['name']}'")
poke_types = []
for poke_type in pokemon_data["types"]:
poke_types.append(poke_type["type"]["name"])
return Pokemon(name=pokemon_data['name'], types=poke_types)
async def get_pokemon(name: str) -> dict | None:
async with httpx.AsyncClient() as client:
print(f"π Querying for '{name}'")
resp = await client.get(f"https://pokeapi.co/api/v2/pokemon/{name}")
print(f"π Got data for '{name}'")
try:
resp.raise_for_status()
except httpx.HTTPStatusError as err:
if err.response.status_code == 404:
return None
raise
else:
return resp.json()
async def get_all(*names: str):
started_at = time.time()
tasks = [asyncio.create_task(get_pokemon(name)) for name in names]
# π Process the tasks individually as they become available
for coro in asyncio.as_completed(tasks):
result = await coro # π You still need to await
if result:
pokemon = parse_pokemon(result)
print(f"π {pokemon.name} is of type(s) {','.join(pokemon.types)}")
else:
print(f"β No data found for...")
finished_at = time.time()
elapsed_time = finished_at - started_at
print(f"β²οΈ Done in {timedelta(seconds=elapsed_time)}")
POKE_NAMES = ["blaziken", "pikachu", "lugia", "bad_name"]
asyncio.run(get_all(*POKE_NAMES))
The benefit is not easily visible:
π Querying for 'blaziken'
π Querying for 'pikachu'
π Querying for 'lugia'
π Querying for 'bad_name'
π Got data for 'blaziken'
π Parsing pokemon 'blaziken'
π blaziken is of type(s) fire,fighting
π Got data for 'bad_name'
π Got data for 'lugia'
π Got data for 'pikachu'
β No data found for...
π Parsing pokemon 'lugia'
π lugia is of type(s) psychic,flying
π Parsing pokemon 'pikachu'
π pikachu is of type(s) electric
β²οΈ Done in 0:00:00.316266
We still query everything at once (which is good).
Note how the order is completely mixed up though.
It means that Python processed the data as soon as it got available, giving enough time for other requests to finish later.
You'll become a better developer if you understand when/why to use async
, await
, create_task
, gather
, and as_completed
.
This is part of the book I'm currently writing. If you want to stop writing 'OK-code' that works and start writing 'GREAT-code', you should consider getting your copy before I finish writing it (I'll increase the price once it's released).
Get your copy here:
π Real-life scenario using Async IO
I'm currently working for another SF startup: Silk Security and we rely a lot on third-party integrations and their APIs.
We query a lot of data, and we need to do it as fast as possible.
For example, we query Snyk's API to collect code vulnerabilities.
Snyk's data is composed of Organizations that contain many Projects that contain many Issues.
It means that we need to list all projects and organizations before getting any issues.
So picture it as:
Note how many queries we need to do! We do them concurrently.
We need to be careful with rate limiting issues that the API may throw. To resolve that we limit the number of queries we do in a single shot, and we start running some processing before querying for more data.
This allows us to gain time and don't hit any rate limits imposed by the API while.
See a redacted code snippet from a real project running in production:
def _iter_grouped(self, issues: list[ResultType], group_count: int):
group_count = min([len(issues), group_count])
return zip(*[iter(issues)] * group_count)
async def get_issue_details(self):
...
# NOTE: We need to be careful here, we can't create tasks for every issue or Snyk will raise 449
# Instead, let's do it in chunks, and let's yield as it's done, so we can spend some time processing it
# and we can query Snyk again.
chunk_count = 4 # π Limit to 4 queries at a time
coro: Awaitable[tuple[ResultType | None]]
for issues in self._iter_grouped(issues, chunk_count):
tasks = [asyncio.create_task(self._get_data(project, issue)) for issue in issues]
for coro in asyncio.as_completed(tasks):
issue, details = await coro
yield issue, details
We can represent it as:
If you learned something new today consider giving me a follow on Twitter. I frequently share Python content and cool projects. DMs are open to any feedback.