离线数仓(9):ODS层实现之导入业务数据

13 篇文章 12 订阅
订阅专栏

目录

0. 相关文章链接

1. ODS层概述

1.1. 数仓分层图

1.2. 数仓数据走向图

1.3. 实现目标

2. 数据准实时进入文件系统

3. 创建外部表

3.1. ods_binlog_data建表语句

3.2. 业务库对应ods层快照表建表语句

3.3. 业务库对应ods层历史变更记录表建表语句

4. ods层快照表调度任务创建

4.1. 概述

4.2. ods_fmys_goods_ext_dt_start节点说明

4.3. ods_fmys_goods_ext_dt_CDM任务节点说明

4.4. ods_fmys_goods_ext_dt_DLI任务节点说明

4.5. ods_fmys_goods_ext_视图创建节点说明

5. ods层历史变更记录表调度任务创建

5.1. 概述

5.2. ods_fmys_goods_ext_record_dt_start节点

5.3. ods_fmys_goods_ext_record_dt_跑数脚本

6. 总结


0. 相关文章链接

  离线数仓文章汇总 

1. ODS层概述

1.1. 数仓分层图

1.2. 数仓数据走向图

1.3. 实现目标

根据如上数仓分层图和数仓数据走向图,可以看出ods层的主要目的有如下3点:

  1. 保持数据原貌不做任何修改,起到备份数据的作用
  2. 数据采用压缩,减少磁盘存储空间(例如:原始数据100G,可以压缩到10G左右)
  3. 创建分区表,防止后续的全表扫描

所以在此次的数仓分层重构中,ods层分别实现了上述3点:

  1. 将业务数据写入到数仓中,并每天进行一个全量快照备份(其实可以只备份新增和变化的数据,但我们根据实际业务情况和其他各方面等考虑,进行了全量快照备份,并且如果进行全量快照备份的话,保留180天数据完全足够)
  2. 除binlog数据通过FlinkSQL写入到文件系统使用了parquet格式外,其他的分区表均使用了orc格式压缩(之所以使用orc压缩,是因为在华为云平台中,有个GaussDBForDWS数据库的服务,用于数据分析的,该数据库底层是PG库,可以支持ORC格式的外部表)
  3. 在建表时创建了分区表(除了每天一个全量快照备份,还将binlog中各个表的修改记录进行了保存,在后续如果使用拉链表时可以用到)

2. 数据准实时进入文件系统

根据上述数仓数据走向图,使用Cancal、FlinkCDC或者华为云的数据复制服务DRS将binlog数据采集写入到kafka中就不详细介绍了,只介绍数据到达kafka后的后续流向。

跟流量日志数据类似,博主这里还是使用了华为云DLI服务中的FlinkSQL将kafka中的数据进行获取,并将数据写入到OBS文件系统中,具体实现代码如下(需要现在OBS并行文件系统中创建对应目录):



-- kafka的 bigdata_mysql_binlog_avro 主题中示例数据如下

-- 注意:其中的nanoTime是使用Java中的System.nanoTime()函数生成的,也就是说,如果程序失败,需要将分区数据删除,然后在flink作业中设置拉取时间重跑

-- 具体字段描述如下:
-- db:             在MySQL中对应的数据库
-- tb:             在MySQL中对应的表
-- columns:        一个JSONArray,表示所有的字段,如下示例,一个JSONArray中存在该表中所有字段,存储的json的key为字段名,在该json中又有value这个属性,表示该字段对应的值
-- event:          event对应操作类型和数值: insert = 0 ; delete = 1 ; update = 2 ; alter = 3 ;
-- sql:            触发该更改或删除等操作的sql语句
-- primary_key:    该更改的主键
-- create_time:    该数据从binlog中获取出来的时间(分区就用此时间)
-- sendTime:       该数据进入到kafka的时间(该时间不用)
-- nano_time:      纳秒级时间戳,用了区分数据的前后,使用此时间可以保证数据唯一

-- {
--     "columns": {
--         "admin_note": {
--             "key": false,
--             "mysqlType": "varchar(255)",
--             "name": "admin_note",
--             "null_val": false,
--             "update": false,
--             "value": ""
--         },
--         "allot_num": {
--             "key": false,
--             "mysqlType": "smallint(5) unsigned",
--             "name": "allot_num",
--             "null_val": false,
--             "update": false,
--             "value": "0"
--         }
--     },
--     "createTime": 1649213973044,
--     "db": "yishou",
--     "event": 2,
--     "nanoTime": 23494049498709146,
--     "primaryKey": "157128418",
--     "sendTime": 1649213973045,
--     "sql": "",
--     "tb": "fmys_order_infos"
-- }

-- 命名规范
--      流类型_数据源_主题(表)
-- 例: source_kafka_bigdata_mysql_binlog_avro

-- 自定义日期格式化方法
CREATE FUNCTION date_formatted AS 'com.yishou.cdc.udf.DateFormattedUDF';


-- ==================================================================================
-- 将数据写入到obs中,通过DLI中的外部表关联

