欢迎您访问365答案网,请分享给你的朋友!
生活常识 学习资料

flink-connect-kafka

时间:2023-05-01
1、参数设置

以下参数都必须/建议设置上1.订阅的主题2.反序列化规则3.消费者属性-集群地址4.消费者属性-消费者组id(如果不设置,会有默认的,但是默认的不方便管理)5.消费者属性-offset重置规则,如earliest/latest…6.动态分区检测(当kafka的分区数变化/增加时,Flink能够检测到!)7.Checkpoint会把offset随着做Checkpoint的时候提交到Checkpoint和默认主题中————————————————

2、参数说明 3、kafka的水印策略 4、kafka动态发现分区、主题

//正则匹配动态发现主题 Properties properties = new Properties(); properties.setProperty("bootstrap.servers",kafka_server_dev); properties.setProperty("group.id","testtttt"); Pattern topicPattern = Pattern.compile("topic[0-9]]"); // topic设置成正则匹配 FlinkKafkaConsumerbase kafkaDataPattern = new FlinkKafkaConsumer<>( topicPattern, new SimpleStringSchema(), properties ).setStartFromEarliest();//动态发现分区properties.setProperty("FlinkKafkaConsumerbase.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS",30*1000+"");

5、指定分区偏移量开始消费

String topic = "odsEventDetail";String groupId = "console-con-new-offline-final"; //指定分区的偏移量开始消费Map specificStartOffsets = new HashMap<>();specificStartOffsets.put(new KafkaTopicPartition("myTopic", 0), 23L);specificStartOffsets.put(new KafkaTopicPartition("myTopic", 1), 31L);specificStartOffsets.put(new KafkaTopicPartition("myTopic", 2), 43L);// myConsumer.setStartFromSpecificOffsets(specificStartOffsets);FlinkKafkaConsumerbase kafkaSource = MyKafkaUtil.getKafkaSource_ObjectNode(topic, groupId) .setStartFromSpecificOffsets(specificStartOffsets);// .setStartFromEarliest(); DataStreamSource kafkaDS = env.addSource(kafkaSource);

6、设置空闲等待

//kafka单分区有序,多分区无序。StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); Properties properties = new Properties(); properties.setProperty("bootstrap.servers",""); properties.setProperty("group.id",""); FlinkKafkaConsumer kafkaData = new FlinkKafkaConsumer<>( "flinktest", new SimpleStringSchema(), properties );//当数据许久没来时,是否需要设置watermark,此处可以设置一个空闲等待时间 kafkaData.assignTimestampsAndWatermarks( (AssignerWithPeriodicWatermarks) WatermarkStrategy .forBoundedOutOfOrderness(Duration.ofMinutes(2)) //从数据源生成watermark .withIdleness(Duration.ofMinutes(5)) //设置空闲等待 );

Copyright © 2016-2020 www.365daan.com All Rights Reserved. 365答案网 版权所有 备案号:

部分内容来自互联网,版权归原作者所有,如有冒犯请联系我们,我们将在三个工作时内妥善处理。