尚硅谷大数据技术之数据湖Hudi-1

尚硅谷大数据技术之数据湖Hudi-1

Hudi概述

Hudi简介

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
接下来我们先来了解一下什么是hudi。可以简单理解为这么几个单词啊。Hadoop相关的upsurts就是支持插入及更新,并且呢支持一个删除,还有增量的一个处理。那么hudi其实就是咱们经常讲的一个什么数据湖的一个框架。那么官方更愿意称它为一个平台。因为啊它提供了一个平台化的能力,还有很多的功能,并且呢它是支持什么呢牛市的啊,这一点就特别关键的。那么继继续看啊,apache将核心仓库还有数据库的功能直接引入了数据库。也就是说大家使用起来还是应该是比较熟悉的

那关键是后面这里你看它提供了一个表表,怎么理解呢?那么相信大家啊来了解hudi的一定都了解过,hive这么一个框架,咱们经常用hive来做一个离线数仓,对吧?那其实咱们大数据,比如说是基于hadoop的那我们的分布式的存储都是基于HDFS。那这个时候对于这个上面的数据怎么像一张表一样来管理跟使用它呢?那就借助hive这个框架,他给我们提供了表的管理,像表的一些schema啊,那我们就可以通过一些hive sql来对数据进行一个分析。

那hudi同样的它提供了它特有的一些表的格式啊,这一点也是一个基本功能。那后面呢大家注意看有一个什么事物,这也可以说是解决了一个hive的一个痛点。那么海普大家知道早期版本并没有支持呃事物这种ACID的这种语义。当然呃有一个版本它是出了支持的,但是呢它底层的实现还是insert overwrite。还是这么来实现的。那么大家可想而知,这种方式来实现的话,整个覆盖的话,那它的效率呃应该是高不了了。所以现在呢使用它这个特性的人也很少。但是hudi不一样,它提供的事物是比较清亮的啊,也不会这么重的操作啊,这也是它非常重要的一个特点啊,这个是大家需要去重点记住。

另外一个呢就是高效的upsert。那么大家在平时啊做那个离线数仓的时候啊,也就是说咱们常规用了一个hive表,应该有一个巨大的痛点。就是哎比如说我只需要更新某几行数据,那或者说我只要更新某一个字段的值,那么其实对于hive来讲,我们能做的也就是银色的override。你要么整个分区覆盖,要么整张表覆盖。那这种呢同样的很低下。但数据湖不一样,它可以找到呃你需要的这一行,将它更新掉就可以了。这也是依赖于hudi架构上的一些设计啊,这也是它最最重要的特性之一啊,upsert.


另外呢呃对于数据的删除,那同样也可以实现。另外呢他也提供了一些索引的能力,那索引大家都了解对吧?那索引呢通常是提高咱们的一个效率。

另外呢流摄取服务,也就是说咱们可以从像一些分布式文件系统,还有消息队列kafka这些地方流式的,将数据采集进湖底这么一个数据库里边。那他提供了一个像delta streamer啊这么一个工具,是依赖于Spark的。当然后续的版本现在也支持使用flink来做这个事儿啊,都可以。

并且有数据的一个聚簇,还有压缩。那呃怎么来理解这个压缩呢?其实咱们用它原生的说法大家就懂了。Compassion这个东西嗯在hbase还有一些OLAP数据库里面应该都很常见。其实就是什么呢?呃,可能是我不断的去写入啊,大家简单理解,我不断的在写入的话,那可能每次写入你多次写入的话,可能会生成多个一个文件系统上的文件。这个时候呢它可以进行什么呢?呃,类似于合并这么一个操作吧。啊有的喜欢翻译为合并,有的翻译为压缩,但都是这个单词啊,大家应该都熟啊,像click house hbase都有这个compassion的一个操作。并且呢支持一个并发的一个操作。那大家从这边看下来,其实他我们列出了几个重要特点啊,都是解决传统数据仓库,尤其是像典型的hive这种一些痛点。那hudi就是专门来解决这些事儿啊,简单来讲是这样。另外呢呃他保持了数据的开源文件格式,也就是说他不会再使用自己特有的一种文件格式。这样的话如果保持开源格式的话,那咱们跟其他地方的兼容性就会特别好了。那它主要用到的是这么两种格式啊,一个是.parquet这种列式存储的行式存储呢,它是会使用一个.avro,哎,好。那这个两种格式大家应该都特别常见啊,那这样的话回头你要需要用其他的框架来跟它做一个集成啊,或者去读取它的数据啊,都是特别的方便。

那hudi呢还有一个特性啊,它可以做流式的一个处理。那同样它也可以做什么批处理。所以hudi其实我们也可以说它可以支持流批一体,既支持流也支持批。另外呢大家注意它这个批是什么增量的,而且是高效的。像比如说hive咱们做离线数仓的时候,经常是什么做T加一的分析,什么是T加一啊?也就是昨天的数据,我可能需要一整天的数据都嗯累计完了,我再统一采集过来,对吧?过了晚上12点我采集过来,再统一对一整天的数据进行一个分析。

那hudi这边批处理的时候,它可以增量。比如说我每5分钟,我可以将5分钟内新增的数据先做一个处理,做一个输出。那下一个5分钟再接着处理5分钟内的一个增量数据啊,也允许这么来用。那同样你也可以用一些流的方式来处理啊,可以说它是呃你想怎么用都可以。另外呢还有一些细节啊,hudi呢可以在任何的云存储平台上使用啊,也就是说跟各大的云平台呃,它都是做了一个兼容的。那么跟hudi我们通常结合一起使用的,有大家特别常见的一些分析引擎啊,像一个什么呢?Spark应该特别熟,不管是离线还是实时都有很多企业会去使用。那么还有最近比较火的一个flink还有呢Presto、Trino,还有hive等等,都可以去集成使用啊,特别的方便。

image-20240305165504157

1
2
3
4
5
那我们也可以简单搂一眼,这个架构图也不算架构啊,这是官网放的一张图啊。那么大家可以看到呃属于数据源,就我们的数据来源可能有各种各样的对吧?啊,可能有数据库的像什么mysql,还有其他一些,还有我们的APP产生的,还有微服务产生的一些日志啊等等这些你的埋点信息也可以。那这些呢通过可我们可以统一采集到一个什么呢事件流。什么叫事件流呢?那比如说我们最常用的就是一个什么kafka这种消息队列,还有什么lock ket MQ这些都可以啊,也就是说消息队列这么一种框架。那那我们可数据到了呃消息队列之后,我们可以把它通过采集入湖做一个ETL。那这边可选的工具有特别多啊,可以用一些CDC的工具,可以用hudi提供的这个delta streamer都可以啊,甚至呢我们做的一些Spark跟flink都可以将它采呃数据入湖。那么到了数据湖之后呢,这些数据进来之后啊,它会形成一个什么呢?一特定的hudi表的一些管理之后呢,我们可以基于这个再做一个增量的ETL,还是结合咱们熟悉的一些引擎,像Spark have flink这一些啊进一步的处理,这是进入到湖底之后的操作。

