SQLAlchemy的session
在更改 SQLAlchemy Session 从每次请求都创建到共享同⼀个 Session 之后遇到了如下问题:
StatementError: (InvalidRequestError) Can’t reconnect until invalid transaction is rolled back [SQL: ]
或者是
raised unexpected: OperationalError(“(_mysql_exceptions.OperationalError) (2006, ‘MySQL server has gone away’)”,)
错误是 SQLAlchemy 抛出。原因是你从 pool 拿的 connection 没有以 sessionmit 或 llback 或者 session.close 放回 pool ⾥。这时 connection 的 transaction 没有完结(rollback or commit)。⽽不知什么原因(recyle 了,timeout 了)你的 connection ⼜死掉了,你的sqlalchemy 尝试重新连接。由于 transaction 还没完结,⽆法重连。
正确⽤法是确保 session 在使⽤完成后⽤ session.close, sessionmit 或者 llback 把连接还回 pool。SQLAlchemy 数据库连接池使⽤
不是相同的东西, session 使⽤连接来操作数据库,⼀旦任务完成 session 会将数据库 connection 交还给 pool。
在使⽤create_engine创建引擎时,如果默认不指定连接池设置的话,⼀般情况下,SQLAlchemy 会使⽤⼀个 QueuePool 绑定在新创建的引擎上。并附上合适的连接池参数。
在以默认的⽅法 create_engine 时(如下),就会创建⼀个带连接池的引擎。
engine = create_engine('mysql+mysqldb://root:password@127.0.0.1:3306/dbname')
在这种情况下,当你使⽤了 session 后就算显式地调⽤ session.close(),也不能把连接关闭。连接会由 QueuePool 连接池进⾏管理并复⽤。
这种特性在⼀般情况下并不会有问题,不过当数据库服务器因为⼀些原因进⾏了重启的话。最初保持的数据库连接就失效了。随后进⾏的session.query() 等⽅法就会抛出异常导致程序出错。
如果想禁⽤ SQLAlchemy 提供的数据库连接池,只需要在调⽤ create_engine 是指定连接池为 NullPool,SQLAlchemy 就会在执⾏session.close() 后⽴刻断开数据库连接。当然,如果 session 对象被析构但是没有被调⽤ session.close(),则数据库连接不会被断开,直到程序终⽌。
下⾯的代码就可以避免 SQLAlchemy 使⽤连接池:
#!/usr/bin/env python
#-*- coding: utf-8 -*-
from sqlalchemy import create_engine
import sessionmaker
from sqlalchemy.pool import NullPool
engine = create_engine('mysql+mysqldb://root:password@127.0.0.1:3306/dbname', poolclass=NullPool)
Session = sessionmaker(bind=engine)
session = Session()
usr_obj_list = session.query(UsrObj).all()
print usr_obj_list[0].id
create_engine()函数和连接池相关的参数有:
-pool_recycle, 默认为 -1, 推荐设置为 7200, 即如果 connection 空闲了 7200 秒,⾃动重新获取,以防⽌ connection 被 db server 关闭。
-pool_size=5, 连接数⼤⼩,默认为 5,正式环境该数值太⼩,需根据实际情况调⼤
-max_overflow=10, 超出 pool_size 后可允许的最⼤连接数,默认为 10, 这 10 个连接在使⽤过后,不放在 pool 中,⽽是被真正关闭的。
-pool_timeout=30, 获取连接的超时阈值,默认为 30 秒
直接只⽤create_engine时,就会创建⼀个带连接池的引擎
engine = create_engine('postgresql://postgres@127.0.0.1/dbname')
当使⽤ session 后就显⽰地调⽤ session.close(),也不能把连接关闭,连接由 QueuePool 连接池管理并复⽤。
引发问题
当数据库重启,最初保持的连接就会失败,随后进⾏session.query()就会失败抛出异常 mysql 数据,interactive_timeout 等参数处理连接的空闲时间超过(配置时间),断开
何时定义 session,何时提交,何时关闭
基本
通常来说,将 session 的⽣命周期和访问操作数据库的⽅法对象隔离和独⽴。
确保 transaction 有⾮常清晰的开始和结束,保持 transaction 简短,也就意味着让 transaction 能在⼀系列操作之后终⽌,⽽不是⼀直开放着。
from contextlib import contextmanager
@contextmanager
def session_scope():
“"”
Provide a transactional scope around a series of operations.
”””
session = Session()
try:
yield session
sessionmit()
except:
raise
finally:
session.close()
是否线程安全
Session 不是为了线程安全⽽设计的,因此确保只在同⼀个线程中使⽤。
如果实际上有多个线程参与同⼀任务,那么您考虑在这些线程之间共享 Session 及其对象;但是在这种
极不寻常的情况下,应⽤程序需要确保实现正确的 locking scheme,以便不会同时访问 Session 或其状态。处理这种情况的⼀种更常见的⽅法是为每个并发线程维护⼀个Session,⽽是将对象从⼀个 Session 复制到另⼀个 Session,通常使⽤ () ⽅法将对象的状态复制到本地的新对象中。scoped session
想要线程安全时使⽤scoped_session(),⽂档解释
the scoped_session() function is provided which produces a thread-managed registry of Session objects. It is commonly used in web applications so that a single global variable can be used to safely represent transactional sessions with sets of objects,
localized to a single thread.
using transactional=False is one solution, but a better one is to simply rollback(), commit(), or close() the Session when operations are complete - transactional mode (which is called “autocommit=False” in 0.5) has the advantage that a series of select operations will all share the same isolated his can be more or less important depending on the isolation mode in effect and the kind of application.
DBAPI has no implicit “autocommit” mode so there is always a transaction implicitly in progress when queries are made.
This would be a fairly late answer. This is what happens: While using the session, a sqlalchemy Error is raised (anything which would also throw an error when be used as pure SQL: syntax errors, unique constraints, key collisions etc.).
You would have to find this error, wrap it into a try/except-block and perform llback().
After this you can reinstate your session.
flush 和 commit 区别
flush 预提交,等于提交到数据库内存,还未写⼊数据库⽂件;
commit 就是把内存⾥⾯的东西直接写⼊,可以提供查询了;
使⽤总结
sqlalchemy:
>>> t() as conn:
... result = ute(text("SELECT x, y FROM some_table"))
... for row in result:
... print(f"x: {row.x} y: {row.y}")
上⾯的result是查询的结果集,可迭代,迭代出的每个对象(row),包含了查询出的每⼀⾏数据的信息。可以直接通过'.字段名'⽅式获取到字段值。row本⾝类似于⼀个元组,如这种⽤法:
for x, y in result: # 获取到每⼀⾏数据,元组数据按查询的select语句后的字段名顺序依次排序
# ...
Result.all() ⽅法,获取到所有的row,如:[(x1, y1), (x2, y2)]
Result.mappings()⽅法获取每⼀⾏的字典对象,如[{'x': x1, 'y':y1}, {'x': x2, 'y':y2}]
sql语句中参数传递:
>>> t() as conn:
... result = ute(
... text("SELECT x, y FROM some_table WHERE y > :y"),
... {"y": 2}
... )
... for row in result:
... print(f"x: {row.x} y: {row.y}")
sql语句中使⽤:参数名形式定义站位符,
excute()第⼆个参数通过字典传递参数
多组参数的传递:
>>> t() as conn:
... ute(
.
.. text("INSERT INTO some_table (x, y) VALUES (:x, :y)"),
... [{"x": 11, "y": 12}, {"x": 13, "y": 14}]
... )
... connmit()
excute()⽅法第⼆个参数不是传⼀个字典,⽽是传⼀个字典列表。此时底层会遍历这个列表然后拼接成多个sql语句执⾏。但是在执⾏多个sql 前,DBAPI会通过各种⽅式对sql进⾏优化如这⾥的insert语句会被优化,优化后,只会执⾏⼀条insert语句:
INSERT INTO some_table (x, y) VALUES (11, 12), (13, 14)
参数直接和sql语句绑定,通过text()⽅法实现:
>>> stmt = text("SELECT x, y FROM some_table WHERE y > :y ORDER BY x, y").bindparams(y=6)
>>> t() as conn:represent的用法
... result = ute(stmt)
... for row in result:
... print(f"x: {row.x} y: {row.y}")
Session是基础的事务/数据库交互对象称为,这个对象的使⽤⽅式与 Connection相似,事实上 Session 通过内部的 Connection 才操作sql 的。
>>> import Session
>>> stmt = text("SELECT x, y FROM some_table WHERE y > :y ORDER BY x, y").bindparams(y=6)
>>> with Session(engine) as session:
... result = ute(stmt)
... for row in result:
... print(f"x: {row.x} y: {row.y}")
>>> with Session(engine) as session:
... result = ute(
... text("UPDATE some_table SET y=:y WHERE x=:x"),
... [{"x": 9, "y":11}, {"x": 13, "y": 15}]
... )
... sessionmit()
这⾥事务提交后session不再继续持有connection,下⼀次需要操作数据库时,会重新从engine中获取connection

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。