欢迎您访问365答案网,请分享给你的朋友!
生活常识 学习资料

spark读取hive数据写入redis

时间:2023-04-19

import java.utilimport org.apache.commons.pool2.impl.GenericObjectPoolConfigimport org.apache.spark.rdd.RDDimport org.apache.spark.sql.{Dataframe, SparkSession}import redis.clients.jedis.{Jedis, JedisPool}import redis.clients.util.Poolimport scala.collection.mutable.ArrayBufferobject GoodBookSpeakWellDateToRedis { private[this] var jedisPool: Pool[Jedis] = _ def init(host: String, port: Int, timeout: Int, password: String): Unit = { jedisPool = new JedisPool(new GenericObjectPoolConfig, host, port, timeout, password) } def zadd(key: String, story_14d_play_cts: Int, story_id: String): Unit = { val jedis = jedisPool.getResource jedis.zadd(key,story_14d_play_cts,story_id) jedis.close() } def zrem(key: String,member :String): Unit ={ val jedis = jedisPool.getResource jedis.zrem(key,member) jedis.close() } // 数据存入redis def getResultToRedis(spark: SparkSession): Unit = { val resultDataNew: Dataframe = spark.sql( """ |select story_id | ,cast(story_14d_play_cts as int) as story_14d_play_cts |from ads.ads_ks_log_story_play_hsjj_14h_sum_a_d |order by story_14d_play_cts desc |""".stripMargin) resultDataNew.show(20) val mapRDD: RDD[(String, Int)] = resultDataNew.rdd.map(row => (row.getString(0), row.getInt(1))) //今天的10条数据 val nowDate: Array[(String,Int)] = mapRDD.collect() val jedis = jedisPool.getResource import scala.collection.JavaConversions._ //昨天的redis中的10条数据 val beforeDate: util.Set[String] = jedis.zrevrange("EXPLAIN_BOOK_ZS", 0, -1) jedis.close() //昨天和今天相同的album_id val sameDate = new ArrayBuffer[String] //今天相对于昨天不相同的album_id val nowNoSameDate = new ArrayBuffer[(String, Int)] //昨天相对于今天不相同的album_id val beforeNoSameDate: ArrayBuffer[String] = new ArrayBuffer[String] for(elem <- nowDate){ if (beforeDate.contains(elem._1)){ sameDate += elem._1 zadd("EXPLAIN_BOOK_ZS",elem._2,elem._1) } else{ nowNoSameDate += elem } } if(sameDate.length >0 & sameDate.length < 10) { for (elem <- beforeDate) { if (!sameDate.contains(elem)) { beforeNoSameDate += elem } } }else if(sameDate.length == 0){ for(elem <- beforeDate){ beforeNoSameDate += elem } } var i:Int = 0 if(sameDate.length!=10){ //删除一个添加一个 while (i<=nowNoSameDate.length-1){ zrem("EXPLAIN_BOOK_ZS",beforeNoSameDate(i)) zadd("EXPLAIN_BOOK_ZS",nowNoSameDate(i)._2,nowNoSameDate(i)._1) i+=1 } } } def main(args: Array[String]): Unit = { val spark = SparkSession.builder .appName("GoodBookSpeakWellDateToRedis") // .master("local[*]") .config("spark.driver.allowMultipleContexts", true) .config("hive.exec.dynamic.partition.mode", "nonstrict") .enableHiveSupport() .getOrCreate() spark.sparkContext.setLogLevel("ERROR") val password = "" val host = "" val port = 6379 val timeout = 1000 init(host, port, timeout, password) //数据存入redis getResultToRedis(spark) spark.stop() }}

Copyright © 2016-2020 www.365daan.com All Rights Reserved. 365答案网 版权所有 备案号:

部分内容来自互联网,版权归原作者所有,如有冒犯请联系我们,我们将在三个工作时内妥善处理。