欢迎您访问365答案网,请分享给你的朋友!
生活常识 学习资料

常用Spark-Shell记录

时间:2023-06-14

启动

/home/work/路径/bin/spark-shell --master yarn --num-executors 40 --conf spark.speculation=true --queue 队列名

1 分析落盘文件某域名某时间点下值总和

val path = “/xxxx/*”

val data = sc.textFile(path)

var data1 = data.flatMap(_.split("n"))

var data2 = data1.filter(x=>x.contains(“域名”))

var data3 = data2.filter(x=>x.contains(“时间戳”))

var res = data3.map(x=>x.split("t")).map(x=>(x(0),x(x.length-1).toDouble.toLong))

var res2 = res.reduceByKey(+)

res2.take(10)

Array[(String, Long)] = Array((cdninfo.proc.topic.cnt.node,3499825092))

2 分析afs数据源

import java.text.SimpleDateFormatimport java.util.{Date, Locale}val format = new SimpleDateFormat("[dd/MMM/yyyy:HH:mm:ss Z]", Locale.US)val fm = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")val path = "path1,path2..."val data = sc.sequenceFile[Long, String](path).map(_._2).map(l => l.substring(l.indexOf("t") + 1, l.lastIndexOf("t"))).flatMap(_.split("n")).filter(x => x != null && x.length > 0)val c = data.filter(x => x.contains("xxx域名"))val d = c.filter(x => { val ts = format.parse(x.split("t")(2)).getTime / 1000 // perf日志 if (ts >= 1632816000 && ts <= 1632819600) true else false})val tmp = d.map(x => x.split("t")).filter(x => if (x.length == 41 || x.length == 39) true else false).filter(x => x(6).equals("xxx") || x(4).equals("xxx")).map(x => { if (x.length == 41) { ((format.parse(x(4)).getTime / 1000 / 60 * 60, x(6)), x(9).toDouble.toLong) } else { ((format.parse(x(2)).getTime / 1000 / 60 * 60, x(4)), x(7).toDouble.toLong) }}).reduceByKey(_ + _)val result = tmp.map(x => (fm.format(new Date(x._1._1 * 1000)), x._2 * 8 / 60)).map { case (str, long) => s"$str $long" }result.repartition(1).saveAsTextFile("/输出路径")


val t = d.map(x => x.split("t")).filter(x => if (x.length == 41) true else false))val tmp3 = z.filter(x => {val remoteAddr = x(2).split(":")(0) if (remoteAddr.contains("xxx") || remoteAddr.startsWith("xxx")) false else true}).filter(x => { val ua = x(27) if ((ua.equals("xxx") && x(0).split(":")(0).startsWith("10.")) || ua.contains("xxx")) { false } else true}).filter(x => { val uri = x(7).split(" ")(2) if (uri.startsWith("xxx") || uri.startsWith("xxx")) { false } else true}).filter(x => { // 去重 if (mutableSet.contains(x(23) + "#" + x(40))) { false } else { mutableSet.add(x(23) + "#" + x(40)) true }}).filter(x => { if (x(38).indexOf("xxx") != -1) false else true // 去父层})

3 打点计算

val domain = “xxx”

val path = s"xxx路径*"

val s = sc.textFile(path).filter(x => x != null && x.length > 0)

s.map(x=>x.split("|")(14).toLong).sum

Copyright © 2016-2020 www.365daan.com All Rights Reserved. 365答案网 版权所有 备案号:

部分内容来自互联网,版权归原作者所有,如有冒犯请联系我们,我们将在三个工作时内妥善处理。