-- source端,对接kafka,从kafka中获取数据
CREATE SOURCE STREAM source_kafka_bigdata_mysql_binlog_avro (
    db STRING,
    tb STRING,
    `columns` STRING,
    event INT,
    `sql` STRING,
    `primary_key` STRING,
    create_time bigint,
    send_time bigint,
    nano_time bigint
) WITH (
    type = "kafka",
    kafka_bootstrap_servers = "xxx1:9092,xxx2:9092,xxx3:9092",
    kafka_group_id = "business_data_to_obs",
    kafka_topic = "bigdata_mysql_binlog_avro",
    encode = "user_defined",
    encode_class_name = "com.yishou.cdc.source.YishouAvroDerializationSchema",
    encode_class_parameter = ""
);


-- sink端,对接obs,将数据写入到obs中
CREATE SINK STREAM sink_obs_ods_binlog_data (
    db STRING,
    tb STRING,
    `columns` STRING,
    event bigint,
    `sql` STRING,
    `primary_key` STRING,
    create_time bigint,
    send_time bigint,
    nano_time bigint,
    dt STRING
) PARTITIONED BY(dt,tb) WITH (
    type = "filesystem",
    file.path = "obs://yishou-test/yishou_data_qa_test.db/ods_binlog_data",
    encode = "parquet",
    ak = "akxxx",
    sk = "skxxx"
);

-- 从source表获取数据,写出到sink表中
INSERT INTO
    sink_obs_ods_binlog_data
SELECT
    db,
    tb,
    `columns`,
    cast(event as bigint) event,
    `sql`,
    `primary_key`,
    create_time,
    send_time,
    nano_time,
    date_formatted(
        cast(coalesce(create_time, UNIX_TIMESTAMP_MS()) / 1000 - 25200 as VARCHAR(10)),
        'yyyyMMdd'
    ) as dt
FROM
    source_kafka_bigdata_mysql_binlog_avro;
    

-- ==================================================================================


注意:

  • 上述是使用华为云的FlinkSQL实现的,跟开源的FlinkSQL有些不同,但大体类似,可以根据实际情况来进行开发;
  • 上述的FlinkSQL是将所有的业务数据变更的binlog均写入了OBS文件系统中,在文件系统中使用了双重分区(日期和表名),可以获取每一天每张表的更新修改删除的binlog数据;
  • 上述使用了一个自定义函数,将10位时间戳转换成了8位的年月日格式,在开源的FlinkSQL中已有类似函数,不过华为云的FlinkSQL这个版本中还没有,所以添加了自定义函数,具体情况具体开发即可;
  • 另外博主在将数据写入到kafka中时,还使用了avro压缩格式,所以在拉取数据时,还需要进行了解压,其实在kafka中只存储近7天数据,在使用时不进行压缩也可以;
  • 博主这里因为业务原因,所以在写入时将数据减去了7个小时(即25200秒),统计的不是0点到第二天0点的数据,而是7点到第二天7点的数据;

在华为云的DLI服务中,运行界面如下所示:

  • FlinkSQL作业详情配置图

  • FlinkSQL任务列表图:

  • OBS文件系统(正常运行后每分钟每张表生成一个新的文件【该表在这分钟要有数据变化】):

3. 创建外部表

3.1. ods_binlog_data建表语句

drop table if exists ${yishou_data_dbname}.ods_binlog_data;
CREATE TABLE if not exists ${yishou_data_dbname}.ods_binlog_data (
    `db` STRING COMMENT '数据库名',
    `columns` STRING COMMENT '列数据',
    `event` BIGINT COMMENT '操作类型',
    `sql` STRING COMMENT '执行的SQL',
    `primary_key` STRING COMMENT '主键的值',
    `create_time` BIGINT COMMENT '从MySQL中获取binlog的13位时间戳',
    `send_time` BIGINT COMMENT '发送binlog到kafka的13位时间戳',
    `nano_time` BIGINT COMMENT '纳秒级更新时间戳',
    `dt` BIGINT COMMENT '日期分区',
    `tb` STRING COMMENT '表名'
) USING parquet PARTITIONED BY (dt, tb) COMMENT 'ods层binlog表(所有通过binlog进入数仓的数据均在此表中)' 
LOCATION 'obs://yishou-test/yishou_data_qa_test.db/ods_binlog_data' 
;

以上述通过FlinkSQL写入到OBS文件系统中的目录为数据目录,创建对应的外部表,查询数据结果如下所示:

3.2. 业务库对应ods层快照表建表语句

drop table if exists ${yishou_data_dbname}.ods_fmys_goods_ext_dt;
CREATE EXTERNAL TABLE ${yishou_data_dbname}.ods_fmys_goods_ext_dt (
    `goods_id` BIGINT COMMENT '*',
    `limit_day` BIGINT COMMENT '*',
    `auto_time` STRING COMMENT '*',
    `is_new` BIGINT COMMENT '-1 无效(下架) 0 旧款(有效/上架) 1 新款',
    `grade` BIGINT COMMENT '商品档次',
    `season` BIGINT COMMENT '季节',
    `goods_type` BIGINT COMMENT '商品类型{1:特价,}'
) COMMENT '商品拓展信息表(ods层快照分区表)' 
PARTITIONED BY (`dt` BIGINT COMMENT '日期分区(yyyymmdd)') 
STORED AS orc LOCATION 'obs://yishou-test/yishou_data_qa_test.db/ods_fmys_goods_ext_dt'
;

上述建表语句和后端业务库的字段需要一一对应,并将对应的数据类型转换成数仓中数据类型。 

