欢迎您访问365答案网,请分享给你的朋友!
生活常识 学习资料

编程实现Spark的WordCount的单词统计

时间:2023-08-18
需求一:使用PyCharm编程实现SparkCore的WordCount单词统计,并保存在HDFS中

from pyspark import SparkConf,SparkContextimport osos.environ['SPARK_HOME'] = '/export/server/spark'PYSPARK_PYTHON = "/root/anaconda3/bin/python3.8"# 当存在多个版本时,不指定很可能会导致出错os.environ["PYSPARK_PYTHON"] = PYSPARK_PYTHONos.environ["PYSPARK_DRIVER_PYTHON"] = PYSPARK_PYTHONif __name__ == '__main__': # 1.创建上下文,指定应用的名字和用谁的资源来跑 conf=SparkConf().setAppName("first_wordcount").setMaster("local[*]") sc = SparkContext(conf=conf) # 2.加载words.txt文件形成一个RDD,RDD的每个元素是文本的每一行 rdd1=sc.textFile('file://file:///export/servers/data/words.txt') # 过滤空行 rdd1 = rdd1.filter(lambda line: len(line.strip()) > 0) # 3.进一步将文本内容打散成单词 rdd2=rdd1.flatMap(lambda line:line.split(" ")) # 4.为每个单词标记上1,形成一个元组,具有键值对数据结构,方便做bykey的操作 rdd3=rdd2.map(lambda word:(word,1)) # 5.进一步做reduceByKey,得到wordcount结果 rdd4=rdd3.reduceByKey(lambda x,y:x+y) # 6.结果打印到控制台 arr=rdd4.collect() print('wordcount结果是:',arr) # 7.结果输出到本地文件 rdd4.saveAsTextFile("hdfs://node1:8020/output/file1")

需求二:使用PyCharm编程实现SparkSQL的DSL和SQL方式WordCount单词统计

 

from pyspark.sql import SparkSession,Rowfrom pyspark.sql.types import *import pyspark.sql.functions as Fimport osos.environ['SPARK_HOME'] = '/export/server/spark'PYSPARK_PYTHON = "/root/anaconda3/bin/python3.8"# 当存在多个版本时,不指定很可能会导致出错os.environ["PYSPARK_PYTHON"] = PYSPARK_PYTHONos.environ["PYSPARK_DRIVER_PYTHON"] = PYSPARK_PYTHONif __name__ == '__main__': # 1.创建上下文对象 spark = SparkSession.builder.appName('test').master('local[*]').getOrCreate() file_df=spark.read.text('file:///export/pyworkspace/pyspark_sz26/pyspark-sparksql-3.1.2/data/words.txt') file_df.printSchema() file_df.show(truncate=False) # 3.注册成临时表 file_df.createOrReplaceTempView('words_t') # 4.做wordcount print('SQL风格做wordcount') spark.sql(''' select t.word, count(*) as cnt from (select explode(split(value,' ')) as word from words_t) t group by t.word order by cnt desc ''').show() print('DSL做wordcount') file_df.select(F.explode(F.split('value',' ')).alias('word')) .groupBy('word') .count() .orderBy('count',ascending=False) .show() spark.stop()

 需求三:使用PySpark读取json数据格式,多种方式查询字段并进行统计分析

 

from pyspark.sql import SparkSession, Rowfrom pyspark.sql.types import *import osos.environ['SPARK_HOME'] = '/export/server/spark'PYSPARK_PYTHON = "/root/anaconda3/bin/python3.8"# 当存在多个版本时,不指定很可能会导致出错os.environ["PYSPARK_PYTHON"] = PYSPARK_PYTHONos.environ["PYSPARK_DRIVER_PYTHON"] = PYSPARK_PYTHONif __name__ == '__main__': # 1-创建上下文对象 spark = SparkSession.builder.appName('test').master('local[*]').getOrCreate() sc = spark.sparkContext df = spark.read.json('file:///export/pyworkspace/pyspark_sz26/pyspark-sparksql-3.1.2/data/employee.json') # (1)查询所有数据; df.show() # (2)查询所有数据,并去除重复的数据; df.distinct().show() # (3)查询所有数据,打印时去除id字段; df.drop('id').show() # (4)筛选出age>30的记录; df.where('age>30').show() # (5)将数据按age分组; df.groupBy('age').count().show() # (6)将数据按name升序排列; df.orderBy('name').show() # (7)取出前3行数据; print(df.take(3)) # (8)查询所有记录的name列,并为其取别名为username; df.select(df.name.alias('username')).show() # (9)查询年龄age的平均值; from pyspark.sql.functions import avg df.agg(avg('age')).show() # (10)查询年龄age的最小值。 from pyspark.sql.functions import min df.agg(min('age')).show()

Copyright © 2016-2020 www.365daan.com All Rights Reserved. 365答案网 版权所有 备案号:

部分内容来自互联网,版权归原作者所有,如有冒犯请联系我们,我们将在三个工作时内妥善处理。