离线数仓(8):ODS层实现之导入流量日志
0. 相关文章链接
1. ODS层概述
1.1. 数仓分层图
1.2. 数仓数据走向图
1.3. 实现目标
根据如上数仓分层图和数仓数据走向图,可以看出ods层的主要目的有如下3点:
- 保持数据原貌不做任何修改,起到备份数据的作用
- 数据采用压缩,减少磁盘存储空间(例如:原始数据100G,可以压缩到10G左右)
- 创建分区表,防止后续的全表扫描
所以在此次的数仓分层重构中,ods层分别实现了上述3点:
- 将流量数据不做任何修改的写入到文件系统中
- 使用了parquet格式,但由于平台和业务原因没有在parquet上再使用例如lzo、snappy等压缩了,但单独的parquet格式压缩也对文本进行了较好的压缩了
- 在建表时创建了分区表
2. 数据准实时进入文件系统
根据上述数仓数据走向图,前面的日志服务器接收日志和通过flume将日志写入到kafka中就不详细介绍了,只介绍数据到达kafka后的后续流向。
数据到达kafka后,可以使用flume来对接kafka,然后将数据写入到文件系统中,并且flume还能设置文件大小时间等滚动,但是由于博主使用的是华为云的DLI服务,没有找到flume组件,所以直接使用FlinkSQL从kafka中获取数据,并按时更新到OBS文件系统中了。
同时,在将数据写入到OBS文件系统中时,还设置了parquet格式,这样只要表中对应分区的元数据有了,就能做到查询数据时分钟级更新(更新时间按照FlinkSQL的Checkpoint时间来算,博主这里使用的是每3分钟更新,即1天会生成480个文件,此更新时间可以根据实际情况来设置,综合考虑小文件和Flink算子反压以及业务需要的数据更新时间等,一般来说在1分钟到5分钟内均可),这为后续实现准实时需求(即要统计当天截止到现在的数据,又没达到完全实时的地步)提供了基础。
具体实现如下代码所示(需要现在OBS并行文件系统中创建对应目录):
-- 创建日期格式化函数,将10位时间戳转换成8位年月日(yyyyMMdd)
CREATE FUNCTION date_formatted AS 'com.fumi.flink.sql.udf.DateFormattedUDF';
-- ============================================================================
-- bigdata_new_app_log_store 主题数据 (APP埋点中的点击日志数据)
-- 创建kafka的source源
CREATE SOURCE STREAM bigdata_new_app_log_store_source (
__time__ STRING,
__topic__ STRING,
scdata STRING,
__source__ STRING,
__receive_time__ STRING,
`topic` STRING,
`__client_ip__` STRING
) WITH (
type = "kafka",
kafka_bootstrap_servers = "xxx1:9092,xxx2:9092,xxx3:9092",
kafka_group_id = "behavior_log_to_obs",
kafka_topic = "bigdata_new_app_log_store",
encode = "json",
json_config = "__time__=__time__;__topic__=__topic__;scdata=scdata;__source__=__source__;__receive_time__=__tag__:__receive_time__;__client_ip__=__tag__:__client_ip__;topic=topic;"
);
-- 创建obs的sink源
CREATE SINK STREAM bigdata_new_app_log_store_sink (
__time__ BIGINT,
__topic__ STRING,
scdata STRING,
__source__ STRING,
__receive_time__ BIGINT,
`topic` STRING,
`__client_ip__` STRING,
`dt` STRING
) PARTITIONED BY(dt) WITH (
type = "filesystem",
file.path = "obs://yishou-test/yishou_data_qa_test.db/ods_new_app_log_store_dt",
encode = "parquet",
ak = "akxxx",
sk = "skxxx"
);
-- 将source源数据写入到sink源中
INSERT INTO
bigdata_new_app_log_store_sink
SELECT
cast(__time__ as BIGINT) as __time__,
__topic__ ,
scdata ,
__source__ ,
cast(__receive_time__ as BIGINT) as __receive_time__ ,
`topic`,
`__client_ip__`,
date_formatted(__time__, 'yyyyMMdd') as dt
FROM
bigdata_new_app_log_store_source;
-- ============================================================================
-- bigdata_yishou_log_exposure 主题数据 (APP埋点中的曝光日志数据)
-- 创建kafka的source源
CREATE SOURCE STREAM bigdata_yishou_log_exposure_source (
__time__ STRING,
__topic__ STRING,
scdata STRING,
__source__ STRING,
__receive_time__ STRING,
`topic` STRING,
`__client_ip__` STRING
) WITH (
type = "kafka",
kafka_bootstrap_servers = "xxx1:9092,xxx2:9092,xxx3:9092",
kafka_group_id = "behavior_log_to_obs",
kafka_topic = "bigdata_yishou_log_exposure",
encode = "json",
json_config = "__time__=__time__;__topic__=__topic__;scdata=scdata;__source__=__source__;__receive_time__=__tag__:__receive_time__;__client_ip__=__tag__:__client_ip__;topic=topic;"
);
-- 创建obs的sink源
CREATE SINK STREAM bigdata_yishou_log_exposure_sink (
__time__ BIGINT,
__topic__ STRING,
scdata STRING,
__source__ STRING,
__receive_time__ BIGINT,
`topic` STRING,
`__client_ip__` STRING,
`dt` STRING
) PARTITIONED BY(dt) WITH (
type = "filesystem",
file.path = "obs://yishou-test/yishou_data_qa_test.db/ods_yishou_log_exposure_dt",
encode = "parquet",
ak = "akxxx",
sk = "skxxx"
);
-- 将source源数据写入到sink源中
INSERT INTO
bigdata_yishou_log_exposure_sink
SELECT
cast(__time__ as BIGINT) as __time__,
__topic__ ,
scdata ,
__source__ ,
cast(__receive_time__ as BIGINT) as __receive_time__ ,
`topic`,
`__client_ip__`,
date_formatted(__time__, 'yyyyMMdd') as dt
FROM
bigdata_yishou_log_exposure_source;
-- ============================================================================
-- ......
--
注意:
- 上述是使用华为云的FlinkSQL实现的,跟开源的FlinkSQL有些不同,但大体类似,可以根据实际情况来进行开发;
- 博主的流量日志一共有9个大的主题,上述只贴出来了2个,同样可以根据实际情况来添加修改;
- 上述使用了一个自定义函数,将10位时间戳转换成了8位的年月日格式,在开源的FlinkSQL中已有类似函数,不过华为云的FlinkSQL这个版本中还没有,所以添加了自定义函数,具体情况具体开发即可。
在华为云的DLI服务中,运行界面如下所示:
- FlinkSQL作业详情配置图
- FlinkSQL任务列表图
- OBS文件系统(正常运行后每3分钟生成一个新的文件)
3. 创建外部表
-- ods_new_app_log_store_dt 主题
DROP TABLE if exists ${yishou_data_dbname}.ods_new_app_log_store_dt;
create table if not exists ${yishou_data_dbname}.ods_new_app_log_store_dt (
__time__ BIGINT comment '时间',
__topic__ STRING comment '埋点模块主题名',
scdata STRING comment '核心数据',
__source__ STRING comment '数据来源',
__receive_time__ BIGINT comment '日志服务器接收时间',
`topic` STRING comment '埋点模块主题名',
`__client_ip__` STRING comment '客户端IP地址',
`dt` STRING comment '日期分区'
) USING parquet PARTITIONED BY (dt)
COMMENT 'ods层ods_new_app_log_store主题数据,所有该主题的原始数据都在此表中'
LOCATION 'obs://yishou-test/yishou_data_qa_test.db/ods_new_app_log_store_dt'
;
-- ods_yishou_log_exposure_dt 主题
DROP TABLE if exists ${yishou_data_dbname}.ods_yishou_log_exposure_dt;
CREATE TABLE ${yishou_data_dbname}.ods_yishou_log_exposure_dt (
__time__ BIGINT comment '时间',
__topic__ STRING comment '埋点模块主题名',
scdata STRING comment '核心数据',
__source__ STRING comment '数据来源',
__receive_time__ BIGINT comment '日志服务器接收时间',
`topic` STRING comment '埋点模块主题名',
`__client_ip__` STRING comment '客户端IP地址',
`dt` STRING comment '日期分区'
) USING parquet PARTITIONED BY (dt)
COMMENT 'ods层ods_yishou_log_exposure主题数据,所有该主题的原始数据都在此表中'
LOCATION 'obs://yishou-test/yishou_data_qa_test.db/ods_yishou_log_exposure_dt'
;
注意:第一次建表后需要对表的分区进行msck,执行如下命令:
MSCK REPAIR TABLE ${yishou_data_dbname}.ods_new_app_log_store_dt;
MSCK REPAIR TABLE ${yishou_data_dbname}.ods_yishou_log_exposure_dt;
之后查看数据条数可以看到每3分钟会更新一次最新数据:
由于这是重构,之前的历史数据是存在旧表里,可以直接从旧表中查出数据,写入新表中即可。
4. 创建调度任务
在此层中调度任务只需要在凌晨添加当天分区即可,因为数据会实时进入obs文件系统,所以只要添加分区在元数据中有该分区就能直接查询最新的数据,脚本如下:
-- 在 ods_new_app_log_store_dt 表中增加对应日期分区(每天凌晨执行,添加当天分区即可)
ALTER TABLE ${yishou_data_dbname}.ods_new_app_log_store_dt ADD if not exists PARTITION (dt=${gmtdate});
作业调度如下(可以凌晨0点0分调度):
注意:对历史数据,当集群资源空闲时,还可以通过同一张表自己写入自己来合并小文件,但其实感觉没有必要;因为当每3分钟生成一个文件时,能看到一般情况下文件大小为60M左右,这不会造成文件系统小文件过多,并且自己写入自己也存在一定风险;并且可以对更新时间进行调节,控制一定文件数量即可。
注:其他 离线数仓 相关文章链接由此进 -> 离线数仓文章汇总
fzw_lk11: 这里看用户对商品的兴趣后续的目的是啥,如果是进行商品的推荐,感觉购买信息不需要放在这里面。
Sql_Boy_A_Liang: select province, concat_ws(',',collect_list(city)) as con_city, concat_ws(',',collect_list(rn)) as con_rn from (SELECT province , city , score , row_number() over (partition by province order by score desc) as rn FROM temp) group by province 这个方式更方便
减减h: 如果有找不到docker-compose命令的话,sudo env PATH=$PATH /usr/local/bin/docker-compose -version alias sudo='sudo env PATH=$PATH' source ~/.bashrc 可以解决一下
Am最温柔: 有帮助 谢谢~
ZWEiKE: windows 上面的docker 可以安装吗?