High-Performance Python: Asyncio
James Reed
Infrastructure Engineer ยท Leapcell
Concurrency programming is a programming approach that deals with the simultaneous execution of multiple tasks. In Python, asyncio
is a powerful tool for implementing asynchronous programming. Based on the concept of coroutines, asyncio
can efficiently handle I/O-intensive tasks. This article will introduce the basic principles and usage of asyncio
.
Why We Need asyncio
We know that when handling I/O operations, using multithreading can greatly improve efficiency compared to a normal single thread. So, why do we still need asyncio
?
Multithreading has many advantages and is widely used, but it also has certain limitations:
- For example, the running process of multithreading is easily interrupted, so the situation of race condition may occur.
- Moreover, there is a certain cost in thread switching itself, and the number of threads cannot be increased indefinitely. Therefore, if your I/O operations are very heavy, multithreading is likely to fail to meet the requirements of high efficiency and high quality.
It is precisely to solve these problems that asyncio
emerged.
Sync VS Async
Let's first distinguish between the concepts of Sync (synchronous) and Async (asynchronous).
- Sync means that operations are executed one after another. The next operation can only be executed after the previous one is completed.
- Async means that different operations can be executed alternately. If one of the operations is blocked, the program will not wait but will find executable operations to continue.
How asyncio Works
- Coroutines:
asyncio
uses coroutines to achieve asynchronous operations. A coroutine is a special function defined with theasync
keyword. In a coroutine, theawait
keyword can be used to pause the execution of the current coroutine and wait for an asynchronous operation to complete. - Event Loop: The event loop is one of the core mechanisms of
asyncio
. It is responsible for scheduling and executing coroutines and handling the switching between coroutines. The event loop will constantly poll for executable tasks. Once a task is ready (such as when an I/O operation is completed or a timer expires), the event loop will put it into the execution queue and continue to the next task. - Async Tasks: In
asyncio
, we execute coroutines by creating asynchronous tasks. Asynchronous tasks are created by theasyncio.create_task()
function, which encapsulates the coroutine into a awaitable object and submits it to the event loop for processing. - Asynchronous I/O Operations:
asyncio
provides a set of asynchronous I/O operations (such as network requests, file reading and writing, etc.), which can be seamlessly integrated with coroutines and the event loop through theawait
keyword. By using asynchronous I/O operations, blocking during waiting for I/O completion can be avoided, improving program performance and concurrency. - Callbacks:
asyncio
also supports using callback functions to handle the results of asynchronous operations. Theasyncio.ensure_future()
function can be used to encapsulate the callback function into a awaitable object and submit it to the event loop for processing. - Concurrent Execution:
asyncio
can concurrently execute multiple coroutine tasks. The event loop will automatically schedule the execution of coroutines according to the readiness of tasks, thus achieving efficient concurrent programming.
In summary, the working principle of asyncio
is based on the mechanisms of coroutines and event loops. By using coroutines for asynchronous operations and having the event loop responsible for the scheduling and execution of coroutines, asyncio
realizes an efficient asynchronous programming model.
Coroutines and Asynchronous Programming
Coroutines are an important concept in asyncio
. They are lightweight execution units that can quickly switch between tasks without the overhead of thread switching. Coroutines can be defined with the async
keyword, and the await
keyword is used to pause the execution of the coroutine and resume after a certain operation is completed.
Here is a simple sample code demonstrating how to use coroutines for asynchronous programming:
import asyncio async def hello(): print("Hello") await asyncio.sleep(1) # Simulate a time-consuming operation print("World") # Create an event loop loop = asyncio.get_event_loop() # Add the coroutine to the event loop and execute loop.run_until_complete(hello())
In this example, the function hello()
is a coroutine defined with the async
keyword. Inside the coroutine, we can use await
to pause its execution. Here, asyncio.sleep(1)
is used to simulate a time-consuming operation. The run_until_complete()
method adds the coroutine to the event loop and runs it.
Asynchronous I/O Operations
asyncio
is mainly used to handle I/O-intensive tasks, such as network requests, file reading and writing. It provides a series of API for asynchronous I/O operations, which can be used in combination with the await
keyword to easily achieve asynchronous programming.
Here is a simple sample code showing how to use asyncio
for asynchronous network requests:
import asyncio import aiohttp async def fetch(session, url): async with session.get(url) as response: return await response.text() async def main(): async with aiohttp.ClientSession() as session: html = await fetch(session, 'https://www.example.com') print(html) # Create an event loop loop = asyncio.get_event_loop() # Add the coroutine to the event loop and execute loop.run_until_complete(main())
In this example, we use the aiohttp
library for network requests. The function fetch()
is a coroutine. It initiates an asynchronous GET request through the session.get()
method and waits for the response to return using the await
keyword. The function main()
is another coroutine. It creates a ClientSession
object inside for reuse, then calls the fetch()
method to get the web page content and print it.
Note: Here we use aiohttp
instead of the requests
library because the requests
library is not compatible with asyncio
, while the aiohttp
library is. To make good use of asyncio
, especially to exert its powerful functions, in many cases, corresponding Python libraries are required.
Concurrent Execution of Multiple Tasks
asyncio
also provides some mechanisms for concurrently executing multiple tasks, such as asyncio.gather()
and asyncio.wait()
. The following is a sample code showing how to use these mechanisms to concurrently execute multiple coroutine tasks:
import asyncio async def task1(): print("Task 1 started") await asyncio.sleep(1) print("Task 1 finished") async def task2(): print("Task 2 started") await asyncio.sleep(2) print("Task 2 finished") async def main(): await asyncio.gather(task1(), task2()) # Create an event loop loop = asyncio.get_event_loop() # Add the coroutine to the event loop and execute loop.run_until_complete(main())
In this example, we define two coroutine tasks task1()
and task2()
, both of which perform some time-consuming operations. The coroutine main()
starts these two tasks simultaneously through asyncio.gather()
and waits for them to complete. Concurrent execution can improve program execution efficiency.
How to Choose?
In actual projects, should we choose multithreading or asyncio
? A big shot summarized it vividly:
if io_bound: if io_slow: print('Use Asyncio') else: print('Use multi-threading') elif cpu_bound: print('Use multi-processing')
- If it is I/O bound and the I/O operations are slow, requiring the cooperation of many tasks/threads, then using
asyncio
is more appropriate. - If it is I/O bound but the I/O operations are fast and only a limited number of tasks/threads are needed, then multithreading will do.
- If it is CPU bound, then multi-processing is required to improve program running efficiency.
Practice
Input a list. For each element in the list, we want to calculate the sum of the squares of all integers from 0 to this element.
Synchronous Implementation
import time def cpu_bound(number): return sum(i * i for i in range(number)) def calculate_sums(numbers): for number in numbers: cpu_bound(number) def main(): start_time = time.perf_counter() numbers = [10000000 + x for x in range(20)] calculate_sums(numbers) end_time = time.perf_counter() print('Calculation takes {} seconds'.format(end_time - start_time)) if __name__ == '__main__': main()
The execution time is Calculation takes 16.00943413000002 seconds
Asynchronous Implementation with concurrent.futures
import time from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, as_completed def cpu_bound(number): return sum(i * i for i in range(number)) def calculate_sums(numbers): with ProcessPoolExecutor() as executor: results = executor.map(cpu_bound, numbers) results = [result for result in results] print(results) def main(): start_time = time.perf_counter() numbers = [10000000 + x for x in range(20)] calculate_sums(numbers) end_time = time.perf_counter() print('Calculation takes {} seconds'.format(end_time - start_time)) if __name__ == '__main__': main()
The execution time is Calculation takes 7.314132894999999 seconds
In this improved code, we use concurrent.futures.ProcessPoolExecutor
to create a process pool, and then use the executor.map()
method to submit tasks and get results. Note that after using executor.map()
, if you need to get the results, you can iterate the results into a list or use other methods to process the results.
Multiprocessing Implementation
import time import multiprocessing def cpu_bound(number): return sum(i * i for i in range(number)) def calculate_sums(numbers): with multiprocessing.Pool() as pool: pool.map(cpu_bound, numbers) def main(): start_time = time.perf_counter() numbers = [10000000 + x for x in range(20)] calculate_sums(numbers) end_time = time.perf_counter() print('Calculation takes {} seconds'.format(end_time - start_time)) if __name__ == '__main__': main()
The execution time is Calculation takes 5.024221667 seconds
concurrent.futures.ProcessPoolExecutor
and multiprocessing
are both libraries for implementing multi-process concurrency in Python. There are some differences:
- Interface-based Encapsulation:
concurrent.futures.ProcessPoolExecutor
is a high-level interface provided by theconcurrent.futures
module. It encapsulates the underlying multi-process functions, making it easier to write multi-process code. Whilemultiprocessing
is one of the standard libraries of Python, providing complete multi-process support and allowing direct operation on processes. - API Usage: The usage of
concurrent.futures.ProcessPoolExecutor
is similar to that of a thread pool. It submits callable objects (such as functions) to the process pool for execution and returns aFuture
object, which can be used to get the execution result.multiprocessing
provides more low-level process management and communication interfaces. Processes can be explicitly created, started, and controlled, and communication between multiple processes can be done using queues or pipes. - Scalability and Flexibility: Since
multiprocessing
provides more low-level interfaces, it is more flexible compared toconcurrent.futures.ProcessPoolExecutor
. By directly operating processes, finer-grained control can be achieved for each process, such as setting process priorities and sharing data between processes.concurrent.futures.ProcessPoolExecutor
is more suitable for simple task parallelization, hiding many underlying details and making it easier to write multi-process code. - Cross-platform Support: Both
concurrent.futures.ProcessPoolExecutor
andmultiprocessing
provide cross-platform multi-process support and can be used on various operating systems.
In summary, concurrent.futures.ProcessPoolExecutor
is a high-level interface that encapsulates the underlying multi-process functions, suitable for simple multi-process task parallelization. multiprocessing
is a more low-level library, providing more control and flexibility, suitable for scenarios that require fine-grained control of processes. You need to choose the appropriate library according to specific requirements. If it is just simple task parallelization, you can use concurrent.futures.ProcessPoolExecutor
to simplify the code; if more low-level control and communication are needed, you can use the multiprocessing
library.
Summary
Unlike multithreading, asyncio
is single-threaded, but the mechanism of its internal event loop allows it to run multiple different tasks concurrently and has greater autonomous control than multithreading.
Tasks in asyncio
will not be interrupted during operation, so the situation of race condition will not occur.
Especially in scenarios with heavy I/O operations, asyncio
has higher operating efficiency than multithreading. Because the cost of task switching in asyncio
is much smaller than that of thread switching, and the number of tasks that asyncio
can start is much larger than the number of threads in multithreading.
However, it should be noted that in many cases, using asyncio
requires the support of specific third-party libraries, such as aiohttp
in the previous example. And if the I/O operations are fast and not heavy, using multithreading can also effectively solve the problem.
asyncio
is a Python library for implementing asynchronous programming.- Coroutines are the core concept of
asyncio
, achieving asynchronous operations through theasync
andawait
keywords. asyncio
provides powerful API for asynchronous I/O operations and can easily handle I/O-intensive tasks.- Through mechanisms such as
asyncio.gather()
, multiple coroutine tasks can be executed concurrently.
Leapcell: The Ideal Platform for FastAPI, Flask and Other Python Applications
Finally, let me introduce the ideal platform for deploying Flask/FastAPI: Leapcell.
Leapcell is a cloud computing platform specifically designed for modern distributed applications. Its pay-as-you-go pricing model ensures no idle costs, meaning users only pay for the resources they actually use.
- Multi-Language Support
- Supports development in JavaScript, Python, Go, or Rust.
- Free Deployment of Unlimited Projects
- Only charge based on usage. No charge when there are no requests.
- Unmatched Cost-Effectiveness
- Pay-as-you-go, with no idle fees.
- For example, $25 can support 6.94 million requests, with an average response time of 60 milliseconds.
- Simplified Developer Experience
- Intuitive user interface for easy setup.
- Fully automated CI/CD pipelines and GitOps integration.
- Real-time metrics and logs, providing actionable insights.
- Effortless Scalability and High Performance
- Automatic scaling to handle high concurrency with ease.
- Zero operation overhead, allowing developers to focus on development.
Learn more in the documentation! Leapcell Twitter: https://x.com/LeapcellHQ