欢迎您访问365答案网,请分享给你的朋友!
生活常识 学习资料

spark-redis实现redisSQL统计

时间:2023-06-24

spark-redis是基于jedis实现的redis rdd,可对redis的String, Hash, List, Set and Sorted Set,XStream数据结构进行转换读写,支持将redis数据转换成Dataframes的方式,以Spark SQL进行统计运算,支持官方Redis cluster的集群读取模型,可自动感知Redis分区,亦可根据配置调整分区数。

支持版本:

Spark-Redis

Spark

Redis

Supported Scala Versions

master

3.0.x

>=2.9.0

2.12

2.4, 2.5, 2.6

2.4.x

>=2.9.0

2.11, 2.12

2.3

2.3.x

>=2.9.0

2.11

1.4

1.4.x

2.10

使用方式,如使用2.4.x版本可引用Maven库:

com.redislabs spark-redis_2.11 2.4.2

or

com.redislabs spark-redis_2.12 2.4.2

使用2.3.x版本需要下载源代码:

git clone https://github.com/RedisLabs/spark-redis.git

更改Spark版本号

然后进入到相应的目录进行编译:

cd spark-redismvn clean package -DskipTests

编译后,将编译好的包install到本地的maven库

mvn install:install-file -DgroupId=com.redislabs -DartifactId=spark-redis_2.11 -Dversion=3.0.0-SNAPSHOT -Dpackaging=jar -Dfile=.spark-redis_2.11-3.0.0-SNAPSHOT.jar

在对应的POM中加载对应包:

com.redislabs spark-redis_2.11 3.0.0-SNAPSHOT com.alibaba fastjson 1.2.73 redis.clients jedis 3.4.1 jar compile

编写测试代码:

SparkConf sparkConf = new SparkConf().setAppName(appName).set("spark.redis.host", "127.0.0.1") .set("spark.redis.port", "6000") .set("spark.redis.auth","123456"); SparkSession spark = SparkSession .builder().master("local[4]") .config(sparkConf) .getOrCreate();Dataset df = spark.createDataframe(Arrays.asList( new Person("John", 35), new Person("Peter", 40)), Person.class);df.write() .format("org.apache.spark.sql.redis") .option("table", "person2") .option("key.column", "name") .mode(SaveMode.Overwrite) .save(); //对redis进行写入操作//结果://redis手动初始化数据 hset person:1 name mike age 30// hset person:2 name ken age 40List schemaFields = new ArrayList();schemaFields.add(new StructField("_id",StringType,false,metadata.empty())); //其 hash key值默认为_idschemaFields.add(new StructField("name",StringType,true,metadata.empty()));schemaFields.add(new StructField("age",IntegerType,true,metadata.empty()));StructType structType=DataTypes.createStructType(schemaFields);Dataset ds=spark.read() .format("org.apache.spark.sql.redis") .option("keys.pattern", "person:*") .schema(structType).load();ds.show();ds.createOrReplaceTempView("persontbl");Dataset ds2=spark.sql("select * from persontbl");ds2.show();//年龄汇总Dataset ds3=spark.sql("select sum(age) from persontbl");ds3.show();

探其代码实现,通过redis的scan方法获取到key值后,通过单节点的连接器,一条条的对key的value进行遍历,该操作在大数据量时,可能导致性能问题,需在网络上进行相应的优化。

同样,其String,List, Set and Sorted Set的实现方式亦是如此。

Copyright © 2016-2020 www.365daan.com All Rights Reserved. 365答案网 版权所有 备案号:

部分内容来自互联网,版权归原作者所有,如有冒犯请联系我们,我们将在三个工作时内妥善处理。