一、什么是拉链表?
- 定义
- 针对数据仓库设计中表存储数据的方式而定义的,设计表的时候添加 start_date
和 end_date 两个字段,数据更新时,通过修改 end_date 来设置数据的有效时间。 所谓拉链,就是记录历史
- 记录一个事物
从开始,一直到当前状态
的所有变化的信息。 - 我们可以使用这张表拿到最新的
当天的最新数据
以及之前的历史数据
。 - 既能满足反应数据的历史状态,又可以最大限度地节省存储空间
- 目的
- 解决SCD(Slowly Changing Dimensions),缓慢变化维。
- 收益
- 最大程度的节省存储
- 快速、高效的获取历史上任意一天的快照数据
- 适用场景
数据量很大
且业务系统不会长期保留历史数据,需要在大数据平台保存- 表字段会被
update更新操作
需要查看某一个时间点或者时间段的历史快照信息
- 表中的记录
变化的比例和频率很小
- 拉链表和流水表
- 流水表存放的是一个用户的变更记录,比如在一张流水表中,一天的数据中,会存放一个用户的每条修改记录,但是在拉链表中只有一条记录。
- 这是拉链表设计时需要注意的一个粒度问题。我们当然也可以设置的粒度更小一些,一般按天就足够。
二、缓慢变化维
2.1 谈到拉链表就不得不谈SCD(缓慢变化维问题)
- 缓慢变化维,简称SCD(Slowly Changing Dimensions)
一些维度表的数据不是静态的,而是会随着时间而缓慢地变化
(这里的缓慢是相对事实表而言
,事实表数据变化的速度比维度表快)- 这种随着时间发生变化的维度称之为缓慢变化维
- 把处理维度表数据历史变化的问题,称为缓慢变化维问题,简称SCD问题
- 举例
编号 | 用户ID | 用户名 | 出生日期 | 住址 |
---|
9527 | 114 | 张三 | 1988-09-08 | 北京市朝阳区 |
- 这个用户的数据不是一直不变,而是有可能发生变化。例如:用户修改了出生日期 或者用户修改了住址。
2.2 缓慢变化维怎么解决?(粗看有五种)
2.2.1 保留初始值(不让改)
用户体验不好
- 如出生日期的数据,始终按照用户第一次填写的数据为准
2.2.2 改写属性值
- 对其相应需要重写维度行中的旧值,以当前值替换。因此其始终反映最近的情况。
- 当一个维度值的数据源发生变化,并且不需要在维度表中保留变化历史时,通常用新数据来覆盖旧数据。这样的处理使属性所反映的中是最新的赋值。
这种方法有个前提,用户不关心这个数据的变化。
- 用户需要历史数据怎么办?
- 我要分析历史变化数据怎么办?
2.2.3 增加维度新行
编号 | 用户ID | 用户名 | 出生日期 | 住址 |
---|
9527 | 114 | 张三 | 1988-09-08 | 北京市朝阳区 |
修改后:
- 那么怎么区分那条数据是最新的,或者怎么看历史呢?这里可以考虑拉链表的方式,在后面新增两列。。。。。。(具体的话我们接着往下看拉链~)
编号 | 用户ID | 用户名 | 出生日期 | 住址 |
---|
9527 | 114 | 张三 | 1988-09-08 | 北京市朝阳区 |
9527 | 114 | 张三 | 1992-09-08 | 北京市海淀区 |
2.2.4 增加维度新列
- 用不同的字段来保存不同的值,就是在表中增加一个字段,这个字段用来保存变化后的当前值,而原来的值则被称为变化前的值。总的来说,这种方法通过添加字段来保存变化后的痕迹。
用户改多少列我要增加多少列??
- 修改前:
编号 | 用户ID | 用户名 | 出生日期 | 住址 |
---|
9527 | 114 | 张三 | 1988-09-08 | 北京市朝阳区 |
编号 | 用户ID | 用户名 | 出生日期 | 出生日期2 | 住址 | 现住址 |
---|
9527 | 114 | 张三 | 1988-09-08 | 1992-09-08 | 北京市朝阳区 | 北京市海淀区 |
2.2.5 使用历史表
- 另外建一个表来保存历史记录,这种方式就是将历史数据与当前数据完全分开来,在维度中只保存当前最新的数据。
- 优点
- 缺点
- 修改前:
编号 | 用户ID | 用户名 | 出生日期 | 住址 |
---|
9527 | 114 | 张三 | 1988-09-08 | 北京市朝阳区 |
编号 | 用户ID | 用户名 | 出生日期 | 住址 |
---|
9527 | 114 | 张三 | 1992-09-08 | 北京市海淀区 |
编号 | 用户ID | 用户名 | 出生日期 | 住址 |
---|
9527 | 114 | 张三 | 1988-09-08 | 北京市朝阳区 |
三、拉链表的使用场景
- 在数据仓库的数据模型设计过程中,经常会遇到下面这种表的设计:
- 有一些表的数据量很大,比如一张用户表,大约10亿条记录,50个字段,这种表,即使使用ORC压缩,单张表的存储也会超过100G,在HDFS使用双备份或者三备份的话就更大一些。
- 表中的部分字段会被update更新操作,如用户联系方式,产品的描述信息,订单的状态等等。
- 需要查看某一个时间点或者时间段的历史快照信息,比如,查看某一个订单在历史某一个时间点的状态。
- 表中的记录变化的比例和频率不是很大,比如,总共有10亿的用户,每天新增和发生变化的有200万左右,变化的比例占的很小。
- 那么对于这种表我该如何设计呢?下面有几种方案可选:
- 方案一:每天只留最新的一份,比如我们每天抽取最新的一份全量数据到Hive中。
- 优点
- 节省空间,一些普通的使用也很方便,不用在选择表的时候加一个时间分区什么的。
- 缺点
- 没有历史数据,想翻翻旧账只能通过其它方式,比如从流水表里面抽。
- 方案二:每天保留一份全量的切片数据。
- 优点
- 每天一份全量的切片是一种比较稳妥的方案,而且历史数据也在。
- 缺点
- 就是存储空间占用量太大,如果对这边表每天都保留一份全量,那么每次全量中会保存很多不变的信息,对存储是极大的浪费,这点我感触还是很深的…
- 当然我们也可以做一些取舍,比如只保留近一个月的数据?但是,需求是无耻的,数据的生命周期不是我们能完全左右的。
- 方案三:使用拉链表。
- 优点
- 在空间上做了一个取舍,虽说不像方案一那样占用量那么小,但是它每日的增量可能只有方案二的千分之一甚至是万分之一。
- 既能获取最新的数据,也能添加筛选条件也获取历史的数据。
四、拉链表的设计和实现
- 这里举例测试一下
- 2023-05-01 首次抽取
- 2023-05-02 修改 出生日期
- 2023-05-03 修改 地址
编号 | 用户ID | 用户名 | 出生日期 | 住址 | 更新时间 |
---|
9527 | 114 | 张三 | 1988-09-08 | 北京市朝阳区 | 2023-01-01 10:00:00 |
9527 | 114 | 张三 | 1992-09-08 | 北京市朝阳区 | 2023-05-02 10:00:00 |
9527 | 114 | 张三 | 1992-09-08 | 北京市海淀区 | 2023-05-03 10:00:00 |
- 普通拉链表的话在后面增加两个字段,即start_date和end_date
id | user_id | user_name | date_of_birth | address_of_birth | update_time | start_date | end_date |
---|
9527 | 114 | 张三 | 1988-09-08 | 北京市朝阳区 | 2023-01-01 10:00:00 | 2023-05-01 | 2023-05-02 |
9527 | 114 | 张三 | 1992-09-08 | 北京市朝阳区 | 2023-05-02 10:00:00 | 2023-05-02 | 2023-05-03 |
9527 | 114 | 张三 | 1992-09-08 | 北京市海淀区 | 2023-05-03 10:00:00 | 2023-05-03 | 9999-12-31 |
CREATE TABLE tmp.temp_ods_user(
`id` int comment 'id',
`user_id` int comment '用户id',
`user_name` string comment '用户名称',
`date_of_birth` string comment '出生日期',
`address_of_birth` string comment '出生地址',
`update_time` string comment '更新时间'
)
comment '测试拉链表,用户信息表ods抽取层'
PARTITIONED BY ( `dt` string COMMENT '增量抽取日期')
stored as orc
;
CREATE TABLE tmp.temp_dw_user_chain(
`start_date` string comment '起始日期',
`change_code` string comment '字段MD5值',
`id` int comment 'id',
`user_id` int comment '用户id',
`user_name` string comment '用户名称',
`date_of_birth` string comment '出生日期',
`address_of_birth` string comment '出生地址',
`update_time` string comment '更新时间'
)
comment '测试拉链表,用户信息表dw明细层拉链处理'
PARTITIONED BY ( `status` string COMMENT '状态'
,`end_date` string COMMENT '截止日期'
)
stored as orc
;
insert overwrite table tmp.temp_ods_user partition (dt='2023-05-01')
select 9527 as id
,114 as user_id
,'张三' as user_name
,'1988-09-08' as date_of_birth
,'北京市朝阳区' as address_of_birth
,'2023-01-01 10:00:00' as update_time
;
insert overwrite table tmp.temp_dw_user_chain partition (status='expired',end_date='2023-05-01')
select
case when h.change_code<>c.change_code then h.start_date else e.start_date end as start_date
,case when h.change_code<>c.change_code then h.change_code else e.change_code end as change_code
,case when h.change_code<>c.change_code then h.id else e.id end as id
,case when h.change_code<>c.change_code then h.user_id else e.user_id end as user_id
,case when h.change_code<>c.change_code then h.user_name else e.user_name end as user_name
,case when h.change_code<>c.change_code then h.date_of_birth else e.date_of_birth end as date_of_birth
,case when h.change_code<>c.change_code then h.address_of_birth else e.address_of_birth end as address_of_birth
,case when h.change_code<>c.change_code then h.update_time else e.update_time end as update_time
from(select *
from tmp.temp_dw_user_chain
where status = 'active'
and id is not null
) h
full join(select `(dt|rank)?+.+`
from (select id,user_id,user_name,date_of_birth,address_of_birth,update_time,change_code
,row_number(id) as rank
from (select *,md5(concat_ws('_',id,user_id,user_name,date_of_birth,address_of_birth,update_time)) as change_code
from tmp.temp_ods_user
where dt = '2023-05-01'
and id is not null
distribute by id sort by id desc
) x
) t
where t.rank = 1
) c
on h.id = c.id
full join(select *
from tmp.temp_dw_user_chain
where status='expired'
and end_date='2023-05-01'
) e
on e.id = c.id
where h.id is not null and c.id is not null and (( h.change_code <> c.change_code ) or ( h.change_code = c.change_code and e.id is not null))
;
insert overwrite table tmp.temp_dw_user_chain partition (status='active',end_date='9999-12-31')
select if(h.id is null or (c.id is not null and (h.change_code <> c.change_code)),'2023-05-01',h.start_date) as start_date
,case
when h.id is null then
c.change_code
when h.id is not null and c.id is not null and h.change_code <> c.change_code then c.change_code
else
h.change_code
end as change_code
,case when c.id is not null then c.id else h.id end as id
,case when c.id is not null then c.user_id else h.user_id end as user_id
,case when c.id is not null then c.user_name else h.user_name end as user_name
,case when c.id is not null then c.date_of_birth else h.date_of_birth end as date_of_birth
,case when c.id is not null then c.address_of_birth else h.address_of_birth end as address_of_birth
,case when c.id is not null then c.update_time else h.update_time end as update_time
from(select *
from tmp.temp_dw_user_chain
where status = 'active'
and id is not null
) h
full join(select `(dt|rank)?+.+`
from (select id,user_id,user_name,date_of_birth,address_of_birth,update_time,change_code
,row_number(id) as rank
from (select *
,md5(concat_ws('_',id,user_id,user_name,date_of_birth,address_of_birth,update_time)) as change_code
from tmp.temp_ods_user
where dt = '2023-05-01'
and id is not null
distribute by id sort by id desc
) x
) t
where t.rank = 1
) c
on h.id = c.id
;
start_date | change_code | id | user_id | user_name | date_of_birth | address_of_birth | update_time | status | end_date |
---|
2023-05-01 | 4ac4beee336ffcc0c6afaab74ed6405f | 9527 | 114 | 张三 | 1988-09-08 | 北京市朝阳区 | 2023-01-01 10:00:00 | active | 9999-12-31 |
- step3:2023-05-02 第一次变更后抽取
insert overwrite table tmp.temp_ods_user partition (dt='2023-05-02')
select 9527 as id
,114 as user_id
,'张三' as user_name
,'1992-09-08' as date_of_birth
,'北京市朝阳区' as address_of_birth
,'2023-05-02 10:00:00' as update_time
;
-- 模拟第一次变更抽取 拉链
insert overwrite table tmp.temp_dw_user_chain partition (status='expired',end_date='2023-05-02')
select
case when h.change_code<>c.change_code then h.start_date else e.start_date end as start_date
,case when h.change_code<>c.change_code then h.change_code else e.change_code end as change_code
,case when h.change_code<>c.change_code then h.id else e.id end as id
,case when h.change_code<>c.change_code then h.user_id else e.user_id end as user_id
,case when h.change_code<>c.change_code then h.user_name else e.user_name end as user_name
,case when h.change_code<>c.change_code then h.date_of_birth else e.date_of_birth end as date_of_birth
,case when h.change_code<>c.change_code then h.address_of_birth else e.address_of_birth end as address_of_birth
,case when h.change_code<>c.change_code then h.update_time else e.update_time end as update_time
from(select *
from tmp.temp_dw_user_chain
where status = 'active'
and id is not null
) h -- 上次的active数据
full join(select `(dt|rank)?+.+`
from (select id,user_id,user_name,date_of_birth,address_of_birth,update_time,change_code
,row_number(id) as rank
from (select *,md5(concat_ws('_',id,user_id,user_name,date_of_birth,address_of_birth,update_time)) as change_code
from tmp.temp_ods_user
where dt = '2023-05-02'
and id is not null
distribute by id sort by id desc
) x
) t
where t.rank = 1
) c -- 抽取的增量数据
on h.id = c.id
full join(select *
from tmp.temp_dw_user_chain
where status='expired'
and end_date='2023-05-02'
) e -- 过期数据
on e.id = c.id
where h.id is not null and c.id is not null and (( h.change_code <> c.change_code ) or ( h.change_code = c.change_code and e.id is not null))
;
insert overwrite table tmp.temp_dw_user_chain partition (status='active',end_date='9999-12-31')
select if(h.id is null or (c.id is not null and (h.change_code <> c.change_code)),'2023-05-02',h.start_date) as start_date
,case
when h.id is null then
c.change_code
when h.id is not null and c.id is not null and h.change_code <> c.change_code then c.change_code
else
h.change_code
end as change_code
,case when c.id is not null then c.id else h.id end as id
,case when c.id is not null then c.user_id else h.user_id end as user_id
,case when c.id is not null then c.user_name else h.user_name end as user_name
,case when c.id is not null then c.date_of_birth else h.date_of_birth end as date_of_birth
,case when c.id is not null then c.address_of_birth else h.address_of_birth end as address_of_birth
,case when c.id is not null then c.update_time else h.update_time end as update_time
from(select *
from tmp.temp_dw_user_chain
where status = 'active'
and id is not null
) h -- 上次的active数据
full join(select `(dt|rank)?+.+`
from (select id,user_id,user_name,date_of_birth,address_of_birth,update_time,change_code
,row_number(id) as rank
from (select *
,md5(concat_ws('_',id,user_id,user_name,date_of_birth,address_of_birth,update_time)) as change_code
from tmp.temp_ods_user
where dt = '2023-05-02'
and id is not null
distribute by id sort by id desc
) x
) t
where t.rank = 1
) c -- 抽取的增量数据
on h.id = c.id
;
start_date | change_code | id | user_id | user_name | date_of_birth | address_of_birth | update_time | status | end_date |
---|
2023-05-02 | ee3915fc4f4ecad9ea1570e391b4e | 9527 | 114 | 张三 | 1992-09-08 | 北京市朝阳区 | 2023-05-02 10:00:00 | active | 9999-12-31 |
2023-05-01 | 4ac4beee336ffcc0c6afaab74ed6405f | 9527 | 114 | 张三 | 1988-09-08 | 北京市朝阳区 | 2023-01-01 10:00:00 | expired | 2023-05-02 |
- step4:2023-05-03 第二次变更后抽取
insert overwrite table tmp.temp_ods_user partition (dt='2023-05-03')
select 9527 as id
,114 as user_id
,'张三' as user_name
,'1992-09-08' as date_of_birth
,'北京市海淀区' as address_of_birth
,'2023-05-03 10:00:00' as update_time
;
-- 模拟第二次变更抽取 拉链
insert overwrite table tmp.temp_dw_user_chain partition (status='expired',end_date='2023-05-03')
select
case when h.change_code<>c.change_code then h.start_date else e.start_date end as start_date
,case when h.change_code<>c.change_code then h.change_code else e.change_code end as change_code
,case when h.change_code<>c.change_code then h.id else e.id end as id
,case when h.change_code<>c.change_code then h.user_id else e.user_id end as user_id
,case when h.change_code<>c.change_code then h.user_name else e.user_name end as user_name
,case when h.change_code<>c.change_code then h.date_of_birth else e.date_of_birth end as date_of_birth
,case when h.change_code<>c.change_code then h.address_of_birth else e.address_of_birth end as address_of_birth
,case when h.change_code<>c.change_code then h.update_time else e.update_time end as update_time
from(select *
from tmp.temp_dw_user_chain
where status = 'active'
and id is not null
) h -- 上次的active数据
full join(select `(dt|rank)?+.+`
from (select id,user_id,user_name,date_of_birth,address_of_birth,update_time,change_code
,row_number(id) as rank
from (select *,md5(concat_ws('_',id,user_id,user_name,date_of_birth,address_of_birth,update_time)) as change_code
from tmp.temp_ods_user
where dt = '2023-05-03'
and id is not null
distribute by id sort by id desc
) x
) t
where t.rank = 1
) c -- 抽取的增量数据
on h.id = c.id
full join(select *
from tmp.temp_dw_user_chain
where status='expired'
and end_date='2023-05-03'
) e -- 过期数据
on e.id = c.id
where h.id is not null and c.id is not null and (( h.change_code <> c.change_code ) or ( h.change_code = c.change_code and e.id is not null))
;
insert overwrite table tmp.temp_dw_user_chain partition (status='active',end_date='9999-12-31')
select if(h.id is null or (c.id is not null and (h.change_code <> c.change_code)),'2023-05-03',h.start_date) as start_date
,case
when h.id is null then
c.change_code
when h.id is not null and c.id is not null and h.change_code <> c.change_code then c.change_code
else
h.change_code
end as change_code
,case when c.id is not null then c.id else h.id end as id
,case when c.id is not null then c.user_id else h.user_id end as user_id
,case when c.id is not null then c.user_name else h.user_name end as user_name
,case when c.id is not null then c.date_of_birth else h.date_of_birth end as date_of_birth
,case when c.id is not null then c.address_of_birth else h.address_of_birth end as address_of_birth
,case when c.id is not null then c.update_time else h.update_time end as update_time
from(select *
from tmp.temp_dw_user_chain
where status = 'active'
and id is not null
) h -- 上次的active数据
full join(select `(dt|rank)?+.+`
from (select id,user_id,user_name,date_of_birth,address_of_birth,update_time,change_code
,row_number(id) as rank
from (select *
,md5(concat_ws('_',id,user_id,user_name,date_of_birth,address_of_birth,update_time)) as change_code
from tmp.temp_ods_user
where dt = '2023-05-03'
and id is not null
distribute by id sort by id desc
) x
) t
where t.rank = 1
) c -- 抽取的增量数据
on h.id = c.id
;
start_date | change_code | id | user_id | user_name | date_of_birth | address_of_birth | update_time | status | end_date |
---|
2023-05-01 | 4ac4beee336ffcc0c6afaab74ed6405f | 9527 | 114 | 张三 | 1988-09-08 | 北京市朝阳区 | 2023-01-01 10:00:00 | expired | 2023-05-02 |
2023-05-02 | ee3915fc4f4ecad9ea1570e391b4e | 9527 | 114 | 张三 | 1992-09-08 | 北京市朝阳区 | 2023-05-02 10:00:00 | expired | 2023-05-03 |
2023-05-03 | a68c8f5e5a4982d33ccc9834f6ddb96b | 9527 | 114 | 张三 | 1992-09-08 | 北京市海淀区 | 2023-05-03 10:00:00 | active | 9999-12-31 |
五、数据拉链常见问题
5.1 抽取方式和拉链方式含义&组合
- 增量抽取
- 每天从线上库抽取数据到仓库,通过给定的增量字段(时间字段),来过滤出目标表新增的、新修改的数据,以减少不必要的存储和计算,一般针对于流水数据,历史数据不允许修改。
- 全量抽取
- 每天从线上库抽取数据到仓库, 抽取目标表所有的数据,一般适用于非流水数据,历史数据经常被修改。
- 增量拉链
- 仓库表的加工方式,active分区只保留主键列最新的那条记录,不感知源表删除数据。
- 全量拉链
- 仓库表的加工方式,active分区只保留主键列最新的那条记录,感知源表删除数据,本次抽取的所有数据,将会overwrite active分区。
- 最优搭配 : 增量抽取+增量拉链 ,全量抽取+全量拉链
- 可接受搭配 :全量抽取+增量拉链
- 错误搭配 :增量抽取+全量拉链 ,这将会丢失大量数据(active分区)
5.2 增量拉链和全量拉链详解
- 增量拉链使用场景:
- 适用有创建时间、更新时间的增量时间戳数据,不适用线上数据存在物理删除
- 1、第一次抽取初始化
- 2、以后每天增量抽取,和历史数据对比
- 3、有变化的数据,将原数据更新为expired,新数据存放active
- 4、保留最新active数据,用于下一次对比
- 5、如果线上存在删除数据,active数据依然会保留,请慎用
- 拉链任务可能在一天内手工跑多次,当天第一次跑拉链任务时,expired分区中是没有数据的,此时会将被更新的旧数据写入expired分区中。当天第二次手工重跑拉链任务时,expired分区中已有数据,会直接将expired分区数据写入expired分区。
- 拉链sql中expired分区是必须使用的。拉链任务当天第二次重跑时active分区数据已经更新,不是昨天的状态,不使用expired分区中已有的数据会清空expired分区数据
- 全量拉链使用场景:
- 适用于无时间戳的数据存储,或存在物理删除
- 1、每天抽取全量数据
- 2、有变化的数据和删除数据更新为EXPIRED,新增数据和无变化的数据插入ACTIVE
- 全量拉链与增量拉链的加工过程类似。当天第一次跑拉链任务时,expired分区中是没有数据的,此时会将被更新和被删除的的旧数据写入expired分区中。当天第二次手工重跑拉链任务时,expired分区中已有数据,会直接将expired分区数据写入expired分区。表中是全量最新数据,直接写入active分区
六、拉链表使用
————普通拉链表,无HISTORY分区————
select *
from dw_xxx_chain
where status='active'
————有结转的拉链表,即有HISTORY分区————
select *
from dw_xxx_chain
where status in ( 'active','history')
select *
from dw_xxx_chain
where start_date<=xxx_date
and end_date>xxx_date
所有评论(0)