作者:何李夫、范欣欣
上文简要介绍了Join在大数据领域中的使用背景以及常用的几种算法-broadcast hash join 、shuffle hash join以及sort merge join等,对每一种算法的核心应用场景也做了相关介绍,这里再重点说明一番:大表与小表进行join会使用broadcast hash join,一旦小表稍微大点不再适合广播分发就会选择shuffle hash join,最后,两张大表的话无疑选择sort merge join。
好了,问题来了,说是这么一说,但到底选择哪种算法归根结底是SQL执行引擎干的事情,按照上文逻辑,SQL执行引擎肯定要知道参与Join的两表大小,才能选择最优的算法喽!那么斗胆问一句,怎么知道两表大小?衡量两表大小的是物理大小还是纪录多少抑或两者都有?其实,这是另一门学问-基于代价优化(Cost Based Optimization,简称CBO),它不仅能够解释Join算法的选择问题,更重要的,它还能确定多表联合Join场景下的Join顺序问题。
RF预备知识:bloom filter
RF说白了是使用bloomfilter对参与join的表进行过滤,减少实际参与join的数据量。为了下文详细解释整个流程,有必要先解释一下bloomfilter这个数据结构(对之熟悉的看官可以绕道)。Bloom Filter使用位数组来实现过滤,初始状态下位数组每一位都为0,如下图所示:
假如此时有一个集合S = {x1, x2, … xn},Bloom Filter使用k个独立的hash函数,分别将集合中的每一个元素映射到{1,…,m}的范围。对于任何一个元素,被映射到的数字作为对应的位数组的索引,该位会被置为1。比如元素x1被hash函数映射到数字8,那么位数组的第8位就会被置为1。下图中集合S只有两个元素x和y,分别被3个hash函数进行映射,映射到的位置分别为(0,3,6)和(4,7,10),对应的位会被置为1:

现在假如要判断另一个元素是否是在此集合中,只需要被这3个hash函数进行映射,查看对应的位置是否有0存在,如果有的话,表示此元素肯定不存在于这个集合,否则有可能存在。下图所示就表示z肯定不在集合{x,y}中:

RF算法理论


Step 1:将item表的join字段(item.id)经过多个hash函数映射处理为一个bloomfilter(如果对bloomfilter不了解,自行google)
Step 2:将映射好的bloomfilter分别广播到order表的所有partition上,准备进行过滤
Step 3:以Partition2为例,存储进程(比如DataNode进程)将order表中join列(order.item_id)数据一条一条读出来,使用bloomfilter进行过滤。淘汰该订单数据不是书籍相关商品的订单,这条数据直接跳过;否则该条订单数据有可能是待检索订单,将该行数据全部扫描出来。
Step 4:将所有未被bloomfilter过滤掉的订单数据,通过本地socket通信发送到计算进程(impalad)。
RF算法分析
- RF本质:通过谓词( bloomfilter)下推,在存储层通过bloomfilter对数据进行过滤,可以从三个方面实现对Join的优化。其一,如果可以跳过很多记录,就可以减少了数据IO扫描次数。这点需要重点解释一下,许多朋友会有这样的疑问:既然需要把数据扫描出来使用BloomFilter进行过滤,为什么还会减少IO扫描次数呢?这里需要关注一个事实:大多数表存储行为都是列存,列之间独立存储,扫描过滤只需要扫描join列数据(而不是所有列),如果某一列被过滤掉了,其他对应的同一行的列就不需要扫描了,这样减少IO扫描次数。其二,减少了数据从存储层通过socket(甚至TPC)发送到计算层的开销,其三,减少了最终hash join执行的开销。
- RF代价:对照未使用RF的Broadcast Hash Join来看,前者主要增加了bloomfilter的生成、广播以及大表根据bloomfilter进行过滤这三个开销。通常情况下,这几个步骤在小表较小的情况下代价并不大,基本可以忽略。
- RF优化效果:基本取决于bloomfilter的过滤效果,如果大量数据被过滤掉了,那么join的性能就会得到极大提升;否则性能提升就会有限。
- RF实现:和常见的谓词下推(’=‘,’>’,’<‘等)一样,RF实现需要在计算层以及存储层分别进行相关逻辑实现,计算层要构造bloomfilter并将bloomfilter下传到存储层,存储层要实现使用该bloomfilter对指定数据进行过滤。
RF效果验证


