已安装Kafka并正常运行未启用KerberosRedHat版本:7.4CM和CDH版本:cdh5.13.3kafka版本:3.0.0(0.11.0)Kudu版本:1.5.0 98.2 操作演示
1.准备测试环境
创建测试topic
kafka-topics --create --zookeeper master.gzyh.com:2181,cdh01.gzyh.com:2181,cdh02.gzyh.com:2181 --replication-factor 3 --partitions 3 --topic kafka2kudu_topic
通过Hue使用Impala创建一个Kudu表:
CREATE TABLE ods_deal_daily_kudu ( id STRING COMPRESSION snappy, name STRING COMPRESSION snappy, sex STRING COMPRESSION snappy, city STRING COMPRESSION snappy, occupation STRING COMPRESSION snappy, mobile_phone_num STRING COMPRESSION snappy, fix_phone_num STRING COMPRESSION snappy, bank_name STRING COMPRESSION snappy, address STRING COMPRESSION snappy, marriage STRING COMPRESSION snappy, child_num INT COMPRESSION snappy, PRIMARY KEY (id)) PARTITION BY HASH PARTITIONS 16STORED AS KUDU TBLPROPERTIES ('kudu.master_addresses'='master.gzyh.com');
2.生产Kafka消息
创建Maven工程,工程的pom.xml文件:
编写ReadFileToKafka.java文件:
package com.cloudera.nokerberos;import net.sf.json.JSONObject;import org.apache.kafka.clients.producer.KafkaProducer;import org.apache.kafka.clients.producer.Producer;import org.apache.kafka.clients.producer.ProducerConfig;import org.apache.kafka.clients.producer.ProducerRecord;import java.io.*;import java.util.HashMap;import java.util.Map;import java.util.Properties;public class ReadFileToKafka { public static String confPath = System.getProperty("user.dir") + File.separator + "conf"; public static void main(String[] args) { if(args.length < 1) { System.out.print("缺少输入参数,请指定要处理的text文件"); System.exit(1); } String filePath = args[0]; BufferedReader reader = null; try { Properties appProperties = new Properties(); appProperties.load(new FileInputStream(new File(confPath + File.separator + "app.properties"))); String brokerlist = String.valueOf(appProperties.get("bootstrap.servers")); String topic_name = String.valueOf(appProperties.get("topic.name")); Properties props = getKafkaProps(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerlist); Producer
将编写好的代码使用mvn命令打包
在工程目录使用mvn cleanpackage命令进行编译打包 编写脚本run.sh脚本运行jar包
run.sh脚本:
[root@master kafka-run]# vim run.sh #!/bin/bash########################################## 创建Topic# kafka-topics --create --zookeeper master.gzyh.com:2181,cdh01.gzyh.com:2181,cdh02.gzyh.com:2181 --replication-factor 3 --partitions 3 --topic ods_deal_daily_topic########################################JAVA_HOME=/usr/java/jdk1.8.0_131#要读取的文件read_file=$1for file in `ls lib/*jar`do CLASSPATH=$CLASSPATH:$filedoneexport CLASSPATH${JAVA_HOME}/bin/java -Xms1024m -Xmx2048m com.cloudera.nokerberos.ReadFileToKafka $read_file
conf目录下的配置文件app.properties:
[root@master kafka-run]# vim conf/app.properties bootstrap.servers=cdh01.gzyh.com:9092,cdh02.gzyh.com:9092,cdh03.gzyh.com:9092topic.name=ods_deal_daily_topic
依赖包可以在命令行使用mvn命令导出:
mvn dependency:copy-dependencies -DoutputDirectory=/tmp/lib
3.创建Pipline
在StreamSets创建一个kafka2kudu的PiplinePipline流程中添加Kafka Consumer作为源并配置Kafka基础信息
配置Kafka相关信息,如Broker、ZK及Topic
配置数据格式化方式,写入Kafka的数据为JSON格式,所以这里选择JSON
添加Kudu模块及配置基本信息
配置Kudu的Master、Table、Operation等
Kudu Masters:可以配置多个,多个地址以“,”分割Table Name:如果使用Impala创建的Kudu表则需要添加impala::前缀Field to ColumnMapping:配置Json中key与Kudu表的column的映射关系,如果字段名称一致则不需要配置。DefaultOpertation:设置操作类型如:insert、upsert、delete
Kudu模块高级配置使用默认配置
4.验证启动kafka2kudu的Pipline
运行run.sh脚本向Kafka发送消息
[root@master kafka-run]# sh run.sh ods_user_600.txt
向Kafka发送消息
查看监控信息
查看Kudu的ods_deal_daily_kudu表内容
入库的数据总条数
可以看到ods_deal_daily_kudu表的总条数与准备的测试数据量一致
大数据视频推荐:
CSDN
大数据语音推荐:
企业级大数据技术应用
大数据机器学习案例之推荐系统
自然语言处理
大数据基础
人工智能:深度学习入门到精通