欢迎您访问365答案网,请分享给你的朋友!
生活常识 学习资料

101.Spark2Streaming在Kerberos环境下的读写

时间:2023-05-01
101.1 演示环境介绍

CM版本:5.14.3CDH版本:5.14.2CDK版本:2.2.0Apache Kafka版本:0.10.2SPARK版本:2.2.0Redhat版本:7.3已启用Kerberos,用root用户进行操作 101.2 操作演示

1.准备环境

使用xst命令导出keytab文件,准备访问Kafka的Keytab文件

[root@cdh01 ~]# kadmin.local Authenticating as principal hbase/admin@FAYSON.COM with password.kadmin.local: xst -norandkey -k fayson.keytab fayson@FAYSON.COM

用klist命令检查导出的keytab文件是否正确

[root@cdh01 ~]# klist -ek fayson.keytab

jaas.cof文件内容

把fayson.keytab和jaas.conf文件拷贝至集群的所有节点统一的/data/disk1/0286-kafka-shell/conf目录下

KafkaClient { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true keyTab="/data/disk1/0286-kafka-shell/conf/fayson.keytab" principal="fayson@FAYSON.COM";};Client { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab="/data/disk1/0286-kafka-shell/conf/fayson.keytab" principal="fayson@FAYSON.COM";};

根据需求将conf下面的配置文件修改为自己集群的环境即可,发送至Kafka的JSON数据示例如下:

{ "occupation": "劳动者、运输工作和部分体力生产工作", "address": "山东东三路18号-6-6", "city": "长江", "marriage": "1", "sex": "1", "name": "魏淑芬", "mobile_phone_num": "13508268580", "bank_name": "广发银行32", "id": "510105198906185189", "child_num": "1", "fix_phone_num": "16004180180"}

