欢迎您访问365答案网,请分享给你的朋友!
生活常识 学习资料

KafkaStreams开发单词计数应用

时间:2023-07-09


pom.xml

<?xml version="1.0" encoding="UTF-8"?> 4.0.0 com.kafkaspace kafkaWorkspace 1.0-SNAPSHOT src/main/scala src/test/scala net.alchim31.maven scala-maven-plugin 3.2.2 compile testCompile -dependencyfile ${project.build.directory}/.scala_dependencies org.apache.maven.plugins maven-shade-plugin 2.4.3 package shade *:* meta-INF/*.SF meta-INF/*.DSA meta-INF/*.RSA org.apache.maven.plugins maven-compiler-plugin 6 6 2.11.8 2.7.4 2.3.2 org.scala-lang scala-library ${scala.version} org.apache.spark spark-core_2.11 ${spark.version} org.apache.hadoop hadoop-client ${hadoop.version} org.apache.spark spark-sql_2.11 2.3.2 mysql mysql-connector-java 5.1.46 org.apache.kafka kafka-clients 2.0.0 org.apache.kafka kafka-streams 2.0.0

LogProcessor.java

import org.apache.kafka.streams.processor.Processor;import org.apache.kafka.streams.processor.ProcessorContext;import java.util.HashMap;public class LogProcessor implements Processor { private ProcessorContext processorContext; @Override public void init(ProcessorContext processorContext) { this.processorContext = processorContext; } @Override public void process(byte[] key, byte[] value) { String inputOri = new String(value); HashMapmap = new HashMap(); int times = 1; if (inputOri.contains(" ")){ //截取字段 String[] words = inputOri.split(" "); for (String word:words){ if (map.containsKey(word)){ map.put(word, map.get(word)+1); }else { map.put(word, times); } } } inputOri = map.toString(); processorContext.forward(key, inputOri.getBytes()); } @Override public void close() { }}

App.java

import org.apache.kafka.streams.KafkaStreams;import org.apache.kafka.streams.StreamsConfig;import org.apache.kafka.streams.Topology;import org.apache.kafka.streams.processor.Processor;import org.apache.kafka.streams.processor.ProcessorSupplier;import java.util.Properties;public class App { public static void main(String[] args) { //声明来源主题 String fromTopic = "testStreams1"; //声明目标主题 String toTopic = "testStreams2"; //设置KafkaStreams参数信息 Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "logProcessor"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop01:9092,hadoop02:9092,hadoop03:9092"); //实例化StreamsConfig对象 StreamsConfig config = new StreamsConfig(props); //创建拓扑结构 Topology topology = new Topology(); //添加处理节点,为源处理节点指定名称和它订阅的主题 topology.addSource("SOURCE", fromTopic) //添加自定义处理节点,指定处理器类和上一节点的名称 .addProcessor("PROCESSOR", new ProcessorSupplier() { @Override public Processor get() { return new LogProcessor(); } }, "SOURCE") //添加目标处理节点,需要指定目标处理节点和上一节点的名称 .addSink("SINK", toTopic, "PROCESSOR"); //实例化KafkaStreams对象 KafkaStreams streams = new KafkaStreams(topology, config); streams.start(); }}

各节点启动kafka和zookeeper集群

在hadoop01中创建两个主题

kafka-topics.sh --create --topic testStreams1 --partitions 3 --replication-factor 2 --zookeeper hadoop01:2181,hadoop02:2181,hadoop03:2181

kafka-topics.sh --create --topic testStreams2 --partitions 3 --replication-factor 2 --zookeeper hadoop01:2181,hadoop02:2181,hadoop03:2181

hadoop01中启动生产者服务

kafka-console-producer.sh --broker-list hadoop01:9092,hadoop02:9092,hadoop03:9092 --topic testStreams1

Hadoop02中启动消费者服务

kafka-console-consumer.sh --from-beginning --topic testStreams2 --bootstrap-server hadoop01:9092,hadoop02:9092,hadoop03:9092

运行App.java,在生产者服务中输入内容,统计后将在消费者中输出。

Copyright © 2016-2020 www.365daan.com All Rights Reserved. 365答案网 版权所有 备案号:

部分内容来自互联网,版权归原作者所有,如有冒犯请联系我们,我们将在三个工作时内妥善处理。