一、读取二、转换三、写入四、适配分析器方案五、示例
一、读取
使用spark-sql读取es数据如下代码所示:
SparkSession session = SparkSession.builder() .appName("esTest") .master("local[*]") .getOrCreate();Dataset
注意:
索引名使用多个或通配符时,mapping不能存在冲突,否则会导致读取时失败
存储嵌套结构时,mapping可以嵌套,也可以平铺(字段名包含点号),但数据必须嵌套,不能平铺,否则会导致读取时失败。示例如下。
(1)嵌套mapping
{ "properties": { "ip": { "type": "object", "properties": { "src": { "type": "long" } } } }}
(2)平铺mapping
{ "properties": { "ip.src": { "type": "long" } } }}
(3)嵌套数据
{ "ip": { "src": 123456 }}
(4)平铺数据(不能这样存储)
{ "ip.src": 123456}
若某字段格式为数组,由于mapping不区分数组,会导致访问时失败,解决方式为设置es.read.field.as.array.include参数,值为所有以数组格式存储的字段名,多个以逗号分隔
更多参数参考官网
二、转换
Row一般使用GenericRow或GenericRowWithSchema,从es读取到的为GenericRowWithSchema。
GenericRow:可以使用RowFactory.create(...values)创建。只能使用下标获取字段值,可以通过schema.fieldIndex("fieldName")获取字段下标
GenericRowWithSchema:使用构造方法创建。可以直接使用row.getAs("fieldName")获取字段值
创建schema比较简便的方式如下
StructType schema = DataTypes.createStructType(new StructField[]{ // p1: 字段名 p2: 数据类型 p3: 允许为空 DataTypes.createStructField("ip", DataTypes.LongType, true), // 数组类型 DataTypes.createStructField("ports", DataTypes.createArrayType(DataTypes.IntegerType), true)});
创建Row类型的Encoder比较简便的方式如下
expressionEncoder
读数组
WrappedArray
写数组。可以不区分类型,直接写Object数组即可
new Object[]{value1, value2, ...}
读对象
Row ip = row.getAs("ip");Long sip = ip.getAs("src");
写对象
// 带schemanew GenericRowWithSchema(new Object[]{value1, value2, ...}, schema)// 不带schemanew RowFactory.create(value1, value2, ...)
三、写入
使用spark-sql将Dataset写入es如下代码所示:
JavaEsSparkSQL.saveToEs( dataset, "test2", // 索引名 Collections.singletonMap("es.nodes", "192.168.6.3:9200") // 参数设置);
注意:
建议提前创建mapping,参考官网说明四、适配分析器方案
基本思路如下
使用spark-sql从es读取数据(seaTunnel已支持)使用flatMap算子,将元数据转换为分析数据(analyse阶段)使用groupByKey算子,按id分组使用reduceGroups算子,聚合各组数据(merge阶段)使用map算子,将数据格式转换回Dataset我们需要做的就是开发一个插件,集成第2到5步,通过读取资产配置,生成分析和合并工具,放到flatMap和reduceGroups中执行。目前资产配置已有,分析和合并工具需要做部分改动。
五、示例
业务介绍:
原始数据包含源IP、目的IP、源端口、目的端口统计每个IP出现的次数和对应的所有端口号原始数据表为test,mapping格式如下
{ "properties": { "sip": { "type": "long" }, "dip": { "type": "long" }, "sp": { "type": "integer" }, "dp": { "type": "integer" } }}
统计数据表为test2,mapping格式如下
{ "properties": { "ip": { "type": "long" }, "ports": { "type": "integer" }, "count": { "type": "integer" } }}
计算代码如下
package com.example.spark;import org.apache.spark.api.java.function.FlatMapFunction;import org.apache.spark.api.java.function.MapFunction;import org.apache.spark.api.java.function.ReduceFunction;import org.apache.spark.sql.*;import org.apache.spark.sql.catalyst.encoders.expressionEncoder;import org.apache.spark.sql.catalyst.encoders.RowEncoder;import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema;import org.apache.spark.sql.types.DataTypes;import org.apache.spark.sql.types.StructField;import org.apache.spark.sql.types.StructType;import org.elasticsearch.spark.sql.api.java.JavaEsSparkSQL;import scala.Tuple2;import scala.collection.mutable.WrappedArray;import java.util.ArrayList;import java.util.Collections;import java.util.List;public class ElasticsearchTest { public static void main(String[] args) { SparkSession session = SparkSession.builder().appName("esTest").master("local[*]").getOrCreate(); StructType schema = DataTypes.createStructType(new StructField[]{ DataTypes.createStructField("ip", DataTypes.LongType, true), DataTypes.createStructField("ports", DataTypes.createArrayType(DataTypes.IntegerType), true), DataTypes.createStructField("count", DataTypes.IntegerType, false) }); expressionEncoder