那么我们的查询呢要对hudi表进行查询。也支持很多的东西,大家可以上面有一坨对吧?啊,像常见的什么trainal Crystal啊,这些都可以have impala啊,还有阿里云的、亚马逊云的啊一些东西。Spark那其实flink也可以啊,那包括我们构建一个pipeline的话,Spark、flink hive啊都特别熟。这上面呢就是它的计算引擎,还有一些查询引擎。

那么再往底层去看呢,就是它数据存在哪里啊啊,那么大家可以看到一些分布式的存储文件系统都可以像亚马逊的S3,我们hadoop的HDFS,还有后面的一些啊都可以。那接下来我们目前呢呃会在使用hudi的一些企业,这也是官方放出来的。当然呃不是所有的啊,咱们只是列出了一些像比较出名的,像由于什么呢?字节跳动对吧?沃尔玛、ebay、亚马逊、推特、苹果啊等等等等啊,还有百度这些什么华为啊啊蚂蚁,也就阿里系的啊,阿里云啊等等,这些通通都在使用一个hudi。并且咱们hudi现在呢可以说是在数据湖有几个框架嘛,一个是hudi,还有一些像冰山啊,还有德尔塔内克啊啊这些来讲,那hudi相对来讲它的社区会更活跃一点,并且呢它的迭代速度也是特别快的啊。

image-20240305173018792

发展历史

1
2
3
4
5
接下来我们来了解一下湖底的发展历史。那在2015年的时候啊啊发表过这么一篇文章,是关于增量处理的。它里面阐述了增量处理的一个核心思想还有原则。这个主要区别于咱们传统的全量处理。全量处理呢不仅处理的数据量大,操作重,而且呢效率也没法得到一个很大的提升。那如果我们采用增量处理的方式啊啊首先你每次处理的数据量就小了,并且呢就更及时了啊,这样显然显然是更好的对吧?那么在2016年的时候啊啊,uber也就是美国的这个优步,他创建的湖底这么一个框架,并且呢对他们的内部的数据库还有关联业务提供了一个支持。这个时候还属于他们自个儿用,对吧?那好东西呢那自然要分享嘛。那在2017年的时候,也其实大家可以看到,又经过了一年。那这个优步呢就将hudi给开源了。而且能支撑到百PB的这么一个数据库框架,可以说啊非常强劲。那么这是刚刚开始开源。那么到了2018年呢,这个时候开始吸引了大量的使用者。哎,主要也是这个时候呢呃依赖于云云计算啊,进一步的普及开了。当然在18年的时候,咱们还呃咱们国内应该是用的还不多。那么在2019年呢啊这也算是一个起点呢,成为阿帕奇的一个孵化项目19年并且呢增加了更多的平台组件。也就是说它相关的支撑呃支持的集成的一些组件就更多了啊,而且它内部的一些呃功能组件也更多了啊,总而言之呢啊就是功能越来越多。那么经过一年的时间,他就快速的毕业了,从孵化项目毕业,成为阿帕奇的顶级项目。啊,这也是几个关键的时间点。那这个时候进入了一个快速增长的时间啊,他们的社区啊下载量啊啊采用率啊超过了十倍的一个增长。那么在2021年的时候啊,已经开始支持优步内部啊啊500pb的这么一个数据库了。并且呢一些sql语句啊,还有跟flink的集成啊啊这些都进一步的引入了啊。因为flink也是比较火嘛,那么增加了优化的一些项,所以呢原数据的一些服务啊,还有它的一些缓存等等这些东西。那么到目前呢可以说这个护底在国内是越来越火了啊。这是大概的一个它的发展历史。

那么从这些历史啊,大家大概就是总结出这么几点。第一啊,谁做的,谁开源的,是美国的uber公司。呃,另外呢呃我们关注的点就是从阿帕奇毕业成为顶级项目。
这是在20年,可以说也是比较相对来讲比较新兴的啊,比较热门。

Hudi特性

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
好,另外我们了解一下hudi的一个特性啊,咱们列了几点啊。第一个呢关于它的索引引,它是一个可插拔拔方方式啊。什么叫插插拔呢?就是随随可以插入入,随时可以拔走,对吧?也就是说你可随随时对某张表添加索引,也可以对某张表删除其索引。那么基于这个索引机制呢,就能够帮助我们支持快速的upsert还有delete。这个upsert其实就是插入集更新嘛啊如果不存在就插入,如果存在那就更新。

那删除呢?那这个你可能听起来觉得很常见啊,也没什么特别的,但是你要区别对比啊,咱们时刻拿Hive来做一个对比啊,也不是说贬低hive啊,只是说hive呢呃他们的擅长的地方不一样啊。那have我们前面也聊过了,我们对于某行数据或者某一个字段值的更新啊效率不是特别高,对吧?啊,但是hudi对这方面它就很擅长了啊,这就要依赖于它的一个索引机制有了,所以你才知道哎我要更新的这一行在哪里啊,并且能快速的定位到这一行。另外呢就是删除数据也就很方便了。那在呃一张表里面我要删除某一条数据,对吧?那这个呢hive也不是很擅长,那hudi呢也可以实现。这也是我们吸引的比较重要的一个特点。

还有一个就是什么呢?增量它可以支持增量的拉取表的变更来进行一个处理。那这个增量就是我们说的了啊它的设计支出的一个核心思想,就是增量这么来设计。这也是呼底这个I这个单词啊,这个字母的一个含义啊,增量的处理。另外呢表变更呃,变更有两个含义啊,一个是表结构变更,第二个呢是表的数据变更。比如说你可能是新增的数据,可能是啊更新的,可能是删除的啊等等。这些变更都可以进行一个处理啊,这样用起来就很舒服了吧,对吧?

呃,那其次呢还有它的一个很重要特性,支持事物对吧?事物的提交及回滚,而且呢可以去做一个并发控制,那这也是很重要特点,也是区别于谁啊hive啊。来我们再往看,呃,那他这边可以集成的一些第三方的计算引擎有特别多呀。像大家熟知的Spark、Presto、Trino、Hive、Flink啊,都是大家比较常用的。这些应该说是咱们大数据领域非常高频、非常常用、非常常见的一些引擎。那么都可以跟hudi进行一个很好的集成一起使用。而且呢可以去sql的操作啊,很舒服。

还有一个也是很有亮点的地方,自动管理小文件。那么大家都知道分布式存储啊,也特别是分布式的文件存储系统、文件系统啊,比如说常见的HDFS天生是不是就害怕小文件了。那我们也知道小文件一多,第一呢是对name node造成的压力。因为不管这个文件多大或者多小啊,它分的每一个文件块都会占用固定的原数据啊,150个字节,对吧?如果小文件过多,每一个文件啊都是一个块啊,那这个原数据存的就就大了。第2个呢,咱们回头用计算引擎去读取这些数据的时候,根据你不同的规则或者说读取格式吧,它大部分来讲是要依赖于切片来读取的。启动一个并行的,比如说M8的map任务对吧?那就会导致map任务过多,但是每个map呢又处理的数据量又小啊,那这样就浪费资源,效率也不高。那这个一直是HDFS呃害怕小文件的一个原因。那么利用户底呢它能自动的管理这个小文件了。相信大家平时在做离线数仓,不管你是用什么引擎,MI也好,Spark也好,或者test也好,对于这个小文件问题都是很头痛的。或者说常见做法就是呃分析完之后再起一个job来合并小文件对吧?啊,那么即利用hudi呢它就能自动管理了,这也是非常呃有亮点的一个地方。

