👈

Concurrent data retrieval

2024-10-25

In data intensive applications many processes are I/O bound, meaning that their speed is limited not by CPU processing power but by the time spent waiting for I/O operations to complete, such as network requests or file reads. For example, when a client application requests data from a server, the request is sent, and while the server processes and sends back a response, the client is essentially waiting, or "idle." This waiting time can add up, especially when multiple requests are needed to retrieve large datasets. One solution would be to send off requests asynchronously, instead of synchronously.

Python is synchronous by default, running code line by line. Many applications in the code base I currently work with are synchronous and I believe some of them would benefit greatly from asynchronicity. Hence, I did some extra research in the context of the Python ecosystem. The examples throughout this post use asyncio, the implementation of coroutines in the Python ecosystem. It uses the async and await syntax to submit tasks to the event loop, which runs on a single thread. That's right, async Python code runs concurrently and not in parallel, which can be confusing at first but it's important to understand the difference. Before showing an example of concurrent data retrieval, let's go back to the origin.

Iterators

A prerequisite for understanding async functions, is an understanding of iterators. An iterator is an object that provides access to an element of a collection and can change its internal state to provide access to the next element (traversal). In Python, an iterator implements the special methods __next__() and __iter__(). Now, you might think of a list, but that is actually an iterable because it only has the __iter__() method, which produces an iterator when it is called. You can get an iterator from a list and call next() until the iterator raises StopIteration.

lst = [1,2,3]
iterator = iter(lst)
print(next(iterator))  # Output: 1
print(next(iterator))  # Output: 2
print(next(iterator))  # Output: 3
print(next(iterator))
...
StopIteration

Generators

A generator is a kind of iterator, but instead of having all the values available up front, each value is lazily evaluated. In Python terms, a value is yielded at each iteration. Also, a value from a generator can only be yielded once, since the generator only knows its current state each time it's yielding a value. This is why it can be memory-efficient to use a generator instead of a list.

def n_generator(n):
    for i in range(n):
        yield i

generator = n_generator(3)

for i in generator:
    print(i)
...
0
1
2

for i in generator:
    print(i)
...

The second time we try to print out items from the generator, they're gone. It is also possible to send values to a generator.

from typing import Generator

def square() -> Generator:
    while True:
        x = yield
        if x is not None:
            yield x ** 2

squarer = square()
squarer.send(2)
...
TypeError: can't send non-None value to a just-started generator

Interestingly, when we try to send a value to the generator, a TypeError is being raised. As pointed out in PEP 342, generators begin execution at the top of the function body, meaning that there is no yield that can receive a value that is not None. This is why we first need to call next with the generator as argument. David Beazley called this "priming", which covers it well in my opinion, since this operation is necessary before using the generator.

squarer = square()
next(squarer)
squarer.send(2)
...
4

next(squarer)  # continue to the next x = yield
squarer.send(3)
...
9

squarer.send(None)  # we can also 'prime' the generator like this
squarer.send(4)
...
16

Now that we understand how to send data to a generator, let's examine what would happen when the generator is wrapped in another function. This is where yield from comes into play. This Python feature is well described in this Stack Overflow issue. The following wrapper function is copied from the issue, complemented with type annotations which were introduced for generators in PEP 484.

from typing import Generator

def wrapper(gen: Generator) -> Generator:
    next(gen)
    while True:
        try:
            x = yield
            gen.send(x)
        except StopIteration:
            pass

squarer = square()
w = wrapper(squarer)
w.send(None)
w.send(2)
...
4

The wrapper primes the generator (gen) by implicitly calling __next__ with next(gen). It needs to handle StopIteration because this is raised from within gen when the generator is finished (PEP 255👴). All this logic can be refactored into something much more elegant:

def wrapper(gen: Generator) -> Generator:
    yield from gen

Like in the previous wrapper declaration, the yield from gives control to the sub-generator. The second wrapper declaration will yield the same outcome as the previous function, but with more concise code.

Coroutines

