先创建测试数据:
开启zookeeper、hadoop和spark集群
在hdfs中创建spark文件夹,再将本地文件上传过去
[root@hadoop01 data]# hdfs dfs -mkdir /spark[root@hadoop01 data]# hdfs dfs -put /export/data/person.txt /spark
在读取时会出现两个bug,分别是:
Failed to get database global_temp,returning NoSuchObjectException.
Error creating transactional connection factory.
解决如下:
1、需要将hive中conf目录的配置文件hive-site.xml传到spark的conf目录中
2、mysql作为元数据数据库,需要在spark-shell启动时添加驱动,–jars包
spark-shell启动方式:
Spark读取数据源的方式进行创建Dataframe
scala> val personDF = spark.read.text("/spark/person.txt")personDF: org.apache.spark.sql.Dataframe = [value: string]scala> personDF.printSchema()root |-- value: string (nullable = true)
Dataframe的show()方法可以查看当前Dataframe的结果数据
scala> personDF.show()+-------------+ | value|+-------------+|1 zhangsan 20|| 2 lisi 29|| 3 wangwu 25|| 4 zhaoliu 30|| 5 tianqi 35|| 6 jerry 40|+-------------+
RDD的toDF()方法,可以将RDD转换为Dataframe对象
scala> val lineRDD = sc.textFile("/spark/person.txt").map(_.split(" "))lineRDD: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[6] at map at
查看personDF对象的name字段数据
scala> personDF.select(personDF.col("name")).show()+--------+| name|+--------+|zhangsan|| lisi|| wangwu|| zhaoliu|| tianqi|| jerry|+--------+
select()操作可以实现对列名进行重命名
scala> personDF.select(personDF("name").as("username"),personDF("age")).show()+--------+---+|username|age|+--------+---+|zhangsan| 20|| lisi| 29|| wangwu| 25|| zhaoliu| 30|| tianqi| 35|| jerry| 40|+--------+---+
过滤age大于等于25的数据
scala> personDF.filter(personDF("age") >= 25).show()+---+-------+---+| id| name|age|+---+-------+---+| 2| lisi| 29|| 3| wangwu| 25|| 4|zhaoliu| 30|| 5| tianqi| 35|| 6| jerry| 40|+---+-------+---+
按年龄进行分组并统计相同年龄的人数
scala> personDF.groupBy("age").count().show()+---+-----+ |age|count|+---+-----+| 20| 1|| 40| 1|| 35| 1|| 25| 1|| 29| 1|| 30| 1|+---+-----+
按年龄降序排列
scala> personDF.sort(personDF("age").desc).show()+---+--------+---+| id| name|age|+---+--------+---+| 6| jerry| 40|| 5| tianqi| 35|| 4| zhaoliu| 30|| 2| lisi| 29|| 3| wangwu| 25|| 1|zhangsan| 20|+---+--------+---+
使用SQL风格操作的前提是需要将Dataframe注册成一个临时表
scala> personDF.registerTempTable("t_person")warning: there was one deprecation warning; re-run with -deprecation for details
查询年龄最大的前两名人的信息
scala> spark.sql("select * from t_person order by age desc limit 2").show()+---+------+---+| id| name|age|+---+------+---+| 6| jerry| 40|| 5|tianqi| 35|+---+------+---+
查询年龄大于25的人的信息
scala> spark.sql("select * from t_person where age > 25").show()+---+-------+---+| id| name|age|+---+-------+---+| 2| lisi| 29|| 4|zhaoliu| 30|| 5| tianqi| 35|| 6| jerry| 40|+---+-------+---+