尚硅谷大数据技术之数据湖Hudi-3

尚硅谷大数据技术之数据湖Hudi-3

数据写

image-20240311223824434

1
2
3
4
5
6
7
8
9
那我们来聊一个hoodie当中的一个核心操作,也就数据写入的一些操作啊,还有一些基本的原理,这个大家就视情况来了解。如果你仅仅是应用呢,你这一块就简单搂一眼就可以。如果你想啊多了解深一点,那你就好好去了解这一块,研究这一块。啊

那我们基本要知道的是hudi的写入操作啊,大概可以分为三大类啊。第一类是所谓的upsert,也就是说插入或者更新。那这个时候它是严格依赖于什么呢?索引。它是必须依赖于索引的。也就是说upset的必须依赖于索引啊,这也是hudi的一个特点特性,高效的upsert啊,还有结合索引。另外呢他会去标记你是插入还是更新的数据。另外呢根据不同表不同场景啊,也就是说他有一定的策略啊去写做一些写入的行为啊,这是upsert的我们具体看啊

另外一种就是insert,就是指追加的方式。其实啊那就是跳过索引啊,就是你这个索引我不管了,反正我有新的数据来,我管你是什么样的,我都直接就是追加追加追加追加啊。但是这个呢嗯回头你要自己去做一个去重的一个处理啊。

还有一种是bulk_insert,是那个批量插入,批量插入这种是后面新版本提供的一种呃优化功能啊,这样的性能会更好。它会做一个排序,然后进行一个写入啊,它是对大数据量的hudi表呃初始化的话,用这种方式会更好一点啊。嗯,而且对文件大小的限制是最好的一个效果。行,那就是粗略的知道有upsert,有insert,有bulk_insert对吧?

啊,那最重要的默认的就是这个upsert是我们一定要了解的。

写流程(UPSERT)

COW

image-20240311225750511

1
2
3
4
5
6
7
好,那我们具体来看呢,upsert咱们就按照两种表来解读。第一种是COW表它怎么做的。第21个呢就是MOR怎么做的。其实前面多多少少聊过,我们再捋一遍啊,对于COW表我们进行一个upsert,它是先什么呢?按先对我们的这一批数据按照指定的key去重

举个例子啊,1A2B3C,比如说是这批数,这是一批数据对吧?啊,那如果我们指定的第一列为我们的键,那就就这个id值就是这样了,对吧?那比如说你数据还有一条1A是不是有重复啊啊,它会先按照指定这个key,哎,先是给你去个重啊,然后呢对这一批数据创建索引。所以我们讲的就是record key呃,对吧?加上分区路径啊,然后呢跟我们的文件id文件组的id呃形成一个映射关系啊,也就是说会记录哎这一条数据呃对应的是哪一个file id啊。行,这个索引细节不聊了。呃,另外这个索引还有一个作用是区分我们是更新还是insert的。insert呢就表示这个这个key是第一次写入啊。如果不是第一次写入,那就是需要更新嘛,这个好理解吧,也就是通过创建索引,并且区分更新还是插入。

那接下来既然要区分,那就有不同的处理方案了。一个是对于update,一个对于insert的。如果是更新了,他会找到这个key啊。咱们这个hudikey对应的这个呃文件片,咱们所以不是维护了这个key跟文件片的关系嘛,对吧?啊,他能找到他找到这个文件片之后啊啊,不是文件组ID啊,那文件组ID再找最新的文件片,然后再找到里面的base文件。Base文件什么意思呢?我给你说啊,parquet就是它的呃,就是如果你看到说什么基本文件呢,列文件呢呃parquet指的都是这么一回事儿,就是指的这种parquet文件好吧,并且呢做一个什么merge完之后写入到新的什么base也也就新的文件片就生成新的parquet嘛。

这个就是更新。那如果是纯粹的插入呢,我不是更新呢啊他就有点不一样了。呃,他会先扫描当前分区下面的小文件,也就是说有一个阈值啊,这个参数可以控制。比如说是十兆,哎,我说小于十兆的这个parquet文件,呃,如果有,那我这个插入的消息我就往里面去写,往里面追加啊。如果没有这种小的parquet文件small file我怎么办呢?啊,没有小的让你蹭啊,你蹭不了啊,那怎么办?那你就自己来呗,你写一个新的不就完事了吗?啊,就写入一个新的文件组,文件组里面的一个新的文件片能理解吧?也就是说有地方蹭那就蹭啊,没地方蹭就自己去开一间,对吧?啊,这个简单啊,这是COW

MOR

image-20240311231611456

1
2
3
4
5
6
7
那MOR就麻烦了啊,也不是麻烦啊,就是跟索引有很大关系啊。MOR的话来如果是MOR的upsert写入啊,那也好理解啊。哪里呃,首先还是一样,根据咱们指定的数记录键或者叫数据键进行一个去重。就刚才例子啊有1A2B3C啊,呃然后再来一条1A你指定第一个字段ID为为这个键的话啊,那就先按照这个键去重。哎,哎发现有两个一哎去个重我只保留一个啊。好,然后呢同样是什么创建索引,然后这个索引也会区分是update还是insert。这个这两步跟COW是一样的啊是一样的。

好,接下来就不一样了啊,它对insert的跟update分别是怎么处理呢?我们来看啊,对于insert,如果因为正常咱们增量提交嘛,MOR是不是要写点log文件?前面我也简单提了得区分能不能建索引,是不是?如果是不可建索引,也就是你用的是布隆啊,说白了就是布隆嘛,这也是默认的,对不对?他会干什么呢?默认去分区内最小的base file。如果建不了索引,他就去蹭别人的。能理解这个意思吧?你建不了索引,那我就去蹭别人了,这是插入的消息啊。然后蹭别人之后,跟别人合并成新的一个文件片啊。那如果没有小的基本文件可以蹭,那就一样自己写一个新的呗。新的文件组里面的新的片的新的parquet,这是见不了索引的时候啊,就生活不能自理呗。

那如果你能够建索引,也就是说如果你用的是flink对吧?咱们就直接把这些联系起来,你就好理解了。如果你用的是flink它基于状态可以对log file建索引,它直接干嘛呢?它是追加log文件,什么样的log呢?小的log。如果没有小的log文件可以蹭。那就自己开个新的呗,新的文件组新的文件片新的base file。

好吧?嗯,能不能理解这个意思啊?简单来讲,如果是插入消息,呃,log索引啊,就区分可不可以建。所以不可见他就去蹭人家的小parquet。然后合并成新的啊。如果蹭不了,就自己写入一个新的新的文件组就行了啊。如果可以见索引引它也是蹭,但是它蹭的是什么小log文件,如果蹭不了就自己写一个新的啊,一样的道理啊,简单理解就是这样吧。
1
那如果是更新的消息,这个就简单了,你也不用说蹭不蹭的了,你更新你就要找到你要你的老数据在哪,对不对啊?它会写对应的这个file group,还有对应的文件片啊,就老数据所在的位置啊,直接什么呢呃,append最新的log file对吧?那如果刚巧是最小的小文件,也会进行一个合并,生成新的文件片啊,这个就你就不用管这个细节了啊,反正如果是更新消息,那就找到对应的是哪个组的啊,你是要更新的数据在哪个组啊,然后对应的新的文件片,然后去追加log就行了。因为你一直追加,你是不是不可能无限膨胀啊?对吧你一直在追加对吧?所以它达到一定的阈值会做一个滚动啊滚动生成一个新的log。好吧?这个是upsert的一些细节。其实你说它有用吧,也没用,但是方便我们去理解呃一些现象。比如说你去看HDFS的时候啊,就看到那些文件的怎么有时候几个log有时候又是怎么样怎么样,对吧?啊,去方便理解。

写流程(INSERT)

COW

image-20240312235702570

1
呃,那其他的像insert这种写入流程就简单了啊,咱们快速过啊啊还是区分两种表,COW跟MOR表啊。那如果是COW它一样的,先对呃通过这个key啊进行一个去重啊,你也可以选择不去重啊,大家注意是可选的。另外呢我们前面讲了index啊,insert不会创建索引。呃,接下来就是蹭的问题了啊,如果有小的基本文件,那就去合并啊生成新的,否则呢没有小的可以蹭,就写新的呗啊,这个就insert了,很粗暴很简单,

MOR

image-20240313000158116

1
也简单一样,我们可以按照这个key去重,也是可选,也不创建索引。那接下来无非又是那一坨式,就是你这个log文件有没有索引啊,能不能索引,对吧?如果可索引,并且有小的可以蹭,那你就去追加啊,如果没有,那你就写呗,写呃追加或者写新的一个文件。呃,这边是为什么是或写最新的文件?因为咱们前面讲的不是有一个什么呢滚动吗?就是说log有大小限制啊。好,那么如果这个log file也不能够索引,那直接就自己写个新的就完事了啊,新的文件片大家注意是新的文件片啊

写流程(INSERT OVERWRITE)

image-20240313000718129

1
2
3
啊那insert overwrite呢,这个是后面才支持的一个功能啊,叫覆盖写。呃,他会在同一个分区中创建新的文件组,并且呢现有的文件组被标记为删除。这个就是区别它创建新的同时将旧的组,大家注意它操作对象是组啊啊,它将旧的组标记为删除啊,根据新记录的数量创建新的文件组啊,说白了就是重来啊。