把SPARK2f服务的配置项将spark_kafka_version的kafka版本修改为0.10
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-93QNI2lp-1644806720962)(https://upload-images.jianshu.io/upload_images/19745945-65901f44f676f946.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)]

2.SparkStreaming开发

pom.xml依赖

使用maven创建scala语言的spark2demo

org.apache.kudu kudu-spark2_2.11 1.6.0-cdh5.14.2 org.apache.kudu kudu-client 1.6.0-cdh5.14.2 org.apache.spark spark-core_2.11 2.2.0.cloudera2 org.apache.spark spark-sql_2.11 2.2.0.cloudera2 org.apache.spark spark-streaming_2.11 2.2.0.cloudera2 org.apache.spark spark-streaming-kafka-0-10_2.11 2.2.0.cloudera2 org.scala-lang scala-library 2.11.8

在resources下创建0288.properties配置文件

kafka.brokers=cdh02.fayson.com:9092,cdh03.fayson.com:9092,cdh04.fayson.com:9092kafka.topics=Kafka_kudu_topickudumaster.list=cdh01.fayson.com,cdh02.fayson.com,cdh03.fayson.com

创建Kafka2Spark2Kudu.scala文件

package com.cloudera.streamingimport java.io.{File, FileInputStream}import java.util.Propertiesimport org.apache.commons.lang.StringUtilsimport org.apache.kafka.common.serialization.StringDeserializerimport org.apache.kudu.client.CreateTableOptionsimport org.apache.kudu.spark.kudu.KuduContextimport org.apache.log4j.{Level, Logger}import org.apache.spark.sql.SparkSessionimport org.apache.spark.sql.types.{StringType, StructField, StructType}import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}import org.apache.spark.streaming.{Seconds, StreamingContext}import org.apache.spark.{SparkConf}import scala.collection.JavaConverters._import scala.util.parsing.json.JSONobject Kafka2Spark2Kudu { Logger.getLogger("com").setLevel(Level.ERROR) //设置日志级别 var confPath: String = System.getProperty("user.dir") + File.separator + "conf/0288.properties" val userInfoSchema = StructType( // col name type nullable? StructField("id", StringType , false) :: StructField("name" , StringType, true ) :: StructField("sex" , StringType, true ) :: StructField("city" , StringType, true ) :: StructField("occupation" , StringType, true ) :: StructField("tel" , StringType, true ) :: StructField("fixPhoneNum" , StringType, true ) :: StructField("bankName" , StringType, true ) :: StructField("address" , StringType, true ) :: StructField("marriage" , StringType, true ) :: StructField("childNum", StringType , true ) :: Nil ) case class UserInfo ( id: String, name: String, sex: String, city: String, occupation: String, tel: String, fixPhoneNum: String, bankName: String, address: String, marriage: String, childNum: String ) def main(args: Array[String]): Unit = { //加载配置文件 val properties = new Properties() val file = new File(confPath) if(!file.exists()) { System.out.println(Kafka2Spark2Kudu.getClass.getClassLoader.getResource("0288.properties")) val in = Kafka2Spark2Kudu.getClass.getClassLoader.getResourceAsStream("0288.properties") properties.load(in); } else { properties.load(new FileInputStream(confPath)) } val brokers = properties.getProperty("kafka.brokers") val topics = properties.getProperty("kafka.topics") val kuduMaster = properties.getProperty("kudumaster.list") println("kafka.brokers:" + brokers) println("kafka.topics:" + topics) println("kudu.master:" + kuduMaster) if(StringUtils.isEmpty(brokers)|| StringUtils.isEmpty(topics) || StringUtils.isEmpty(kuduMaster)) { println("未配置Kafka和KuduMaster信息") System.exit(0) } val topicsSet = topics.split(",").toSet val spark = SparkSession.builder().appName("Kafka2Spark2Kudu-kerberos").config(new SparkConf()).getOrCreate() val ssc = new StreamingContext(spark.sparkContext, Seconds(5)) //设置Spark时间窗口,每5s处理一次 val kafkaParams = Map[String, Object]("bootstrap.servers" -> brokers , "auto.offset.reset" -> "latest" , "security.protocol" -> "SASL_PLAINTEXT" , "sasl.kerberos.service.name" -> "kafka" , "key.deserializer" -> classOf[StringDeserializer] , "value.deserializer" -> classOf[StringDeserializer] , "group.id" -> "testgroup" ) val dStream = KafkaUtils.createDirectStream[String, String](ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String](topicsSet, kafkaParams)) //引入隐式 import spark.implicits._ val kuduContext = new KuduContext(kuduMaster, spark.sparkContext) //判断表是否存在 if(!kuduContext.tableExists("user_info")) { println("create Kudu Table :{user_info}") val createTableOptions = new CreateTableOptions() createTableOptions.addHashPartitions(List("id").asJava, 8).setNumReplicas(3) kuduContext.createTable("user_info", userInfoSchema, Seq("id"), createTableOptions) } dStream.foreachRDD(rdd => { //将rdd数据重新封装为Rdd[UserInfo] val newrdd = rdd.map(line => { val jsonObj = JSON.parseFull(line.value()) val map:Map[String,Any] = jsonObj.get.asInstanceOf[Map[String, Any]] new UserInfo( map.get("id").get.asInstanceOf[String], map.get("name").get.asInstanceOf[String], map.get("sex").get.asInstanceOf[String], map.get("city").get.asInstanceOf[String], map.get("occupation").get.asInstanceOf[String], map.get("mobile_phone_num").get.asInstanceOf[String], map.get("fix_phone_num").get.asInstanceOf[String], map.get("bank_name").get.asInstanceOf[String], map.get("address").get.asInstanceOf[String], map.get("marriage").get.asInstanceOf[String], map.get("child_num").get.asInstanceOf[String] ) }) //将RDD转换为Dataframe val userinfoDF = spark.sqlContext.createDataframe(newrdd) kuduContext.upsertRows(userinfoDF, "user_info") }) ssc.start() ssc.awaitTermination() }}

使用mvn命令编译工程

由于是scala工程编译时mvn命令要加scala:compile

mvn clean scala:compile package

将编译好的spark2-demo-1.0-SNAPSHOT.jar包上传至服务

在conf目录下新增0288.properties配置文件

3.运行

用spark2-submit命令向集群提交SparkStreaming作业

spark2-submit --class com.cloudera.streaming.Kafka2Spark2Kudu --master yarn --deploy-mode client --executor-memory 2g --executor-cores 2 --driver-memory 2g --num-executors 2 --queue default --principal fayson@FAYSON.COM --keytab /data/disk1/0286-kafka-shell/conf/fayson.keytab --driver-java-options "-Djava.security.auth.login.config=/data/disk1/0286-kafka-shell/conf/jaas.conf" --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=/data/disk1/0286-kafka-shell/conf/jaas.conf" spark2-demo-1.0-SNAPSHOT.jar

通过CM查看作业是否提交成功通过Kudu Master的管理界面可以看到user_info表已创建

点击Table Id列进入user_info表详情页,获取Impala的建表语句:

CREATE EXTERNAL TABLE `user_info` STORED AS KUDUTBLPROPERTIES( 'kudu.table_name' = 'user_info', 'kudu.master_addresses' = 'cdh01.fayson.com:7051,cdh02.fayson.com:7051,cdh03.fayson.com:7051')

运行脚本向Kafka的Kafka_kudu_topic生产消息登录Hue在Impala中执行上面的建表语句
执行Select查询user_info表中数据,数据已成功入库

4.总结Spark2默认的kafka版本为0.9需要通过CM将默认的Kafka版本修改为0.10在scala代码中访问Kafka是也一样需要添加Kerberos相关的配置security.protocol和sasl.kerberos.service.name参数jaas.conf文件Fayson通过spark2-submit的方式指定,jaas.conf文件及keytab需要在集群的所有节点存在,因为Driver和Executor是随机在集群的节点上启动的在/opt/cloudera/parcels/SPARK2/lib/spark2/jars目录下需要检查下是否有其它版本的spark-streaming-kafka的依赖包,如果存在需要删除,否则会出现版本冲突问题

大数据视频推荐:
CSDN
大数据语音推荐:
企业级大数据技术应用
大数据机器学习案例之推荐系统
自然语言处理
大数据基础
人工智能:深度学习入门到精通

Copyright © 2016-2020 www.365daan.com All Rights Reserved. 365答案网 版权所有 备案号:

部分内容来自互联网,版权归原作者所有,如有冒犯请联系我们,我们将在三个工作时内妥善处理。