欢迎您访问365答案网,请分享给你的朋友!
生活常识 学习资料

flink采集本地文件到kafka(本地跑)

时间:2023-05-15

kafka地址为:

创建测试topic。
首先找到创建脚本:kafka-topics.sh

命令:find /opt/ -name 'kafka-topics*’

查看所有topic:kafka-topics --zookeeper localhost:2181 —list

创建一个mgtest

./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic mgtest1

代码如下:

import org.apache.flink.api.common.serialization.SimpleStringSchemaimport org.apache.flink.streaming.api.scala._import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011//采集本地数据到kafkaobject KafkaSinkTest { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) val inputStream = env.readTextFile("/Users/edy/IdeaProjects/flinksql/src/main/resources/sensor.txt") val datastream = inputStream.map(x => { val arr = x.split(",") Test(arr(0).trim, arr(1).trim.toLong, arr(2).trim.toDouble).toString }) //sink datastream.addSink(new FlinkKafkaProducer011[String]("172.16.104.2:9092,172.16.104.3:9092,172.16.104.4:9092","mgtest",new SimpleStringSchema())) datastream.print() env.execute(" kafka sink test") }case class Test(id: String, timestamp: Long, temperature: Double)}

Copyright © 2016-2020 www.365daan.com All Rights Reserved. 365答案网 版权所有 备案号:

部分内容来自互联网,版权归原作者所有,如有冒犯请联系我们,我们将在三个工作时内妥善处理。