那包括一些数据的聚簇啊、压缩啊、清理啊这些它都是自动实现啊,自动托管。那这个压缩就是咱们前面聊的这个compassion。啊,也可以理解为合并啊也可以称之为合并吧。啊清理呢就是一些过期的数据啊,它有一个cleaner啊不同的策略来清理我们的数据。说白了这就是什么一条龙啊,同学们对吧?给我们做到了一条龙服务。

另外就是一个流式摄入哎就是流式采集呗。首先呢它内置了一个CDC的源,还有工具,就像咱们前面聊到了这个delta streamer,这是flink官方封装好的一个工具啊,底层是依赖于Spark,用这个可以很方便的去进行一个流失摄入。不管是从卡不卡这种消息队列啊,还是从其他一些地方啊,从文件系统都可以用这个。那如果你习惯于用flink也可以,你也可以用flink等等都可以。

另外呢,就是他做了一个内置的原数据。那我们通常现在企业里面嗯,不管你是做数仓也好,做什么也好,到后期来讲,你应该都会做一个数据治理方面的事儿。那其中治理当中很重要的一块就是咱们一个原数据的血缘关系,对吧?也就是原数据理理。那这一块呢hudi在这方面嗯做的也是有了有一定的考虑,你可以很方便的去通过它的原数据做一些其他其他处理。

另外呢通过向后兼容的方式实现什么表结构变更的支持啊,也就是说我表原来有ABC3个字段啊,比如说mysql来的啊,我都呃入到hudi,正常的数据变更能拿到。那如果比如说我修改了一个字段,我把C字段改成了D字段,或者说我增加了一个字段D啊,像这种表结构的变更,咱们hudi啊啊也是能够兼容的。好这个呢其实这些特性大家可以看到,没有一句是废话。每一个都能解决咱们生产使用的一些 痛点问题。这也是为什么hudi越来越受关注啊,对吧?他能抓住你的痛点。

使用场景

image-20240305175215745

1
2
3
4
5
那么了解hudi的,咱们再来聊一聊咱们用户地可以用来做什么事情啊,也就是说我们能够落地的一些使用场景啊,那简单的咱们就总结了这么几个啊,第一个是可以近实时的写入啊,近实时。那么可以减少咱们一些碎片化工具的使用。可以通过CDC工具增量的导入咱们关系型数据库的数据,像MYSQ这种对吧?关系型数据库那还可以呢
咱们前面也讲了自动管理小文件啊,那它可以限制小文件的大小跟数量,也就是说咱们可以设一些参数啊,呃,通过这些参数来控制文件的数量,控制文件的大小,这个可以很灵活的来配置。
另外一个就是近实时的分析啊,为什么说近实时呢?呃,因为咱们正常来讲hudi的话,如果你想做到那种毫秒级的响应,毫秒级出结果,毫秒级的延迟,或者说是很短的秒,比如说几秒钟之内5秒。3秒5秒这样子,呃,那可能实现是可以实现,但是可能没有那么尽如人意,更多的来讲,咱们能达到这个分钟级的这种延迟啊,已经非常理想了,也能满足满足咱们大部分的一个要求了。相对于这些秒级的存储,就时序数据库啊,对吧,这种是时序数据库,它会更加的节省资源啊,这也是从性能延迟,还有一个呃,资源方面的一个衡量啊,我咱们肯定是这种更合适的,没必要追求极致,对吧,提供分钟级的时效性啊,支持更高效的查询,我们说了之所以支撑到分钟性能更好呢,是因为这会更高效啊。还有一点就hudi作为一个依赖,它非常的轻量,这句话怎么理解呢?哎,大家想想,咱们用一些,比如就就以例子为例吧,Hive,咱们要部署它,咱们以人工,呃,就手工部署为例,你首先是不是要有一个安装包。编译好的安装包,这个安装包咱们是不是要上传服务器。上传服务器之后,咱们是不是进行一个解压对吧,解压完之后是不是修改它的配置文件,修改完之后呢,是不是去启动呃,Hive相关的服务,像hive的那个原数据服务,或者have serve two这些服务啊,也就是说它是需要部署需要单独启动的一个东西,对不对啊,那作为hudi它需要这样吗?不需要,你只需要什么呢?编译完的hudi相关的,比如说我举个例子,你要跟flink集成,你只需要什么呢?将你编译好了flink hudi这么一个jar包,这只是一个jar包,你把它放到flink的class pass,简单来讲就是放到flink的一代路径之后呢,你用flink就能去操作hudi表了,很简单吧。另外呢,比如说你用的是Spark引擎也可以,你只需要将编译完的这个Spark hudi的这个架包,一个架包也是一个架包。也是放到Spark的依赖路径就可以了,那么你启动一个Spark就能够去呃读写hudi表啊,去查询hudi表。那同样的道理,像pstal啊啊也一样的啊,都是通过一个架包的方式作为一个依赖,你把它放到对应的引擎当中就可以了啊,非常的轻量啊

是还有呢,增量pipeline。它这里区分的是到达时间跟事件时间,如果熟悉flink或者熟悉流式处理场景下的数据乱序问题啊,就时间语义了。那其实它也是区分这两个东西,可以处理一些延迟的数据,也就是说对乱序有一定的支撑吧,啊另外呢,更短的调度间隔,减少端到端的延迟,对吧,那也就是说其实还说了这个事儿,咱们可以达到一通过增量达到一个分钟级的啊时效性。作为一个增量管道来处理啊,就不不间断的一直在增量的处理,你可以流逝的,也可以短周期的调度啊。还有一个就是增量的导出。我们可以替代部分kafka的场景,数据导出到在线的服务存储,那么增量导出对吧?区别于你每一次的全量导出,也就是说你看增量,增量实就咱们提炼这么几个场景来讲,几个关键词是什么?实时,实时这个是写分析,就是读呗,读出来再进一步的,呃,你要做成分析都可以,对吧,那不管读还是写都能做到进实时对吧,也就分钟题啊,这是咱们的一个场景,第二个场景呢,就是做增量的。不管你是从数据进来还是读出来做处理分析啊,得到一个结果,或者是往其他地方去写啊,入也好出也好,都是分钟级。啊,或者说增量的方式啊,也是分钟级,这就是咱们使用的一些场景啊。

编译安装

Hudi编译_版本兼容&Maven安装配置

1
2
3
4
5
6
7
8
9
10
11
12
接下来我们来尝试编译一个hudi。因为hudi并没有提供一个编译好的包,没有提供编译好的二进制包,所以呢需要我们去手动编译。并且呢这中间涉及到很多细节,呃,很多版本的兼容性问题,就一些注意的地方我都会给大家讲,呃,一些坑呢我也给大家踩完了。只要照着我的方式去改相应的pom文件,改相应的源码,就能够很顺利的编译完成来使用了。

