查看原文
其他

SQL on DataFrame,多快好省

alitrack alitrack 2022-10-01

今天介绍一种可以让查询速度更快, 代码更简洁,省去记忆大量的 Pandas 命令的方法。

或许你听说过 pandasql(或者 sqldf),这里说的不是它,因为它太慢了。

为了方便理解,做了三种方法的对比, 分别为直接对 DuckDB 的表(视图)进行 SQL 操作、Pandas API 以及把 Pandas DataFrame 当做表,通过 SQL 操作。

环境:Notebook

准备工作

下载数据

wget -q https://github.com/cwida/duckdb-data/releases/download/v1.0/lineitemsf1.snappy.parquet
wget -q https://github.com/cwida/duckdb-data/releases/download/v1.0/orders.parquet

安装包

!pip install pandasql ipython-sql duckdb-engine==0.1.8rc4  duckdb ipython-autotime

导入包及初始化

#自动计算运行时间
%load_ext autotime
#下面6行和ipython-sql有关
%load_ext sql
%config SqlMagic.autopandas=True
%config SqlMagic.feedback = False
%config SqlMagic.displaycon = False

%sql duckdb:///:memory:
%sql pragma threads=4
#导入必要的包
import pandas as pd
import numpy as np
from pandasql import sqldf
from duckdb import query
#方便需要定义的两个函数
sqlite = lambda q: sqldf(q, globals())
ducksql= lambda q:  query(q).df()

准备数据

DuckDB 以两种方式测试,第一种,通过创建视图的方式,直接访问 parquet 文件,不把数据放内存

%%sql
create view lineitem as select * from 'lineitemsf1.snappy.parquet';
create view orders as SELECT * FROM 'orders.parquet';
show tables;
#返回
 name
0 lineitem
1 orders

第二种方式就是把 parquet 加载到 DataFrame,然后 SQL on Pandas DataFrame

lineitem_df = %sql SELECT * FROM 'lineitemsf1.snappy.parquet'
orders_df = %sql SELECT * FROM 'orders.parquet'

对比测试

count


这一步 pandasql 基本就可以出局了,太慢了。

Limit