那这边有一个小案例,大家看就行了啊。呃在插入之前呢,这个时候分区有一个文件组一、文件组2,对吧?啊,他们有一个版本的,都有一个PACK的文件啊,那如果你是插入相同数量的记录覆盖,也就是说原先有呃这个组有十条数据这个组有十条数据。那这个时候你是hudi的overwrite还是20条写过来,这个时候它会生成新的组3跟新的组四啊,并且是呃也各有一个pty的。然后呢,老的一跟二要被覆盖的这两个就在原数据中被标记为无效,标记为删除,就这么简单,就是这么粗暴啊,生成新的再把脑袋干掉,过河拆桥对吧?就这么简单。那插入如果是更多的记录,那其实没什么营养啊。比如说原先只有20条,你覆盖写入是100条,那无非就是生成更多的新的什么文件组。那同样的呢你要将呃。翻译一翻案2,就老的在T1后,原数据被标记为无效,就是老的又被过河拆迁了。啊,好吧,那其实这几个只是啰嗦一下啊啰嗦一下。

image-20240313000818437

1
2
3
4
5
那如果是MOR表有什么特点一样啊,一模一样,好吧,这个就不啰嗦了。

嗯,你看他们执行方面非常相似啊,而且不会干扰咱们的这个compassion。呃,然后可以减少parquet文件的大小啊,对吧?因为你重新规划了嘛,就相当于说你你旧改嘛,你家老的自建房都推都都拆了,然后通过规划去盖房子,那肯定是呃空间使用更合理嘛,是不是啊,不浪费空间嘛。啊,行,不需要更新关键路径中的外部索引,对吧?我不用去更新收引。好吧,还可以扩展清理策略。

呃,缺点呢也有需要转发以前提交的原数据。另外呢呃还有一些小问题,当然这个缺点是在呃官方的一个呃啊设计当中的一个考虑啊。这个咱们现在也不用去操心,也就是说有一定的小代价吧。一个是转发元数据,第二个呢是将忽略老数据呃。但是这个事不用我们关心,他呃,会有方案来忽略他们。这个就简单介绍一下啊,那咱们在这里面最常用的还是这个什么upsert,这是最重要的一个特性。

Key生成策略&删除策略

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
那其他的一些小东西咱们就简单了解一下就可以了啊。比如说有一个key的生成策略,咱们前面一直讲索引对吧?索引对应的是不是有个hoodie key啊?那hooodue key由咱们的记录键跟分区路径组成的

那这个地方这个记录键那可以怎么做啊?然后分区路径又有哪些限制啊?就简单说一下。那首先这个record key是可以有多个字段组合的啊,比如说我们的字段有啊ID啊name,然后比如说性别、age这么多个来几条数据吧。一啊张三,性别男,呃,年龄十八啊。比如说这种数据

你可以只以一个字段ID作为record key,你也可以拼接这个id跟性别作一个key啊,也可以啊就联合的的key。

另外这个分区路径呢是可以有多个字段组合的。呃,多个字段组合呢为什么呢?因为你分区是不是可以有多级分区啊,也就多层目录呗,多层那这个也是允许的。另外呢如果你是非分区表也是可以的啊,这个都无所谓啊。

另外就是比较重要删除策略,hudi对于数据的删除
删除大家简单呃大家联想一下hive怎么删除数据呢?嗯,对吧?或者说我在某一张分区表某个分区,我里面有12345啊,某呃别管是不是hive了啊,就比如说某个分区下面有12345呃五条数据,我想把第三条数据删了怎么办呢?如果以前的传统做法,你是不是insert overwrite,你是不是overwrite一个1245进去,是不是就把删给给删了,对吧?但这样大家操作比较重了,如果我这个数据有一条,你只删一条再写一遍啊,没必要啊

所以它有两种,第一种呢是逻辑删。我将value字段全部标记为null就可以了是吧?除了key之外的,其他的我都全部标记为null,这是逻辑上的删除。

那第二种呢就是物理删,就正儿八经删啊,通过配置一个key删除所有的输入记录,就OPERATION_OPT_KEY对吧?另外一个就是利用它的payload机制,指定我们的payload类为一个空的PAYLOAD_CLASS_OPT_KEY = org.apache.hudi.EmptyHoodieRecordPayload,这样的话它会删除所有的记录。

啊,另外呢还有一种方式呢,就手动添加一个字段。添加这个字段呢就是固定的字段名啊,添加这个字段_hoodie_is_deleted,然后就可以了啊,就标记为删除了。这种是一些删除的策略啊

那么整体来看呢,核心优势最重要的是什么?就是upsert啊。那基于upsert,咱们对更新也好,删除也好,它非常高效。为什么呢?索引对不对?索引啊所以前面聊了这么多呀,描了这么多概念。其实最最重要的啊,大家有必要去了解的,好好理解的一个是什么?时间轴这是基本的吧啊文件布局要了解吧。那这个索引对不对?表类型、查询类型简单了解啊,因为你回头一直用啊,你要去用,你肯定会涉及到。那写操作最重要的就是这个upsert啊就够了。

数据读与Compaction

1
2
3
那接下来就剩一点小概念了啊,数据读这一块前面都其实都介绍过了,我不再啰嗦了。就一个快照读取,一个增量读取

那还有一个叫流读,那这个主要是在0.8版本之后的flink这一边支持流读。那当然后面我们也会去演示啊,有一些实际案例去操作啊。真正的是一个流式的读取,它是source会定期监控新增的改动文件,将读取任务下派对吧给读了这个task,这个后面再唠吧。你简单留个印象
1
2
3
4
5
6
7
那我们聊一个compassion,compassion来一起回忆一下它是什么表。有的呀MOR表啊,记住啊,COW不用compassion。那如果呃我是数据刚刚开始写,那这个时候是不是没有base file,也就是没有parquet。那这个时候你要compassion就是什么呢?它走的是类似COW表呃,COW这种insert流程,直接什么merge所有的log文件,生成一个parquet文件,就这么理解就完事儿了。

那如果呃现在已经有parquet文件,并且呢也有log那怎么办呢?啊,他会先什么呢?其实就是一个cow upsert流程,他会先读log文件,建立一个索引,再读什么呢?老的parquet文件。建完索引读part quet,读完parquet读log之后呢,将它俩合并到新的一个parquet文件。啊,就这么简单嘛。

啊另外要想强调的是,咱们这个compassion是可以走异步的。什么叫异步啊啊我相信对同步异步这个概念应该还是有些人搞不太明白。这是呃应该说作为一名开发基本的一人基本都要了解的吧。那所谓的异步就是各管各的呗,对吧?就有点像咱们田径比赛当中的什么呢啊,比如说400米比赛吧啊是不是每个赛道一名选手,对不对?那你开始之后啊,裁判发令之后是不是各跑各的?你总不会说你跑得快,你跑到前面说,哎,我等你一会儿,你没有这回事吧,你专心跑你自己的就行了,是不是?这是一个异步呢?那还有一种同步,你可以怎么理解400米接力啊400米接力啊。如果上一棒400米没跑完,你下一棒不能跑,你是不是得等啊,是不是得阻塞呀?啊那一般来讲这么来粗糙的理解就够了啊

那这边的异步指的是什么呢?呃,就是咱们写入的这个动作跟我们执行compassion,他俩之间不用互相看脸色,不用互相等待啊。Compassion由你自己的条件来触发执行就行了。我写入管我写入的就行了。嗯,那这个compassion的呃compassion的策略可以按照呃几次commit,或者说按照固定的时间来触发,这是有不同的策略对吧?那一般来讲呃会用数量更多一点啊数量更多一点。行,这是咱们聊的一些小概念了,后面几个就简单过啊。
1
接下来我们动手体验一下跟Spark集成来操作护底表这么一个操作。那么首先呢咱们还是要回顾一下HOOO底支持Spark的一个版本。那么前面也跟大家聊到了不同的呼底版本支持呢,Spark系列是不一样的啊。那这边我列出了对于Spark三的支持啊,那要注意在0.12当中呃支持的是3.1、3.2、3.3,没有3.0啊,没有3.0。那如果你想要3.0,你可能呢你得用最新呢,只能用到0.10。当然呢对于二系列还支持2.4啊,这个我就不啰嗦了。那一些新特性啊也是在3.2才支持的那本教程呢咱们就以3.2的Spark为例啊,那首先需要大家先去安装部署一下,那么你可以通过这个地址直接w get去下载下来。那我这边是有了,下载下来之后呢,你将这个踏包呃通过踏命令去做一个解压,解压到对应的目录就可以了。那么大家可以看一下,我的model下面已经解压了一份Spark302.2,就是这个啊。另外呢解压完之后,最好还是配个环境变量啊,方便使用。哎,provide点d my ENV啊,我习惯上写在这个文件里面。在这呢我已经配了一个Spark home,那这个路径啊写成你的路径就可以了。修改完环境变量,不要忘了做一个source常规操作啊。扫描一下这个环境啊,那就可以了。那么接下来呢呃我们需要集成,只需要做一件事情就可以了。将我们编译完的Spark hoody包放到Spark的呃依赖路径下面即可。啊,那在哪呢?我先把我老的那个删掉啊。好。我们听到soft沃尔弧底呃0点一二里面有一个package,在这里呢有一个Spark谋害啊who蝶Spark这个那么进来之后呢有一个target,target就是咱们编译完的一个路径,那里面有一个架包对吧嗯。呃,呃咱们要的是上面这个呃名字最短的这个就是好,那我们拷贝它,拷贝到OPT module Spark3点2.2里面的驾驶,放到这个里面就可以了啊,我们拷贝过去。那拷贝完事儿之后啊,我们搂一眼吧。那这个包已经在这里了,这是我们编译完成的包。那么接下来你我们只需要去启动我们的Spark就可以。那通呃Spark呢怎么来操作这个护底呢?呃,你可以直接通过share Spark share直接去快速的体验。那你也可以去到你编呃打写成一个代码,然后打成价包的方式去提交也可以啊,那我们先介绍一下这个渲染的方式啊,这样比较快啊,快速的体验。那么大家别忘了这个它Spark对于依赖的加载,它是静态加载的。也就是说如果你已经是启动了一个呃什么样的集群,比如说是真的弄的Spark,那么你添加新依赖需要重启一下它才能够加载到啊。行,这个不啰嗦了。那么接下来呢大家注意这个启动命令啊,share里面呃启动的时候不同的版本它可能写法不太一样啊,包括它的类不太一样。这个大家可以去官网查阅啊,有一个Spark指南,你看一下你对应版本有什么区别啊。那如果是3.2跟我一样就行了。那么大家看一下啊share然后呢指定几个参数,那这些参数分别是什么?哎,一个序列化器指定为KRYO,第21个呢呃使用了一个catalogue啊,咱们用的是什么hody catalog,还有一个呢是一个拓展项啊,用的是hohoddy spck session啊,指定这三个就可以了。那么如果你需要用到HDFS之类的啊,呃你记得启动一下hadoop。那如果没有的话,你不启动也行啊,这个我就不不去啰嗦了。那这个命令啊你就不要手敲了吧,这个可是全类名啊啊,也是一个固定写法啊。如果你有其他参数要指定,再去指定就好了啊,咱们快速演示啊。好,回车。好,那这样就进入了Spark share这个交互式的命令行了,就可以我们来一个快速的体验了。