那么我们本次课程呢使用的相关组件的版本,基本上是跟我们是用我们是上硅谷的一些版本保持一致。比如说hadoop我们用的是3.1.3,那么hive呢用的是3.1.2,flink呢用的是1.13.6。注意scala版本我们用的是2.12。
那除此之外还有一个Spark,那如果是Spark的话,由于我们用的是0.12。那0.12呢对Spark的支持,可能就是二系列的话是支持2.4。3的话是3.1、3.2、3.3啊,不支持3.0啊,这是0.12的一个特性。所以呢我这边Spark直接用了一个3.2啊,你想用3.1、3.3都可以啊。Scala版本是2.12,那关于这个版本就呃前面这两个啊,主要是hadoop跟hive这个东西是需要我们去修改一些东西来适配啊来兼容的,否则啊你是用不起来的

那关于flink呢,它0.12是支持flink13,flink14,还有15啊都支持。那么如果你用的呼底是0.11版本啊,我们现在是0.12,对吧?如果是0.11版本,它只支持flink的1.13,1.14。那如果是hudi的0.10版本,它最高也就支持到flink的1.13版本。这个是大家需要注意的地方啊。关于flink的支持
另外一个就是spark的支持。我刚才讲了,其实从0.11开始,它对3.0就不再支持了啊,它支持的是2.4啊、3.1,还有3.2。那么从0.12开始,它支持的还是2.4、3.1、3.2、3.3啊,这个是大家需要注意的一些地方。
如果你的Spark是其他版本,要么你自己去修改去适配,要么你就用更老一点的版本啊。比如说你用0.10的hudi,它是可以支持,比如说3.0的Spark等等。那关于这些版本兼容性怎么找呢?

啊,我给大家提供一个思路啊,呃你百度就行了啊。比如说你来一个POO底啊0.1,我们先看一下零点一啊,搜一个release,呃,随便找一片子就行了啊。比如说这个嘛,你看这都是官方号发的啊,很权威的啊。那我们看一下啊,呃我直接往下跳。嗯,你看嗯一些零点一的一些alt table语法为3.1跟3.2的Spark才支持的对吧?那你3.0就没法支持了,所以会有一些问题。再比如说Spark这一块,一些语法时间履行仅限什么3.2以上的版本,你3.1甚至不支持这个特性。再往下看呢呃Spark版本和绑定的包增加了对3.2的支持,对吧?3.1将继续支持,2.4继续。那你再往下随便搂一眼啊,这个是flink的啊,就是有各个重要组件的一些兼容版本,它都有说。呃,你看对于3.2的支持啊,还有什么趴回的。我记得还有一个啊啊你看看这里从0.1一开始,它不再正式的支持3.0的什么Spark,鼓励用户升级到3.2或3.1,这个是大家要注意的地方。那0.11的flink只支持到一四啊,那看完一一之后,你再看一下一二。嗯,这一篇应该是别人翻译过的。没事啊,我们还是同样的看一下,哎,你看它支持了什么是8G3.3,并且呢继续支持3.2、3.12.4,还是没有3.0,这个是大家要注意的地方啊。另外呢,这个新版的foody0点一二支持了flink1.15啊,也是比较新的版本。那14跟1三继续得到支持,这个是跟大家强调的一些小细节啊,所以你这个版本的兼容性要特别特别注意啊。

另外这个hadoop hive啊,我会教你怎么改源码啊,怎么来适配。因为它默认了这个hadoop的编译版本用的是2.7还是2.6来着,反正是二系列的,你编译不过去的啊。那那hive也一样啊,它也好像是2.3还是2.4,我也忘了啊,所以这一块呢需要修改一些源码啊。行,呃,后续再给大家一一演示啊。那在做编译之前,那么需要大家最好是在linux上面去做一个编译。那我的环境是cetOS7.5。那么其实如果你的这些组件版本跟我基本一样,呃,操作系统也基本一样,那你直接拿我编译好的包去用就可以了啊。那如果不一样,那你还是自己编译一下啊。
1
那行,那首先咱们得有一个maven,那那maven呢这个资料呢。你可以给比如说我给到你的资料,这个文件夹有放了一些需要的东西啊,比如说这个编译需要的依赖啊,还有maven在这儿啊,我放了一个安装包啊,还有一些其他的东西啊,让你把这个安装包上传到你的服务器上,我这边是已经上传并且安装过了。你上传之后啊,你该tar命令去解压,你正常去解压就行了啊。解压完之后呢,你想改名就改个名啊,无所谓。改完名之后,咱们将maven配到环境变量里,对吧?在/etc/profile里面配一个maven home啊,再拼接到path里面啊,注意后面这个路径写你自己的路径啊,这个写你自个儿的路径,就比如说我这边吧。拉到最后,哎,大家看我这边是添加了一个maven,当然我用的是.3,无所谓啊,一样啊,只要是3.6的应该都没问题。好,配完之后别忘了什么,做一个source让它生效啊,这样就ok了。这样你的maven就安装成功了。但是除了安装之外,咱们还需要做一些额外的配置来加速咱们的依赖下载。因为每maven编译的时候需要下载需要用到的一些依赖架包。那默认它的仓库地址是什么?是国外的呀?那国外如果你没有一些科学上网呃,可能会特别特别慢。所以我们常规做法是什么?是改成阿里的进像仓库地址,这样你国内去他去下的时候就特别快了。那你怎么测试你没有安装成功呢?就执行个命命令呗啊,mvn -v那如果跳出这个maven的信息版本信息,那说明是ok的啊。那CDK的版本我用的是1.8啊,你也要稍微注意一下,至少是1.8了。好,这个是关于maven的安装。那修改的配置呢,比如说哎我来到我的maven路径下面,你解压完应该是有这么几个文件夹的那bin就是命令脚本conf这个配置对吧?我们来到conf文件夹,然后呢这里有一个settings。来settings里面呢,我是配好了,我就告诉你在哪这里你直接搜什么呢?/mirrors镜像对吧?这个默认是注释掉的那下面你自己配一下啊,就比如说。我这边自己你看加一个mirrors标签啊,后面对应有一个结尾啊mirrors那中间这一段注释我就呃是我注释掉了。但是你看我这里加了一个什么阿里的地址是吧?阿里镜像的地址啊,你加上这么一个东西就可以了。那你后面这些你不用管,因为大家知道maven仓库生效,只有其实只有第一个生效,你背的再多也没用啊,中央仓库只会有一个啊。行,这边大家应该都会了,那改完就保存退出就可以了啊。那就按照我给你们的这个添加一个镜像啊。好,那这个就是我们的maven安装及准备。

Hudi编译_解决与hadoop3.x的兼容问题

image-20240307171417454

image-20240307171851824