3.3. 业务库对应ods层历史变更记录表建表语句

drop table if exists ${yishou_data_dbname}.ods_fmys_goods_ext_record_dt;
CREATE EXTERNAL TABLE ${yishou_data_dbname}.ods_fmys_goods_ext_record_dt(
    `goods_id` BIGINT COMMENT '*',
    `limit_day` BIGINT COMMENT '*',
    `auto_time` STRING COMMENT '*',
    `is_new` BIGINT COMMENT '-1 无效(下架) 0 旧款(有效/上架) 1 新款',
    `grade` BIGINT COMMENT '商品档次',
    `season` BIGINT COMMENT '季节',
    `goods_type` BIGINT COMMENT '商品类型{1:特价}',
    `event` BIGINT COMMENT '数据操作类型(0:插入;1:删除;2:修改)',
    `record_time` BIGINT COMMENT '从MySQL中获取binlog的13位时间戳',
    `nano_time` BIGINT COMMENT '纳秒级更新时间戳(当record_time相同时,使用此时间戳来判断先后)'
) COMMENT '商品拓展信息历史更改记录表(一个分区内保存该业务表历史所有的变更记录)' 
PARTITIONED BY (`dt` BIGINT COMMENT '日期分区') 
STORED AS orc LOCATION 'obs://yishou-test/yishou_data_qa_test.db/ods_fmys_goods_ext_record_dt'
;

上述建表语句和后端业务库的字段需要一一对应,并且其中添加了数据操作类型、记录时间戳和纳秒级更新时间戳这3个字段,用了记录这条数据历史上什么时候做了什么变更。

4. ods层快照表调度任务创建

4.1. 概述

架构图:

说明:

  • 该作业为每天早晨7点零5分调度(因业务原因,统计的是昨天7点到今天7点的数据,并且给数据归档和传输的时间,所以7点零5分开始调度执行);
  • 该作业有2条分支,1条为通过CDM任务(即通过Sqoop拉取全量业务库的数据),1条为DLI任务(即通过昨天的全量数据和昨天的binlog数据进行汇总聚合得出最新的业务库的表数据);
  • 通过华为云的EL表达式来判断执行哪条分支(CDM的EL表达式:#{DateUtil.format(Job.planTime,"HHmmss") != '070500' ? 'true' : 'false'} ; DLI的EL表达式:#{DateUtil.format(Job.planTime,"HHmmss") == '070500' ? 'true' : 'false'});正常调度时(早晨7点零5分运行)会执行DLI任务,运行速度很快,一般2分钟内执行完成;当感觉数据异常时,使用补数据的方法执行作业时(不是7点零5分运行),会执行CDM任务,执行速度慢,会全量拉取数据;
  • 会将全量结果数据输出到业务库表对应的ods表中,并生成一个以昨天日期的分区,这就是按天全量快照备份;
  • 最后会创建一个视图,该视图为分钟级更新,会和业务库中对应表的数据一模一样(下述具体介绍);

4.2. ods_fmys_goods_ext_dt_start节点说明

该节点为SQL节点,具体代码如下:

-- 在ods_binlog_data表中增加对应日期和表名的分区
ALTER TABLE ${yishou_data_dbname}.ods_binlog_data ADD if not exists PARTITION (dt=${today}, tb = 'fmys_goods_ext');

主要目的是新增ods_binlog_data表对应日期和业务表的分区,后续使用ods_binlog_data表可以直接使用对应的双重分区。

4.3. ods_fmys_goods_ext_dt_CDM任务节点说明

此节点为CMD(即Sqoop)作业,会将业务库中的全量数据拉取到数仓的ods表中(通过overwrite的方式,将数据写入到昨天的分区中),当手动调度时才执行此节点,主要目的是当感觉数据有误时,可以使用此作业来更新替换最新最全最正确的数据。

CDM任务配置如下:

4.4. ods_fmys_goods_ext_dt_DLI任务节点说明

当正常调度时,会执行此节点,会使用ods前天的分区然后和昨天以及之后的binlog数据合并,再将数据插入到ods的昨天的分区中;因为是跑的SQL作业,不需要拉取数据,所以会执行的较快,正常调度时就使用此种方法运行。

代码如下所示:


-- DLI sql 
-- ******************************************************************** --
-- author: yangshibiao
-- create time: 2022/09/01 10:15:46 GMT+08:00
-- ******************************************************************** --

-- 从ods_fmys_goods_ext_dt表中获取出前天分区的数据(即相当于昨天7点之前的所有数据)
-- 从ods_binlog_data表中获取出昨天分区以及之后分区的数据(即昨天7点之后的增量更新的binlog数据)
-- 然后2份数据对主键开窗,获取最新的,并且不是被删除的数据,然后将数据插入到ods_fmys_goods_ext_dt昨天的分区中(T-1,即今天7点之前的所有历史数据)
-- 注意:从ods_fmys_goods_ext_dt表中获取出前天分区的数据时,因为在这张表里没有 nano_time 和 event 字段,所以需要手动设置,设置nano_time为一个很小的值,设置event为0,即插入的数据即可
insert overwrite table ${yishou_data_dbname}.ods_fmys_goods_ext_dt partition(dt)
select
    goods_id
    , limit_day
    , auto_time
    , is_new
    , grade
    , season
    , goods_type
    , ${one_day_ago} as dt
from (
    select
        goods_id
        , limit_day
        , auto_time
        , is_new
        , grade
        , season
        , goods_type
        , nano_time
        , event
        , row_number() over(partition by goods_id order by nano_time desc) as row_number
    from (
        select
            goods_id
            , limit_day
            , auto_time
            , is_new
            , grade
            , season
            , goods_type
            , -987654321012345 as nano_time
            , 0 as event
        from ${yishou_data_dbname}.ods_fmys_goods_ext_dt
        where dt = ${two_day_ago}

        union all

        select
            get_json_object(columns ,'$.goods_id.value') as goods_id
            , get_json_object(columns ,'$.limit_day.value') as limit_day
            , get_json_object(columns ,'$.auto_time.value') as auto_time
            , get_json_object(columns ,'$.is_new.value') as is_new
            , get_json_object(columns ,'$.grade.value') as grade
            , get_json_object(columns ,'$.season.value') as season
            , get_json_object(columns ,'$.goods_type.value') as goods_type
            , nano_time as nano_time
            , event as event
        from ${yishou_data_dbname}.ods_binlog_data
        where 
            dt >= ${one_day_ago}
            and tb = 'fmys_goods_ext'
    )
)
-- event为1就是删除的数据,当最后一次记录为删除数据的时候,就不用这个主键了,因为已经删除了
where row_number = 1 and event != 1

DISTRIBUTE BY floor(rand()*10)
;

4.5. ods_fmys_goods_ext_视图创建节点说明

此节点的目的是使用ods昨天的分区和binlog今天以及之后的分区生成一张ods的视图,因为binlog的数据是实时更新的,所以这个ods的视图每次查询的都是最新的,跟业务库可以做到准实时。

代码如下所示:

-- DLI sql 
-- ******************************************************************** --
-- author: yangshibiao
-- create time: 2022/09/01 10:25:56 GMT+08:00
-- ******************************************************************** --

-- 目的:
-- 创建业务库对应的ods层视图,主要目的是按照分钟级别,将业务库的数据同步到数据仓库中,即每分钟在数仓中查询这个视图会和业务库中对应表的数据一模一样

-- 实现思路:
-- 1、从ods_fmys_goods_ext_dt表中获取出昨天分区的数据(即相当于今天7点之前的所有数据)
-- 2、从ods_binlog_data表中获取出今天分区以及之后分区的数据(即今天7点开始该表增量更新的binlog数据)
-- 3、然后2份数据对主键开窗,获取最新的,并且不是被删除的数据,因为binlog数据每分钟更新一次,所以可以实现业务表数据分钟级进入数据仓库
drop view if exists ${yishou_data_dbname}.ods_fmys_goods_ext;
create view if not exists ${yishou_data_dbname}.ods_fmys_goods_ext(
    goods_id
    , limit_day
    , auto_time
    , is_new
    , grade
    , season
    , goods_type
) comment 'fmys_goods_ext在ods层视图(根据业务库数据,分钟级别更新)'
as
select
    goods_id
    , limit_day
    , auto_time
    , is_new
    , grade
    , season
    , goods_type
from (
    select
        goods_id
        , limit_day
        , auto_time
        , is_new
        , grade
        , season
        , goods_type
        , nano_time
        , event
        , row_number() over(partition by goods_id order by nano_time desc) as row_number
    from (
        select
            goods_id
            , limit_day
            , auto_time
            , is_new
            , grade
            , season
            , goods_type
            , -987654321012345 as nano_time
            , 0 as event
        from ${yishou_data_dbname}.ods_fmys_goods_ext_dt
        where dt = ${one_day_ago}

        union all

        select
            get_json_object(columns ,'$.goods_id.value') as goods_id
            , get_json_object(columns ,'$.limit_day.value') as limit_day
            , get_json_object(columns ,'$.auto_time.value') as auto_time
            , get_json_object(columns ,'$.is_new.value') as is_new
            , get_json_object(columns ,'$.grade.value') as grade
            , get_json_object(columns ,'$.season.value') as season
            , get_json_object(columns ,'$.goods_type.value') as goods_type
            , nano_time as nano_time
            , event as event
        from ${yishou_data_dbname}.ods_binlog_data
        where 
            dt >= ${today}
            and tb = 'fmys_goods_ext'
    )
)
where row_number = 1 and event != 1
;

注意:ods_binlog_data表中对应数据格式如下所示,上述DLI节点和视图节点均使用get_json_object函数进行了数据解析

{
    "columns": {
        "admin_note": {
            "key": false,
            "mysqlType": "varchar(255)",
            "name": "admin_note",
            "null_val": false,
            "update": false,
            "value": ""
        },
        "allot_num": {
            "key": false,
            "mysqlType": "smallint(5) unsigned",
            "name": "allot_num",
            "null_val": false,
            "update": false,
            "value": "0"
        }
    },
    "createTime": 1649213973044,
    "db": "yishou",
    "event": 2,
    "nanoTime": 23494049498709146,
    "primaryKey": "157128418",
    "sendTime": 1649213973045,
    "sql": "",
    "tb": "fmys_order_infos"
}

5. ods层历史变更记录表调度任务创建

5.1. 概述

架构图如下:

说明:

  • 该作业为每天早晨7点零5分调度(因业务原因,统计的是昨天7点到今天7点的数据,并且给数据归档和传输的时间,所以7点零5分开始调度执行);
  • 该作业用记录表前天的分区和binlog昨天的分区汇总,即是记录表昨天分区的数据;

5.2. ods_fmys_goods_ext_record_dt_start节点

该节点为SQL节点,具体代码如下:

-- 在ods_binlog_data表中增加对应日期和表名的分区
ALTER TABLE ${yishou_data_dbname}.ods_binlog_data ADD if not exists PARTITION (dt=${today}, tb = 'fmys_goods_ext');

主要目的是新增ods_binlog_data表对应日期和业务表的分区,后续使用ods_binlog_data表可以直接使用对应的双重分区。在上述的ods层快照表调度任务中已有该脚本执行,但在实际跑数时,不确定这2个哪个先行执行,而如果添加依赖也不符合常理,所以添加一段这样的脚本即可,其中使用 if not exists语法。

5.3. ods_fmys_goods_ext_record_dt_跑数脚本

此节点会输出最新的业务表对应的历史记录表分区,脚本如下:

-- DLI sql 
-- ******************************************************************** --
-- author: yangshibiao
-- create time: 2022/08/15 11:45:36 GMT+08:00
-- ******************************************************************** --

-- 执行脚本(记录表前天分区的数据和binlog昨天分区的数据 union all 再写入昨天的分区即可)
insert overwrite table ${yishou_data_dbname}.ods_fmys_goods_ext_record_dt partition(dt)
select
    goods_id
    , limit_day
    , auto_time
    , is_new
    , grade
    , season
    , goods_type
    , event
    , record_time
    , nano_time
    , ${one_day_ago} as dt
from (
    select
        goods_id
        , limit_day
        , auto_time
        , is_new
        , grade
        , season
        , goods_type
        , event
        , record_time
        , nano_time
    from ${yishou_data_dbname}.ods_fmys_goods_ext_record_dt
    where dt = ${two_day_ago}

    union all

    select
        get_json_object(columns ,'$.goods_id.value') as goods_id
        , get_json_object(columns ,'$.limit_day.value') as limit_day
        , get_json_object(columns ,'$.auto_time.value') as auto_time
        , get_json_object(columns ,'$.is_new.value') as is_new
        , get_json_object(columns ,'$.grade.value') as grade
        , get_json_object(columns ,'$.season.value') as season
        , get_json_object(columns ,'$.goods_type.value') as goods_type
        , event
        , create_time as record_time
        , nano_time as nano_time
    from ${yishou_data_dbname}.ods_binlog_data
    where 
        dt = ${one_day_ago}
        and tb = 'fmys_goods_ext'
)
DISTRIBUTE BY floor(rand()*10)
;



-- 初始化脚本(任务调度之前手动执行初始化脚本)(将昨天的分区快照表数据写入到记录表昨天的分区,并设置event为0,设置record_time为0,设置nano_time为一个很小的数)
-- 注意:执行初始化脚本时,需要注意昨天的分区快照表数据是否有,一般为前置表上线第二天进行初始化,然后就可以启动作业调度,自动调度执行
-- insert overwrite table ${yishou_data_dbname}.ods_fmys_goods_ext_record_dt partition(dt)
-- select
--     goods_id
--     , limit_day
--     , auto_time
--     , is_new
--     , grade
--     , season
--     , goods_type
--     , 0 as event
--     , 0 as record_time
--     , -987654321012345 as nano_time
--     , ${one_day_ago}
-- from ${yishou_data_dbname}.ods_fmys_goods_ext_dt
-- where dt = ${one_day_ago}
-- ;


-- 创建记录表视图(该记录表昨天的分区 union all 今天以及之后的binlog的数据,准实时更新)
drop view if exists ${yishou_data_dbname}.ods_fmys_goods_ext_record;
create view if not exists ${yishou_data_dbname}.ods_fmys_goods_ext_record(
    goods_id
    , limit_day
    , auto_time
    , is_new
    , grade
    , season
    , goods_type
    , event
    , record_time
    , nano_time
) comment 'ods_fmys_goods_ext_record在ods层视图(根据业务库数据,分钟级别更新)'
as
select
    goods_id
    , limit_day
    , auto_time
    , is_new
    , grade
    , season
    , goods_type
    , event
    , record_time
    , nano_time
from (
    select
        goods_id
        , limit_day
        , auto_time
        , is_new
        , grade
        , season
        , goods_type
        , event
        , record_time
        , nano_time
    from ${yishou_data_dbname}.ods_fmys_goods_ext_record_dt
    where dt = ${one_day_ago}

    union all

    select
        get_json_object(columns ,'$.goods_id.value') as goods_id
        , get_json_object(columns ,'$.limit_day.value') as limit_day
        , get_json_object(columns ,'$.auto_time.value') as auto_time
        , get_json_object(columns ,'$.is_new.value') as is_new
        , get_json_object(columns ,'$.grade.value') as grade
        , get_json_object(columns ,'$.season.value') as season
        , get_json_object(columns ,'$.goods_type.value') as goods_type
        , event
        , create_time as record_time
        , nano_time as nano_time
    from ${yishou_data_dbname}.ods_binlog_data
    where 
        dt >= ${today}
        and tb = 'fmys_goods_ext'
)
;

注意:在调度之前需要执行初始化脚本(脚本在上述代码中),并且执行初始化脚本之前需要注意昨天的分区快照表数据是否有,一般为前置表上线第二天进行初始化,然后就可以启动作业调度,自动调度执行。并且跟ods层快照表一样,同样的生成了一个视图,用于分钟级更新数据。

6. 总结

在ODS层业务数据类型表中,业务库一张表对应数仓中4张表或者视图:

  • ods层快照分区表:ods_xxx_dt,每天一个分区,用来存储全量快照;
  • ods层快照视图:ods_xxx,一个视图,分钟级更新业务库的数据;
  • ods层历史变更记录表:ods_xxx_record_dt,每天一个分区,用来存储业务表的历史变更记录,可以只保留近1个月的数据;
  • ods层历史变更记录视图:ods_xxx_record,一个视图,分钟级更新业务库的变更记录;

注意:

  • 上述示例是已一张业务表为例进行编写,其他的业务表同样操作即可,在ods中全部业务表没有其他区别,都是使用同样的策略。
  • 在上述的2个作业调度中(快照表作业和历史变更记录表作业)都需要进行自依赖,即必须得前一天的作业正常运行完成,才能进行今天的作业;因为这2个作业都是需要依赖之前的分区,当之前分区因为运行失败没有数据或者数据错误时,不能跑新的作业;不过在代码中都有重跑措施,当运行失败时,直接点击重跑或者点击测试运行然后将作业置成功即可。

数据具体走向图:


注:其他 离线数仓 相关文章链接由此进 ->  离线数仓文章汇总


sqoop导入数据到hive ods
weixin_42219734的博客
05-18 2007
# !/bin/bash# 功能:把源数据表加载到hive ods# 输入参数据:$1 源数据库 $2 源表名 $3 增全量 $4加载数据日期 $5增量字段1 $6增量字段2 $增量字段3# 如果是全量 增量字段不需要输入# $4加载日期如果不输入 则认为是加载昨日数据# $5,$6,$7 根据实时情况输入# 例:sh sqoop_mysql_hive_table.sh athletics c_...
业务数据导入数仓
m0_66857498的博客
03-30 2165
业务数据导入数仓 sqoop增量数据导入 bin/sqoop import \ --connect jdbc:mysql://linux01:3306/myword \ --username root \ --password root \ --target-dir /sqoopdata/t_md_areas \ --中间存储在hdfs 的目录 --hive-import \ --hive-database mydata \ --存入hive的数据库 --hive-tabl...
数据仓库ODS详解- 功能、设计与最佳实践
最新发布
一个8年大数据开发工程师的碎碎念
07-28 4707
数据仓库ODS大数据分析的基石,为企业决策提供可靠数据源。本文深入探讨ODS设计原则、实施要点和最佳实践,涵盖金融、零售等行业应用。重点关注云环境下ODS部署策略,以及实时数据集成、数据湖技术等创新趋势。文章还分析了数据体量增长、实时性需求等挑战,提供实用解决方案。whether助您构建高效、安全、可扩展的ODS,为数字化转型奠定坚实基础。#数据仓库 #ODS #大数据分析 #云计算 #数据
离线数仓搭建_09_ODS数据导入
勇敢牛牛,不怕困难!
09-21 1117
本文为数仓搭建的第一,即为ODS原始数据,主要是将原始数据导入到hive中。
数仓项目之数据采集实战及ODS数据初步导入
周星星
01-13 1397
在实际生产开发当中,适当的设计agent的数量和模式,并很好的将数据采集过来,是我们分析数据的第一步,即先要有数据业务系统那边进行埋点,记录日志,到服务器本地磁盘当中考虑使用高可用模式,并使用级联模式,上游一个agent,下游两个agent,因为要对数据进行简单的清洗 、处理,所以需要一个自定义拦截器上游agent高可用模式1个source taildir类型1个channel file类型高可...
数据集成在ODS项目的应用模式
dinongxu8804的博客
12-22 136
ODS(The operational data store),也叫运营数据存储,是用于支持企业日常的全局应用的数据集合,它是介于DB和DW 之间的一种数据存储技术。从专家给出的定义上来讲,它是一个面向主题的、集成的、当前的并且是可"挥发"的数据集合,它反映了在某一个时间切片瞬间,经营分析系统和外围系统用以相互交换数据的集合,主要用于经营分析系统与外围系统关键数据一致性校验、以及经营...
Hive+Spark离线数仓工业项目--ODS及DWD构建(1)
技术专家
12-26 1262
Hive+Spark离线数仓工业项目--ODS及DWD构建
[数仓]十三、离线数仓数据质量管理)
weixin_44428807的博客
07-13 735
本文使用Python和Shell脚本实现数据质量监控的各项功能,故需先搭建相应的开发环境,Python开发可选择IDEA(需安装Python插件),或PyCharm等工具,本文使用IDEA作为开发工具。(2)点击“Plugins”,点击右上角的“Marketplace”,然后在搜索框中输入“python”,在搜索结果列表中找到Python插件,点击“Install”,安装插件。(1)点击Idea中的“File”,在下列列表中点击“New”,在右侧弹出的列表中点击“Project…
大数据-案例-离线数仓-在线教育:MySQL(业务数据)-ETL(Sqoop)->Hive数仓ODS-数据清洗->DW(DWD-统计分析->DWS)】-导出(Sqoop)->MySQL->可视化
u013250861的博客
07-13 2030
一、访问咨询主题看板 1. 需求分析 目的: 分析每一个调研需求需要计算什么指标, 以及计算这个指标需要通过那些维度,而且还包括计算这个需求涉及 到那些表和那些字段 需求1: 统计指定时间段内,访问客户的总数量。能够下钻到小时数据 指标: 访问量 维度: 时间维度: 年 季度 月 天 小时 涉及到哪些表: web_chat_ems_2019_12 涉及到哪些字段: 时间维度: create_time 说明: 发现create_time字段中包含有年 月 天 小时
实时数仓离线数仓的概念
一个记录普通男孩在IT界学习思考感悟的地方
03-06 2576
目录 1、数据仓库的发展趋势 1.1数据仓库的趋势 1.2 数据仓库的发展 2、数据仓库架构的演变 2.1 传统数仓架构 2.2 离线大数据架构 2.3 Lambda架构 2.4 Kappa架构 2.5混合架构 3、三种大数据数据仓库架构 3.1 离线大数据架构 3.2 Lambda 架构 3.3 Kappa 架构 3.4 Lambda 架构与 Kappa 架构的对比 4、实时数仓建设思路 5、菜鸟实时数仓案例 5.1 整体设计 5.2 数据模型 6、美团点评基于...
Hive+Spark离线数仓工业项目实战--数仓设计及数据采集(1)
技术专家
12-23 2035
Hive+Spark离线数仓工业项目实战--数仓设计及数据采集
离线数仓 (十一) -------- ODS 搭建
在森林里麋了鹿
10-26 572
ODS 搭建
ods-json日志数据处理和导入脚本
S1124654的博客
06-21 380
hive json日志格式装换成hive表数据
离线仓库ODS-DWD-DWS-ADS
jshxk的博客
05-17 657
离线数仓设计
ods数据导入mysql
hjckevin的专栏
03-20 2512
之前记录了mdb格式中的数据如何导出,这次是ods格式。     为了导出数据,还专门查阅了用Java操作openoffice的相关库,并做了简单的测试,确实很方面就能读取到ods中表的内容。但是,读取出来以后如何放到针对mysql数据表的JavaBean中还是个问题。首先的思路是写一个一劳永逸的通用方法,使用具体JavaBean作为参数,在方法内获取ods数据后,利用反射机制得到JavaBe
数据同步到数仓解决方案
杨宇昌的博客
05-03 1495
数据同步到数仓解决方案
实时数仓(二)业务数据库到ods(kafka)
qq_44665283的博客
03-24 2062
利用flink cdc将业务数据库到ods(kafka) (1)主要代码 package com.yyds.app.ods; import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource; import com.alibaba.ververica.cdc.connectors.mysql.table.StartupOptions; import com.alibaba.ververica.cdc.debezium.DebeziumSourceF
大数据项目实战之数据仓库:电商数据仓库系统——第7章 数仓开发之ODS
yiluohan0307的专栏
04-17 1145
大数据项目实战之数据仓库:电商数据仓库系统——第7章 数仓开发之ODS
ods-数据导入(hive)示例
S1124654的博客
06-21 647
hdfs文件装载到ods(hive)
电商实时数仓ODS数据采集解析
"本文主要介绍了电商实时数仓的分结构以及ODS数据采集...电商实时数仓通过ODS数据采集和多结构,实现数据的高效管理和复用,同时结合离线和实时计算,以及即席查询工具,以满足不同场景下的数据分析需求。
写文章

