Pulpcode

捕获,搅碎,拼接,吞咽

0%

sqlalchemy使用手册

Introduction

我们在开发过程中需要使用python去连接mysql,pymysql过于“老土”,所以我选择使用sqlalchemy。

初始化

我一般在配置文件中,配置mysql相关的用户名,密码,等参数。

1
2
3
4
5
6
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker

DB_CONNECT_STRING = "mysql+mysqldb://%s:%s@%s:%s/%s?charset=utf8"
engine = create_engine(DB_CONNECT_STRING % (config.MYSQL_USER, config.MYSQL_PASSWD, config.MYSQL_HOST, config.MYSQL_PORT, config.MYSQL_DB), echo=config.SQLALCHEMY_ECHO)
DB_Session = scoped_session(sessionmaker(bind=engine))

其中:create_engine() 会返回一个数据库引擎,sessionmaker() 会生成一个数据库会话类。

我常常在tornado中使用这sqlalchemy,所以我会在两个钩子函数中,创建session,关闭session。

1
2
3
4
5
6
class BaseHandler(tornado.web.RequestHandler):
def initialize(self):
self.session = DB_Session()

def on_finish(self):
self.session.close()

直接执行sql

sqlalchemy是支持直接执行sql的,

1
2
session.execute('select * from customers').fetchall()
session.execute('select * from customers where id = :id', {'id': 1}).first()

需要注意的第一点是,我的第二种写法并没有“拼sql”,所以它是可以防止sql注入的。

还要注意,返回数据的“容器”是怎样的,下面给几个例子。

使用fetchall()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16

sql = 'select a, b from A;'
session.execute(sql).fetchall()
# 如果返回多条

[(a1, b1),
(a2, b2)]

# 如果返回一条,(你会发现,即使是一条,结构也没有发生变化)

[(a1, b1)]

# 如果一条数据都没有呢?

[]

使用first()

1
2
3
4
5
6
7

sql = 'select a, b from A;'
session.execute(sql).first()
# 返回一条
(a1, b2)
# 如果一条数据都没有
None

不过我不建议使用直接执行sql的方式,orm才是sqlalchemy的强项。

使用 orm

这里直接给出几个常用的例子,方便查阅使用:

对于一个表b,有一个对应的B model。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
class B(BaseModel)

__tablename__ = 'b'

id = Column(BigInteger, nullable=False, primary_key=True)
application_id = Column(CHAR(5), nullable=False)
type = Column(SMALLINT, nullable=False)
count = Column(Integer, nullable=False, default=0)

b = session.query(B).first()
#直接拿B,将拿出一个对象,你可以直接把它当对象使用。
b.id
b.count
b.type

b = session.query(B.id, B.type).first()
#这样会拿出一个元组(id1, type1)
# 类似直接执行,al()操作,会拿出一个list,里面都是()

# filter的操作就是where,要注意的一点是,filter默认是and,要使用or或者其它操作需要导入
query.filter(B.application_id=='00001', B.type=3)
from sqlalchemy import func, or_, not_
query.filter(or_(B.type==1, B.type==2)).all() # or

一个有趣的操作是scalar,它会拿出一条数据的第一个字段,那么你想获得一条数据的第一条记录,直接使用scalar就行了,省的first之后再去[0]。

不过要注意的是,scalar的前提是单条,或没有(没有返回None),如果是多条记录,会抛出异常,所以我一般用它,是因为filter部分是靠主键来取的。

(“Multiple rows were found for one()”)

还有其它的操作,比如获取主键可以直接使用get,而不用filter。

1
session.query(User).get(1).name

还有就是删除操作:

1
session.query(User).filter(User.id == 1).delete()

当然修改的操作要记得commit,抛出异常的时候要记得回滚。

1
2
3
4
5
6
7
try:
....
session.commit()
except:
session.rollback()
else:
....

连接池

pool_size

pool_size是连接池中的连接个数, 如果你设置的太少,那么很有可能连接不够用。你一个server的其它请求进来要获取数据库连接的时候,就TimeoutError: QueuePool limit of size x overflow y reached, connection timed out, timeout z

那么你的server最大能有多少连接呢? 在数据库中使用这条命令: show variables like 'max_connections';

pool_recycle

你可以再mysql中使用如下命令:show global variables like '%timeout%'; 里面有个变量叫interactive_timeout

mysql建立的连接,在interactive_timeout时间内都没有访问请求的话,mysql server将主动断开这条连接,那么你的后续操作如果都在sqlalchemy上进行操作肯定会出错的。这就会出现:error 2006 (MySQL server has gone away)

那么如果设置了pool_recycle(该值必须小于数据库服务器的interactive_timeout),连接池中的空闲连接超过此时间后,自动释放。

其它

create_engine有一个echo参数,在调试的时候将其开启,可以看见每一条执行的sql,这个在开发时很方便。

还有一个要注意的是:

1
scoped_session(sessionmaker(autocommit=False, autoflush=False, bind=engine))

我一般会将autocommit和autoflush默认关闭。