kafka地址为:
创建测试topic。
首先找到创建脚本:kafka-topics.sh
命令:find /opt/ -name 'kafka-topics*’
查看所有topic:kafka-topics --zookeeper localhost:2181 —list
创建一个mgtest
./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic mgtest1
代码如下:
import org.apache.flink.api.common.serialization.SimpleStringSchemaimport org.apache.flink.streaming.api.scala._import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011//采集本地数据到kafkaobject KafkaSinkTest { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) val inputStream = env.readTextFile("/Users/edy/IdeaProjects/flinksql/src/main/resources/sensor.txt") val datastream = inputStream.map(x => { val arr = x.split(",") Test(arr(0).trim, arr(1).trim.toLong, arr(2).trim.toDouble).toString }) //sink datastream.addSink(new FlinkKafkaProducer011[String]("172.16.104.2:9092,172.16.104.3:9092,172.16.104.4:9092","mgtest",new SimpleStringSchema())) datastream.print() env.execute(" kafka sink test") }case class Test(id: String, timestamp: Long, temperature: Double)}