Hudi集成Spark

环境准备&启动Shell

image-20240314144140822

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
接下来我们动手体验一下跟Spark集成来操作hudi表这么一个操作。那么首先呢咱们还是要回顾一下hudi支持Spark的一个版本。那么前面也跟大家聊到了不同的呼底版本支持呢,Spark系列是不一样的啊。那这边我列出了对于Spark三的支持啊,那要注意在0.12当中呃支持的是3.1、3.2、3.3,没有3.0啊,没有3.0。那如果你想要3.0,你可能呢你得用最新呢,只能用到0.10。当然呢对于二系列还支持2.4啊,这个我就不啰嗦了。

那一些新特性啊也是在3.2才支持的那本教程呢咱们就以3.2的Spark为例啊,那首先需要大家先去安装部署一下,那么你可以通过这个地址直接wget去下载下来。那我这边是有了,下载下来之后呢,你将这个踏包呃通过踏命令去做一个解压,解压到对应的目录就可以了。另外呢解压完之后,最好还是配个环境变量啊,方便使用。哎,/etc/profile.d/my_env.sh,我习惯上写在这个文件里面。在这呢我已经配了一个Spark home,那这个路径啊写成你的路径就可以了。修改完环境变量,不要忘了做一个source /etc/profile.d/my_env.sh常规操作啊。扫描一下这个环境啊,那就可以了。

那么接下来呢呃我们需要集成,只需要做一件事情就可以了。将我们编译完的Spark hoodie包放到Spark的呃依赖路径下面即可。hudi下面有一个packaging,在这里呢有一个Spark模块。那么进来之后呢有一个target,target就是咱们编译完的一个路径,那里面有一个jar包对吧。 咱们要的是上面这个呃名字最短的这个就是hudi-spark3.2-bundle_2.12-0.12.0.jar

拷贝到/spark3.2/jars,放到这个里面就可以了啊,我们拷贝过去。那么接下来你我们只需要去启动我们的Spark就可以。那Spark呢怎么来操作这个护底呢?呃,你可以直接通过spark shell直接去快速的体验。那你也可以去到你编呃打写成一个代码,然后打成jar包的方式去提交也可以啊

那我们先介绍一下这个shell的方式啊,这样比较快啊,快速的体验。那么大家别忘了这个它Spark对于依赖的加载,它是静态加载的。也就是说如果你已经是启动了一个呃什么样的集群,比如说是真的弄的Spark集群,那么你添加新依赖需要重启一下它才能够加载到啊。行,这个不啰嗦了。那么接下来呢大家注意这个启动命令啊,shell里面呃启动的时候不同的版本它可能写法不太一样啊,包括它的类不太一样。这个大家可以去官网查阅啊,有一个Spark指南,你看一下你对应版本有什么区别啊。那如果是3.2跟我一样就行了。

#针对Spark 3.2(官网还有一个jars要指定啊)
spark-shell \
--conf
'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
--conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog' \
--conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension'

那么大家看一下啊share然后呢指定几个参数,那这些参数分别是什么?哎,一个序列化器指定为KRYO,第21个呢呃使用了一个catalogue啊,咱们用的是什么hody catalog,还有一个呢是一个拓展项啊,用的是hohoddy spck session啊,指定这三个就可以了。

那么如果你需要用到HDFS之类的啊,呃你记得启动一下hadoop。那如果没有的话,你不启动也行啊,这个我就不不去啰嗦了。那这个命令啊你就不要手敲了吧,这个可是全类名啊啊,也是一个固定写法啊。如果你有其他参数要指定,再去指定就好了啊,咱们快速演示啊。好,回车。好,那这样就进入了Spark shell这个交互式的命令行了,就可以我们来一个快速的体验了。

Shell方式_准备及插入数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import org.apache.hudi.QuickstartUtils._
import scala.collection.JavaConversions._
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._

val tableName = "hudi_trips_cow"
val basePath = "file:///tmp/hudi_trips_cow"
val dataGen = new DataGenerator

val inserts = convertToStringList(dataGen.generateInserts(10))
val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))
df.write.format("hudi").
options(getQuickstartWriteConfigs).
option(PRECOMBINE_FIELD_OPT_KEY, "ts").
option(RECORDKEY_FIELD_OPT_KEY, "uuid").
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
option(TABLE_NAME, tableName).
mode(Overwrite).
save(basePath)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
那接下来我们要做什么事儿呢?哎,就是呃准备一些数据啊,准备一些咱们来测试的一个数据啊。数据有了之后,呃,来具体的操作一下各种操作,像插入啊、查询啊、更新啊,还有增量查询对吧?啊,增量的那还有指定时间点的查询啊,删除覆盖等等这些方式。

那么这边呢不建议大家手敲了,为什么呢?啊,有一些需要导入的包啊,那这一块呢咱们就直接先拷贝啊,来拷贝这些import。好,那都import完成之后,接下来我们是呃通过它自带的一个类可以去生成数据。那指定一个变量是表明,那还有呢指定一个它的数据的路径。那这一边为了快速演示,咱们就用一个本地的就好了。OK啊那么大家注意这个数据生成器,这个类是谁呢?是hudi提供的啊,hudi提供的。

好了,那么接下来呢呃我们就来演示一下插入数据表,我们没有创建对吧?呃,无所谓啊。第一批写入它会判断表是否存在,不存在它就会自己创建的啊所以我们就直接写就可以了。
那么看一下这个写法啊,首先呢我是将呃生成了什么呢?十条数据啊,生成十条数据,然后呢做一个插入啊,这是它固定它的一个api啊,这个就按照这么写就可以了。然后呢将它转成一个list啊,是string字符串啊,之后呢将这个数据读成什么呢?通过Spark上下文然后并行化读取,读取成一个什么呢?读取进来啊,那分区数我指定为2啊,然后呢直接去read Json就可以了,那读成了一个DF

那通常来讲我们对hudi操作都是用的Spark sql语法啊。你可以用DF去操作啊,也可以写sql啊。好,来,我们把它拿过来。稍等一会儿。好,那其实上面这里就是生成的数据和呃我们转成了这个list,大概就是我们看一条就好了啊。到这里啊,这是一条数据啊,有TS啊、UID啊啊,有rider driver啊对吧?等等一些字段,这是一个出行的一个交通数据。

好,那接下来我们要将这个DF里的数据写入到hudi里面,那怎么写呢?哎,大家看啊是其实这个写法如果熟悉Spark应该都熟了,呃,通过DF的write方式去写就可以了。那只要格式指定为hudi。那之所以能够指定为hudi,就是因为咱们已经集成好了那个jar包,你已经放进去了,那它就能识别这个hudi格式。接下来下面的这一些写法是hudi特有的啊,跟hudi集成的时候特有的一些写法。呃,那这边我们是设置了一个quick star啊,快速入门的一个案例配置啊,就简单的配置。

另外呢有几个东西先给大家讲一下,一个叫precombine field,这个东西是什么呢?呃,precome by就是可以理解为预聚合,为什么呢?预聚合字段这个东西主要是通过来去重使用的。另外一个东西是不是叫key啊?Record key啊,这个就是咱们前面一直在在一直在讲的record key,指定record key为UUID这个字段,对吧?


那接下来就讲讲这个precombine有什么用啊。举一个例子啊,比如说我的呃record key是呃有一条数据是1 a 10,呃,那么呃值是比如说a然后这个时间戳比如说是十。那么如果我又来了一条数据,1 a 11,其实这个语句和字段就指定了。哎,我当我的record key相同的时候,因为我们record key是要保持唯一性,对不对?那如果出现重复,它取哪一条啊哪个值,它会取预聚合字段这个值较大的那个数据。比如说有这两条数据啊,record key都是这样啊,都是1重复了对吧?那么它就会取下面这条字段啊,假设咱们precombine filed指定的是timestamp啊,它就取大的啊,所以这个可以简单理解为就是去重的时候用啊。所以你就得呃考虑这个预聚合字段用什么比较合适啊。通常来讲常规的就是用一个时间戳字呢就就比较理想啊。那这个record key你也得去考虑啊,用什么样的好一点,最好你不要说出现大量重复的那就不合适了啊。就类似于你去设计数据库,一张表主键你来怎么设

那其他的就没什么了。这个是一个什么分区的字段啊PARTITION FIELD,用哪一个作为一个分区字的,然后呢表名tablename就没了。然后呢,模式mode选择一个覆盖,然后保存到路径。这个路径就是hudi表的路径啊 。那你看我们上面已经定义了一对吧?这个base path这是一个基本路径,对吧?路径就写到以表名作为一个文件夹的名称就可以了。好吧?好,那就是关于这几个基本参数的解释

