SQLALchemy异步操作以及在FastAPI中的⽤例mysql为例,不做额外配置。
创建异步引擎
asyncio import create_async_engine
url = "mysql+aiomysql://root:*****@127.0.0.1:3306/demodemo?charset=utf8"
async_egn = create_async_engine(url)
metadata
两种⽅式 ⼀种是Table收集 ⼀种是Mapper类声明
Table收集
第⼀个参数是表名
from sqlalchemy import MetaData
metadata = MetaData()
from sqlalchemy import Table
pes import Integer,String
User = Table(
await和async使用方法
"user",
metadata,
Column('id', Integer, primary_key=True),
Column('name', String(30))
)
Mapper类声明
获取Base的⽅法⾮常多
import registry
mapper_registry = registry()
Base = ate_base()
# adata即为MetaData单例
class User(Base):
__tablename__ = 'user'
id = Column(Integer, primary_key=True)
name = Column(String(30))
# User.__table__  可以获取到Table版的User
execute
分为连接和会话两种 区别不⼤
使⽤连接
async with t() as conn:
result = ute(...)
print(result.fetchall())
async with t() as conn:
async_result = await conn.stream(...)
async for row in async_result:
print(row)
增删改查很简单,table可以是Table收集 也可以是 Mapper类声明
from sqlalchemy import insert,delete,update,select
# 增⼀条
# 增批量
# 删
# 改
# 查⽤lumns.id, id, table.id 皆可
query = select([table]).order_by(id)).offset(1).limit(2)
使⽤会话
text可以执⾏sql字符串 不推荐 除⾮是存储过程
from sqlalchemy import text
asyncio import AsyncSession
async with AsyncSession(async_egn) as session:
result = ute(text('select * from demotable'))
print(result.all())
FastAPI中使⽤
在fastapi中使⽤时 官⽹推荐使⽤sessionmaker 它是预设配置的会话类⼯⼚
⽬前按照这个
/
├── app.py            # 程序⼊⼝
├── config.ini        # 配置⽂件
├── api
│├── __init__.py
│├── auth.py        # Oauth2授权认证登录以及注册api
│├── schemas.py    # 类型检查以及⽣成⽂档所需类型声明
│└── ...            # 其他api ⼦路由等⽂件
├── db
│├── __init__.py
│├── base.py        # ORM mapper基类以及收集的数据元
│└── models.py      # 数据库模型
├── util              # 封装的⼯具包
...                    # 其他测试⽂件
配置sessionmaker 在⾃⼰写的db模块的init⽂件⾥(注意 ⾃动commit和⾃动flush都关闭了)
from .base import table, metadata
asyncio import create_async_engine, AsyncSession import sessionmaker
from . import models
# 读取配置⽂件
from pathlib import Path
from configparser import ConfigParser
ini = ConfigParser()
cfg = ini['db']
# 数据库引擎,也是连接池
async_egn = create_async_('mysql'))
# 创建session元类
async_session_local = sessionmaker(
class_=AsyncSession,
boolean('autocommit', False),
boolean('autoflush', False),
bind=async_egn
)
async def db_session() -> AsyncSession:
'''session⽣成器作为fastapi的Depends选项'''
async with async_session_local() as session:
yield session
使⽤sessionmaker测试查询:db⽬录的models.py⾥写好了Mapper 这是查询Demo表⾥主键id为1的⾏的两种⽅式
from sqlalchemy import select
dels import Demo
from db import async_session_local
async def test_select():
async with async_session_local() as session:
_orm = select(Demo).where(Demo.id == 1)
result: Demo = (ute(_orm)).scalars().first()
print(dict(result))
async def test_select_by_pk():
async with async_session_local() as session:
if (result := (Demo, 1)) is not None:
print(dict(result))
在api路由中使⽤
demo = APIRouter()
@('/demo/{id}')
async def get_demo(id: int, dbs: AsyncSession = Depends(db_session)):    return (Demo, id)
增加⼀条Demo并返回其id的两种⽅式:
async def test_insert_get_new_id_by_autocommit():
async with async_session_local() as session:
async with session.begin():# 这个专门配合⾃动提交的我个⼈不推荐这种
new_demo = Demo(name='test_insert_get_new_id_by_autocommit')
session.add(new_demo)
await session.flush()
print(new_demo.id)
async def test_insert_get_new_id():
async with async_session_local() as session:
new_demo = Demo(name='test_insert_get_new_id')
session.add(new_demo)
await session.flush()
print(new_demo.id)
await sessionmit()  # 需要⼿动提交因为没async with session.begin
Mapper的特殊配置
Table收集  Mapper类声明 区别不⼤  但是后者的查询结果集只能点列属性去取 ⽽前者⽀持使⽤下标可以转为字典如何让Mapper类声明的结果集也能转字典:
实现keys()和__getitem___():
import registry
from sqlalchemy import Table
# 声明映射
mapper_registry = registry()
Base = ate_base()
metadata = adata
def table(mapper: Base) -> Table:
"""表映射声明类转表对象"""
return mapper.__table__
class mapper_to_dict_able_mixin:
'''混⼊继承混⼊后结果集能直接转化为dict'''
def keys(self):
return map(lambda c: c.key, table(self).columns)
def __getitem__(self, key:str):
return getattr(self, key)
class User(mapper_to_dict_able_mixin, Base):
__tablename__ = 'user'
id = Column(Integer, primary_key=True)
username = Column(String(63))
password = Column(String(63))
届时 结果集可以直接dict(),即fastapi中能可以直接return结果集(前⽂⽤了这个)

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。