1
2
3
4
好,那环境准备好之后呢,咱们可以去将我们的源码包上传到你的服务器上面。那这个源码包你可以从官网下载,也可以从我这边啊我这边放的资料里面放了一个源码包,这个你从官网下就行了啊,这个哪都有,那上传你的服务器应该都会啊。我比如说上传到你的OPT softwere,那在这里呢已经有一个了,对吧?那有了它之后,它是个tar包,我们就解压呗。-ZXVF那杠-C指定你要解压的路径,那我还是放在本路径就好了啊。
简单看一眼这么多文件夹大家都看懵了,对吧?那其实对我们来讲,回头编译完需要的一些jar包在哪呢?在这个里面就是说里面其他这些东西你就你不修改源码的话,你就不用去看啊,反正反正别人一玩都在这个里面packaging啊。那说这么多,咱们现在要来做一些修改。

第一呢就是对我们hadoop还有hive的版本兼容。就我前面讲的这个事儿,来我给大家看一下啊,我打开它的pom文件,大家注意就是这个文件夹根目录下的pom文件。好,打开我显示一下行数set nu。呃,我往下拉一点啊,往下拉一点,那么呃不是这里啊啊,比如说好在一百多行这里,那么大家可以看到它默认的一些版本是什么?Hadoop版本用的是2.10,总而言之它是不是二系列的。那么跟三系列的hadoop还是存在一些API的不一样啊,会报一些兼容性的问题。另外一个就是hive,你看它默认是用2.3来去做编译的那还如果你用的像我们是3.1.2,那使用上会有问题。所以在这里呢我们首先要修改它依赖的版本啊,那比如说我直接修改。呃,或者我先拷贝一行吧,YY P对吧?然后呢修改我把原来的注掉啊,还是留着吧,这个习惯还是保留。那hadoop的版本我就改成我的3.1.3。另外一个咱们要改的就是hive啊,还是要YY P拷贝一个,先把原来的住掉。好,改成我需要的版本3.1.2。好,这边就修改了。但是你光修改这个地方并不会万事大吉啊。那我们先改完这两个,保证退出啊。如果光改这个地方,你还会报很多错的。

image-20240307172609034

1
2
3
4
5
比如说我先来把拿到执行命令这里指定了各种各样的版本是吧?啊,我需要的版本。好,那这个时候你直接来执行这个maven命令来回车,大家注意看它的报错。啊,稍等一会。好,现在报错了对吧?大家可以看到什么呢?是库迪的COMM门模块啊。然后呢他说找不到合适的构造器,那为什么呢?实际参数列表和形式参数列表长度不同,那么大家注意其实就是这么一个方法啊。就是你看是hadoop相关的,然后呢是FS相关的。它这里面传参2系列它是一个参数,三系列它是两个参数。这个就是它的一个调用的api的兼容性问题。那这个地方要解决起来很简单啊啊,我们呃由于第二个参数我们是没什么用的啊,或者说对我们来讲没有实际意义,所以你给他传个null就好了。哎,所以呢你在这个地方要修改它啊。

好,那修改之前呢我们再加一个什么东西呢?就是一个仓库地址。就是还是刚才这个pom文件啊,在里边我们呢再手动加上一个仓库地址,让他依赖优先来这里下。你搜一下仓库这个单词,那你可以看到他默认写了一堆仓库,对吧?你看也就是说虽然呢咱们maven已经配了阿里镜像了,但是是不是以呃咱们工程里边的pom文件优先级更高啊?你这里指定的第一个优先级是国外的,这个仓库地址还是会从这里下的啊,所以我们这个地方还是要调一下啊。那我们我看一下大概在第几行啊,大概在一千多行,反正你就搜这个就可以了。好在这里面我们加一个就可以了哎。这样是加速咱们的编译。

好,第二个事情呢就是刚才那个修改组件版本,我们改了啊。

image-20240307173137150

1
那第三个,我刚才讲到的这个报错,这个怎么解决呢?他报哪个类,你就修改哪个类就行了啊,就是这个类嘛,这是一个java文件啊,张好写的第110行第44个字符,对吧?所以你直接拷贝这个类内名啊,vim这个类名我直接粘贴,哎,回车我显示一下行数。呃,大概是110行对吧?那我们看其实就这一行对吧,是不是FS的这个输出流的这么一个构造。这个是2.10版本或者说二系列哈。那现在我们用的是三,那我就在构造第二个参数给他。全都是细节啊,哎你加个null就行了。呃,如果你想研究一下这个参数的意义,你自己去对比吧,这里我们就先不展开了啊。现在目前对我们来说是没意义的,我传个null就行。好,保存退出。这样的话刚才这个报错,哎,也就是这个报错就不会再出现了啊,这是解决跟hadoop版本兼容的一个问题,好吧。

Hudi编译_手动安装需要的kafka依赖

image-20240307173428868

image-20240307173933063

1
2
3
4
5
那解决完了跟hadoop的依赖问题,那还得再来解决一个跟kafka的依赖问题。啊,为什么呢?因为我们在编译过程中有一个栈是hudi的工具栈,在编译这个栈的时候,它需要用到kafka的一些依赖,对吧,像kafka-avro序列化器呀,啊等等这些东西,呃,比如说我刚才的编译命令再执行一遍吧,那个hadoop已经解决完了,应不会再报那个错了,但是他会报其他的错啊,我还是执行这个命令给大家看一下啊。那让他跑着我们继续讲啊,那这个这些依赖你在maven中央仓库是下不到,或者在各个地方下不到,呃,因为这个是已经要在confluent里面去下,我们需要手动下载,并且呢,手动安装到你的本地maven仓库啊,那这个东西怎么下呢?呃,我已经把链接给你了,去confluent官网去下就行,他报错需要的版本是5.3.4,那你就下5.3.4啊

那么看一下你在编译过程中最后遇到什么,就像我文档截的图一样,他说不能够解决这些依赖,对吧?什么依赖呢?是关于confluent的一些kafka依赖啊,是版本是5.3.4,那我给你的链接啊,也就这个链接,你下载完之后,这是它的官网啊,如果你是其他的版本,你就看报错要什么版本,你就对应的去找什么版本就行了,那下完之后,比如说在这个它是一个zip包,你将它解压。那解压完之后你会发现这里面东西很多呀。呃,你就是解压完之后,比如说来到他的share里面,share里面有很多的他的相关依赖。啊啊,就在在这个Java里面share\Java,你或者直接在那个直接搜索啊,那那那报错几个包,哪几个包呢?就这个啊,你看啊,一个是他这个包,这是一个,然后再往下这个什么common一个包,还有这个,还有这个common一个包,还有这个卡不卡这个一个包。啊,一共是这么4个包啊,需要我们手动去安装啊,你在这个目录里面share\Java里面将这几个包搜索到啊,刚才这里包含了很多其他乱七八糟的价包,但是呢,我已经给你准备好了。你只需要到哪呢?资料还有什么编译需要的依赖也就是什么这4个包。我已经帮你摘出来了啊,5.3.4,如果你跟我一样
直接拿这四个就OK了,这四个包呢,你就是上传你的服务器啊,上传完之后通过maven命令手动安装到你的本地仓库就可以了啊,那怎么装呢?命令我都给你准备好了,诶用main的install命令就可以了,好吧,那你这边需要修改的可能有几个地方,这个什么group ID啊,就它的GV啊,这边是固定写法,你就按我这么写就行了那。这个-dfile这个路径就看你安装在哪了,我写的是相对路径啊,这个大家要注意的,我提醒一下啊,我写的是我的相对路径,你们如果路径在其他地方啊,你可以写你的路径,全路径也可以,绝对路径也可以,那由于我现在是不是四个包都在这个文件夹里面,都在这个路径下啊,所以我直接在这里执行命令就OK了啊,就不会出错了,来我们比如说我执行第一个啊,粘贴回车。好了,Success,那就OK了。
来再看第二个。在安装。啊成功,那其实你四个一起拷贝就行,我只是为了让大家看清楚啊,我一个一个执行,第三个完事了,第4个。就OK了,那么你只需要将这4个架包安装到本地仓库之后再编译,就不会再报那个错了,这个是必须做的啊,好,那现在回到hudi的目录吧啊,好,这个是怎么来解决需要的kafka依赖啊,这个需要手动安装的啊。