select
w_state
,i_item_id
,sum(case when (cast(d_date as date) < cast (‘1998-04-08’ as date))
then cs_sales_price – coalesce(cr_refunded_cash,0) else 0 end) as sales_before
,sum(case when (cast(d_date as date) >= cast (‘1998-04-08’ as date))
then cs_sales_price – coalesce(cr_refunded_cash,0) else 0 end) as sales_after
from
catalog_sales left outer join catalog_returns on
(catalog_sales.cs_order_number = catalog_returns.cr_order_number
and catalog_sales.cs_item_sk = catalog_returns.cr_item_sk)
,warehouse
,item
,date_dim
where
i_current_price between 0.99 and 1.49
and item.i_item_sk = catalog_sales.cs_item_sk
and catalog_sales.cs_warehouse_sk = warehouse.w_warehouse_sk
and catalog_sales.cs_sold_date_sk = date_dim.d_date_sk
and date_dim.d_date between ‘1998-03-09’ and ‘1998-05-08’
group by
w_state,i_item_id
order by w_state,i_item_id
limit 100;
|
impala on kudu(without runtime filter) | impala on parquet(with runtime filter) | |
total time | 43s996ms | 2s385ms |
bloomfilter生成 |
Filter 0 arrival: 857ms Filter 1 arrival: 879ms Filter 2 arrival: 939ms |
|
大表scan扫描 |
HDFS_SCAN_NODE (id=0):(Total: 3s479ms) – RowsRead: 72.01M – RowsReturned: 72.01M – RowsReturnedRate: 20.69 M/s |
HDFS_SCAN_NODE (id=0):(Total: 2s011ms) – RowsRead: 72.01M – RowsReturned: 35.92K – RowsReturnedRate: 17.86 K/sec Filter 0 (1.00 MB): – Rows processed: 72.01M – Rows rejected: 71.43M – Rows total: 72.01M Filter 1 (1.00 MB): – Rows processed: 49.15K – Rows rejected: 126 – Rows total: 49.15K Filter 2 (1.00 MB): – Rows processed: 584.38K – Rows rejected: 548.46K – Rows total: 584.38K |
数据加载计算进程内存 |
DataStreamSender (dst_id=11):(Total: 15s984ms) – NetworkThroughput(*): 298.78 MB/sec – OverallThroughput: 100.85 MB/sec – RowsReturned: 72.01M – SerializeBatchTime: 10s567ms – TransmitDataRPCTime: 5s395ms |
DataStreamSender (dst_id=11):(Total: 10.725ms) – NetworkThroughput(*): 244.06 MB/sec – OverallThroughput: 71.23 MB/sec – RowsReturned: 35.92K – SerializeBatchTime: 7.544ms – TransmitDataRPCTime: 3.130ms
|
Hash Join |
HASH_JOIN_NODE (id=5):(Total: 19s104ms) – BuildPartitionTime: 862.560ms – BuildRows: 8.99M – BuildRowsPartitioned: 8.99M – BuildTime: 373.855ms – …… – ProbeRows: 90.00M – ProbeRowsPartitioned: 0 (0) – ProbeTime: 17s628ms – RowsReturned: 90.00M – RowsReturnedRate: 985.85 K/s – SpilledPartitions: 0 (0) – UnpinTime: 960.000ns |
HASH_JOIN_NODE (id=6):(Total: 21.707ms) – BuildPartitionTime: 3.487ms – BuildRows: 18.81K (18814) – BuildRowsPartitioned: 18.81K – BuildTime: 646.817us – …… – ProbeRows: 85.28K (85278) – ProbeRowsPartitioned: 0 (0) – ProbeTime: 6.396ms – RowsReturned: 85.27K – RowsReturnedRate: 38.88 K/s – SpilledPartitions: 0 (0) – UnpinTime: 915.000ns |
1. 确认经过RF之后大表的数据量得到大量滤除,只剩下少量数据参与最终的HashJoin。参见第二行大表scan扫描结果,未使用rf的返回结果有7千万行+纪录,而经过RF过滤之后满足条件的只有3w+纪录。3万相比7千万,性能优化效果自然不言而喻。
2. 经过RF滤除之后,少量数据经过网络从存储进程加载到计算进程内存的网络耗时大量减少。参见第三行“数据加载到计算进程内存”,前者耗时15s,后者耗时仅仅11ms。主要耗时分为两部分,其中数据序列化时间占到2/3-10s左右,数据经过RPC传输时间占另外1/3 -5s左右。
说好的谓词下推呢?
- 其一是逻辑执行计划优化层面的说法,比如SQL语句:select * from order ,item where item.id = order.item_id and item.category = ‘book’,正常情况语法解析之后应该是先执行Join操作,再执行Filter操作。通过谓词下推,可以将Filter操作下推到Join操作之前执行。即将where item.category = ‘book’下推到 item.id = order.item_id之前先行执行。
- 其二是真正实现层面的说法,谓词下推是将过滤条件从计算进程下推到存储进程先行执行,注意这里有两种类型进程:计算进程以及存储进程。计算与存储分离思想,这在大数据领域相当常见,比如最常见的计算进程有SparkSQL、Hive、impala等,负责SQL解析优化、数据计算聚合等,存储进程有HDFS(DataNode)、Kudu、HBase,负责数据存储。正常情况下应该是将所有数据从存储进程加载到计算进程,再进行过滤计算。谓词下推是说将一些过滤条件下推到存储进程,直接让存储进程将数据过滤掉。这样的好处显而易见,过滤的越早,数据量越少,序列化开销、网络开销、计算开销这一系列都会减少,性能自然会提高。
范神出品,必属精品!!!
最后对bloomfilter的定性和谓语下推的解释特别精彩!!!
精彩
有两个图片打不开是什么鬼啊!
看了下没问题嘛 换个浏览器试试 如果还有问题 请说明是哪个图
那如果我用item.category = ‘book’ and item.id = order.item_id就不算谓词下推了吗?
算啊 这个就是谓词下推 个人觉得谓词下推包含两种 一种是这个 一种是文中所说的
范大哥,那在原生的HDFS上(没有设置列存),如何使用布隆过滤器在存储层过滤,程序是mapereduce的
原生HDFS没有布隆过滤器机制,布隆过滤器主要用来加速随机查询,HDFS这样的大规模扫描任务用不上
那在mapreduce中实现两个巨大表(TB级)的关联,只能通过shuffle-join这种缓慢的方式吗?
嗯 如果是两张大表join那没办法 不可能快的
要是能加上参考文档链接就好了。
谢谢建议,后面文章都会添加参考文章
算法第三步,是不是比较反了啊?倘若这么做好像会很慢,而且肯定没有直接比较来的块吧。应该是一个partition有一个描述数据的BloomFilter,而谓词中的值算出一个位置数值,来匹配这个BloomFilter吧。这样比较次数只和谓词个数有关,与partition中数据个数无关。
没有很理解,可以在邮件中详细沟通。libisthanks@gmail.com
谢谢分享,有学习到,同样希望提供参考文档链接
这篇文章基于我们内部做的实验和本人的一些认知 想bloomfilter相关的资料可以网上查找
上文在哪儿?
谢谢分享,最近一直在看您的文章,不过这篇文章里面有一些图片引用了您本地的路径,能去掉么?能看到您的邮箱~
我假期统一处理一下 谢谢反馈