欢迎您访问365答案网,请分享给你的朋友!
生活常识 学习资料

Scala操作hudi

时间:2023-07-23
文章目录

Scala操作hudi

1、启动客户端2、配置信息3、 创建数据表4、插入数据5、查询数据6、更新数据7、增量查询8、时间点查询9、删除数据10、覆盖写入 Scala操作hudi 1、启动客户端

//spark3.1spark-shell --packages org.apache.hudi:hudi-spark3.1.2-bundle_2.12:0.10.1,org.apache.spark:spark-avro_2.12:3.1.2 --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer'//spark 3.0spark-shell --packages org.apache.hudi:hudi-spark3.0.3-bundle_2.12:0.10.1,org.apache.spark:spark-avro_2.12:3.0.3 --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer'

2、配置信息

// spark的shell配置import org.apache.hudi.QuickstartUtils._import scala.collection.JavaConversions._import org.apache.spark.sql.SaveMode._import org.apache.hudi.DataSourceReadOptions._import org.apache.hudi.DataSourceWriteOptions._import org.apache.hudi.config.HoodieWriteConfig._val tableName = "hudi_trips_cow"val basePath = "file:///tmp/hudi_trips_cow"val dataGen = new DataGenerator

3、 创建数据表

hudi不需要创建数据表,当使用数据表时没有数据表就会进行自动创建,因此hudi没有键表语句

4、插入数据

// 插入数据的语法val inserts = convertToStringList(dataGen.generateInserts(10))val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))df.write.format("hudi"). options(getQuickstartWriteConfigs). option(PRECOMBINE_FIELD_OPT_KEY, "ts"). option(RECORDKEY_FIELD_OPT_KEY, "uuid"). option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath"). option(TABLE_NAME, tableName). mode(Overwrite). save(basePath)

指令参数详解

mode: 设置插入数据的模式,overwriter为覆盖模式,当数据表存在时重新创建RECORDKEY_FIELD_OPT_KEY: 设置主键PRECOMBINE_FIELD_OPT_KEY: 逻辑聚合主键PARTITIONPATH_FIELD_OPT_KEY: 设置分区 5、查询数据

//查询语句val tripsSnapshotDF = spark. read. format("hudi"). load(basePath)tripsSnapshotDF.createOrReplaceTempView("hudi_trips_snapshot")//执行查询语句spark.sql("select fare, begin_lon, begin_lat, ts from hudi_trips_snapshot where fare > 20.0").show()spark.sql("select _hoodie_commit_time, _hoodie_record_key, _hoodie_partition_path, rider, driver, fare from hudi_trips_snapshot").show()

6、更新数据

//数据更新语句val updates = convertToStringList(dataGen.generateUpdates(10))val df = spark.read.json(spark.sparkContext.parallelize(updates, 2))df.write.format("hudi"). options(getQuickstartWriteConfigs). option(PRECOMBINE_FIELD_OPT_KEY, "ts"). option(RECORDKEY_FIELD_OPT_KEY, "uuid"). option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath"). option(TABLE_NAME, tableName). mode(Append). save(basePath)

7、增量查询

//增量查询//重新加载数据spark. read. format("hudi"). load(basePath). createOrReplaceTempView("hudi_trips_snapshot")val commits = spark.sql("select distinct(_hoodie_commit_time) as commitTime from hudi_trips_snapshot order by commitTime").map(k => k.getString(0)).take(50)//设置提交时间val beginTime = commits(commits.length - 2) //增量查询数据val tripsIncrementalDF = spark.read.format("hudi"). option(QUERY_TYPE_OPT_KEY, QUERY_TYPE_INCREMENTAL_OPT_VAL). option(BEGIN_INSTANTTIME_OPT_KEY, beginTime). load(basePath)tripsIncrementalDF.createOrReplaceTempView("hudi_trips_incremental")spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts from hudi_trips_incremental where fare > 20.0").show()

8、时间点查询

//时间点查询val beginTime = "000" // Represents all commits > this time.val endTime = commits(commits.length - 2) // commit time we are interested in//增量查询数据val tripsPointInTimeDF = spark.read.format("hudi"). option(QUERY_TYPE_OPT_KEY, QUERY_TYPE_INCREMENTAL_OPT_VAL). option(BEGIN_INSTANTTIME_OPT_KEY, beginTime). option(END_INSTANTTIME_OPT_KEY, endTime). load(basePath)tripsPointInTimeDF.createOrReplaceTempView("hudi_trips_point_in_time")//执行时间点查询时间查询spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts from hudi_trips_point_in_time where fare > 20.0").show()

9、删除数据

// 删除数据//获取所有数据记录spark.sql("select uuid, partitionpath from hudi_trips_snapshot").count()//获取两条记录需要删除val ds = spark.sql("select uuid, partitionpath from hudi_trips_snapshot").limit(2)//删除主题val deletes = dataGen.generateDeletes(ds.collectAsList())val df = spark.read.json(spark.sparkContext.parallelize(deletes, 2))//重新加载数据df.write.format("hudi"). options(getQuickstartWriteConfigs). option(OPERATION_OPT_KEY,"delete"). option(PRECOMBINE_FIELD_OPT_KEY, "ts"). option(RECORDKEY_FIELD_OPT_KEY, "uuid"). option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath"). option(TABLE_NAME, tableName). mode(Append). save(basePath)//执行相同的查询语句val roAfterDeleteViewDF = spark. read. format("hudi"). load(basePath)roAfterDeleteViewDF.registerTempTable("hudi_trips_snapshot")// 获取数据记录,不出意外应该少两条spark.sql("select uuid, partitionpath from hudi_trips_snapshot").count()

10、覆盖写入

// 覆盖写入spark. read.format("hudi"). load(basePath). select("uuid","partitionpath"). sort("partitionpath","uuid"). show(100, false)val inserts = convertToStringList(dataGen.generateInserts(10))val df = spark. read.json(spark.sparkContext.parallelize(inserts, 2)). filter("partitionpath = 'americas/united_states/san_francisco'")df.write.format("hudi"). options(getQuickstartWriteConfigs). option(OPERATION.key(),"insert_overwrite"). option(PRECOMBINE_FIELD.key(), "ts"). option(RECORDKEY_FIELD.key(), "uuid"). option(PARTITIONPATH_FIELD.key(), "partitionpath"). option(TBL_NAME.key(), tableName). mode(Append). save(basePath)//查询spark. read.format("hudi"). load(basePath). select("uuid","partitionpath"). sort("partitionpath","uuid"). show(100, false)

Copyright © 2016-2020 www.365daan.com All Rights Reserved. 365答案网 版权所有 备案号:

部分内容来自互联网,版权归原作者所有,如有冒犯请联系我们,我们将在三个工作时内妥善处理。