Why SQLAlchemy 2.0 Is the Most Powerful Python ORM Yet
James Reed
Infrastructure Engineer · Leapcell

SQLAlchemy Tutorial
SQLAlchemy is the most popular Object Relational Mapping (ORM) in the Python ecosystem. It has an elegant design and is divided into two parts: the underlying Core and the upper-level traditional ORM. In most ORMs in Python and even in other languages, a good hierarchical design has not been implemented. For example, in Django's ORM, the database connection and the ORM itself are completely mixed together.
Why Do We Need the Core?
The Core layer mainly implements the client connection pool. As the core of modern web applications, the concurrent connection capability of relational databases is often not strong. It is generally not recommended to use a large number of short connections, and in most cases, a connection pool is needed. There are roughly two types of connection pools:
- Server-side connection pool: A specialized connection pool middleware that allocates a long connection for reuse each time for a short connection.
- Client-side connection pool: Generally introduced into the code as a third-party library.
The connection pool of SQLAlchemy belongs to the client-side connection pool. In this connection pool, SQLAlchemy maintains a certain number of long connections. When connect
is called, it actually retrieves a connection from the pool; when close
is called, it actually returns the connection to the pool.
Creating a Connection
In SQLAlchemy, use create_engine
to create a connection (pool). The parameter of create_engine
is the URL of the database.
from sqlalchemy import create_engine # MySQL connection example engine = create_engine( "mysql://user:password@localhost:3306/dbname", echo=True, # Setting echo to True will print the actual executed SQL, which is more convenient for debugging future=True, # Use the SQLAlchemy 2.0 API, which is backward-compatible pool_size=5, # The size of the connection pool is 5 by default. Setting it to 0 means there is no limit to the connection pool_recycle=3600 # Set the time to limit the automatic disconnection of the database ) # Create an in-memory SQLite database. You must add check_same_thread=False, otherwise it cannot be used in a multithreaded environment engine = create_engine("sqlite:///:memory:", echo=True, future=True, connect_args={"check_same_thread": False}) # Another way to connect to MySQL # pip install mysqlclient engine = create_engine('mysql+mysqldb://user:password@localhost/foo?charset=utf8mb4')
The Core Layer -- Using SQL Directly
CRUD
from sqlalchemy import text with engine.connect() as conn: result = conn.execute(text("select * from users")) print(result.all()) # The result can be iterated over, and each row result is a Row object for row in result: # The row object supports three access methods print(row.x, row.y) print(row[0], row[1]) print(row["x"], row["y"]) # Pass parameters, use `:var` to pass result = conn.execute( text("SELECT x, y FROM some_table WHERE y > :y"), {"y": 2} ) # You can also pre-compile the parameters stmt = text("SELECT x, y FROM some_table WHERE y > :y ORDER BY x, y").bindparams(y=6) # When inserting, you can directly insert multiple rows conn.execute( text("INSERT INTO some_table (x, y) VALUES (:x, :y)"), [{"x": 11, "y": 12}, {"x": 13, "y": 14}] )
Transactions and Commit
SQLAlchemy provides two ways to commit, one is manual commit
, and the other is semi-automatic commit
. The official documentation recommends using engine.begin()
. There is also a completely automatic autocommit
method that commits once for each row, which is not recommended.
# "commit as you go" requires manual commit with engine.connect() as conn: conn.execute(text("CREATE TABLE some_table (x int, y int)")) conn.execute( text("INSERT INTO some_table (x, y) VALUES (:x, :y)"), [{"x": 1, "y": 1}, {"x": 2, "y": 4}] ) conn.commit() # Note the commit here # "begin once" semi-automatic commit with engine.begin() as conn: conn.execute( text("INSERT INTO some_table (x, y) VALUES (:x, :y)"), [{"x": 6, "y": 8}, {"x": 9, "y": 10}] )
ORM
Session
The Session
is not thread-safe. But generally, the web framework should obtain a session
at the start of each request, so it is not a problem either.
from sqlalchemy.orm import Session with Session(engine) as session: session.add(foo) session.commit() # You can also use sessionmaker to create a factory function, so you don't have to enter parameters every time from sqlalchemy.orm import sessionmaker new_session = sessionmaker(engine) with new_session() as session: ...
Declarative API
- Use
__tablename__
to specify the database table name. - Use
Mapped
and native types to declare each field. - Use
Integer
,String
, etc. to specify the field type. - Use the
index
parameter to specify the index. - Use the
unique
parameter to specify the unique index. - Use
__table_args__
to specify other attributes, such as composite indexes.
from datetime import datetime from sqlalchemy import Integer, String, func, UniqueConstraint from sqlalchemy.orm import relationship, mapped_column, Mapped from sqlalchemy.orm import DeclarativeBase class Base(DeclarativeBase): pass class User(Base): __tablename__ = "users" # It must be a tuple, not a list __table_args__ = (UniqueConstraint("name", "time_created"),) id: Mapped[int] = mapped_column(Integer, primary_key=True) name: Mapped[str] = mapped_column(String(30), index=True) fullname: Mapped[str] = mapped_column(String, unique=True) # For particularly large fields, you can also use deferred, so that this field is not loaded by default description: Mapped[str] = mapped_column(Text, deferred=True) # Default value, note that a function is passed, not the current time time_created: Mapped[datetime] = mapped_column(DateTime(Timezone=True), default=datetime.now) # Or use the server default value, but it must be set when the table is created and will become part of the table's schema time_created: Mapped[datetime] = mapped_column(DateTime(timezone=True), server_default=func.now()) time_updated: Mapped[datetime] = mapped_column(DateTime(timezone=True), onupdate=func.now()) class Address(Base): __tablename__ = "address" id: Mapped[int] = mapped_column(Integer, primary_key=True) email_address: Mapped[str] = mapped_column(String, nullable=False) # Call create_all to create all models Base.metadata.create_all(engine) # If you only need to create one model User.__table__.create(engine)
Foreign Keys
Use relationship
to specify the association relationship between models.
Bi-directional Mapping of One-to-Many Relationship
from sqlalchemy import create_engine, Integer, String, ForeignKey from sqlalchemy.orm import DeclarativeBase, relationship, Session, Mapped, mapped_column class Group(Base): __tablename__ = 'groups' id: Mapped[int] = mapped_column(Integer, primary_key=True) name: Mapped[str] = mapped_column(String) # The corresponding multiple users, here use the model name as the parameter members = relationship('User') class User(Base): __tablename__ = 'users' id = Column(Integer, primary_key=True) name = Column(String) # group_id is the real foreign key name in the database, and the second field ForeignKey is used to specify the corresponding ID group_id = Column(Integer, ForeignKey('groups.id')) # The corresponding group field in the model, which needs to declare which field in the corresponding model it overlaps with group = relationship('Group', overlaps="members")
Many-to-Many Mapping, an Association Table is Required
# Association table class UserPermissions(Base): __tablename__ = 'user_permissions' id: Mapped[int] = mapped_column(Integer, primary_key=True) # Also use foreign key to specify the foreign key user_id: Mapped[int] = mapped_column(Integer, ForeignKey('users.id')) permission_id: Mapped[str] = mapped_column(String, ForeignKey('permissions.id')) class User(Base): __tablename__ = 'users' id: Mapped[int] = mapped_column(Integer, primary_key=True) name: Mapped[str] = Column(String) # Use secondary to specify the association table, and also use overlaps to specify the corresponding field in the model permissions = relationship('Permission', secondary="user_permissions", overlaps="users") class Permission(Base): __tablename__ = 'permissions' id: Mapped[int] = mapped_column(Integer, primary_key=True) name: Mapped[str] = Column(String) # The same as above users = relationship('User', secondary="user_permissions", overlaps="permissions") user1 = User(name='user1', group_id=1) user2 = User(name='user2') group1 = Group(name='group1') group2 = Group(name='group2', members=[user2]) permission1 = Permission(name="open_file") permission2 = Permission(name="save_file") user1.permissions.append(permission1) db.add_all([user1, user2, group1, group2, permission1, permission2]) db.commit() print(user1.permissions[0].id)
In most other tutorials, backref
is used to generate the attributes of the corresponding model. Here, it is more preferable to explicitly declare the accessible attributes in the corresponding model.
CRUD
Different from the 1.x API, in the 2.0 API, query
is no longer used, but select
is used to query data.
from sqlalchemy import select # The parameter of where is an expression composed of `==`. The advantage is that when writing code, spelling errors will be detected stmt = select(User).where(User.name == "john").order_by(User.id) # filter_by uses **kwargs as parameters stmt = select(User).filter_by(name="some_user") # order_by can also use User.id.desc() to represent reverse sorting result = session.execute(stmt) # Generally, when selecting the entire object, the scalars method should be used, otherwise a tuple containing one object will be returned for user in result.scalars(): print(user.name) # When querying a single attribute of the model, there is no need to use scalars result = session.execute(select(User.name)) for row in result: print(row.name) # There is also a shortcut to query by id: user = session.get(User, pk=1) # To update data, the update statement needs to be used from sqlalchemy import update # synchronize_session has three options: false, "fetch", "evaluate", and the default is evaluate # false means not updating the object in Python at all # fetch means reloading an object from the database # evaluate means that while updating the database, the same operation is also tried on the object in Python as much as possible stmt = update(User).where(User.name == "john").values(name="John").execution_options(synchronize_session="fetch") session.execute(stmt) # Or directly assign a value to the attribute user.name = "John" session.commit() # There is a place here that may introduce a race condition (竞态条件) # Wrong! If two processes update this value at the same time, it may result in only one value being updated. # Both assign the value they think is correct, which is 2, but the actual correct value is 1 + 1 + 1 = 3 # Corresponding SQL: Update users set visit_count = 2 where user.id = 1 user.visit_count += 1 # Correct approach: Note the capital U, that is, using the attribute of the model, and the generated SQL is to add 1 on the SQL server side # Corresponding SQL: Update users set visit_count = visit_count + 1 where user.id = 1 user.visit_count = User.visit_count + 1 # To add an object, directly use the session.add method session.add(user) # Or add_all session.add_all([user1, user2, group1]) # If you want to get the inserted ID, of course, you can also read it after committing session.flush() # flush is not a commit, and the transaction has not been committed. It should be repeatable read, which is related to the isolation level of the database. print(user.id) # To delete, use session.delete session.delete(user)
Loading Associated Models
If after reading a list of N records, you then go to the database to read the specific values of each item one by one, N+1 queries will be generated, which is the most common mistake in the database: the N+1 problem.
By default, the foreign key associated models will not be loaded in the query. You can use the selectinload
option to load the foreign keys, thus avoiding the N+1 problem.
# Foreign keys not loaded session.execute(select(User)).scalars().all() # Foreign keys loaded session.execute(select(User).options(selectinload(User.groups))).scalars().all()
The principle of Selectinload
is to use the select in
subquery. In addition to selectinload
, the traditional joinedload
can also be used, and its principle is the most common join table
.
# Use joinedload to load foreign keys. Note that the unique method needs to be used, which is specified in 2.0. session.execute(select(User).options(joinedload(User.groups))).unique().scalars().all()
In 2.0, it is more recommended to use selectinload
rather than joinedload
. Generally, selectinload
is better, and there is no need to use unique
.
Writing Foreign Keys
In SQLAlchemy, you can directly handle foreign keys just like handling arrays.
user.permissions.append(open_permission) # Add user.permissions.remove(save_permission) # Remove # Clear all foreign keys user.permissions.clear() user.permissions = []
Special Handling of JSON Fields
Most databases now support JSON fields. In SQLAlchemy, you can directly read a JSON object from a field or write a JSON object to it. But never directly perform an update
on this JSON object and expect to write it back to the database, which is unreliable. Be sure to copy, read and write, and then assign it back.
import copy article = session.get(Article, 1) tags = copy.copy(article.tags) tags.append("iOS") article.tags = tags session.commit()
Batch Insertion
When a large amount of data needs to be inserted, if the method of inserting one by one is used, a lot of time will be wasted in the interaction with the database, and the efficiency is very low. Most databases such as MySQL provide the insert ... values (...), (...) ...
batch insertion API, and this can also be well utilized in SQLAlchemy.
# Use session.bulk_save_objects(...) to directly insert multiple objects from sqlalchemy.orm import Session s = Session() objects = [ User(name="u1"), User(name="u2"), User(name="u3") ] s.bulk_save_objects(objects) s.commit() # Using bulk_insert_mappings can save the overhead of creating objects and directly insert dictionaries users = [ {"name": "u1"}, {"name": "u2"}, {"name": "u3"}, ] s.bulk_insert_mappings(User, users) s.commit() # Using bulk_update_mappings can update objects in batches. The id in the dictionary will be used as the where condition, # and all other fields will be used for the update session.bulk_update_mappings(User, users)
DeclarativeBase
Fully embrace the Python native type system
from sqlalchemy.orm import DeclarativeBase class Base(DeclarativeBase): pass from sqlalchemy.orm import mapped_column, MappedColumn id: Mapped[int] = mapped_column(Integer, primary_key=True) fullname: Mapped[Optional[str]]
Asyncio
One AsyncSession per task
. The AsyncSession
object is a mutable, stateful object that represents an ongoing single, stateful database transaction. When using asyncio
for concurrent tasks, such as using APIs like asyncio.gather()
, each individual task should use a separate AsyncSession
.
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker, AsyncSession engine = create_async_engine(url, echo=True) session = async_sessionmaker(engine) # Create objects async with engine.begin() as conn: await conn.run_sync(Base.metadata.create_all) # Insert data async with session() as db: db.add(...) await db.commit() # Query data async with session() as db: stmt = select(A) row = await db.execute(stmt) for obj in row.scalars(): print(obj.id) await engine.dispose()
Using in a Multiprocessing Environment
Due to the Global Interpreter Lock (GIL) in Python, to utilize multi-core processors, multiprocessing needs to be used. In a multiprocessing environment, resources cannot be shared. Corresponding to SQLAlchemy, that is, the connection pool cannot be shared. We need to solve this problem manually.
Generally speaking, it is best not to try to share the same Session
among multiple processes. It is best to create a Session
when initializing each process.
Adding Where Conditions Only When a Value is Set
In the URL, it is often necessary to return corresponding results according to which options the user has specified.
query = select(User) if username is not None: query = query.where(User.username == username) if password is not None: query = query.where(User.password == password)
Leapcell: The Next-Gen Serverless Platform for Web Hosting, Async Tasks, and Redis
Finally, I would like to recommend to you the platform that is most suitable for deploying Python services: Leapcell
1. Multi-Language Support
- Develop with JavaScript, Python, Go, or Rust.
2. Deploy unlimited projects for free
- pay only for usage — no requests, no charges.
3. Unbeatable Cost Efficiency
- Pay-as-you-go with no idle charges.
- Example: $25 supports 6.94M requests at a 60ms average response time.
4. Streamlined Developer Experience
- Intuitive UI for effortless setup.
- Fully automated CI/CD pipelines and GitOps integration.
- Real-time metrics and logging for actionable insights.
5. Effortless Scalability and High Performance
- Auto-scaling to handle high concurrency with ease.
- Zero operational overhead — just focus on building.
Explore more in the documentation!
Leapcell Twitter: https://x.com/LeapcellHQ