image-20240314171428150

1
2
3
那我们来做一下哈。嗯。好,等他执行完执行完了对吧?那么接下来我们怎么看呢?在我们指定的这个路径下面,应该是有hudi的目录生成的hudi表的目录,还有它对应的什么原数据目录啊数据目录啊。然后我们来搂一眼啊,这个路径我们定义的是本地文件系统tmp。下面来我们瞅瞅一眼cd /tmp,然后呢看一眼啊,这个有点多对吧?但是我们看到了有一个什么hudi_trips_cow

大家可以看到现在的这个文件夹是什么东西呢?啊,就是分区,就是我们的分区目录,呃,这个就是亚洲对吧?这个是美洲啊,我们的分区是按照这一来的。我们看一下这个数据啊,这里有一个字段叫做partitionpath啊,我们指定它为分区字段对吧?它的只有什么呢什什么啊美洲的啊什么什么什么,你大家可以看到这个值是不是什么嵌套的,有三层对吧?那我们随便看一个吧,比如说这这是其中一个分区啊,是一级分区啊。 ,哎,你是不是看到又有一层文件夹了,那是因为咱们使用了多级分区啊,也就是说多级目录了啊,每周下面的这里还有这里啊,哎随便看一个在里面,还有一层对吧?还有一层好再来再看就没了吧。这里面只有一个什么文件点parquet。

image-20240314171543022

image-20240314171600194

Shell方式_查询数据&文件命名源码

转换成DF

1
那现在有了hudi表,我们来尝试着查询一下那查的话怎么查呢?其实还是一样,从Spark去读取对应路径下的hudi表。啊,这个Spark这个contest我们直接去Spark session嘛啊直接read之后呢,格式指定为hudi。啊,同样的道理,由于我们集成的那个jar包,所以他就能够识别了。另外不要忘了他需要写一个load,也就写到对应的表路径啊,你要读取的hudi表的表路径啊,一定要写到表路径啊。然后呢我们需要呃我们再把它创建成一张临时的视图啊,起个名字OK那这边要注意的是什么呢?这张表我们知道分区字段它里面包含了多级分区啊,对吧?我们是有三级分区的那如果是老版本的hudi,0.9以前的hudi,呃,这个时候我们需要在表路径,然后在拼接上load(basepath+"/*/*/*/")。呃,这样子也就是说有几层路径你就要拼接几层,那么当前版本就不需要,它自己会去识别啊,这是一个小事情。也就是说你加载路径必须把每一级的分区路径都得表示出来啊,用星号去拼接啊。

image-20240314213029582

查询

1
2
3
4
5
那我们就快速的把它注册成为一个temporal view啊。好,直接还是粘贴就行了啊,这种写法就没必要去敲了啊,浪费时间啊,就很简单的语法。好,有了之后我们是不是可以直接查了呀?啊你可以用DF去show(),也可以用spark sql的语法来写啊,然后可以写一个select语句啊,然后再show()一下啊。我们搂一眼吧啊,这个也没必要敲了,那字段都是我们那个模拟数据。我这边想说明的一点是什么呢?我我尽量缩小,哎,看不清楚啊。我想说的是其实每张呼底表它都有几个隐藏字段,分别是什么呢?你看hoodie_commit_time,那么你看这个是什么?这就是我说的拼接年月日时分秒。那我们仔细看一下啊,这是年月日时分秒毫秒对吧?它是直接拼在一块的啊,而不是说距离1970年有多少时间戳,好吧?

然后呢还有一个commit的一个序列号啊,其实它基本就是一个时间戳,然后再拼接上其他东西。

还有这个是什么record key,这个是不是由我们去写入手指定的record key,就是用的UUID啊,还有呢分区路径啊,那这个是多级的,看到没有?再一块看到hoodie_file_name啊,就就文件名对吧?前面基本有几个五个隐藏字段啊,一个小事情。
1
这是一个正常的查询,你就会Spark就就会了。接下来呃说白了咱们的集成只是放一个jar包,然后格式指定为hudi,还有一些配置项,该指定的去指定就ok了。接下来就是正常的一个操作了。所以对大家来讲应该是没有什么学习成本的啊,无非就是前面我们花了大量的时间去讲那个核心概念啊,那些你把它理解就ok了,用起来的话倒是没有什么难度啊,没有难度。

image-20240314221827909

时间旅行查询

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
spark.read.
format("hudi").
option("as.of.instant", "20210728141108100").
load(basePath)

spark.read.
format("hudi").
option("as.of.instant", "2021-07-28 14:11:08.200").
load(basePath)

// 表示 "as.of.instant = 2021-07-28 00:00:00"
spark.read.
format("hudi").
option("as.of.instant", "2021-07-28").
load(basePath)
1
2
3
4
5
另外一个事儿呢就是hudi。还有一种查询方式是叫时间旅行的查询。什么叫时间旅行啊啊,就是我要查询指定时间点的一个commit。就比如说呃现在COMMIT的都多次了啊,时间分是T0、T1、T2。咱不是有个时间线嘛,对不对?时间轴timeline嘛。那我们说timeline呢是一个一个的instant,对不对啊?每个instant都有对应的什么呢?一个是它的instant的时间,还有呢他的动作action,还有他的状态,是不是啊?

那这个时候已经到了T2以后了,那如果你想查T1的时候,这个时刻的呃数据它的状态也是可以的,你只需要指定一个instant的时间即可啊。那我看一下咱们目前的这个commit time,应该都是一一回事儿啊,应该都是一回事儿。呃,那具体呢就是这么一个配置项就可以了,那我们试一下呗。但是这个时间戳不对啊,这个是以前的那我想想啊,我们现在都一样。

呃,那也简单,怎么简单呢?我们只需要再重复执行一下前面这个就行了,就插入数据我们再插入一遍呗,再插入一遍啊,就这这呃overwrite我们改成append就可以了呗,稍等一下嗯。好append不就ok了吗?说。但是appen end咱们一会儿再来吧,因为后面咱们要讲更新数据,更新数据其实有点类似于插入新的数据,那回头咱们就要end了啊。这个咱们先呃简单理解一下,一会儿再来玩嘛啊讲到更新的时候再来玩这个事儿啊。

image-20240314214908287

1
2
3
4
好,那接下来我给大家补充一点啊,就是咱们可以看到这个地方这个pofeket文件前缀这么长,那它到底是什么意思呢?那我听我一一道来吧。啊那么注意它的连接符是什么?是下划线,也就是说咱们根据下划线分成几块来看啊。哎我先在这里切一刀,呃,然后这个下划线再切一刀,哎,然后这个后缀切移到它,其实是由四部分组成。第一部分是什么呢?这么长的一串东西,就是咱们前面一直讲的file ID还记得吧?咱们聊到索引的时候讲过呀,呃每一个它它里面是不是按照文件组来组织的?每一个文件组有一个唯一的file ID啊,当这个呃生成这个文件组生成的时候,这个file ID就已经固定了,不会再变了啊,它默认情况下是通过UUID来取的啊,也就是挺随机的啊。


然后中间这个东西呢嗯怎么理解呢?这是它拼接了一个token啊,你可以不用管,呃,那后面这个东西应该就很熟了吧,这个其实就是instant time, 也就是说咱们那个commit time啊。好,那最后一个是什么?就后缀啊。由于咱们COW它只有一个parquet对吧?没有什么点log啊,就简单了。

image-20240314215302423

1
那如果你要看源码也简单啊,呃这样呃我这边有一个写好的,你直接搜什么呢?我相应的依赖都放好了,呃,相应的依赖hudi Spark的这个依赖。好,那么接下来咱们直接搜啊control shift加F搜一个什么呢?呃,应该是叫base file,你看啊就这里在FSUTS,这是hudi的类啊,它在这里有个方法叫make base。啊,当然它有两个,有一个回头要被遗弃掉,那这个东西呢大家就看一眼。你看这字符串格式的话,也就你看它是通过什么下划线来拼接的 。第一个是什么?第二个是什么?第三个,第四个。那我们来看吧,第一个是什么呢?File ID。第2个是啥呢?一个token啊,第三个呢是一个instant time。第4个呢就是它的后缀啊扩展名。嗯,就知道这个文件起名是什么规律。

image-20240314215906159

1
那说到这儿,我顺便提一嘴,那个如果是log文件呢,在同样这个类里面有一个makelog啊,我搜一下control加F12有1个makeLogFileName啊,那它同样的它是拼接了什么fileID commit time,还有log的后缀啊扩展名,后面它还会跟上一个叫版本啊版本。行,嗯,那就先到这儿了啊。

image-20240314220456197

Shell方式_更新数据&时间旅行查询

1
2
3
4
5
6
7
那接下来我们了解一下怎么来做一个数据的更新。那其实更新很简单,我们直接用一个插入操作就可以了啊,它如果判断recort存在,那么就相当来说会做一个更新了啊,因为我们说的upsert对吧?依赖于索引。如果我判断存在,那我就是做更新操作update,如果不存在,那我就是做insert操作,对吧?就这么简单啊。好,那这个时候我们可以再一次的生成几条数据啊,然后呢呃还是一样通过DF.write的方式往里写。

那这个时候注意了,咱们的这个写模式要改为append,其他一模一样都不变,跟我们前面插入的都一样啊,那这个我们就直接拷贝就行了。那完事之后呢,就可以去做一个查询了。那么大家注意保存模式通常是就是append啊不要写overwrite啊,除非是第一次建表啊,第一次建表咱们还是用overwrite保险一点啊,或者说你你有需求你就overwrite。

