object names { def main(args: Array[String]): Unit = { saveAsCsv() readCsvToHive() } //csv转成Hive表 def readCsvToHive(): Unit = { val spark = SparkSession .builder() .appName("name1") .config("hive.metastore.uris", "目标集群") .enableHiveSupport() .getOrCreate() spark.sparkContext.setLogLevel("WARN") spark.read .options(Map("delimiter" -> "~", "header" -> "true")) .csv("/tmp/app_download_detail") .withColumn("create_time", from_unixtime(col("create_time"), "yyyy-MM-dd HH:mm:ss")) .withColumn("etl_created_time", from_unixtime(unix_timestamp(), "yyyy-MM-dd HH:mm:ss")) .write .mode("overwrite") .format("parquet") .saveAsTable("dwd.dwd_bury_app_download_detail_dd") spark.close() } //保存为CSV文件 def saveAsCsv(): Unit = { val spark = SparkSession .builder() .master("local[4]") .appName("bury app download 01") .config("hive.metastore.uris", "源集群") .enableHiveSupport() .getOrCreate() spark.sparkContext.setLogLevel("WARN") import spark.implicits._ val df = spark.read.table("表名") .repartition(4) .write .mode("overwrite") .format("csv") .option("sep", "~") .option("header", "true") .csv("/tmp/app_download_detail") spark.close() }}
关于大数据spark的Hive集群到Hive集群的方案
时间:2023-05-01
相关推荐