query = Facility.select() 等价于SQL SELECT * FROM facilities
Facility.select(Facility.name, Facility.membercost) 等价于 SELECT name, membercost FROM facilities;
Facility.select().where(Facility.membercost > 0) 等价于 SELECT * FROM facilities WHERE membercost > 0
Facility.select().where(Facility.name.contains('tennis')) 等价 SELECT * FROM facilities WHERE name ILIKE '%tennis%';
或者 Facility.select().where(Facility.name ** '%tennis%')
query = Facility.select().where(Facility.facid.in_([1, 5])) in操作
SELECT name,CASE WHEN monthlymaintenance > 100 THEN 'expensive' ELSE 'cheap' END FROM facilities;
对应ORM语句
cost = Case(None, [(Facility.monthlymaintenance > 100, 'expensive')], 'cheap') query = Facility.select(Facility.name, cost.alias('cost'))
比较日期
SELECT memid, surname, firstname, joindate FROM members WHERE joindate >= '2012-09-01';
Member.select(Member.memid, Member.surname, Member.firstname, Member.joindate).where(Member.joindate >= datetime.date(2012, 9, 1))
删除重复项并且排序
SELECT DISTINCT surname FROM members ORDER BY surname LIMIT 10;
Member.select(Member.surname).order_by(Member.surname).limit(10).distinct()
UNION 操作 把查询到的数据 合并 到一起 前提:列数相同
SELECT surname FROM members UNION SELECT name FROM facilities;
lhs = Member.select(Member.surname)rhs = Facility.select(Facility.name)query = lhs | rhs
|
代表UNION 相同的只显示一次
+
代表UNION ALL 都显示
&
代表INTERSECT 只显示相同的
-
代表EXCEPT
SELECT MAX(join_date) FROM members;
query = Member.select(fn.MAX(Member.joindate)) query.scale() 取得里面值 上面的只会生成SQL语句并不会真正执行 取得最大值的那一条数据
SELECT firstname, surname, joindate FROM member WHERE joindate = (SELECT MAX(joindate) FROM members);
MemberAlias = Member.alias() subq = MemberAlias.select(fn.MAX(MemberAlias.joindate)) 先进行子查询 再将子查询作为条件 query = Member .select(Member.firstname, Member.surname, Member.joindate) .where(Member.joindate == subq)
连接查询
SELECT starttime FROM bookingsINNER JOIN members ON (bookings.memid = members.memid) WHERE surname = 'Farrell' AND firstname = 'David';
query = Booking.select(Booking.starttime).join(Member) .where((Member.surname == 'Farrell') & (Member.firstname == 'David'))
SELECT starttime, name FROM bookings INNER JOIN facilities ON (bookings.facid = facilities.facid) WHERE date_trunc('day', starttime) = '2012-09-21':: date AND name ILIKE 'tennis%' ORDER BY starttime, name;
query = (Booking.select(Booking.starttime, Facility.name).join(Facility)
.where( (fn.date_trunc('day', Booking.starttime) == datetime.date(2012, 9, 21)) & Facility.name.startswith('Tennis')).order_by(Booking.starttime, Facility.name))
批量插入 不要在循环中使用 Model.create() 1.每次调用该方法都是在自己的事务中,这会很慢 2.每次执行这个方法,都会执行大量的python逻辑 3.发送大量数据到数据库需要解析大量数据 4.检索最后一个插入索引,会导致某些情况进行其他查询 怎么改进?
with db.atomic(): for data_dict in data_source: MyModel.create(**data_dict)
在同一个事务中
使用 insert_many()
MyModel.insert_many(data, fields=[MyModel.field1, MyModel.field2]).execute()
with db.atomic(): MyModel.insert_many(data, fields=fields).execute()
在使用sqlite中可能有绑定变量的限制在SQL
根据数据源,可能需要分成多个快 sqlite限制是 999
可以编写一个循环将数据批量分组
with db.atomic(): for idx in range(0, len(data_source), 100): MyModel.insert_many(data_source[idx:idx+100]).execute()
或者使用辅助函数来分组
from peewee import chunked# Insert rows 100 at a time. 一次插入100条数据with db.atomic(): for batch in chunked(data_source, 100): MyModel.insert_many(batch).execute()
Model.bulk_create() 函数和 insert_many 非常相似,加入批量大小参数
with db.atomic(): User.bulk_create(users, batch_size=100)
可以高效高效更新
User.bulk_update([u1, u2, u3], fields=[User.username])
对于大型列表,最好加上 batch_size 来调用这个更新方法
with database.atomic(): User.bulk_update(list_of_users, fields=['username'], batch_size=50)
for row in db.batch_commit(row_data, 100): User.create(**row)
插入的数据从别的表来时
res = (TweetArchive .insert_from( Tweet.select(Tweet.user, Tweet.message), fields=[TweetArchive.user, TweetArchive.message]) .execute())
INSERT INTO "tweet_archive" ("user_id", "message")SELECT "user_id", "message" FROM "tweet";
Tweet.update(is_published=True).where(Tweet.creation_date < today).execute()
不要在循环中 使用 save() 不仅速度慢,而且当是多进程是,容易受到竞争影响
query = Stat.update(counter=Stat.counter + 1).where(Stat.url == request.url)
query = Employee.update(bonus=(Employee.bonus + (Employee.salary * .1)))
user_id = (User .replace(username='the-user', last_login=datetime.now()) .execute())
user_id = (User .insert(username='the-user', last_login=datetime.now()) .on_conflict_replace() .execute())
Tweet.delete().where(Tweet.creation_date < one_year_ago).execute()
选择单条数据
Model.get() Model.get_by_id() Model[num] 如果没有触发 DoesnotExist
也可以对已有的 SQL 执行get() 操作 get_or_create() 会先索引,没有就创建
user, created = User.get_or_create(username=username)
select() 是智能的,可以多次切片操作,但是只会查询一次
结果被缓存以后,如果要禁用该功能,节约内存 使用 select.Iterator()
除了返回对象,还可以封装成其他对象
, ,
当返回的行数太多的时候 可以使用 obejects() 避免建模 提高查询性能可以使用 curosr
query = Tweet.select(Tweet.content, User.username).join(User)cursor = database.execute(query)for (content, username) in cursor: print(username, '->', content)
尽量使用 & | 而不是使用 and or 原因是Python会将 and / or 的返回值转化为 bool值 in则是相同的原理
提取随机记录
LotteryNumber.select().order_by(fn.Random()).limit(5)
LotterNumber.select().order_by(fn.Rand()).limit(5)
统计
Tweet.select().count()
Tweet.select().where(Tweet.id > 50).count()
(User.select(User, fn.Count(Tweet.id).alias('count')) .join(Tweet, JOIN.LEFT_OUTER) .group_by(User))
Employee.select( fn.Min(Employee.salary), fn.Max(Employee.salary)).scalar(as_tuple=True)
操作符
<< x in y >> x is y % ** like
.in_(value) .not_in(value) .is_null(is_null) .contains(substr)
.startswith(prefix) .endswith(suffix) .between(low, high)
.regexp(exp) .iregexp(exp) .bin_and(value) .bin_or(value)
.concat(other) .distinct() .collate(collation) .cast(type)
fn.Lower(fn.Substr(User.username, 1, 1)) == 'g'
执行原始SQL
query = MyModel.raw('SELECT * FROM my_table WHERE data = %s', user_data)
query = MyModel.select().where(SQL('Some SQL expression %s', user_data))