那这个时候呢,我们再来查一下吧。嗯,我看一下啊,前面的是多少啊啊,什么38342对吧,这个也是38342嗯,都一样啊。我们先看一下这边的变化吧啊呃可能数据没有在对应的这个分区里有体现啊。你看哎这个有啊,

这是我们刚才这个路径呃,这个表里面的这个分区下面之前呢只有上面这一个什么,那现在多了一个什么呢?这个这个时间点的对吧?第二次commit,那我们知道上面这个呃是属于这个是file ID,你看是一样对不对?也就是说这两个文件属于同一个文件组,但是是不同的fileslice啊。那你看这个token就不一样了啊,然后这个提交时间也不一样了。也就是说,前面这个已经是旧的了,这个是新的了。

image-20240314223119580

1
2
3
那我看看我怎么把它查出来啊,我们现在不是流式处理,对吧?所以这个DF可能要重新加载一下啊,这是之前加载的DF我们虽然数据已经更新了hudi表,但是这个DF还是老的。老的话那也简单,我重新加载一个DF1啊

然后呢,路径还是这个表名改一下,对吧?这样的话就ok了嘛。这一次应该没问题。那么大家可以看到现在的commit time有什么呢?有老的这个什么38342,也有新的这个57670啊,那是不是跟这个一样对吧?因为它拼接的这个实力时间其实就是呃我们的commit time啊,这个地方大家就看得懂啊,那record giase是UUID啊生成的,咱们就不用去看了啊,

image-20240314224033225

1
2
3
4
5
6
7
行。那接下来我们顺便来玩一下时间旅行呗,对吧。那时间旅行有几种写法啊,就是你去read的时候啊,就重新去创建这个DF,然后呢你指定一个as.of.instant。然后格式有3种写法。那现在我的话肯定是要用这个完整的这个来写吧。

哎我先构造一下啊,哎拿过来呃。好,那这个这里的time我要改成我自个儿了,因为我现在只有两个嘛,那就改成老的这个吧,这个38342啊,我旅行回去啊。好,然后load完之后是不是可以生成一个DF啊,对吧?

呃,通过这个方式重新加载一遍,我们能查查找到全量最新的,那现在我希望恢复到某一个instant的对应的那个时刻的快照。其实这个时间旅行就是查看某一个历史快照嘛

回车。那么大家看现在一共也就只有十条数据。当时我们第一次是不是插入十条,第二次又插了十条,对吧?第二次的十条可能有更新,有新插入的那你看现在我也没有什么过滤条件啊,一共只有什么这十条。而且这个COMMIT time都是当时的。这个就是所谓的时间旅行啊

image-20240314224713827

Shell方式_增量查询&指定时间点查询

增量查询

1
2
3
那么接下来我们来聊一下这个增量查询它是怎么来实现的。其实也很简单,就是依赖于咱们这个胎卖还是这个时间轴啊,所以说这个概念很重要。那我们在这个轴上有多个instant,对吧?每个instant对应可能是我们一个动作啊,那么他怎么来实现的?比如说我这时间轴上有多个instant啊,那所谓的增量查询其实很简单,就是我们既然是时间轴,我们就应该能够指定一个时间范围了。啊,所以他我们可以去指定一个叫begin,还可以指定一个end。

那所谓的增量查询啊,比如说我写个T0、T1、T2、T3、T4。我举个例子啊,如果你想增量查询T4以后新增的数据啊,T4之前的那些我不要啊,我不想查,我只想查T4之后有哪些新增的那简单你只需要怎么做呢?你将begin指向T4,那end你不指定就行了,表示说结束都要啊,就后面的全都要,这个就是所谓的增量查询。那基于此呢,你也可以想一想,那如果我想要查询什么呢?我想要类似前面咱们讲到一个时间旅行,对吧?你是不是可以begin跟end指定为同一个时刻就可以了,对吧?那也可以实现一个时间旅行啊,那包括后面这个指定时间点查询,其实也是通过控制这两个东西的范围来实现的。

image-20240315111301329

1
2
3
4
5
6
7
8
9
10
11
12
好吧,那现在废话少说,我们先来实现一个增量查询。那么我们前面有过两次commit对吧?那我们对这一张表的还是要重新加载一下啊,我之前就重新加载一下吧,好吧,先加载进来。好,加载完之后我们来获取指定的开始时间啊。那这个你也可以手动指定说具体的某一个time,但是如果你不想指定的话,也可以通过咱们下面这个sql来实现。就是什么呢?哎,我order by commit time啊. 这个commit time就是这个隐藏列hoodie_commit_time啊。那我们直接对它进行一个distinct啊去重啊,然后再呢一个排序之后呢,我再用map算子啊去getstring(0)啊,然后呢再take(50),那当然咱们没有50了。好,先看一眼这个commits吧。好,那其实大家可以看到我们这个commans只有两个啊,只有这么两个。好,那既然只有这么两个呢,那接下来呃开始的时间那就是看你也需要哪一个了。

那那为了更好的演示,这样吧,咱们再插入几次啊,咱们再更新数据。这里不是有一个哎啊是上面啊更新数据,这里不是有一个appen的方式吗?啊,我们多执行几次就行了呗啊,那我再执行三次吧。也就是说呃我一共呢是插入过五次了啊,应该有五个了。你看这里有五个parquet的对吧?每一个版本它都保留下来了,当然有对应的清理策略啊。行,那既然有这么多,我再执行一下。刚才这个哪一个呢?就是咱们增量查询,这里不是呃,我先重新加载一下哈。在读取这个路径啊,创建为相同的这个表明好了。那之后呢还是通过对commit time去重,然后排序啊,封装成为一个commits。再给大家看一下。那大家也可以看到这里已经变了,对吧?那接下来我就直接指定为上一次啊,上一次啊,也就是说我们现在这里有五次了啊,我指定从上一次这个地方开始增量的查询啊,那我们来试一下呗

那接下来我们就创建这个增量查询的表,怎么创建呢?还是一样read格式维固定。接下来关键是什么?查询类型要改一下,这个查询类型我们更改为叫增量查询啊,这个就固定写法,它封装好的一个常量啊,你就照这么写就行。接下来我们指定一个什么开始时间啊,这个也是一个固定它封装好的一个常量啊。然后把我们得到了这个begintime传进来,然后呢就正常去load的啊,这就ok了呗。然后呢表明我表名就命名为一个增量表,这个一个表名啊。那接下来不要忘了这个begin time先获取一下啊val begintime=commits(commits.length-2),也就是说获取的是这个什么呃52秒023毫秒了。这个好,那么接下来就是创建这一张增量表。


增量表创建完了,我们来查询这一张增量表,看看它的commit time啊,是不是从我们指定的这个开始时间往后的这一些呢,好吧。那么可以看到,这个是我们的开始时间啊。好,那我们看到现在能查出来的数据,它的committime都是怎么样?是不是都是大于这个时间呢?啊,前面应该都是一样,都是56秒570的了啊,前面这个分钟都是42啊,一样啊。大家看你执行的时间了啊。这个其实就是啊指定一个时间范围过滤出来,所以它的实现原理特别简单。但往往就是我们说对高端的食材,往往只需要最简单的烹饪方式,对吧?

这就是一个增量查询啊,很简单啊。呃,那么其实大家也可以看到,我们每一次数据变更都重新加载是为啥呢?因为咱们基于这种Spark渲染,它纯粹就是一种什么批处理啊,对吧?我们不是实时的流处理啊。所以呢利用增量查询,我们能够在批处理的数据上创建啊流式的这种管道效果。

我举个例子,呃,咱们比如说用spark写的,我可以是不是可以设置每5分钟调度一次,也就是说每5分钟执行一次代码。那这个时候你是不是可以每5分钟通过,不写死开始时间,每一次获取的是不是都不一样,对吧?这样就可以实现一个哎5分钟,我只拿到这5分钟的增量数据啊,对不对?就看起来像是一个流式的效果,这是通过Spark来做这么来一个实现啊。
重新加载数据
1
2
3
4
5
spark.
read.
format("hudi").
load(basePath).
createOrReplaceTempView("hudi_trips_snapshot")
获取指定beginTime
1
2
val commits = spark.sql("select distinct(_hoodie_commit_time) as commitTime from  hudi_trips_snapshot order by commitTime").map(k => k.getString(0)).take(50)
val beginTime = commits(commits.length - 2)
创建增量查询的表
1
2
3
4
5
val tripsIncrementalDF = spark.read.format("hudi").
option(QUERY_TYPE_OPT_KEY, QUERY_TYPE_INCREMENTAL_OPT_VAL).
option(BEGIN_INSTANTTIME_OPT_KEY, beginTime).
load(basePath)
tripsIncrementalDF.createOrReplaceTempView("hudi_trips_incremental")
查询增量表
1
spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts from  hudi_trips_incremental where fare > 20.0").show()
指定时间点查询
1
2
3
好,另外我们一起看一下这个指定时间点吧。所谓的指定时间点是什么意思呢?就是我现在有多次呃commit呃多个instant,那比如说你指定到你要指定的是这一段时间啊它的一个状态就好了。最新的一次我不要这个就是指定时间点,那怎么实现呢?其实就是什么将begin设置为最开始的地方,将end设置为指定的这个time,那这样就可以获取这一段时间的一个一个状态和数据了,对不对?这个跟时间旅行前面讲的那个什么asof那个还是有点区别的啊

