欢迎您访问365答案网,请分享给你的朋友!
生活常识 学习资料

sparkDataFrame操作

时间:2023-07-28

先创建测试数据:

开启zookeeper、hadoop和spark集群

在hdfs中创建spark文件夹,再将本地文件上传过去

[root@hadoop01 data]# hdfs dfs -mkdir /spark[root@hadoop01 data]# hdfs dfs -put /export/data/person.txt /spark

在读取时会出现两个bug,分别是:
Failed to get database global_temp,returning NoSuchObjectException.
Error creating transactional connection factory.

解决如下:

1、需要将hive中conf目录的配置文件hive-site.xml传到spark的conf目录中
2、mysql作为元数据数据库,需要在spark-shell启动时添加驱动,–jars包


spark-shell启动方式:

Spark读取数据源的方式进行创建Dataframe

scala> val personDF = spark.read.text("/spark/person.txt")personDF: org.apache.spark.sql.Dataframe = [value: string]scala> personDF.printSchema()root |-- value: string (nullable = true)

Dataframe的show()方法可以查看当前Dataframe的结果数据

scala> personDF.show()+-------------+ | value|+-------------+|1 zhangsan 20|| 2 lisi 29|| 3 wangwu 25|| 4 zhaoliu 30|| 5 tianqi 35|| 6 jerry 40|+-------------+

RDD的toDF()方法,可以将RDD转换为Dataframe对象

scala> val lineRDD = sc.textFile("/spark/person.txt").map(_.split(" "))lineRDD: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[6] at map at :24scala> case class Person(id:Int,name:String,age:Int)defined class Personscala> val personRDD = lineRDD.map(x => Person(x(0).toInt, x(1), x(2).toInt))personRDD: org.apache.spark.rdd.RDD[Person] = MapPartitionsRDD[7] at map at :27scala> val personDF = personRDD.toDF()personDF: org.apache.spark.sql.Dataframe = [id: int, name: string ..、1 more field]scala> personDF.show+---+--------+---+| id| name|age|+---+--------+---+| 1|zhangsan| 20|| 2| lisi| 29|| 3| wangwu| 25|| 4| zhaoliu| 30|| 5| tianqi| 35|| 6| jerry| 40|+---+--------+---+scala> personDF.printSchemaroot |-- id: integer (nullable = false) |-- name: string (nullable = true) |-- age: integer (nullable = false)

查看personDF对象的name字段数据

scala> personDF.select(personDF.col("name")).show()+--------+| name|+--------+|zhangsan|| lisi|| wangwu|| zhaoliu|| tianqi|| jerry|+--------+

select()操作可以实现对列名进行重命名

scala> personDF.select(personDF("name").as("username"),personDF("age")).show()+--------+---+|username|age|+--------+---+|zhangsan| 20|| lisi| 29|| wangwu| 25|| zhaoliu| 30|| tianqi| 35|| jerry| 40|+--------+---+

过滤age大于等于25的数据

scala> personDF.filter(personDF("age") >= 25).show()+---+-------+---+| id| name|age|+---+-------+---+| 2| lisi| 29|| 3| wangwu| 25|| 4|zhaoliu| 30|| 5| tianqi| 35|| 6| jerry| 40|+---+-------+---+

按年龄进行分组并统计相同年龄的人数

scala> personDF.groupBy("age").count().show()+---+-----+ |age|count|+---+-----+| 20| 1|| 40| 1|| 35| 1|| 25| 1|| 29| 1|| 30| 1|+---+-----+

按年龄降序排列

scala> personDF.sort(personDF("age").desc).show()+---+--------+---+| id| name|age|+---+--------+---+| 6| jerry| 40|| 5| tianqi| 35|| 4| zhaoliu| 30|| 2| lisi| 29|| 3| wangwu| 25|| 1|zhangsan| 20|+---+--------+---+

使用SQL风格操作的前提是需要将Dataframe注册成一个临时表

scala> personDF.registerTempTable("t_person")warning: there was one deprecation warning; re-run with -deprecation for details

查询年龄最大的前两名人的信息

scala> spark.sql("select * from t_person order by age desc limit 2").show()+---+------+---+| id| name|age|+---+------+---+| 6| jerry| 40|| 5|tianqi| 35|+---+------+---+

查询年龄大于25的人的信息

scala> spark.sql("select * from t_person where age > 25").show()+---+-------+---+| id| name|age|+---+-------+---+| 2| lisi| 29|| 4|zhaoliu| 30|| 5| tianqi| 35|| 6| jerry| 40|+---+-------+---+

Copyright © 2016-2020 www.365daan.com All Rights Reserved. 365答案网 版权所有 备案号:

部分内容来自互联网,版权归原作者所有,如有冒犯请联系我们,我们将在三个工作时内妥善处理。