Hudi编译_解决Spark写入Hudi的兼容性问题

image-20240307175357561

1
2
3
再接下来呢就是要还有一个跟Spark的问题。这个问题呢在你编译的过程中并不会暴露出来,但是在你使用的时候才会出问题。就如果你我看我没有把报错写上来啊,看这里就好,我就不再演示了啊。也就是说如果你没有解决我这个事儿的话,你在编译完成之后,你用spack来操作hudi表啊,你能进去。但是呢你再往一张hudi表插入数据的时候,你执行一个insert,它会报错。哎,他说找不到一个方法,什么方法呢?什么apache的jetty,这个相关的错那这个东西呢是为什么呢?我跟大家讲一下啊,因为它这边呢是Spark模块,它会用到一些hive的依赖。那由于hive的依赖,咱们已经改成了3.1.2,对不对?那他本身携带了jetty?那hudi它本身的COMM模块,呃,它也有一个jetty,说白了就是什么把就jetty的依赖冲突问题,所以咱们这个问题要手动去解决一下啊,解决一下嗯,让大家可以看到呃。hive携带的是0.9.3,hudi本身用的是0.9.4,啊,存在一个依赖冲突。所以这个时候我们要去修改一个依赖啊,排除低版本的阶梯。

那么大家注意我们改的是哪一个里面啊,是hudi-saprk-bundle绑定的这个pom模块啊,你不要改错地方了,这个模块才需要改啊。嗯好,那具体在哪呢?来我告诉大家,呃,在源码路路径下面呢,你找这个packaging对吧?在这个里面好在这个里面大家看一下,它有各种绑定的什么什么bundle对吧?那有hadoop的有hive的,有presto,Spark是呃有flink对吧?那我们需要改的是这个hudi-Spark-bundle。好,进来之后直接打开啊看一下吧,它里面其实就是什么一个源码src,一个pom文件。那么直接打开pom文件,它这里面你搜hive。再往下你看它会包含一些hive的依赖的,还有了医院。好,那再往下搜。啊,不搜了,我们直接看吧。呃大概在三百多行的位置有一个,呃,比如说我搜它吧。Hive-service, 哎,你看这边是have的一代啊have的一代。那在这里呢我们它引入了,你看hip版本,我们已经改为什么3.1.2啊,3.1.2,所以会有问题啊,那就挨个加呗。来了看一下啊,应该大概是在382行的位置啊啊我给你改一下位置呢,382

image-20240307180241975

1
2
3
4
5
6
7
那在后面我们加一个排除啊,你直接拷贝就行了啊。这是我已经改过了,哎,他已经排除了一个了,那么直接添加这几个就行,添加这三个啊。在这里添加这三个还没完呢,这只是第一个依赖要改。

再往下这个什么service RPC再往下翻,service RPC不用动,再往下hive-jDBC还是要排除。啊注意这边就需要大家细心一点,不要粘错了啊。好,hive meta store一样的啊,要排除一些。再往下走还有一个hive common啊,也有一些jetty。那最后呢我们再手动加一个jetty啊,咱们需要的啊增加一个jetty版本。那我标红的部分你就整体怎么一起拷贝就行了啊,直接在zookeeper的前面,在这里啊直接粘贴,哎,加这么几个依赖,那这样的话就能搞定我前面说的这个问题。

当然如果你用不到Spark,或者说你只想要flink那这个事你可以不做啊,也就是说Spark的这个包你不用呗,好吧?那由为可能大家不一定会用什么,所以我就都讲啊。好,保存退出,那这样就ok了。那前面讲到的这几个问题就是一些坑啊,给大家说一说。那跟spark使用了一些依赖问题。

那还有一个地方呃是另一个模块,是我们的hudi-utilities这个模块啊,因为在这个模块当中有一个功能,有一个导入的工具叫做delta streamer啊。那在我们后面也会介绍到,在Spark集成这一块有个4.5啊导入工具。这个模块里边也可以去跟呃HI普做一个同步。那这个时候就涉及到跟HI普3.1.2这个阶梯的问题了。所以呢这个模块同样要做相同的操作,要排除阶体。好,那就同样的我们来做呗。好,我们看一下,就是有这边有一个UTS啊进来hooody uties哎白斗,然后修改它的home文件,那么大概应该是在四百多行的位置啊,我看一下405行的位置啊,它这里你看到没有?它也有一个害补依赖,那也有一堆东西都没有排除,那我们还是相同的啊呃第一个service还是添加一个排除依赖好。再往下这个RPC不用管,再往下这个ZDBC还是要做一个排除。再往下看了,这个meta store也要应该也是要做排除的啊。好,再往下有一个have common也排除一下,嗯,往下走,往下走,哎,这是最后一个了。那同样的再往下就到其他的地方去了,对吧?那这个时候我们再引入一个凸底对应版本的阶梯就可以了啊,把这个添加进来。我加在这个后面吧,在zookeeper前面。好,嗯,那么这样呢就可以了。那回头这个呢我们再去使用,就也不会出现说那个jetty的报错了啊,这样就ok了。

Hudi编译_执行编译命令&jar包位置