Coroutines are program components that can be paused and resumed. Since Python 3.4 it's possible to create a coroutine using the asyncio standard library and its @asyncio.coroutine decorator in combination with the yield from syntax above, as code execution in the wrapper is momentarily paused in favor of the code execution in the sub generator. Since yield from is now used used for two different purposes: to transfer control from one generator to a sub generator bidirectionally and to create sub routines. That is why the more intuitive and not-conflicting await syntax was introduced in Python 3.5.

import asyncio

async def square(x: float) -> float:
    return x ** 2

async def wrapper(x: float) -> float:
    return await square(x)

asyncio.run(wrapper(2))
...
4

You might have noticed that the wrapper function is now called using the asyncio.run function. This is needed to execute code on the event loop.

Event loop

An event loop is an orchestrator of coroutines and can pause and resume code efficiently, given that the code is I/O-bound. asyncio is one of Python's builtin implementations to achieve concurrency through coroutines and asynchronous programming. The diagram shows how the event loop "waits efficiently" on I/O operations.

  sequenceDiagram
    participant EL as "Event Loop"
    participant C1 as "Coroutine 1"
    participant C2 as "Coroutine 2"
    participant IO as "IO Operation"

    EL->>+C1: Start Coroutine 1
    C1-->>EL: Await IO Operation
    EL->>+C2: Start Coroutine 2
    C2-->>EL: Await IO Operation
    EL->>IO: Monitor IO Events
    IO-->>EL: IO Operation Complete
    EL->>+C1: Resume Coroutine 1
    C1-->>EL: Complete
    EL->>+C2: Resume Coroutine 2
    C2-->>EL: Complete

You might be asking yourself how the event loop knows when an IO operation has been completed. For that, asyncio leverages the select module which is an interface to the Unix system calls that are used to examine that status of a file descriptor of I/O channels. What does this mean? A file descriptor is like a receipt that a program receives when it opens a file or connection. The receipt can be used to read, write or close the associated file or connection. Unix system calls such as select, poll and epoll are indirectly used by asyncio to determine if a task is blocked.

Concurrency in a data retrieval context

Let's assume that we have an application that wants to requests many pages of data from another application. Often times we know how many pages of data we need to request in advance, because this information is returned with the first request. To reduce idle time of our single thread, we can create a list of tasks (requests), where we create one task for each page of data, and push this to the event loop.

from typing import Any
import httpx
import asyncio

async def get_items(client: httpx.AsyncClient, page: int = 0, page_size: int = 10) -> dict[str, Any]:
    r = await client.get(f"https://www.data_provider.com/api/items", params={"page": page, "pageSize": page_size})
    r.raise_for_status()
    print(f"Fetched items {page * page_size} - {(page + 1) * page_size}")  # show range of items
    return r.json()

async def fetch_all_items(client: httpx.AsyncClient, total_pages: int, page_size: int = 10) -> list[dict[str, Any]]:
    # create list of tasks
    tasks = [
        get_items(client, page=page, page_size=page_size)
        for page in range(total_pages)
    ]

    # push tasks to event loop
    try:
        results = await asyncio.gather(*tasks)
    except httpx.HTTPStatusError as e:
        print("Error fetching items: {e}")  # that's right, asyncio will propagate exceptions

    # flatten list of lists of items
    all_items = [item for result in results for item in result.get("items", [])]
    return all_items

In this example, a task would be a get_items call. For demonstration purposes, I printed the range of items that was retrieved. Since the tasks are ran concurrently, the event loop will determine when to perform which task, making the order of the tasks nondeterministic.

async def main():
    async with httpx.AsyncClient() as client:
        items = await fetch_all_items(client, 100)

if __name__ == "__main__":
    asyncio.run(main())
...
Fetched items 0 - 10
Fetched items 20 - 30
Fetched items 10 - 20
Fetched items 30 - 40

The next time fetch_all_items is called, the printed output might look different:

...
Fetched items 20 - 30
Fetched items 0 - 10
Fetched items 30 - 40
Fetched items 10 - 20

When dealing with concurrent data fetching applications, you will probably run into several problems like, like overloading the server or race conditions. Some of these problems can be solved with synchronization, which I will write about in more detail in a next post.

Built with ❤️ using 🦀