欢迎您访问365答案网,请分享给你的朋友!
生活常识 学习资料

SparkSQL&Elasticsearch

时间:2023-06-21
Spark SQL & Elasticsearch

一、读取二、转换三、写入四、适配分析器方案五、示例


一、读取

使用spark-sql读取es数据如下代码所示:

SparkSession session = SparkSession.builder() .appName("esTest") .master("local[*]") .getOrCreate();Dataset dataset = session.read() .format("org.elasticsearch.spark.sql") .option("es.nodes", "192.168.6.3:9200") // 设置参数 .load("test"); // es索引名,多个使用逗号分隔,可以使用*号

注意:

索引名使用多个或通配符时,mapping不能存在冲突,否则会导致读取时失败

存储嵌套结构时,mapping可以嵌套,也可以平铺(字段名包含点号),但数据必须嵌套,不能平铺,否则会导致读取时失败。示例如下。

(1)嵌套mapping

{ "properties": { "ip": { "type": "object", "properties": { "src": { "type": "long" } } } }}

(2)平铺mapping

{ "properties": { "ip.src": { "type": "long" } } }}

(3)嵌套数据

{ "ip": { "src": 123456 }}

(4)平铺数据(不能这样存储)

{ "ip.src": 123456}

若某字段格式为数组,由于mapping不区分数组,会导致访问时失败,解决方式为设置es.read.field.as.array.include参数,值为所有以数组格式存储的字段名,多个以逗号分隔

更多参数参考官网


二、转换

Row一般使用GenericRow或GenericRowWithSchema,从es读取到的为GenericRowWithSchema。

GenericRow:可以使用RowFactory.create(...values)创建。只能使用下标获取字段值,可以通过schema.fieldIndex("fieldName")获取字段下标

GenericRowWithSchema:使用构造方法创建。可以直接使用row.getAs("fieldName")获取字段值

创建schema比较简便的方式如下

StructType schema = DataTypes.createStructType(new StructField[]{ // p1: 字段名 p2: 数据类型 p3: 允许为空 DataTypes.createStructField("ip", DataTypes.LongType, true), // 数组类型 DataTypes.createStructField("ports", DataTypes.createArrayType(DataTypes.IntegerType), true)});

创建Row类型的Encoder比较简便的方式如下

expressionEncoder rowEncoder = RowEncoder.apply(schema);

读数组

WrappedArray ports = row.getAs("ports");

写数组。可以不区分类型,直接写Object数组即可

new Object[]{value1, value2, ...}

读对象

Row ip = row.getAs("ip");Long sip = ip.getAs("src");

写对象

// 带schemanew GenericRowWithSchema(new Object[]{value1, value2, ...}, schema)// 不带schemanew RowFactory.create(value1, value2, ...)


三、写入

使用spark-sql将Dataset写入es如下代码所示:

JavaEsSparkSQL.saveToEs( dataset, "test2", // 索引名 Collections.singletonMap("es.nodes", "192.168.6.3:9200") // 参数设置);

注意:

建议提前创建mapping,参考官网说明
四、适配分析器方案

基本思路如下

使用spark-sql从es读取数据(seaTunnel已支持)使用flatMap算子,将元数据转换为分析数据(analyse阶段)使用groupByKey算子,按id分组使用reduceGroups算子,聚合各组数据(merge阶段)使用map算子,将数据格式转换回Dataset将Dataset保存到es(seaTunnel已支持)

我们需要做的就是开发一个插件,集成第2到5步,通过读取资产配置,生成分析和合并工具,放到flatMap和reduceGroups中执行。目前资产配置已有,分析和合并工具需要做部分改动。


五、示例

业务介绍:

原始数据包含源IP、目的IP、源端口、目的端口统计每个IP出现的次数和对应的所有端口号

原始数据表为test,mapping格式如下

{ "properties": { "sip": { "type": "long" }, "dip": { "type": "long" }, "sp": { "type": "integer" }, "dp": { "type": "integer" } }}

统计数据表为test2,mapping格式如下

{ "properties": { "ip": { "type": "long" }, "ports": { "type": "integer" }, "count": { "type": "integer" } }}

计算代码如下

package com.example.spark;import org.apache.spark.api.java.function.FlatMapFunction;import org.apache.spark.api.java.function.MapFunction;import org.apache.spark.api.java.function.ReduceFunction;import org.apache.spark.sql.*;import org.apache.spark.sql.catalyst.encoders.expressionEncoder;import org.apache.spark.sql.catalyst.encoders.RowEncoder;import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema;import org.apache.spark.sql.types.DataTypes;import org.apache.spark.sql.types.StructField;import org.apache.spark.sql.types.StructType;import org.elasticsearch.spark.sql.api.java.JavaEsSparkSQL;import scala.Tuple2;import scala.collection.mutable.WrappedArray;import java.util.ArrayList;import java.util.Collections;import java.util.List;public class ElasticsearchTest { public static void main(String[] args) { SparkSession session = SparkSession.builder().appName("esTest").master("local[*]").getOrCreate(); StructType schema = DataTypes.createStructType(new StructField[]{ DataTypes.createStructField("ip", DataTypes.LongType, true), DataTypes.createStructField("ports", DataTypes.createArrayType(DataTypes.IntegerType), true), DataTypes.createStructField("count", DataTypes.IntegerType, false) }); expressionEncoder rowEncoder = RowEncoder.apply(schema); // 读取原始数据 Dataset dataset = session.read() .format("org.elasticsearch.spark.sql") .option("es.nodes", "192.168.6.3:9200") .load("test"); // 根据原始数据生成统计数据 Dataset flatMapDataset = dataset.flatMap((FlatMapFunction) row -> { List list = new ArrayList<>(); list.add(new GenericRowWithSchema(new Object[] { row.getAs("sip"), new Object[] {row.getAs("sp")}, 1 }, schema)); list.add(new GenericRowWithSchema(new Object[] { row.getAs("dip"), new Object[] {row.getAs("dp")}, 1 }, schema)); return list.iterator(); }, rowEncoder); // 将统计数据按ip分组 KeyValueGroupedDataset groupDataset = flatMapDataset.groupByKey( (MapFunction) row -> row.getAs("ip"), Encoders.LONG()); // 聚合各组的统计数据 Dataset> reduceDataset = groupDataset.reduceGroups((ReduceFunction) (row1, row2) -> { long ip = row1.getAs("ip"); WrappedArray ports1 = row1.getAs("ports"); WrappedArray ports2 = row2.getAs("ports"); int count1 = row1.getAs("count"); int count2 = row2.getAs("count"); Integer[] ports = new Integer[ports1.length() + ports2.length()]; ports1.copyToArray(ports); ports2.copyToArray(ports, ports1.length()); return RowFactory.create(ip, ports, count1 + count2); }); // 转换数据结构 Dataset mapDataset = reduceDataset.map((MapFunction, Row>) tuple -> tuple._2, rowEncoder); // 写入统计数据 JavaEsSparkSQL.saveToEs(mapDataset, "test2", Collections.singletonMap("es.nodes", "192.168.6.3:9200")); }}

Copyright © 2016-2020 www.365daan.com All Rights Reserved. 365答案网 版权所有 备案号:

部分内容来自互联网,版权归原作者所有,如有冒犯请联系我们,我们将在三个工作时内妥善处理。