欢迎您访问365答案网,请分享给你的朋友!
生活常识 学习资料

pyspark第五章共享变量

时间:2023-07-26


from unittest import resultfrom pyspark import SparkConf,SparkContextimport json# /opt/module/spark/bin/spark-submit /opt/Code/broadcast.pyif __name__ == '__main__': conf = SparkConf().setAppName("WorldCount").setMaster("local[*]") sc = SparkContext(conf=conf) stu_info_list = [(1,'张大仙',11), (2,'王晓晓',13), (3,'张甜甜',11), (4,'王大力',11), ] broadcast = sc.broadcast(stu_info_list) score_info_rdd = sc.parallelize([(1,'语文',99), (2,'语文',89), (3,'语文',79), (4,'语文',69) ]) def map_func(data): id = data[0] name = '' value = broadcast.value for i in value: if id == i[0]: name = i[1] return (name,data[1],data[2]) print(score_info_rdd.map(map_func).collect())


from unittest import resultfrom pyspark import SparkConf,SparkContextimport json# /opt/module/spark/bin/spark-submit /opt/Code/broadcast.pyif __name__ == '__main__': conf = SparkConf().setAppName("WorldCount").setMaster("local[*]") sc = SparkContext(conf=conf) rdd = sc.parallelize([1,2,3,4,5,6,7,8,9,10],2) count = 0 def map_func(data): global count count += 1 print(count) rdd.map(map_func).collect() print(count)


from unittest import resultfrom pyspark import SparkConf,SparkContextimport json# /opt/module/spark/bin/spark-submit /opt/Code/broadcast.pyif __name__ == '__main__': conf = SparkConf().setAppName("WorldCount").setMaster("local[*]") sc = SparkContext(conf=conf) rdd = sc.parallelize([1,2,3,4,5,6,7,8,9,10],2) acmlt = sc.accumulator(0) def map_func(data): global acmlt acmlt += 1 print(acmlt) rdd.map(map_func).collect() print(acmlt)


from unittest import resultfrom pyspark import SparkConf,SparkContextimport json# /opt/module/spark/bin/spark-submit /opt/Code/broadcast.pyif __name__ == '__main__': conf = SparkConf().setAppName("WorldCount").setMaster("local[*]") sc = SparkContext(conf=conf) rdd = sc.parallelize([1,2,3,4,5,6,7,8,9,10],2) acmlt = sc.accumulator(0) def map_func(data): global acmlt acmlt += 1 print(acmlt) rdd2 = rdd.map(map_func) rdd2.cache() rdd2.collect() rdd3 = rdd2.map(lambda x:x) rdd3.collect() print(acmlt)


from unittest import resultfrom pyspark import SparkConf,SparkContextimport jsonimport refrom operator import add# /opt/module/spark/bin/spark-submit /opt/Code/broadcast.pyif __name__ == '__main__': conf = SparkConf().setAppName("WorldCount").setMaster("local[*]") sc = SparkContext(conf=conf) file_rdd = sc.textFile('file:///opt/Data/accumulator_broadcast_data.txt') line_rdd = file_rdd.filter(lambda line:line.strip()) # 过滤空行 data_rdd = line_rdd.map(lambda x:x.strip()) # 将前后空格去除 words_rdd = data_rdd.flatMap(lambda x:re.split('s+',x)) abnormal_char = [',','.','!','#','$','%'] broadcast = sc.broadcast(abnormal_char) acmlt = sc.accumulator(0) def filter_func(data): global acmlt abnormal_char = broadcast.value if data in abnormal_char: acmlt += 1 return False else: return True normal_words_rdd =words_rdd.filter(filter_func) result_rdd = normal_words_rdd.map(lambda x: (x,1)).reduceByKey(add) print('正常单词计数结果: ',result_rdd.collect()) print('特殊字符数量: ',acmlt)


相关资料:
黑马全网首发PySpark视频教程
链接:https://pan.baidu.com/s/1AY7FmO6w-jCAj7t_tD9oGw
提取码:1234

Copyright © 2016-2020 www.365daan.com All Rights Reserved. 365答案网 版权所有 备案号:

部分内容来自互联网,版权归原作者所有,如有冒犯请联系我们,我们将在三个工作时内妥善处理。