热门文章

  • 机器学习:欧氏距离(Euclidean Distance) 26678
  • 机器学习:余弦距离(Cosine Dsitance) 13980
  • Typora最后的免费版本 12972
  • 推荐系统(7):推荐算法之基于协同过滤推荐算法 11185
  • 开发工具 文章汇总 8516

分类专栏

  • 开发工具 5篇
  • 开发随笔 9篇
  • 开发语言 1篇
  • Java 6篇
  • Python 18篇
  • 开发环境 1篇
  • Linux 8篇
  • Docker
  • 大数据 1篇
  • 大数据环境搭建 35篇
  • Zookeeper 8篇
  • Hadoop 39篇
  • Hive 21篇
  • Kafka 1篇
  • HBase 9篇
  • Spark 41篇
  • Flink 65篇
  • StreamX 5篇
  • Hudi 44篇
  • 离线数仓 13篇
  • 实时数仓
  • 数据库 2篇
  • MySQL 1篇
  • Doris 7篇
  • Artificial Intelligence
  • 机器学习 9篇
  • Alink 5篇
  • 用户画像 25篇
  • 推荐系统 14篇
  • 个人总结 4篇

最新评论

  • 基于协同过滤的电商推荐系统(2):用户对商品的偏好得分

    fzw_lk11: 这里看用户对商品的兴趣后续的目的是啥,如果是进行商品的推荐,感觉购买信息不需要放在这里面。

  • Hive中使用sort_array函数解决collet_list列表排序混乱问题

    Sql_Boy_A_Liang: select province, concat_ws(',',collect_list(city)) as con_city, concat_ws(',',collect_list(rn)) as con_rn from (SELECT province , city , score , row_number() over (partition by province order by score desc) as rn FROM temp) group by province 这个方式更方便

  • 数据湖之Hudi(7):使用docker进行Hudi的快速体验和使用

    减减h: 如果有找不到docker-compose命令的话,sudo env PATH=$PATH /usr/local/bin/docker-compose -version alias sudo='sudo env PATH=$PATH' source ~/.bashrc 可以解决一下

  • MPP数据库之Doris(5):Doris安装部署之Broker部署

    Am最温柔: 有帮助 谢谢~

  • 数据湖之Hudi(7):使用docker进行Hudi的快速体验和使用

    ZWEiKE: windows 上面的docker 可以安装吗?

