Scala操作hudi
1、启动客户端2、配置信息3、 创建数据表4、插入数据5、查询数据6、更新数据7、增量查询8、时间点查询9、删除数据10、覆盖写入 Scala操作hudi 1、启动客户端
//spark3.1spark-shell --packages org.apache.hudi:hudi-spark3.1.2-bundle_2.12:0.10.1,org.apache.spark:spark-avro_2.12:3.1.2 --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer'//spark 3.0spark-shell --packages org.apache.hudi:hudi-spark3.0.3-bundle_2.12:0.10.1,org.apache.spark:spark-avro_2.12:3.0.3 --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer'
2、配置信息// spark的shell配置import org.apache.hudi.QuickstartUtils._import scala.collection.JavaConversions._import org.apache.spark.sql.SaveMode._import org.apache.hudi.DataSourceReadOptions._import org.apache.hudi.DataSourceWriteOptions._import org.apache.hudi.config.HoodieWriteConfig._val tableName = "hudi_trips_cow"val basePath = "file:///tmp/hudi_trips_cow"val dataGen = new DataGenerator
3、 创建数据表hudi不需要创建数据表,当使用数据表时没有数据表就会进行自动创建,因此hudi没有键表语句
4、插入数据// 插入数据的语法val inserts = convertToStringList(dataGen.generateInserts(10))val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))df.write.format("hudi"). options(getQuickstartWriteConfigs). option(PRECOMBINE_FIELD_OPT_KEY, "ts"). option(RECORDKEY_FIELD_OPT_KEY, "uuid"). option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath"). option(TABLE_NAME, tableName). mode(Overwrite). save(basePath)
指令参数详解
mode: 设置插入数据的模式,overwriter为覆盖模式,当数据表存在时重新创建RECORDKEY_FIELD_OPT_KEY: 设置主键PRECOMBINE_FIELD_OPT_KEY: 逻辑聚合主键PARTITIONPATH_FIELD_OPT_KEY: 设置分区 5、查询数据//查询语句val tripsSnapshotDF = spark. read. format("hudi"). load(basePath)tripsSnapshotDF.createOrReplaceTempView("hudi_trips_snapshot")//执行查询语句spark.sql("select fare, begin_lon, begin_lat, ts from hudi_trips_snapshot where fare > 20.0").show()spark.sql("select _hoodie_commit_time, _hoodie_record_key, _hoodie_partition_path, rider, driver, fare from hudi_trips_snapshot").show()
6、更新数据//数据更新语句val updates = convertToStringList(dataGen.generateUpdates(10))val df = spark.read.json(spark.sparkContext.parallelize(updates, 2))df.write.format("hudi"). options(getQuickstartWriteConfigs). option(PRECOMBINE_FIELD_OPT_KEY, "ts"). option(RECORDKEY_FIELD_OPT_KEY, "uuid"). option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath"). option(TABLE_NAME, tableName). mode(Append). save(basePath)
7、增量查询//增量查询//重新加载数据spark. read. format("hudi"). load(basePath). createOrReplaceTempView("hudi_trips_snapshot")val commits = spark.sql("select distinct(_hoodie_commit_time) as commitTime from hudi_trips_snapshot order by commitTime").map(k => k.getString(0)).take(50)//设置提交时间val beginTime = commits(commits.length - 2) //增量查询数据val tripsIncrementalDF = spark.read.format("hudi"). option(QUERY_TYPE_OPT_KEY, QUERY_TYPE_INCREMENTAL_OPT_VAL). option(BEGIN_INSTANTTIME_OPT_KEY, beginTime). load(basePath)tripsIncrementalDF.createOrReplaceTempView("hudi_trips_incremental")spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts from hudi_trips_incremental where fare > 20.0").show()
8、时间点查询//时间点查询val beginTime = "000" // Represents all commits > this time.val endTime = commits(commits.length - 2) // commit time we are interested in//增量查询数据val tripsPointInTimeDF = spark.read.format("hudi"). option(QUERY_TYPE_OPT_KEY, QUERY_TYPE_INCREMENTAL_OPT_VAL). option(BEGIN_INSTANTTIME_OPT_KEY, beginTime). option(END_INSTANTTIME_OPT_KEY, endTime). load(basePath)tripsPointInTimeDF.createOrReplaceTempView("hudi_trips_point_in_time")//执行时间点查询时间查询spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts from hudi_trips_point_in_time where fare > 20.0").show()
9、删除数据// 删除数据//获取所有数据记录spark.sql("select uuid, partitionpath from hudi_trips_snapshot").count()//获取两条记录需要删除val ds = spark.sql("select uuid, partitionpath from hudi_trips_snapshot").limit(2)//删除主题val deletes = dataGen.generateDeletes(ds.collectAsList())val df = spark.read.json(spark.sparkContext.parallelize(deletes, 2))//重新加载数据df.write.format("hudi"). options(getQuickstartWriteConfigs). option(OPERATION_OPT_KEY,"delete"). option(PRECOMBINE_FIELD_OPT_KEY, "ts"). option(RECORDKEY_FIELD_OPT_KEY, "uuid"). option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath"). option(TABLE_NAME, tableName). mode(Append). save(basePath)//执行相同的查询语句val roAfterDeleteViewDF = spark. read. format("hudi"). load(basePath)roAfterDeleteViewDF.registerTempTable("hudi_trips_snapshot")// 获取数据记录,不出意外应该少两条spark.sql("select uuid, partitionpath from hudi_trips_snapshot").count()
10、覆盖写入// 覆盖写入spark. read.format("hudi"). load(basePath). select("uuid","partitionpath"). sort("partitionpath","uuid"). show(100, false)val inserts = convertToStringList(dataGen.generateInserts(10))val df = spark. read.json(spark.sparkContext.parallelize(inserts, 2)). filter("partitionpath = 'americas/united_states/san_francisco'")df.write.format("hudi"). options(getQuickstartWriteConfigs). option(OPERATION.key(),"insert_overwrite"). option(PRECOMBINE_FIELD.key(), "ts"). option(RECORDKEY_FIELD.key(), "uuid"). option(PARTITIONPATH_FIELD.key(), "partitionpath"). option(TBL_NAME.key(), tableName). mode(Append). save(basePath)//查询spark. read.format("hudi"). load(basePath). select("uuid","partitionpath"). sort("partitionpath","uuid"). show(100, false)