咱们来试一下吧。好,那你看我怎么做啊啊我将开始时间指定为000,表示最开始,然后end的time呢我指定为上一次,对吧?这个写法我们前面唠过了啊。好,先获取这两个,也就是说我指定的范围了。指定完范围之后,我们创建一张表啊,叫时间点查询,一样的格式为hudi啊,咱们去read一下,然后查询类型还是一样,还是这种增量查询啊,增量查询就可以指定begin跟end啊,这是固定写法啊。然后把咱们获取到了两个时间传进来,这个是000,这个是上一次,也就是说还是这个啊从最开始一直到上一次。然后呢,创建了这个表名啊,然后再进行一个查询。很简单吧啊所以你把他的思想本质理解了就可以了。

image-20240315114151514

1
2
3
那么大家可以看到这个committime有哪一些啊?这些是不是通通都小于42分52秒023呢23毫秒对吧?啊,那这个是结束时间的这个时刻啊,那剩下的这些都是比他小比他早的,对不对啊?有这么四次提交的时间,我们能明显看到啊

接下来可能大家会跟那个时间旅行搞混啊。呃,我刚才是不是创建了一张时间旅行的这个表,对不对啊?所以其实说白了这个时间旅行查询跟这种指定时间点,呃,其实咱们理解起来是一样啊。只要你的开始时间是000,然后end指定为某个点儿,这个就等价于asof一个时间点一个意思啊一个意思呃,行。这个是咱们这个事儿啊。

实操

1
它这个案例数据的生成方式是:假如我每次生成同样的条数,这些数据都是同样的数据,只是数值发生了变化。所以所以时间点只有10条数据

第一次插入后查询

image-20240315130013095

第一次更新

image-20240315130049905

时间旅行

image-20240315130123850

第二次更新

image-20240315130145417

增量查询 一次插入 两次更新 查第一次更新之后

image-20240315130219453

增量查询 一次插入 两次更新 查第一次插入之后

image-20240315130231468

Shell方式_删除数据&覆盖数据

删除数据

1
2
3
4
5
6
7
8
9
那么接下来就该到删除数据这个事儿了。删除数据前面我跟大家聊到了。来我们看一下第三章核心概念有一个哎应该是在数据写里面有个删除的策略。

那来那我们说第一种是配置一个什么key。第二种是指定一个PAYLOAD的为空啊空这个类删除所有的。第三是添加一个标记字段对吧?

那接下来咱们是怎么做呢?啊,执行删除的话,其实我这边做的是第一种方式。指定一个Operation_opt_key啊,然后呢指定为delete啊,这就是第一种方式了。那这种方式呢就是说咱们现在还是什么,其实就是写入啊。你写入比如说你你原先一这张表,12345这五条数据,然后呢你现在要删除3跟4,那简单你构造一个3和4这2条数据啊,然后呢指定为delete做一个什么呢?Insert操作就可以了。啊,做一个write吧啊做一个写入吧那这样的话它就会把3跟4给删掉。其实他也是先标记的一样的,就是这么简单嘛。

那我们看一下啊,那首先我们先取要删的,先看一下现在这张表啊,原始的这张快照表现在有几条了,先count一下。嗯,呃有十条对吧?啊,有十条好,还是这十条数据。那么取其中的两条哎,来我用了一个limit2,取其中的两条啊用来做删除,构造成一个DS啊,名字叫DS嘛。然后呢。将待删除的两条数据构建DF什么意思呢?把这个前面获取到了这个东西啊,生呃用他官方的一个类啊一个方法,叫生成删除的数据啊就可以了。再把这个deletes最终构造成一个DF好吧。构造完之后就是执行write啊,关键在哪里啊?指定一个key为delete啊,然后呢你正常去write就可以了啊,其他的什么预聚合字段呃,record key啊,分区字段表明保持一致即可。

好了,删除完之后我们再来统计一下行数有没有变化。你要统计变化后的是不是要重新加载一下,对吧?所以不要忘了重新read一下啊,我这边叫一个新的DF啊。无所谓啊,然后还是这个表名啊。你看原来的十现在变成了8,那这样的话就是实现一个删除的效果。

覆盖数据

1
2
3
4
5
6
7
那聊完删除呢,咱们也来聊聊另外一种场景,就是覆盖。对于我们的表或者一个分区来说,如果呀你基本上整个分区的数据都要动了大变了,那这个时候还不如直接覆盖掉来得快,比你说呃一条条拎出来去做更新或者删除,你还不如直接全部覆盖对吧?啊,破罐子破摔啊,这样更好一点,更简单一点。那类似于hive的这种insert overwrite,那咱们目前也是支持的呃。好,那我们看一下啊,那首先我们统计一下当前表的key啊,那我们还是读取格式为hudi读取这张表的路径。然后我们将它的UUID字段,我们不是以它为key吗?对吧?还有它的分区路径啊,把它查出来啊,然后排个序啊之后呢,展现一下当前的key。

那其实你查隐藏字段也一样啊。它的隐藏字段其实就是这个拼这个嘛,对吧?那现在我就展示的更全一点。好,那现在的key有这么一些,那我们先生成一些新的要用来覆盖的数据,那我还是再什么生成10个呗啊,然后呢呃再把这个生成的这些数据读成一个DF啊,然后过滤一下啊。为什么要过滤呢?因为我生成的数据可能是每个分区的都有。现在我只想演示的是我覆盖这个分区americas,然后这个united states,然后san_fransco这个分区了啊,其他的我就不想动了啊。好执行。

接下来我们要覆盖这个圣弗兰西斯科这个分区了,那怎么写呢?其他这个写法都一样,还是write。那么注意了,我这个模式还是append,你不要说直接来一个overwrite啊,这个我们说嗯当然也可以,但是不太理想啊。你得考虑你的索引这些是原数据啊,你覆盖的只是数据本身,那原数据这些是不是也要呃稍微更新一下?

另外呢就是指定一个操作的key,为Insert overwrite就可以了啊,那其他的什么预聚合字段record key啊,还有分区字段该什么是什么啊,表名该什么就什么。好,覆盖完之后,我们再次执行这个东西,来看一下key有没有变化,好不好啊。你看是不不一样了,你看前缀都知道不一样了啊。这个就是要注意的是我刚才跟大家提到,你不要说直接这个DF的这个mode为overwrite就完事了啊,这种不规范啊不规范还是用hudi方式来overwrite,为什么呢?又涉及到原数据的。嗯。
查看当前表的key
1
2
3
4
5
6
spark.
read.format("hudi").
load(basePath).
select("uuid","partitionpath").
sort("partitionpath","uuid").
show(100, false)

image-20240315154033633

生成一些新的行程数据
1
2
3
4
val inserts = convertToStringList(dataGen.generateInserts(10))
val df = spark.
read.json(spark.sparkContext.parallelize(inserts, 2)).
filter("partitionpath = 'americas/united_states/san_francisco'")
覆盖指定分区
1
2
3
4
5
6
7
8
9
df.write.format("hudi").
options(getQuickstartWriteConfigs).
option(OPERATION.key(),"insert_overwrite").
option(PRECOMBINE_FIELD.key(), "ts").
option(RECORDKEY_FIELD.key(), "uuid").
option(PARTITIONPATH_FIELD.key(), "partitionpath").
option(TBL_NAME.key(), tableName).
mode(Append).
save(basePath)
查询覆盖后的key,发生了变化
1
2
3
4
5
6
spark.
read.format("hudi").
load(basePath).
select("uuid","partitionpath").
sort("partitionpath","uuid").
show(100, false)

SQL方式_环境准备&创建表

环境准备

image-20240315234439932

1
nohup hive --service metastore & 
1
前面我们尝试的用shell的方式来操作,但毕竟还是要写一些代码,对吧?啊,写DF等等这些可能不太方便,那如果我们能够用sql方式来操作,那就很舒服了啊,那它也是支持的,那么接下来我们来做啊,第一个呢,就是由于我们要启动一个Spark sql的这么一个交互式的命令行客户端,呃,所以呢,它会默认会去连接hive的啊,所以我们需要呃,需要做的就是将hive的原数据服务启动起来,那这边就要求你hive的原数据服务是配置为外置的单独的服务,而不是默认的内嵌啊,什么意思呢?啊,说白了就是你的hive,你看一下。在这里你有没有配这个东西?hive.metastore.uris啊,如果配的这个就表示它需要单独启动的一个原数据的外置服务啊9083啊

image-20240315234649889

1
2
3
4
5
#针对Spark 3.2
spark-sql \
--conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
--conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog' \
--conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension'
1
2
3
4
5
6
行,那我们起一下呗,嗯,啊,去哪个路径都行啊,我配的环境变量的啊啊我直接呢,呃,我直接后台启动吧,nohup hive --service metastore &
日志也不要了,那些我就不管了啊。好,那接下来我们看一下端口监听了没有,netstat -anp|grep 9083。好,现在已经是监听状态啊,这个metastore服务应该是启动起来了

那接下来我们就要启动这个Spark sql这么一个客户端就可以了,那同样要指定三个配置啊,第一个kryo,第二个catog啊,第三个就extension,那么catalog还是hudi提供的,那extension呢,用的hudi的。那为什么会用到hive呢?其实跟这个catalog有关系

那么大家知道什么叫catalog呢?啊,学过的应该都知道啊,Catalog直接翻译是目录的意思啊,那如果我们简单的不讲那些专业术语,简单来讲就是在数据库之上的一层,呃,抽象,也就是说先有catalog,再有数据库叫database,比如说哪个库哪个库,那我们知道每个库里面是不是有一张一张的table啊啊,这是常规的一个组织关系,对吧?那现在再多一层叫catalog啊

image-20240315234954110

1
2
3
4
5
那外部系统啊,比如说Spark就可以通过这种catalog的东西,也比如说跟hive去做一个打通啊。他有一个hive catalog。好,那这边hudi它默认实现hudi catalog,其实它用的就是底层源码,用的是hive catalog啊,所以这也是为什么如果你不启动hive这个东西,你直接去启动啊,会报错什么原数据服务什么啊,拒绝连接呀啊,无法实例化原数据啊这些东西啊,那是因为这个事啊