1
2
3
4
5
那最后就剩一步了,就是执行编译就可以,我们先回退到这个解压的跟路径下面啊,hudi 0.12.0,好,在这大家注意啊,要在这里执行啊,在这个地方执行mvn编译就可以了,那么这边大家注意看一下我的命令。我的命令指定了啥呢?你看mvn,呃,清理clean,然后呢,打包package跳过测试-DskipTests,那后面是关键啊,同学们首先-D,我指定的什么Spark的版本-Dspark3.2,因为我前面看release也跟大家讲了,它是可以支持什么,诶我2.4的Spark 3.1的Spark 3.2的Spark,那这边你不指定默认就是3啊,默认就是3,呃,那我们这边最好是指定具体版本3.2,另外flink是不是支持13 14都支持啊,对吧?那你也要稍微指定一下你具体哪个版本,那我用的是1.13flink,大家注意格式不能变啊,就按我这个格式写。第还有一个呢,就是flink也好,Spark也好,都有不同的scala版本,对吧?啊,那这边你也要指定一下scla版本,我用2.12,大家注意格式是这么写啊,中间得有一个横杠啊,另外呢,就是当然我这边是多此一举啊,就指定哈,hadoop跟hive我已经改完了对吧,但是后面这个最好还是加一下啊,指定我编译集成。这是HAVE3啊,行,另外指定一个哈杜版本,总而言之言的,总之你就照着我这个来改就好了,因为他这这个命令有很多,还有各种简写缩写对吧,比如说你可以只写一个SPARK3,那它默认是用哪一个版本,这啊那的啊,你就指定具体的就好了,好吧,这个东西如果你想看我提一嘴啊,他在这里有一个read me呀,没了吗?好大写的,在这里他有简单的说了几句啊,你可以自己看看啊,一些编译环境要求,比如说JAVA8以上的得有记,呃,对main问得大于3.3.1对吧?啊,然后可以指定Spark怎么指定。对对,这边都我告诉你,你不同的写法,它指定的还有兼容的版本是不一样的啊,你自己在这边找一找啊。那包括弗link也有好吧啊,弗link也一样,你不指定斯GALA版本,它默认就用的2.11啊,这个是大家要注意的地方啊呃,最好的方式就像我一样,所有的东西我都给你指定好了,不啰嗦了,直接把这条命令怎么样拿过来,然后呢,在这个跟路径下面执行,哎,拷贝粘贴回车,接下来就是等待就可以了啊。如果你按照我的步骤,前面都修改完了,接下来就不会报错了啊,那我先停一下,等他编译完。

好,那大概大家可以看到,经过了8分多钟,那终于编译完了,各个模块都编译好了。那接下来的问题就是,诶,我编译好的东西在哪里,怎么用啊?因为我们前面讲的hudi并不需要我们去部署启动做这些事儿,它是通过依赖的方式放到对应的引擎当中就可以了,对吧?呃,那首先我们验证一下是否成功呢?我们可以用一下hudi自带的客户端,看一下能不能启动起来啊,那其实就是什么呢?在咱们的根目录下面,我直接有一个hudi cli模块啊,然后在这个里面呢,有一个hudi-cli脚本啊,你试一下啊,回车。
那如果进来这个界面呢,啊,那就说明你是OK的啊。好,那除了这个之外呢,我看一下quit不用分号啊好。
那接下来就是我们需要的架包在哪?那我们知道前面我反复强调主要在这里啊,Packaging它每个模块,呃,相关的东西都会放到这个里面去啊,那我们直接进到这里面看一下,这个前面也看了是不是有各种各样的模块啊,其实就是什么呢?你需要什么就拿什么,比如说你要跟。Flink集成,那你就到这个文件夹下面,把相关的架包拿出来放到flink,呃,依赖路径下面就可以了,再比如说诶,你需要跟hive集成那里。跟hive相关的,因为hive还得用到hadoop嘛,这两个东西,两个加包放到hive的依赖目录下面就可以了啊就这么简单,再比如说你需要跟呃,再比如说你要用Spark,那就是在这个地方将jar包拿出来,那这边我们以flink为例啊,那比如说我要拿flink的包,我要跟flink集成,那就进入hudi-flink绑定了这个模块里边来看一下,编译完之后这里是多了一个target目录,来进入到target来看一下这个路径啊,看一下这里呢,会有好几个jar包,对吧?啊,那像这种source我们就不用了,Original呢,也不用了啊,这不太依赖的,那我们看直接拿这个包。就行了,就把这个包拿到就OK了。好吧,这个就是我们的编译过程啊,那如果你不想自己编译,并且组件版本基本跟我一致啊,那你直接拿到我给什么资料,有一个编译好的包这里呢,我把几个常用的啊都给你摘出来了啊,像跟flink的呀,那hadoop的呀,跟hive的pstal Spark.这些包都给你准备好了啊。好,这个就是我们的编译过程。

核心概念

基本概念

时间轴TimeLine

image-20240308134501651

instant
1
2
我们了解一个护底,那么它特有的一些概念,我们也要首先了解,那首先呢,我们看一下三点一基本概念。那第一个呢,就是时间轴。那这个时间轴怎么理解呢?其实就是什么?这你看看这个图,这是不是一个时间线对吧?那么中间的每一个点,上面的每一个点都是一个时刻,对吧?那其实就是什么呢?它这个时间轴在不同的时间点上都记录了我们做的每一种操作。那这个怎么理解,拿我们生活中的例子来比喻啊,比如说你一个人在一天当中,你对他做了一个记录,那比如说啊,早上呃,6点他起床刷牙洗脸,诶,你记录一下,比如说你拿个小本子把它记下来,那再者说呢,啊,到了6点半啊,他蹲了个坑啊。那你记录一下,那再比如说7点。他吃完早饭出门了啊,出门啊,然后怎么样怎么样,就按照这个时间的发展,在每一个关键的时刻呀,啊,每一个动作,每一个行为,你都将它记录下来,那记录下来有什么用呢?你是不是可以方便去回看呢?比如说到了晚上,呃,8点了,你想回忆一下,诶早上8点钟我在做什么,你是不是看一下你这个记录就可以了,那同样的道理啊,啊hudi里面的时间轴就是这个意思。它就是一个时间线啊。随着时间推移的一个时间线,那你的每一次操作他都记录了下来。这个就是它的一个时间线啊,时间轴啊,也有的翻译为时间线,还有呢,就比如说我们一些博物馆,不是有一些历史类的博物馆,那它可能会设计一个走廊或者一个展厅是什么呢?呃。每一年的重大事件,比如说1921年这边发生了什么什么事件,再往下一个。资料图片,它是比如说19啊多少年,1931年啊,然后这边有什么重大事件,在1932年什么重大事件,三三年什么重大事件,那这个也像是一个时间轴啊,我们走过这个回廊,就像穿越了历史时空一样。好,那刚才是简单来理解这个时间轴的概念
那我们具体来看一看,在这当中。每一次记录的东西叫做什么啊?它有一些特有的叫法,那首先第一个名词呢,就是instant。这个怎么翻译呢?咱们可以理解为时刻。啊,那我们我这边执译是翻译成了及时时间,也就是说每一个时刻,就像刚才举的例子,哎,这个六点半这个时间点你做了什么事,对吧,那7点钟这个时间点你做了什么事啊,这具体的某一个时间。对应的事啊,就这个具体某一个时刻,这个就是instant啊,所以呢,我们可以想象得到,呃,hudi上面的这个time上面就是有一个一个的instant。每个instant就记录了对应的时间,还有我们的操作。

image-20240308134803234

image-20240308135024365

1
2
那这个instant有这么几个组成部分啊,大家看这个图instant,一个instant有三个部分,第一个你做的动作啊,就像刚才例子啊,你6点半你的动作是什么啊,是起床洗漱对吧?这就是咱们记录下来的action。另外一个是不是时间点呢?time也就是说是什么时候发生的啊,6点半这个就是这个太state就是一个状态,这个是属于呃
它的状态有三种啊,一种是来我们看这里。已经调度尚未执行,或者正在执行或者已完成,也就是说他当前的状态,干这个事是干的怎么样了,是正在准备做呀,啊,还没还没具体做对吧,还是说正在做还是已经做完了啊,这就是一个状态的概念

