PythonSQLAlchemy基本操作和常⽤技巧(包含⼤量实例,
⾮常好)
⾸先说下,由于最新的 0.8 版还是开发版本,因此我使⽤的是 0.79 版,API 也许会有些不同。
因为我是搭配 MySQL InnoDB 使⽤,所以使⽤其他数据库的也不能完全照搬本⽂。
接着就从安装开始介绍吧,以 Debian/Ubuntu 为例(请确保有管理员权限):
1.MySQL
复制代码代码如下:
apt-get install mysql-server
apt-get install mysql-client
apt-get install libmysqlclient15-dev
2.python-mysqldb
复制代码代码如下:
apt-get install python-mysqldb
3.easy_install
复制代码代码如下:
wget lecommunity/dist/ez_setup.py
python ez_setup.py
4.MySQL-Python
复制代码代码如下:
easy_install MySQL-Python
5.SQLAlchemy
复制代码代码如下:
easy_install SQLAlchemy批量更新sql语句
如果是⽤其他操作系统,遇到问题就 Google ⼀下吧。我是在 Mac OS X 上开发的,途中也遇到些问题,不过当时没记下来……
值得⼀提的是我⽤了 MySQL-Python 来连 MySQL,因为不⽀持异步调⽤,所以和 Tornado 不是很搭。不过性能其实很好,因此以后再去研究下其他⽅案吧……
装好后就可以开始使⽤了:
复制代码代码如下:
from sqlalchemy import create_engine
import sessionmaker
DB_CONNECT_STRING = 'mysql+mysqldb://root:123@localhost/ooxx?charset=utf8'
engine = create_engine(DB_CONNECT_STRING, echo=True)
DB_Session = sessionmaker(bind=engine)
session = DB_Session()
这⾥的 DB_CONNECT_STRING 就是连接数据库的路径。“mysql+mysqldb”指定了使⽤ MySQL-Python 来连
接,“root”和“123”分别是⽤户名和密码,“localhost”是数据库的域名,“ooxx”是使⽤的数据库名(可省略),“charset”指定了连接时使⽤的字符集(可省略)。
create_engine() 会返回⼀个数据库引擎,echo 参数为 True 时,会显⽰每条执⾏的 SQL 语句,⽣产环境下可关闭。sessionmaker() 会⽣成⼀个数据库会话类。这个类的实例可以当成⼀个数据库连接,它同时还记录了⼀些查询的数据,并决定什么时候执⾏ SQL 语句。由于 SQLAlchemy ⾃⼰维护了⼀个数据库连接池(默认 5 个连接),因此初始化⼀个会话的开销并不⼤。对 Tornado ⽽⾔,可以在 BaseHandler 的 initialize() ⾥初始化:
复制代码代码如下:
class BaseHandler(tornado.web.RequestHandler):
def initialize(self):
self.session = models.DB_Session()
def on_finish(self):
self.session.close()
对其他 Web 服务器来说,可以使⽤ scoped_session,它能保证每个线程获得的 session 对象都是唯⼀的。不过 Tornado 本⾝就是单线程的,如果使⽤了异步⽅式,就可能会出现问题,因此我并没使⽤它。
拿到 session 后,就可以执⾏ SQL 了:
复制代码代码如下:
ute('show databases').fetchall()
# 建 user 表的过程略
ute('select * from user where id = 1').first()
ute('select * from user where id = :id', {'id': 1}).first()
不过这和直接使⽤ MySQL-Python 没啥区别,所以就不介绍了;我还是喜欢 ORM 的⽅式,这也是我采⽤ SQLAlchemy 的唯⼀原因。
于是来定义⼀个表:
复制代码代码如下:
from sqlalchemy import Column
pes import CHAR, Integer, String
declarative import declarative_base
BaseModel = declarative_base()
def init_db():
def drop_db():
class User(BaseModel):
__tablename__ = 'user'
id = Column(Integer, primary_key=True)
name = Column(CHAR(30)) # or Column(String(30))
init_db()
declarative_base() 创建了⼀个 BaseModel 类,这个类的⼦类可以⾃动与⼀个表关联。
以 User 类为例,它的 __tablename__ 属性就是数据库中该表的名称,它有 id 和 name 这两个字段,分别为整型和 30 个定长字符。Column 还有⼀些其他的参数,我就不解释了。
最后,ate_all(engine) 会到 BaseModel 的所有⼦类,并在数据库中建⽴这些表;drop_all() 则是删除这些表。
接着就开始使⽤这个表吧:
复制代码代码如下:
from sqlalchemy import func, or_, not_
user = User(name='a')
session.add(user)
user = User(name='b')
session.add(user)
user = User(name='a')
session.add(user)
user = User()
session.add(user)
sessionmit()
query = session.query(User)
print query # 显⽰SQL 语句
print query.statement # 同上
for user in query: # 遍历时查询
print user.name
print query.all() # 返回的是⼀个类似列表的对象
print query.first().name # 记录不存在时,first() 会返回 None
# ().name # 不存在,或有多⾏记录时会抛出异常
print query.filter(User.id == 2).first().name
(2).name # 以主键获取,等效于上句
print query.filter('id = 2').first().name # ⽀持字符串
query2 = session.query(User.name)
print query2.all() # 每⾏是个元组
print query2.limit(1).all() # 最多返回 1 条记录
print query2.offset(1).all() # 从第 2 条记录开始返回
der_by(User.name).all()
der_by('name').all()
der_by(User.name.desc()).all()
der_by('name desc').all()
print session.query(User.id).order_by(User.name.desc(), User.id).all()
print query2.filter(User.id == 1).scalar() # 如果有记录,返回第⼀条记录的第⼀个元素
print session.query('id').select_from(User).filter('id = 1').scalar()
print query2.filter(User.id > 1, User.name != 'a').scalar() # and
query3 = query2.filter(User.id > 1) # 多次拼接的 filter 也是 and
query3 = query3.filter(User.name != 'a')
print query3.scalar()
print query2.filter(or_(User.id == 1, User.id == 2)).all() # or
print query2.filter(User.id.in_((1, 2))).all() # in
query4 = session.query(User.id)
print query4.filter(User.name == None).scalar()
print query4.filter('name is null').scalar()
print query4.filter(not_(User.name == None)).all() # not
print query4.filter(User.name != None).all()
unt()
print session.unt('*')).select_from(User).scalar()
print session.unt('1')).select_from(User).scalar()
print session.unt(User.id)).scalar()
print session.unt('*')).filter(User.id > 0).scalar() # filter() 中包含 User,因此不需要指定表
print session.unt('*')).filter(User.name == 'a').limit(1).scalar() == 1 # 可以⽤ limit() 限制 count() 的返回数print session.query(func.sum(User.id)).scalar()
print session.w()).scalar() # func 后可以跟任意函数名,只要该数据库⽀持
print session.query(func.current_timestamp()).scalar()
print session.query(func.md5(User.name)).filter(User.id == 1).scalar()
query.filter(User.id == 1).update({User.name: 'c'})
user = (1)
print user.name
user.name = 'd'
session.flush() # 写数据库,但并不提交
(1).name
session.delete(user)
session.flush()
(1)
(1).name
query.filter(User.id == 1).delete()
sessionmit()
(1)
增删改查都涉及到了,⾃⼰看看输出的 SQL 语句就知道了,于是基础知识就介绍到此了。
下⾯开始介绍⼀些进阶的知识。
可以使⽤⾮ ORM 的⽅式:
复制代码代码如下:
User.__table__.insert(),
[{'name': `randint(1, 100)`,'age': randint(1, 100)} for i in xrange(10000)]
)
sessionmit()
上⾯我批量插⼊了 10000 条记录,半秒内就执⾏完了;⽽ ORM ⽅式会花掉很长时间。
使⽤ query 对象的 prefix_with() ⽅法:
复制代码代码如下:
session.query(User.name).prefix_with('HIGH_PRIORITY').all()
使⽤ () ⽅法替代 session.add(),其实就是 SELECT + UPDATE:
复制代码代码如下:
user = User(id=1, name='ooxx')
<(user)
sessionmit()
或者使⽤ MySQL 的 INSERT … ON DUPLICATE KEY UPDATE,需要⽤到 @compiles 装饰器,有点难懂,⾃⼰搜索看吧:《SQLAlchemy ON DUPLICATE KEY UPDATE》和 sqlalchemy_mysql_ext。
可以使⽤ MySQL 的⽅⾔:
复制代码代码如下:
from sql import INTEGER
id = Column(INTEGER(unsigned=True), primary_key=True)
开发时遇到过⼀个奇怪的需求,有个其他系统的表⾥包含了⼀个“from”字段,这在 Python ⾥是关键字,于是只能这样处理了:
复制代码代码如下:
from_ = Column('from', CHAR(10))
Column 会⽣成⼀个很复杂的对象,想获取长度⽐较⿇烦,这⾥以 User.name 为例:
复制代码代码如下:
User.lumns[0].type.length
最简单的⽅式就是修改数据库的默认配置。如果⾮要在代码⾥指定的话,可以这样:
复制代码代码如下:
class User(BaseModel):
__table_args__ = {
'mysql_engine': 'InnoDB',
'mysql_charset': 'utf8'
}
MySQL 5.5 开始⽀持存储 4 字节的 UTF-8 编码的字符了,iOS ⾥⾃带的 emoji(如  字符)就属于这种。
如果是对表来设置的话,可以把上⾯代码中的 utf8 改成 utf8mb4,DB_CONNECT_STRING ⾥的 charset 也这样更改。
如果对库或字段来设置,则还是⾃⼰写 SQL 语句⽐较⽅便,具体细节可参考《How to support full Unicode in MySQL databases》。
不建议全⽤ utf8mb4 代替 utf8,因为前者更慢,索引会占⽤更多空间。
复制代码代码如下:
from random import randint
from sqlalchemy import ForeignKey
class User(BaseModel):
__tablename__ = 'user'
id = Column(Integer, primary_key=True)
age = Column(Integer)
class Friendship(BaseModel):
__tablename__ = 'friendship'
id = Column(Integer, primary_key=True)
user_id1 = Column(Integer, ForeignKey('user.id'))
user_id2 = Column(Integer, ForeignKey('user.id'))
for i in xrange(100):
session.add(User(age=randint(1, 100)))
session.flush() # 或 sessionmit(),执⾏完后,user 对象的 id 属性才可以访问(因为 id 是⾃增的)
for i in xrange(100):
session.add(Friendship(user_id1=randint(1, 100), user_id2=randint(1, 100)))
sessionmit()
session.query(User).filter(User.age < 50).delete()
执⾏这段代码时,你应该会遇到⼀个错误:
复制代码代码如下:
原因是删除 user 表的数据,可能会导致 friendship 的外键不指向⼀个真实存在的记录。在默认情况下,MySQL 会拒绝这种操作,也就是 RESTRICT。InnoDB 还允许指定 ON DELETE 为 CASCADE 和 SET NULL,前者会删除 friendship 中⽆效的记录,后者会将这些记录的外键设为 NULL。
除了删除,还有可能更改主键,这也会导致 friendship 的外键失效。于是相应的就有 ON UPDATE 了。其中 CASCADE 变成了更新相应的外键,⽽不是删除。
⽽在 SQLAlchemy 中是这样处理的:
复制代码代码如下:
class Friendship(BaseModel):
__tablename__ = 'friendship'
id = Column(Integer, primary_key=True)
user_id1 = Column(Integer, ForeignKey('user.id', ondelete='CASCADE', onupdate='CASCADE'))
user_id2 = Column(Integer, ForeignKey('user.id', ondelete='CASCADE', onupdate='CASCADE'))
复制代码代码如下:
from sqlalchemy import distinct
import aliased

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。