行啊,那如果你的机器上面没有装hive,你就简单装一下呗啊,那装完之后最好配一下环境变量,这样的话你就不用做额外的操作了,如果没有hive环境变量啊,也就这个时候你要手动拷贝你的hive配置文件到Spark的配置目录下,说白了就一句话,你要能够让Spark能读取到hive的配置就可以了啊,要么你配环境变量,要么你拷贝呃,它的配置文件

那我现在是只要原数据服务启动的,呃,我环境是能读到hive那个配置的。所以我直接输入命令来启动我的Spark sql,那这个时候就进来了,这个时候我比如说我show databases。呃。看一眼,那么大家能看到有这么多库对不对啊,这些其实是我的什么,我的hive当中存在的一些数据库啊,这个就是我们Spark sql的启动方式啊

创建表

1
2
3
4
5
6
7
那接下来我们讲一下这个建表语句啊在呃,简单先下面看一个例子吧,啊,就是正常的写一个DDL那就是create table,然后定义字段的类型啊就行了,那无非是在后面加一个using hudi啊,表示为是hudi表啊,另外呢,可以通过指定一些表的参数来指定hudi相关参数,像比如说最常见的是这三个,第一个呢是type,也就是说表的类型,你是MOR还是COW,在这边可以去做一个指定,默认不指定的话,都是COW。

我们在前面渲的时候,并没有指定什么表类型,对不对,那是因为默认就是cow啊。另外一个primary key,这个就等于所谓的record key啊,就是我们的记录键啊

那还有一个预聚合的字段,这个我们说了主要是我主键相同啊,这个时候该取哪一条数据去重的时候对吧?啊以指定的预合并字段值大的为准啊,那一般我们用时间戳会更好一点啊。

好,这个就是一一些建表参数啊,那这个主键如果我们不指定的话,它默认是取得UUID啊,一个随唯一的随机值啊。
创建非分区表

image-20240316000626130

1
2
3
4
5
create table hudi_cow_nonpcf_tbl (
uuid int,
name string,
price double
) using hudi;
1
2
3
4
5
那我们尝试建一个吧啊。那比如说我们先来创建一个非分区表,这个地方呢,我们建一个cow表。呃,那你看我这个语法只起了表明指定的字段,Using的who,后面那个表参数都不指定,也就是说这个时候我用的都是什么,都是默认值,默认值呢就是CoW表,默认的主键就是UUID,那默认的预聚合字段就是没有啊,就是没有啊,就这个意思啊

我们来执行一下啊,呃,我们创建一个database吧的spark_hudi,当然你这个库无所谓啊,那我show tables.你看这张表就好了吧,啊,你desc hudi_cow_nonpcf_tbl一下啊。对吧?啊,那这张表我们建表时指定的这三个字段啊,