image-20240308135132319

image-20240308135922210

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
那么具体来看一下啊,这个action最hudi当中有哪一些东西啊,那看图更直观一点啊,你看有commit,就是提交对吧,还有清理cleans,还有增量提交delta commit,还有这个compassion,也就是翻译成压缩或者合并啊,那还有一个什么回滚rollback,保存点savepoint啊,那这个我们先留一个简单的印象啊,但大家知道这些动作。

前提我印象有这么一些啊,那我们具体来看一看吧,一次commit表示将一批数据原子性的写入一个表,那么大家注意了,这边有几个关键词啊,一次提交表示什么?一批数据,另外一个词是什么原子性?写入一个表啊,这边是这个含义。

好,那这个清理呢,就是会清理我们旧版本的文件啊,就是做一个清理操作啊,你不再需要了,不再呃成为垃圾了,那我就是啊,把你清理掉呗

另外一个就是这个delta commit,这是增量提交的意思。增量啊,关键词是增量,另外呢,还是一批数据,还是原子性,大家注意这边特指的是某一种mergeonread类型的表,它有这个增量提交这个概念。这个这个单词也留一个印象啊,后面我们也会介绍啊,一般简称Mor啊,读时合并。这个就涉及到我们后面要介绍表类型啊,它主要有两种啊,那MOr呢,是其中一种,MOr就有一个增量提交这么一个事儿啊,另外个这个

合并呢啊,其实就是什么呢?你看合并内部差异的数据结构。也就是说我有多呃多个版本,或者我做了更新了啊,比如说将更新操作从某一个地方合并到另一个地方。呃,举个例子啊,呃,但是这些概念我们还没闹啊,就是它有一个东西叫基本文件。啊,就是这种mor的表啊,可能有一个基本文件,它是parquet的格式,这个我们先简单了解,简单理解啊,那每一次的增量提交,它是直接写到这个基本文件吗?不是啊,他每次的增量提交都会写入到一个log结尾的文件当中啊,每次提交都会写到一个log文件当中,那既然数据都不在一起,那回头你是不是得干嘛呀,把他们合并到一起去啊(基本文件),对吧?那比如说他们一起合并,所以把这里的数据啊都合并到这里来了啊,这是其中的一种合并

那还有一种就是回滚呢,也就是当我们写入啊,不管是commit还是delta commit啊不成功的时候,他会进行回滚。他会把他的一些临时文件给删掉啊。

还有一个save point将某些文件组标记为已保存,以便什么不会被删除,哎,所以这个时候我们知道这个save point就是什么,避免被这个清理操作给删掉啊,它会做一个标记啊,仅此而已,那它的目的和作用也很明显,就是呃做呃,使我们的数据有个备份嘛。好,这个是action啊,Action就动作啊,我们简单一起输,看这张图啊就行了,其实意思也很直白啊

那时间呢,时间instant time通常是一个时间戳,那你看啊,比如说这个。大家注意,他不是说按我们那个UTC那个时间抽啊,从1970年1月1日00:00:00开始,那个时间不是啊,它是年月日十分秒这么一串数字,那你看2019年1月17日一点。啊,13分49秒是这种时间错,那么这个按照动作时间的顺序是是不是单调增加了,为啥呀,我们的时间是不是正常来讲,记录是不是由早到晚的是吧。

那这个状态刚才讲过了,一个是还没开始。一个是正在正在执行,一个是已经执行完毕啊,这是这个动作对应的一个状态
时间概念
1
2
3
4
5
好,那最后一个呢,我想强调的是一个时间的概念。时间的概念,这边的时间我们要区分两种概念,第一个就是hudi去做提交,也就写入的这个时间,或者说咱们可以理解为数据到达湖底的时间,因为你commit数据不就进湖底了吗?对吧?Commit不就写入湖底了吗?啊,那这个时候其实也就是到达湖底的时间是吧?那么大家知道数据本身是不是也可能携带时间啊,所以另外一个东西叫event time

那这个地方呢,大家可以联系一下flink当中的时间语义。flink的时间语义是不是有3个呀?啊,一个是什么处理时间,注入时间,还有事件时间,那我们有时候在处理数据的乱序这些问题的时候,是不是要采用一个event时间呢?啊,按照数据本身携带的时间来啊处理

那么在hudi当中呢,我们也是可以区分。这两个概念啊,一个是hudi处理的时间,第二个是数据本身的时间,为什么呢?这种其实还是为了处理迟到数据啊,我举个例子啊,有一条数据,它是比如说8点钟产生的。那这个时候这条数据,比如说携带的一个时间戳啊,就是今天早上8点,但是呢,由于我们需要将数据采集,最终写入固底,对吧?或呃,可能中间由于网络延迟或者各种故障异常原因啊,但是最终我们写入hudi的时候可能已经8:10了。对不对,这中间是不是就有一个时间差,好,那我们再来一个极端的例子,那再比如说我是8点,我是按小时来做一个分区,这个分区大家就理解一下hive表的那个分区就可以了啊,比如说我按小时分区,有条数据是8:58生成的啊,然后由于各种原因,我写入hudi的时候已经9:01了,对吧?那么如果你按照这个commit的时间,那么这条数据是不是就会被写入到。9点的那个分区,但是这条数据明明是8:58产生,是不是应该进入八点的分区是不是啊,这个就是我们取事件时间的一个意义所在

image-20240308141143253

1
那么看一下hudi对于这种事是怎么处理的啊,那么看上面这张图就可以,呃,这边采用了还你看啊,也是按照小时作为分区字段,也就是说一个小时是一个分区,对应一个目录,大家用hive的那个分区来理解就可以了啊,那么从10点开始陆续产生各种commit。啊,从10点开始,那么10:20的时候,来了一条9点的数据,意思就是这条数据是9点产生的,但是到了10:20啊,他才commit到咱们的hudi表里面去啊,那这个时候根据事件时间这条数据啊。啊,我们这个分区字段可以用数据的它本身的时间来做分区嘛,对吧,你是9点,那你自然应该落到什么,写入到9点的分区,这个是没问题的,也就是说它能够落入对应的分区里面去,那这问关键就在于那我在读取hudi表数据的时候,我在消费的时候,呃,怎么拿到这条迟到的数据,这个是不是算迟到啊,我时间已经10:20了,但是9点的数据刚来。啊,那这个也没事,我们只需要按照正常的这个到达时间去过滤就可以了,这条数据我们我们增量去消费的时候,比如说我是消费10点钟以后提交的数据,Commit过来的数据,那么9点的这条数据它是10:20才提交过来,那么我们也是能够消费到。啊,说这么多就是想表达一点,呃。我们不用去担心迟到数据消费不到,第二个事呢,迟到的数据,嗯,他该落入哪个分区,还是能够落入哪个分区,当然前提是咱们这个用法是对的啊。啊,这个是一个时间概念,应该也好理解啊,总结起来就是这张图啊,一个时间线啊,每一个时刻都记录了一些事,那每一个记录当中分为什么时间,还有做了什么事啊,现在处于什么状态啊,这么三个东西。