启动
/home/work/路径/bin/spark-shell --master yarn --num-executors 40 --conf spark.speculation=true --queue 队列名
1 分析落盘文件某域名某时间点下值总和
val path = “/xxxx/*”
val data = sc.textFile(path)
var data1 = data.flatMap(_.split("n"))
var data2 = data1.filter(x=>x.contains(“域名”))
var data3 = data2.filter(x=>x.contains(“时间戳”))
var res = data3.map(x=>x.split("t")).map(x=>(x(0),x(x.length-1).toDouble.toLong))
var res2 = res.reduceByKey(+)
res2.take(10)
Array[(String, Long)] = Array((cdninfo.proc.topic.cnt.node,3499825092))
2 分析afs数据源
import java.text.SimpleDateFormatimport java.util.{Date, Locale}val format = new SimpleDateFormat("[dd/MMM/yyyy:HH:mm:ss Z]", Locale.US)val fm = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")val path = "path1,path2..."val data = sc.sequenceFile[Long, String](path).map(_._2).map(l => l.substring(l.indexOf("t") + 1, l.lastIndexOf("t"))).flatMap(_.split("n")).filter(x => x != null && x.length > 0)val c = data.filter(x => x.contains("xxx域名"))val d = c.filter(x => { val ts = format.parse(x.split("t")(2)).getTime / 1000 // perf日志 if (ts >= 1632816000 && ts <= 1632819600) true else false})val tmp = d.map(x => x.split("t")).filter(x => if (x.length == 41 || x.length == 39) true else false).filter(x => x(6).equals("xxx") || x(4).equals("xxx")).map(x => { if (x.length == 41) { ((format.parse(x(4)).getTime / 1000 / 60 * 60, x(6)), x(9).toDouble.toLong) } else { ((format.parse(x(2)).getTime / 1000 / 60 * 60, x(4)), x(7).toDouble.toLong) }}).reduceByKey(_ + _)val result = tmp.map(x => (fm.format(new Date(x._1._1 * 1000)), x._2 * 8 / 60)).map { case (str, long) => s"$str $long" }result.repartition(1).saveAsTextFile("/输出路径")
val t = d.map(x => x.split("t")).filter(x => if (x.length == 41) true else false))val tmp3 = z.filter(x => {val remoteAddr = x(2).split(":")(0) if (remoteAddr.contains("xxx") || remoteAddr.startsWith("xxx")) false else true}).filter(x => { val ua = x(27) if ((ua.equals("xxx") && x(0).split(":")(0).startsWith("10.")) || ua.contains("xxx")) { false } else true}).filter(x => { val uri = x(7).split(" ")(2) if (uri.startsWith("xxx") || uri.startsWith("xxx")) { false } else true}).filter(x => { // 去重 if (mutableSet.contains(x(23) + "#" + x(40))) { false } else { mutableSet.add(x(23) + "#" + x(40)) true }}).filter(x => { if (x(38).indexOf("xxx") != -1) false else true // 去父层})
3 打点计算
val domain = “xxx”
val path = s"xxx路径*"
val s = sc.textFile(path).filter(x => x != null && x.length > 0)
s.map(x=>x.split("|")(14).toLong).sum