pom.xml
<?xml version="1.0" encoding="UTF-8"?>
LogProcessor.java
import org.apache.kafka.streams.processor.Processor;import org.apache.kafka.streams.processor.ProcessorContext;import java.util.HashMap;public class LogProcessor implements Processor
App.java
import org.apache.kafka.streams.KafkaStreams;import org.apache.kafka.streams.StreamsConfig;import org.apache.kafka.streams.Topology;import org.apache.kafka.streams.processor.Processor;import org.apache.kafka.streams.processor.ProcessorSupplier;import java.util.Properties;public class App { public static void main(String[] args) { //声明来源主题 String fromTopic = "testStreams1"; //声明目标主题 String toTopic = "testStreams2"; //设置KafkaStreams参数信息 Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "logProcessor"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop01:9092,hadoop02:9092,hadoop03:9092"); //实例化StreamsConfig对象 StreamsConfig config = new StreamsConfig(props); //创建拓扑结构 Topology topology = new Topology(); //添加处理节点,为源处理节点指定名称和它订阅的主题 topology.addSource("SOURCE", fromTopic) //添加自定义处理节点,指定处理器类和上一节点的名称 .addProcessor("PROCESSOR", new ProcessorSupplier() { @Override public Processor get() { return new LogProcessor(); } }, "SOURCE") //添加目标处理节点,需要指定目标处理节点和上一节点的名称 .addSink("SINK", toTopic, "PROCESSOR"); //实例化KafkaStreams对象 KafkaStreams streams = new KafkaStreams(topology, config); streams.start(); }}
各节点启动kafka和zookeeper集群
在hadoop01中创建两个主题
kafka-topics.sh --create --topic testStreams1 --partitions 3 --replication-factor 2 --zookeeper hadoop01:2181,hadoop02:2181,hadoop03:2181
kafka-topics.sh --create --topic testStreams2 --partitions 3 --replication-factor 2 --zookeeper hadoop01:2181,hadoop02:2181,hadoop03:2181
hadoop01中启动生产者服务
kafka-console-producer.sh --broker-list hadoop01:9092,hadoop02:9092,hadoop03:9092 --topic testStreams1
Hadoop02中启动消费者服务
kafka-console-consumer.sh --from-beginning --topic testStreams2 --bootstrap-server hadoop01:9092,hadoop02:9092,hadoop03:9092
运行App.java,在生产者服务中输入内容,统计后将在消费者中输出。