Mastering Python Async IO with FastAPI
Grace Collins
Solutions Engineer · Leapcell
Since Python is an interpreted language, when used for back-end development, such as in the combination of Python + Django, compared to Java + Spring, its response time will be a bit longer. However, as long as the code is reasonable, the difference is not too significant. Even when Django uses the multi-process mode, its concurrent processing ability is still much weaker. Python has some solutions to improve concurrent processing capabilities. For example, using the asynchronous framework FastAPI, with its asynchronous capabilities, the concurrent processing ability of I/O-intensive tasks can be greatly enhanced. FastAPI is one of the fastest Python frameworks.
FastAPI as example
Let's first take a brief look at how to use FastAPI.
Example 1: Default Network Asynchronous IO
Installation:
pip install fastapi
Simple Server-side Code:
# main.py
from typing import Union
from fastapi import FastAPI
app = FastAPI()
@app.get("/")
async def read_root():
return {"Hello": "World"}
Startup:
uvicorn main:app --reload
We can see that, compared with other frameworks, the interface of FastAPI only has an additional async
keyword. The async
keyword defines the interface as asynchronous. From the return result alone, we can't tell the difference between FastAPI and other Python frameworks. The difference lies in concurrent access. When the server threads of FastAPI handle route requests, such as http://127.0.0.1:8000/
, if they encounter network I/O, they will no longer wait for it but handle other requests instead. When the network I/O is completed, the execution will resume. This asynchronous ability improves the processing ability of I/O-intensive tasks.
Example 2: Explicit Network Asynchronous IO
Let's look at another example. In the business code, an explicit asynchronous network request is initiated. For this network I/O, just like route requests, FastAPI will also handle it asynchronously.
# app.py
from fastapi import FastAPI, HTTPException
import httpx
app = FastAPI()
# Example of an asynchronous GET request
@app.get("/external-api")
async def call_external_api():
url = "https://leapcell.io"
async with httpx.AsyncClient() as client:
response = await client.get(url)
if response.status_code!= 200:
raise HTTPException(status_code=response.status_code, detail="Failed to fetch data")
return response.json()
If you want database I/O to be asynchronous, you need the support of asynchronous operations from the database driver or ORM.
Asynchronous IO
The core implementation of FastAPI's asynchrony is asynchronous I/O
. We can start a server with asynchronous processing capabilities directly using asynchronous I/O without using FastAPI.
import asyncio
from aiohttp import web
async def index(request):
await asyncio.sleep(1) # Simulate I/O operation
return web.Response(text='{"Hello": "World"}', content_type='application/json')
async def init(loop):
# Use the event loop to monitor web requests
app = web.Application(loop=loop)
app.router.add_route('GET', '/', index)
# Start the server, and the event loop monitors and processes web requests
srv = await loop.create_server(app.make_handler(), '127.0.0.1', 8000)
print('Server started at http://127.0.0.1:8000...')
return srv
# Explicitly get an event loop
loop = asyncio.get_event_loop()
# Start the event loop
loop.run_until_complete(init(loop))
loop.run_forever()
When this example is started, the return result of http://127.0.0.1:8000/
is the same as that of Example 1. The underlying implementation principle of asynchronous I/O is "coroutines" and "event loops".
Coroutines
async def index(request): await asyncio.sleep(1) # Simulate I/O operation return web.Response(text='{"Hello": "World"}', content_type='application/json')
The function index
is defined with async def
, which means it is a coroutine. The await
keyword is used before an I/O operation to tell the execution thread not to wait for this I/O operation. The calls of normal functions are implemented through the stack, and functions can only be called and executed one by one. However, a coroutine is a special kind of function (not a collaborative thread). It allows the thread to pause execution at the await
mark and switch to execute other tasks. When the I/O operation is completed, the execution will continue.
Let's take a look at the effect of multiple coroutines executing concurrently.
import asyncio from datetime import datetime async def coroutine3(): print(f"Coroutine 3 started at {datetime.now()}") await asyncio.sleep(1) # Simulate I/O operation print(f"Coroutine 3 finished at {datetime.now()}") async def coroutine2(): print(f"Coroutine 2 started at {datetime.now()}") await asyncio.sleep(1) # Simulate I/O operation print(f"Coroutine 2 finished at {datetime.now()}") async def coroutine1(): print(f"Coroutine 1 started at {datetime.now()}") await asyncio.sleep(1) # Simulate I/O operation print(f"Coroutine 1 finished at {datetime.now()}") async def main(): print("Main started") # Create tasks to make coroutines execute concurrently task1 = asyncio.create_task(coroutine1()) task2 = asyncio.create_task(coroutine2()) task3 = asyncio.create_task(coroutine3()) # Wait for all tasks to complete await task1 await task2 await task3 print("Main finished") # Run the main coroutine asyncio.run(main())
Output:
Main started
Coroutine 1 started at 2024-12-27 12:28:01.661251
Coroutine 2 started at 2024-12-27 12:28:01.661276
Coroutine 3 started at 2024-12-27 12:28:01.665012
Coroutine 1 finished at 2024-12-27 12:28:02.665125
Coroutine 2 finished at 2024-12-27 12:28:02.665120
Coroutine 3 finished at 2024-12-27 12:28:02.665120
Main finished
We can see that the thread does not execute the three tasks one by one. When it encounters an I/O operation, it switches to execute other tasks. After the I/O operation is completed, it continues to execute. It can also be seen that the three coroutines basically start waiting for the I/O operation at the same time, so the final execution completion times are basically the same. Although the event loop is not used explicitly here, asyncio.run
will use it implicitly.
Generators
Coroutines are implemented through generators. Generators can pause the execution of functions and also resume it, which are the characteristics of coroutines.
def simple_generator(): print("First value") yield 1 print("Second value") yield 2 print("Third value") yield 3 # simple_generator is a generator function, gen is a generator gen = simple_generator() print(next(gen)) # Output: First value \n 1 print(next(gen)) # Output: Second value \n 2 print(next(gen)) # Output: Third value \n 3
When running the generator with next()
, when it encounters yield
, it will pause. When next()
is run again, it will continue running from the yield
where it was paused last time. Before Python 3.5, coroutines were also written with "annotations" + yeild
. Starting from Python 3.5, async def
+ await
are used.
import asyncio from datetime import datetime @asyncio.coroutine def my_coroutine(): print("Start coroutine", datetime.now()) # Asynchronous call to asyncio.sleep(1): yield from asyncio.sleep(1) print("End coroutine", datetime.now()) # Get the EventLoop loop = asyncio.get_event_loop() # Execute the coroutine loop.run_until_complete(my_coroutine()) loop.close()
The pause and resume features of generators can be used for many things besides coroutines. For example, it can calculate while looping and store algorithms. For instance, implementing a Pascal's triangle (both ends of each row are 1, and the numbers in other positions are the sum of the two numbers above it).
def pascal_triangle():
row = [1]
while True:
yield row
new_row = [1] # The first element of each row is always 1
for i in range(1, len(row)):
new_row.append(row[i - 1] + row[i])
new_row.append(1) # The last element of each row is always 1
row = new_row
# Generate and print the first 5 rows of Pascal's triangle
triangle = pascal_triangle()
for _ in range(5):
print(next(triangle))
Output:
[1]
[1, 1]
[1, 2, 1]
[1, 3, 3, 1]
[1, 4, 6, 4, 1]
Event Loops
Since coroutine execution can be paused, when will the coroutine resume execution? This requires the use of an event loop to tell the execution thread.
# Get the EventLoop loop = asyncio.get_event_loop() # The event loop executes the coroutine loop.run_until_complete(my_coroutine()) loop.close()
The event loop uses the I/O multiplexing technology, constantly cycling to monitor events where coroutines can continue to execute. When they can be executed, the thread will continue to execute the coroutines.
I/O Multiplexing Technology
To understand I/O multiplexing in a simple way: I'm the boss of a courier station. I don't need to actively ask each courier about the completion of their tasks. Instead, the couriers will come to me on their own after completing their tasks. This improves my task processing ability, and I can do more things.
select
, poll
, and epoll
can all achieve I/O multiplexing. Compared with select
and poll
, epoll
has better performance. Linux generally uses epoll
by default, and macOS uses kqueue
, which is similar to epoll
and has similar performance.
Socket Server Using Event Loops
import selectors
import socket
# Create a selectors object, equivalent to the implementation of epoll, when running on Linux
sel = selectors.DefaultSelector()
# Request reception event handling function. Accept new connections and register read events
def accept(sock, mask):
conn, addr = sock.accept() # Accept the connection
print('Accepted connection from', addr)
conn.setblocking(False)
sel.register(conn, selectors.EVENT_READ, read) # Register the read event
# Request reading event handling function. Read request data and send an HTTP response, then close the connection.
def read(conn, mask):
data = conn.recv(100) # Read data from the connection
print('response to')
response = "HTTP/1.1 200 OK\r\n" \
"Content-Type: application/json\r\n" \
"Content-Length: 18\r\n" \
"Connection: close\r\n" \
"\r\n" \
"{\"Hello": \"World\"}"
conn.send(response.encode()) # Echo the data
print('Closing connection')
sel.unregister(conn) # Unregister the event
conn.close() # Close the connection
# Create a server socket
sock = socket.socket()
sock.bind(('localhost', 8000))
sock.listen()
sock.setblocking(False)
# Register the accept event
sel.register(sock, selectors.EVENT_READ, accept)
print("Server is running on port 8000...")
# Event loop
while True:
# This will block when there are no requests
events = sel.select() # Select the file descriptors (events) that are ready
print("events length: ", len(events))
for key, mask in events:
callback = key.data # Get the event handling function
print("handler_name:", callback.__name__)
callback(key.fileobj, mask) # Call the event handling function
Start the server socket to monitor the specified port. If running on a Linux system, selectors
uses epoll
as its implementation by default. The code uses epoll
to register a request reception event (accept event). When a new request arrives, epoll
will trigger and execute the event handling function, and at the same time, register a read event (read event) to process and respond to the request data. When accessed from the web side with http://127.0.0.1:8000/
, the return result is the same as that of Example 1. Server running log:
Server is running on port 8000...
events length: 1
handler_name: accept
Accepted connection from ('127.0.0.1', 60941)
events length: 1
handler_name: read
response to
Closing connection
Socket Server
Directly use Socket to start a server. When accessed with a browser at http://127.0.0.1:8080/
or using curl http://127.0.0.1:8080/
, it will return {"Hello": "World"}
import socket from datetime import datetime # Create a TCP socket server_socket = socket.socket() # Bind the socket to the specified IP address and port number server_socket.bind(('127.0.0.1', 8001)) # Start listening for incoming connections server_socket.listen(5) # Loop to accept client connections while True: print("%s Waiting for a connection..." % datetime.now()) client_socket, addr = server_socket.accept() # This will block, waiting for client connections print(f"{datetime.now()} Got connection from {addr}") # Receive client data data = client_socket.recv(1024) print(f"Received: {data.decode()}") # Send response data response = "HTTP/1.1 200 OK\r\n" \ "Content-Type: application/json\r\n" \ "Content-Length: 18\r\n" \ "Connection: close\r\n" \ "\r\n" \ "{\"Hello": \"World\"}" client_socket.sendall(response.encode()) # Close the client socket client_socket.close()
When accessed with curl http://127.0.0.1:8001/
, Server running log:
2024-12-27 12:53:36.711732 Waiting for a connection...
2024-12-27 12:54:30.715928 Got connection from ('127.0.0.1', 64361)
Received: GET / HTTP/1.1
Host: 127.0.0.1:8001
User-Agent: curl/8.4.0
Accept: */*
Summary
Asynchronous I/O is implemented at the bottom layer using "coroutines" and "event loops". "Coroutines" ensure that when the thread encounters marked I/O operations during execution, it doesn't have to wait for the I/O to complete but can pause and let the thread execute other tasks without blocking. "Event loops" use the I/O multiplexing technology, constantly cycling to monitor I/O events. When a certain I/O event is completed, the corresponding callback is triggered, allowing the coroutine to continue execution.
Leapcell: The Ideal Platform for FastAPI and Other Python Applications:
Finally, let me introduce the ideal platform for deploying Flask/FastAPI: Leapcell.
Leapcell is a cloud computing platform designed specifically for modern distributed applications. Its pay-as-you-go pricing model ensures no idle costs, meaning users only pay for the resources they actually use.
The unique advantages of Leapcell for WSGI/ASGI applications:
1. Multi-Language Support
- Supports development in JavaScript, Python, Go, or Rust.
Free Deployment of Unlimited Projects
- Only charge based on usage. No charge when there are no requests.
2. Unmatched Cost-Effectiveness
- Pay-as-you-go, with no idle fees.
- For example, $25 can support 6.94 million requests, with an average response time of 60 milliseconds.
3. Simplified Developer Experience
- Intuitive user interface for easy setup.
- Fully automated CI/CD pipelines and GitOps integration.
- Real-time metrics and logs, providing actionable insights.
4. Effortless Scalability and High Performance
- Automatic scaling to handle high concurrency with ease.
- Zero operation overhead, allowing developers to focus on development.
Learn more in the documentation!
Leapcell Twitter: https://x.com/LeapcellHQ