%sql select * from lineitem limit 5 # 60.9ms
ducksql("select * from lineitem_df limit 5"#23.5ms
sqlite("select * from lineitem_df limit 5"#3m 57s
lineitem_df.head(5#13ms

这一轮, pandas 胜,pandasql 最慢,接下来放弃它了

选取部分列

lineitem_df[['l_shipdate','l_extendedprice','l_quantity','l_partkey']].head() #271ms
%sql select l_shipdate,l_extendedprice,l_quantity,l_partkey from lineitem limit 5 #18.8ms
ducksql( "select l_shipdate,l_extendedprice,l_quantity,l_partkey from lineitem_df limit 5"#8.1ms

pandas 最慢

排序(Sort/Order by)

lineitem_df.sort_values('l_shipdate',ascending=False).head(5#28s
ducksql("select * from lineitem_df order by l_shipdate desc limit 5"#1.31s
%sql select * from lineitem order by l_shipdate desc limit 5

#1.14s

查询(Where)

%sql select * from lineitem where l_shipmode ='RAIL' limit 5 #38.5ms
ducksql("select * from lineitem_df where l_shipmode ='RAIL' limit 5"#15.3ms
lineitem_df.query("l_shipmode =='RAIL'").head()

#758ms

聚合(Groupby)

%sql select l_shipmode,l_shipinstruct, count(*) as cnt, sum(l_extendedprice*l_quantity) rev from lineitem group by 1,2 order by 3 desc limit 5 # 415ms

ducksql("select l_shipmode,l_shipinstruct, count(*) as cnt, sum(l_extendedprice*l_quantity) rev from lineitem_df group by 1,2 order by 3 desc limit 5"# 866ms

lineitem_df.assign(rev=lineitem_df["l_extendedprice"] * lineitem_df["l_quantity"]).groupby(['l_shipmode','l_shipinstruct']).agg({"rev": np.sum, "l_shipmode": np.size}).head()

#4.45s

差距有些大,而且代码很长,需要记忆很多函数写法了。

Join(连接)

这里只测试一个 inner join,其它的有兴趣可以自己来

%%sql
SELECT l_returnflag,
       l_linestatus,
       sum(l_extendedprice)
FROM  lineitem
JOIN  orders ON (l_orderkey=o_orderkey)
WHERE l_shipdate <= DATE '1998-09-02'
  AND o_orderstatus='O'
GROUP BY l_returnflag,
         l_linestatus
ducksql("""
SELECT l_returnflag,
       l_linestatus,
       sum(l_extendedprice)
FROM  lineitem_df
JOIN  orders_df ON (l_orderkey=o_orderkey)
WHERE l_shipdate <= DATE '1998-09-02'
  AND o_orderstatus='O'
GROUP BY l_returnflag,
         l_linestatus
"""
)
pd.merge(lineitem_df.query("l_shipdate <= '1998-09-02'")[['l_returnflag','l_orderkey','l_linestatus','l_extendedprice']],
         orders_df.query("o_orderstatus=='O'")[['o_orderkey']],how='inner', left_on='l_orderkey',right_on='o_orderkey')\
.groupby(['l_returnflag','l_linestatus']).agg(sum=('l_extendedprice','sum'))
MethodTime
View518ms
SQL on DataFrame854ms
Pandas6.13s

Union(合并)

  • union all(不去重)

pd.concat([lineitem_df.query("l_shipdate =='1992-01-02'  and l_shipmode =='FOB'"),
          lineitem_df.query("l_shipdate <='1992-01-03'  and l_shipmode =='FOB'")],axis=0)
#1.67s
%%sql
select * from lineitem where  l_shipdate ='1992-01-02' and l_shipmode ='FOB'
union all
select * from lineitem where  l_shipdate <='1992-01-03' and l_shipmode ='FOB'

#1.74s
ducksql("""
select * from lineitem_df where  l_shipdate ='1992-01-02' and l_shipmode ='FOB'
union all
select * from lineitem_df where  l_shipdate <='1992-01-03' and l_shipmode ='FOB'
"""
)
#2.31s
  • union(去重)
pd.concat([lineitem_df.query("l_shipdate =='1992-01-02'  and l_shipmode =='FOB'"),
          lineitem_df.query("l_shipdate <='1992-01-03'  and l_shipmode =='FOB'")],axis=0).drop_duplicates()
# 1.16s
%%sql
select * from lineitem where  l_shipdate ='1992-01-02' and l_shipmode ='FOB'
union
select * from lineitem where  l_shipdate <='1992-01-03' and l_shipmode ='FOB'
#1.75s
ducksql("""
select * from lineitem_df where  l_shipdate ='1992-01-02' and l_shipmode ='FOB'
union
select * from lineitem_df where  l_shipdate <='1992-01-03' and l_shipmode ='FOB'
"""
)
# 2.51s

开窗函数(Window Function)

  • row_number()
%%sql
SELECT * FROM (
  SELECT
    t.*,
    ROW_NUMBER() OVER(PARTITION BY l_shipdate ORDER BY l_extendedprice DESC) AS rn
  FROM lineitem t
)
WHERE rn =1
ORDER BY l_extendedprice, rn;
#14s
ducksql("""
SELECT * FROM (
  SELECT
    t.*,
    ROW_NUMBER() OVER(PARTITION BY l_shipdate ORDER BY l_extendedprice DESC) AS rn
  FROM lineitem_df t
)
WHERE rn =1
ORDER BY l_extendedprice, rn;
"""
)
#37.8s
(
    lineitem_df.assign(
        rn=lineitem_df.sort_values(["l_extendedprice"], ascending=False)
        .groupby(["l_shipdate"])
        .cumcount()
        + 1
    )
    .query("rn ==1")
    .sort_values(["l_shipdate""rn"])
)
#24.6s
(
    lineitem_df.assign(
        rnk=lineitem_df.groupby(["l_shipdate"])["l_extendedprice"].rank(
            method="first", ascending=False
        )
    )
    .query("rnk == 1")
    .sort_values(["l_shipdate""rnk"])
)
# 5.81s,这个快很多
  • rank
%%sql
SELECT * FROM (
  SELECT
    t.*,
    RANK() OVER(PARTITION BY l_shipmode ORDER BY l_quantity) AS rnk
  FROM lineitem t
    where l_quantity<10
)
WHERE rnk < 3
ORDER BY l_shipmode, rnk;
#3.03s

ducksql("""
SELECT * FROM (
  SELECT
    t.*,
    RANK() OVER(PARTITION BY l_shipmode ORDER BY l_quantity) AS rnk
  FROM lineitem_df t
    where l_quantity<10
)
WHERE rnk < 3
ORDER BY l_shipmode, rnk;
"""
)
#4.17
(
    lineitem_df[lineitem_df["l_quantity"] < 10]
    .assign(rnk_min=lineitem_df.groupby(["l_shipmode"])["l_quantity"].rank(method="min"))
    .query("rnk_min < 3")
    .sort_values(["l_shipmode""rnk_min"])
)
# 3.31

更新(UPDATE)与删除(DELETE)

SQL on DataFrame 不支持。

从上面的例子可以看出,使用 SQL on DataFrame, 省去了记忆复杂 Pandas 的函数、速度也比较保证(复杂的 SQL 反而快很多),另外如果数据比较大,还是建议直接放在 DuckDB 数据库里,不用担心丢失、节省内存,还保证速度,只在需要的时候才把数据取出来。


您可能也对以下帖子感兴趣

文章有问题?点此查看未经处理的缓存