最新文章

  • 我的创作纪念日
  • Spark(40):Streaming DataFrame 和 Streaming DataSet 中的触发器
  • Spark(39):Streaming DataFrame 和 Streaming DataSet 输出
2023年72篇
2022年241篇
2021年53篇
2019年33篇

目录

目录

评论
添加红包

请填写红包祝福语或标题

红包个数最小为10个

红包金额最低5元

当前余额3.43元 前往充值 >
需支付:10.00
成就一亿技术人!
领取后你会自动成为博主和红包主的粉丝 规则
hope_wisdom
发出的红包

打赏作者

电光闪烁

你的鼓励将是我创作的最大动力

¥1 ¥2 ¥4 ¥6 ¥10 ¥20
扫码支付:¥1
获取中
扫码支付

您的余额不足,请更换扫码支付或 充值

打赏作者

实付
使用余额支付
点击重新获取
扫码支付
钱包余额 0

抵扣说明:

1.余额是钱包充值的虚拟货币,按照1:1的比例进行支付金额的抵扣。
2.余额无法直接购买下载,可以购买VIP、付费专栏及课程。

余额充值

玻璃钢生产厂家浙江玻璃钢雕塑公司苏州园林玻璃钢雕塑定做阜阳多彩玻璃钢雕塑订做价格镜面玻璃钢雕塑免费咨询茂名玻璃钢雕塑定制温州玻璃钢雕塑加工价格嘉兴玻璃钢陶瓷雕塑价格商场艺术空间美陈玻璃钢不锈钢雕塑图片玻璃钢西瓜雕塑定做宝鸡玻璃钢花盆批发韶关户外景观玻璃钢雕塑山西仿古校园玻璃钢景观雕塑厂家沈阳玻璃钢卡通雕塑规格云浮玻璃钢雕塑图片亳州玻璃钢雕塑工厂城口玻璃钢仿铜雕塑温州商场开业美陈玻璃钢鹿雕塑泉州海南海口玻璃钢雕塑定制唐山玻璃钢广场雕塑厂家定西玻璃钢景观雕塑定制陕西多彩玻璃钢雕塑定制合肥玻璃钢景观雕塑设计柳州玻璃钢雕塑用途广东商场创意商业美陈怎么样贵州玻璃钢雕塑销售厂家郑州泡沫玻璃钢雕塑毕节商场美陈雕塑河南景观玻璃钢雕塑定做价格香港通过《维护国家安全条例》两大学生合买彩票中奖一人不认账让美丽中国“从细节出发”19岁小伙救下5人后溺亡 多方发声单亲妈妈陷入热恋 14岁儿子报警汪小菲曝离婚始末遭遇山火的松茸之乡雅江山火三名扑火人员牺牲系谣言何赛飞追着代拍打萧美琴窜访捷克 外交部回应卫健委通报少年有偿捐血浆16次猝死手机成瘾是影响睡眠质量重要因素高校汽车撞人致3死16伤 司机系学生315晚会后胖东来又人满为患了小米汽车超级工厂正式揭幕中国拥有亿元资产的家庭达13.3万户周杰伦一审败诉网易男孩8年未见母亲被告知被遗忘许家印被限制高消费饲养员用铁锨驱打大熊猫被辞退男子被猫抓伤后确诊“猫抓病”特朗普无法缴纳4.54亿美元罚金倪萍分享减重40斤方法联合利华开始重组张家界的山上“长”满了韩国人?张立群任西安交通大学校长杨倩无缘巴黎奥运“重生之我在北大当嫡校长”黑马情侣提车了专访95后高颜值猪保姆考生莫言也上北大硕士复试名单了网友洛杉矶偶遇贾玲专家建议不必谈骨泥色变沉迷短剧的人就像掉进了杀猪盘奥巴马现身唐宁街 黑色着装引猜测七年后宇文玥被薅头发捞上岸事业单位女子向同事水杯投不明物质凯特王妃现身!外出购物视频曝光河南驻马店通报西平中学跳楼事件王树国卸任西安交大校长 师生送别恒大被罚41.75亿到底怎么缴男子被流浪猫绊倒 投喂者赔24万房客欠租失踪 房东直发愁西双版纳热带植物园回应蜉蝣大爆发钱人豪晒法院裁定实锤抄袭外国人感慨凌晨的中国很安全胖东来员工每周单休无小长假白宫:哈马斯三号人物被杀测试车高速逃费 小米:已补缴老人退休金被冒领16年 金额超20万

玻璃钢生产厂家 XML地图 TXT地图 虚拟主机 SEO 网站制作 网站优化