我们说了hudi表它有个特点,就是最前面会有5个隐藏列啊,那分别是什么?提交时间,提交序列号啊,Record key啊,分区路径啊,还有文件名啊这五个。
1
2
3
4
5
6
7
8
9
10
create table hudi_mor_tbl (
id int,
name string,
price double,
ts bigint
) using hudi
tblproperties (
type = 'mor',
primaryKey = 'id',
preCombineField = 'ts'
1
2
3
这个就是一个简单的建表,那接下来来我们建一个MOR的非分区表,怎么建呢?那简单前面都一样,我不啰嗦了啊,咱们快速过,那就是表属性,然后呢,类型为MOR啊,那指定主键为ID字段啊,指定预合并为ts字段啊,这个是最完整的一个写法啊啊,这个同样直接拿过来啊。

好,desc hudi_mor_tbl。前面同样有5个什么隐藏列,而且默认类型都是string啊,都是string。
创建分区表
1
2
3
4
5
6
7
8
9
10
11
12
13
14
create table hudi_cow_pt_tbl (
id bigint,
name string,
ts bigint,
dt string,
hh string
) using hudi
tblproperties (
type = 'cow',
primaryKey = 'id',
preCombineField = 'ts'
)
partitioned by (dt, hh)
location '/tmp/hudi/hudi_cow_pt_tbl';
1
2
3
那么如果我们要创建分区表,用怎么样的语法呢?啊,也简单了解一下啊,如果是个分区表呢,无非就是大家注意位置啊,是写在后面啊,那么这个表属性是在分区语法的前面啊,也就是说前面这个是我们刚刚试过的,只要在后面加上partitioned by,那分区字段可以是多个字段表示多级分区啊,就多级目录啊。大家注意这个分区这个关键词是有ED结尾的啊不要忘了啊,大家容易忘这个事儿啊。然后指定表的路径,这个跟hive的写法有点像吧啊,所以对于大家来讲没有什么学习成本。

那我就手动敲一下吧。啊比如说我的分区要按照dt跟hh啊来做一个分区。那这个时候这张表就建好了,我们desc下啊。这呃你看那现在DSC我们都看到了一个什么分区信息啊,由于我们指定了两个分区字段啊,好,回头我们去插入数据的时候再来去看。
在已有的hudi表上创建新表-非分区
1
2
create table hudi_existing_tbl0 using hudi
location 'file:///tmp/hudi/dataframe_hudi_nonpt_table';
1
2
3
4
5
那还有一种用的方式就是在我们已经存在的hudi表再创建一张新表啊,这种东西呢。你可能以为是CTAS对吧?什么叫CPAS呢?就什么create table,然后呢as或者like这种,呃,主要是like吧,但呃是支持没错啊,这个是后面呃后面咱们来讲。

那还有一种用的方式就是我直接指定什么这个location。因为这个路径是已经存在的那这个时候呢呃我们不需要再去指定什么字段呢,还有表属性这些东西,为什么呢?因为它可以自动识别模式还有配置。因为大家还记得吧,我们说表路径下面会有什么?表路径下面会存储了咱们的原数据信息,还记得吧?

因为它有一个.hoodie。所以咱们直接指定到location,它就会自动识别模式,还有咱们的配置。那如果你不要分区,那你就不要写分区呗,要分区你就partitioned by就可以了,就这么简单啊。
在已有的hudi表上创建新表-分区表
1
2
3
create table hudi_existing_tbl1 using hudi
partitioned by (dt, hh)
location 'file:///tmp/hudi/dataframe_hudi_pt_table';

通过CTAS(Create Table As Select)建表-非分区

1
2
3
4
5
create table hudi_ctas_cow_nonpcf_tbl
using hudi
tblproperties (primaryKey = 'id')
as
select 1 as id, 'a1' as name, 10 as price;
1
2
3
4
5
6
7
那就是将一个表的查询结果啊,将它作为我的表啊,这个语法我们应该是特别常见嘛。呃,另外大家注意它CTAS这种语法的话,它是用批量插入作为写操作的啊啊批量插入也就是说帮将整个结果集一次性插入咱们这个table啊。好,那CTAS呢咱们这边举了三种例子,对吧?啊,分别是什么呢?呃,比如说我创建COW非分区表,那说白了就是还是那几件事儿啊。你表属性要不要指定type,要不要指定主键,要不要指定预合并,对吧?呃,另外呢就是要不要有partitioned by字段啊,就这几个都可以啊,都灵活来设啊。

那你看下面就是既有type又有主键,又有啊预聚合又有分区,这个是最完整的对吧?好,

那这边哦对,有一个事提醒,就是咱们hive建表的时候啊,我们的分区字段是不是不会写在建表的字段列表当中啊,对吧?这是额外的一个字段是不是?但是在hudi当中我们是这样的啊,就是说你这个分区字段必须出现在你的定义的字段列表当中啊。

那么大家注意一下,我是不是没有指定location啊?没有指定location,它默认会保存到Spark的默认那个配置路径spark-3.2/spark-warehouse/...

image-20240316004342426

通过CTAS(Create Table As Select)建表-分区
1
2
3
4
5
6
create table hudi_ctas_cow_pt_tbl
using hudi
tblproperties (type = 'cow', primaryKey = 'id', preCombineField = 'ts')
partitioned by (dt)
as
select 1 as id, 'a1' as name, 10 as price, 1000 as ts, '2021-12-01' as dt;
通过CTAS从其他表加载数据
1
2
3
4
5
6
7
8
9
10
# 创建内部表
create table parquet_mngd using parquet location 'file:///tmp/parquet_dataset/*.parquet';

# 通过CTAS加载数据
create table hudi_ctas_cow_pt_tbl2 using hudi location 'file:/tmp/hudi/hudi_tbl/' options (
type = 'cow',
primaryKey = 'id',
preCombineField = 'ts'
)
partitioned by (datestr) as select * from parquet_mngd;
1
2
3
那还有一种呢CTAS还可以什么用location的方式来加载。咱们前面讲的这个location是不是包含了什么呢?包含了一些属性配置,还有字段信息啊。那比如说你要创建一张内部表啊,所谓那个表是咱们说的的啊,那using就是什么呢?parquet,大家注意这边不是hudi了啊。如果你写的是hudi,那是一种外部表。什么叫外部表呢?外部表意思就是说我数据的路径啊可能在某个地方我单独指定就行了。我的DDL建表只是一个建表语句原数据而已。好,那location可能是某个地方的parquet对吧?

啊,那如果是通过CTAS加载数据,那除呃你可以用hoodie,然后location之后可以去单独指定一些配置啊,这个咱们就了解一下就行啊,partitioned by,然后呢再去stat。他是不是也是as select按是起来就将这个数据加载到这张表里面。那其实相当于就是什么?就是说一些属性我要自己来定义,而不是使用默认配置,就这样而已嘛。这个是简单说一下一个CTAS的用法。

插入&查询

1
现在呢我们就来给前面创建了几张表啊,我们去插入一些数据啊,然后我们先看一下这些表,呃,so对吧?那我们现在是有这么几张表了,那比如说来一个非分区表啊,就最开始建的这张表,我们先insert in two。这张表啊,然后呢它的表结构是啥来着啊,无所谓,咱们直接拷贝吧。啊是这张表没错啊,就插入一条数据。哎,多了一个insert,呃,select好回车。由于咱们没指定啊啊,所以它的路径会是在哪呢?在这个地方啊,咱们没有指定location,没没有指定location对吧?啊,那这个时候我们就来到默认路径呗。这里有一个Spark warehouse,那来了一个spa y house。再看一看里面,你看有一个Spark hooody啊,这么一个数据库的路径都是依赖于文件系统的啊。好,那你看啊咱们现在有这么几个了。那我们是这个固底COW看一下这里面现在是不是有一个pfet,对吧?啊,我们一起来回顾一下,看一下原数据怎么看呢?IS杠AR这里能看到什么呢?点弧底点蝴蝶,那还有一个什么分区的云数据啊,我们说了没有指定分区字段,那整个表的路径就是大的一个分区路径啊,相当于是这样啊,就相当于说有个默认的大分区啊。行,嗯,呵呵。好,那同样对另一张表,MOR表我们也去插入一下。呵呵。对吧啊,也是执行成功了。我们来看一下数据有没有生成,就这张啊M2你也可以直接去那个select一下啊。那你看也有帕灰特了啊。好,行啊,这个应该都ok另外一种就是咱们的动态分区写法。哎,我就是说我我是一张分区表,但是呢分区我不写死,分区的值不写示,哎,我只需要传一个什么呢分区字段就可以。那么回头呢咱们大家注意了,你后面不是跟上一个senlight吗?啊,反正总而言之,跟上一个结果集,最后的字段就跟分区字段匹配啊。那比如说你分区字段是两个,那么倒数第二个字段的值那就是呃这个DT的值。那最后的一个值就是这个HH的值啊,它是根据这个字段的顺序来对应的的这跟hif的一个用法还是基本一致的。好,那我们来尝试着,哎,我不指定分区让他去哎,不对啊,在这里让他去插入之后呢,咱们只需要去看一看这个呃。好,那我们先看一下这张表呗,呃,COWPTTVL是哪张表来着?进来太多COWPTTTB22这张表是吧?看确认一下啊,PTTBR啊,行,那就这个我们先看一下这里有没有。呃,不在这儿啊,因为我们当时建表时候指定一个什么location啊,指定一个location。好,那既然指定了location,我们就去在这看呗啊,我再拷贝一个窗口出来。哦,这个是HDFS路径了,现在就是那这样的话,我的hadoop我看起了没有,他豆粉是起的,那么啊这里有啊。我来到跟路径看看咱们指定的那个TNP,哎,然后有个HO底啊,然后你看这张表是不是现在就有了,它不存在就会自动帮我们创建。哎,点进来哎发现了哎,它自动在这个表路径下面生成了一个呃分区路径啊。就咱们是动态插入的嘛啊还有原数据对吧?这里面呢还有第二层分区HH还有呢哎这里就是一个part回的文件了。好,嗯,这个就是像分区呃动态插入啊,动态分区插入这个可以说应该是特别常用,特别好用啊。另外一种就是往斜死的分区去做一个插入操作啊,就还是同样的表,咱们在银色的时候指定塔金水为具体的值,那这样的话就是一个指定分区的插入了。那么也是快速执行一下啊,这些语法都跟have或者说Spark sql的语法啊。如果你要指定为本地文件系统啊,就比如说咱们创建表的时候,这个地方呃你没有加协议,默认就是HDFS是吧?如果你要本地的话,你应该是这么写呃,fire啊,对吧?啊,这样来写,那就这就是一个本地的。好,那完事之后我们只需要确认什么,刚才是不是只有一个十这么一个分区,我们看看有没有生成新的分区啊,你看现在是不是生成了一个11的这个二级分区啊,这里面也有数据了,这个就是一些插入啊。另外一个就单独介绍一个我们的这个boook insert批量导入这么一种插入方式。这种一般是用来做一个呃历史数据的初始化啊,举个例子啊,呃你现在呢有大量的数据,有比如说有1亿条历史数据,现在你们要开始上数据湖了啊,要上foody了,要数据路湖了。那这个时候呢我建议说历史数据的导入就用这种方式啊,boook insert. 啊,那要用它呢其实就是呃开启两个配置。第一个呢就是这个,第二个呢就指定为插入模式,为非研究模式啊非研究模式。来,我们先来执行一下,比如说MOR这张表,我们再插入一条数据啊,这个时候呃我们都不去指定这个参数,对吧?还没指定这个时候默认为什么upset啊?我就是插入或更新啊,插入或更新,我们先查一下这张表啊。之前呢我们是插入过一条数据,是不是啊?1A1 21A1 20。那现在相当于说我要把A一的名字改为A一下划线一啊,这样子。好,先插入。好,完事之后呢我们再来查询那个隐藏字段,咱们就不看了对吧?我们只要看它的结果值,哎,你看这个name是不是从原先的,我看在哪个地方,原先的A一现在变成了A一下划线一了啊,这就实现了一个更新的效果嘛,这个TS也被我们更新掉了嘛,对吧?啊,我们说我们指定了那个precombine预合并字段,如果咱们的record d相同,它就会取预合并字段值大的那条数据,对吧?那我们指定的ID为什么呢?Id就是咱们的record key啊都是一啊,然后呢这个TS大的就是这个1001啊,就取它,这就实现了一个更新覆盖的效果。嗯,行,哼那我们要用boke银色的。注意啊,要求你这张表指定的玉合并的key啊才可以啊,要不然会报错的啊。呃数据也会不准确,这样的话就可以保证一个数据的一致性的插入的时候,好,那我们先把这两个参数给开启一下啊,我们set的方式让它临时生效啊就ok了啊,set一下。好,现在已经是触跟非严格了,开启之后呢咱们再去插入,大家可以看一下啊。我再插入,相当于说嗯这个内蒙又变了,ts呢是1002啊,也就看一下它的方式会是什么。来我们再查询一遍,这个时候你会发现哎,居然有相同ID为一的存在,也就是说它并没有执行upset。我们这张表是有预合并字段的,但是没有执行F4。如果是前面这种upset的效果,大家也看到它是不是直接覆盖掉,更新掉啊,对吧?那现在是批量导入的话就不会了啊,他就会把这条新的数据也插入进来而已啊而已嗯。好,那接下来查询就不用讲了,查询就是一个select语法就可以了啊。那这个我们其实我们已经执行过seat了啊,所以这个没什么好演示的,这个大家都会啊,谁都会啊。那么查询这里呢我们需要单独来强调一下时间旅行。那么在渲染方式当中我们已经试过了,那么接下来我们用circle方式来实现啊。那注意版本要求hoooding是要0.9以上,另外Spark你要circle方式要求3.2以上,不然circle方式是不支持的。那么为了避免前面的影响呢,咱们不是开了一个box insert吗?对吧?呃,那这个地方我们先把它关掉,把它制成一个false。那那个非严格模式还是保持非严格就可以了啊,其他不动。那我们再建一张新的表啊,这个什么表一对吧?字段还是这几个啊,ID name TSDT还有HH分区呢咱们还是以两个字段分区啊,路径呢也指定一下。呃,COW呃组件ID预合并字段TS啊,就这样子跟前面建表基本一样啊,为了不干扰我们再建一张新的。建完之后呢,咱们开始插入数据了啊,先插入这么一条简单的数据啊,1ID为一,内为A0,TS为1000啊,后面是两个分区字段。好,插入完咱们在现查吧。好,现在说没找到,他会去给我们创建的。好,我们查询一下数据呢,缩小一点啊,数据现在是进来了,你看ID name TS还有两个分区。好,那接下来我们对它进行修改,现在我们是upset的模式,对不对啊?那这个时候ID呃组件相同,那就会取域合并字段大的这一条,也就是它也就可以将A一更A0更新成了A一啊,你看后面这个分区一样的啊一样的好做一个更新操作。哎,嗯好,更新完之后我直接上翻查询。那我们缩小一点看一下这个值已经变了,对不对啊?已经从A0变成了A1。Ok也就是说现在进行了两次的commit,那现在呢我们要基于第一次这个时候做一个时间旅行,怎么写呢?呃,这么写啊,就是你正常的查询加上一个什么语法呢?Time stand time stand as of. As of of什么什么东西呢?当时的那个instant type,那我们刚才查过是有的,对吧?当时的时间是这个啊,第二次提交的时间是这个,那么就指定时间履行到这个时候啊去查。如果查出来是A0 1000,那就说明就是没毛病了,对吧?啊,那当然这边的写法有几种啊几种。呃,一个是写完整的时间戳,年月日10分秒毫秒。另外一个呢是写一个日期date格式的啊,有有连接符的这种啊也可以。再有一个呢,你只写到日,表示从0点0分开始啊,那我们现在用就用第一种呗。那这个时间戳我们就从刚才查询的A零它对应的这个commit time来就可以了。好拷贝,那这边注意加一个单引号啊,单引号。好,行,来回车。哎,注意看我们查出来的是什么A零是吧?哎,我们把它的历史状态给查出来了。如果你不加这个,那查出来就是最新的A一了。这个就是世界旅行,这个应该说还是比较好用啊,可以查询多版本啊,这个是世界旅行啊。

更新数据_Update

更新数据_MergeInto

删除&覆盖数据

修改表结构、分区&存储过程