欢迎您访问365答案网,请分享给你的朋友!
生活常识 学习资料

98.StreamSets实时采集Kafka

时间:2023-06-13
98.1 演示环境介绍

已安装Kafka并正常运行未启用KerberosRedHat版本:7.4CM和CDH版本:cdh5.13.3kafka版本:3.0.0(0.11.0)Kudu版本:1.5.0 98.2 操作演示

1.准备测试环境

创建测试topic

kafka-topics --create --zookeeper master.gzyh.com:2181,cdh01.gzyh.com:2181,cdh02.gzyh.com:2181 --replication-factor 3 --partitions 3 --topic kafka2kudu_topic

通过Hue使用Impala创建一个Kudu表:

CREATE TABLE ods_deal_daily_kudu ( id STRING COMPRESSION snappy, name STRING COMPRESSION snappy, sex STRING COMPRESSION snappy, city STRING COMPRESSION snappy, occupation STRING COMPRESSION snappy, mobile_phone_num STRING COMPRESSION snappy, fix_phone_num STRING COMPRESSION snappy, bank_name STRING COMPRESSION snappy, address STRING COMPRESSION snappy, marriage STRING COMPRESSION snappy, child_num INT COMPRESSION snappy, PRIMARY KEY (id)) PARTITION BY HASH PARTITIONS 16STORED AS KUDU TBLPROPERTIES ('kudu.master_addresses'='master.gzyh.com');

2.生产Kafka消息

创建Maven工程,工程的pom.xml文件:

cdh-project com.cloudera 1.0-SNAPSHOT 4.0.0 kafka-demo jar kafka-demo http://maven.apache.org UTF-8 org.apache.kafka kafka-clients 0.10.2.0 net.sf.json-lib json-lib 2.4 jdk15

编写ReadFileToKafka.java文件:

package com.cloudera.nokerberos;import net.sf.json.JSONObject;import org.apache.kafka.clients.producer.KafkaProducer;import org.apache.kafka.clients.producer.Producer;import org.apache.kafka.clients.producer.ProducerConfig;import org.apache.kafka.clients.producer.ProducerRecord;import java.io.*;import java.util.HashMap;import java.util.Map;import java.util.Properties;public class ReadFileToKafka { public static String confPath = System.getProperty("user.dir") + File.separator + "conf"; public static void main(String[] args) { if(args.length < 1) { System.out.print("缺少输入参数,请指定要处理的text文件"); System.exit(1); } String filePath = args[0]; BufferedReader reader = null; try { Properties appProperties = new Properties(); appProperties.load(new FileInputStream(new File(confPath + File.separator + "app.properties"))); String brokerlist = String.valueOf(appProperties.get("bootstrap.servers")); String topic_name = String.valueOf(appProperties.get("topic.name")); Properties props = getKafkaProps(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerlist); Producer producer = new KafkaProducer(props); reader = new BufferedReader(new FileReader(filePath)); String tempString = null; int line = 1; // 一次读入一行,直到读入null为文件结束 while ((tempString = reader.readLine()) != null) { String detailJson = parseJSON(tempString); ProducerRecord record = new ProducerRecord(topic_name, detailJson); producer.send(record); line++; } reader.close(); producer.flush(); producer.close(); } catch (Exception e) { e.printStackTrace(); } finally { if (reader != null) { try { reader.close(); } catch (IOException e1) { } } } } private static String parseJSON(String tempString) { if(tempString != null && tempString.length() > 0) { Map resultMap = null; String[] detail = tempString.split("01"); resultMap = new HashMap<>(); resultMap.put("id", detail[0]); resultMap.put("name", detail[1]); resultMap.put("sex", detail[2]); resultMap.put("city", detail[3]); resultMap.put("occupation", detail[4]); resultMap.put("mobile_phone_num", detail[5]); resultMap.put("fix_phone_num", detail[6]); resultMap.put("bank_name", detail[7]); resultMap.put("address", detail[8]); resultMap.put("marriage", detail[9]); resultMap.put("child_num", detail[10]); return JSONObject.fromObject(resultMap).toString(); } return null; } private static Properties getKafkaProps() { try{ Properties props = new Properties(); props.put(ProducerConfig.ACKS_CONFIG, "all"); props.put(ProducerConfig.BATCH_SIZE_CONFIG, 1000); //批量发送消息 props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); return props; } catch (Exception e) { e.printStackTrace(); } return null; }}

将编写好的代码使用mvn命令打包

在工程目录使用mvn cleanpackage命令进行编译打包 编写脚本run.sh脚本运行jar包

run.sh脚本:

[root@master kafka-run]# vim run.sh #!/bin/bash########################################## 创建Topic# kafka-topics --create --zookeeper master.gzyh.com:2181,cdh01.gzyh.com:2181,cdh02.gzyh.com:2181 --replication-factor 3 --partitions 3 --topic ods_deal_daily_topic########################################JAVA_HOME=/usr/java/jdk1.8.0_131#要读取的文件read_file=$1for file in `ls lib/*jar`do CLASSPATH=$CLASSPATH:$filedoneexport CLASSPATH${JAVA_HOME}/bin/java -Xms1024m -Xmx2048m com.cloudera.nokerberos.ReadFileToKafka $read_file

conf目录下的配置文件app.properties:

[root@master kafka-run]# vim conf/app.properties bootstrap.servers=cdh01.gzyh.com:9092,cdh02.gzyh.com:9092,cdh03.gzyh.com:9092topic.name=ods_deal_daily_topic

依赖包可以在命令行使用mvn命令导出:

mvn dependency:copy-dependencies -DoutputDirectory=/tmp/lib

3.创建Pipline

在StreamSets创建一个kafka2kudu的PiplinePipline流程中添加Kafka Consumer作为源并配置Kafka基础信息
配置Kafka相关信息,如Broker、ZK及Topic
配置数据格式化方式,写入Kafka的数据为JSON格式,所以这里选择JSON
添加Kudu模块及配置基本信息
配置Kudu的Master、Table、Operation等

Kudu Masters:可以配置多个,多个地址以“,”分割Table Name:如果使用Impala创建的Kudu表则需要添加impala::前缀Field to ColumnMapping:配置Json中key与Kudu表的column的映射关系,如果字段名称一致则不需要配置。DefaultOpertation:设置操作类型如:insert、upsert、delete
Kudu模块高级配置使用默认配置

4.验证启动kafka2kudu的Pipline
运行run.sh脚本向Kafka发送消息

[root@master kafka-run]# sh run.sh ods_user_600.txt

向Kafka发送消息
查看监控信息
查看Kudu的ods_deal_daily_kudu表内容
入库的数据总条数

可以看到ods_deal_daily_kudu表的总条数与准备的测试数据量一致

大数据视频推荐:
CSDN
大数据语音推荐:
企业级大数据技术应用
大数据机器学习案例之推荐系统
自然语言处理
大数据基础
人工智能:深度学习入门到精通

Copyright © 2016-2020 www.365daan.com All Rights Reserved. 365答案网 版权所有 备案号:

部分内容来自互联网,版权归原作者所有,如有冒犯请联系我们,我们将在三个工作时内妥善处理。