欢迎您访问365答案网,请分享给你的朋友!
生活常识 学习资料

Flinkkafkasource定义并行度

时间:2023-05-02

package org.apache.flink.streaming.connectors.kafka.table;import org.apache.flink.annotation.Internal;import org.apache.flink.api.common.serialization.DeserializationSchema;import org.apache.flink.api.common.serialization.SerializationSchema;import org.apache.flink.configuration.ConfigOption;import org.apache.flink.configuration.ConfigOptions;import org.apache.flink.configuration.Configuration;import org.apache.flink.configuration.ReadableConfig;import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerbase;import org.apache.flink.streaming.connectors.kafka.config.StartupMode;import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;import org.apache.flink.table.api.ValidationException;import org.apache.flink.table.catalog.CatalogTable;import org.apache.flink.table.catalog.ObjectIdentifier;import org.apache.flink.table.connector.format.DecodingFormat;import org.apache.flink.table.connector.format.EncodingFormat;import org.apache.flink.table.connector.format.Format;import org.apache.flink.table.connector.sink.DynamicTableSink;import org.apache.flink.table.connector.source.DynamicTableSource;import org.apache.flink.table.data.RowData;import org.apache.flink.table.factories.DeserializationFormatFactory;import org.apache.flink.table.factories.DynamicTableSinkFactory;import org.apache.flink.table.factories.DynamicTableSourceFactory;import org.apache.flink.table.factories.FactoryUtil;import org.apache.flink.table.factories.FactoryUtil.TableFactoryHelper;import org.apache.flink.table.factories.SerializationFormatFactory;import org.apache.flink.table.types.DataType;import org.apache.flink.types.RowKind;import javax.annotation.Nullable;import java.time.Duration;import java.util.HashSet;import java.util.List;import java.util.Map;import java.util.Optional;import java.util.Properties;import java.util.Set;import java.util.regex.Pattern;import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.KEY_FIELDS;import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.KEY_FIELDS_PREFIX;import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.KEY_FORMAT;import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.PROPERTIES_PREFIX;import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.PROPS_BOOTSTRAP_SERVERS;import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.PROPS_GROUP_ID;import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.SCAN_STARTUP_MODE;import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.SCAN_STARTUP_SPECIFIC_OFFSETS;import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.SCAN_STARTUP_TIMESTAMP_MILLIS;import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.SCAN_TOPIC_PARTITION_DISCOVERY;import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.SINK_PARTITIONER;import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.SINK_SEMANTIC;import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.StartupOptions;import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.TOPIC;import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.TOPIC_PATTERN;import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.VALUE_FIELDS_INCLUDE;import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.VALUE_FORMAT;import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.createKeyFormatProjection;import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.createvalueFormatProjection;import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.getFlinkKafkaPartitioner;import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.getKafkaProperties;import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.getSinkSemantic;import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.getStartupOptions;import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.validateTableSinkOptions;import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.validateTableSourceOptions;import static org.apache.flink.table.factories.FactoryUtil.SINK_PARALLELISM;@Internalpublic class KafkaDynamicTableFactory implements DynamicTableSourceFactory, DynamicTableSinkFactory { public static final ConfigOption SOURCE_PARALLELISM = ConfigOptions.key("source.parallelism") .intType() .noDefaultValue() .withDescription( "Defines a custom parallelism for the source、" + "By default, if this option is not defined, the planner will derive the parallelism " + "for each statement individually by also considering the global configuration."); public static final String IDENTIFIER = "kafka"; @Override public String factoryIdentifier() { return IDENTIFIER; } @Override public Set> requiredOptions() { final Set> options = new HashSet<>(); options.add(PROPS_BOOTSTRAP_SERVERS); return options; } @Override public Set> optionalOptions() { final Set> options = new HashSet<>(); options.add(FactoryUtil.FORMAT); options.add(KEY_FORMAT); options.add(KEY_FIELDS); options.add(KEY_FIELDS_PREFIX); options.add(VALUE_FORMAT); options.add(VALUE_FIELDS_INCLUDE); options.add(TOPIC); options.add(TOPIC_PATTERN); options.add(PROPS_GROUP_ID); options.add(SCAN_STARTUP_MODE); options.add(SCAN_STARTUP_SPECIFIC_OFFSETS); options.add(SCAN_TOPIC_PARTITION_DISCOVERY); options.add(SCAN_STARTUP_TIMESTAMP_MILLIS); options.add(SINK_PARTITIONER); options.add(SINK_SEMANTIC); options.add(SINK_PARALLELISM); options.add(SOURCE_PARALLELISM); return options; } @Override public DynamicTableSource createDynamicTableSource(Context context) { final TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context); final ReadableConfig tableOptions = helper.getOptions(); final Optional>> keyDecodingFormat = getKeyDecodingFormat(helper); final DecodingFormat> valueDecodingFormat = getValueDecodingFormat(helper); helper.validateExcept(PROPERTIES_PREFIX); validateTableSourceOptions(tableOptions); validatePKConstraints( context.getObjectIdentifier(), context.getCatalogTable(), valueDecodingFormat); final StartupOptions startupOptions = getStartupOptions(tableOptions); final Properties properties = getKafkaProperties(context.getCatalogTable().getOptions()); // add topic-partition discovery properties.setProperty( FlinkKafkaConsumerbase.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS, String.valueOf( tableOptions .getOptional(SCAN_TOPIC_PARTITION_DISCOVERY) .map(Duration::toMillis) .orElse(FlinkKafkaConsumerbase.PARTITION_DISCOVERY_DISABLED))); final DataType physicalDataType = context.getCatalogTable().getSchema().toPhysicalRowDataType(); final int[] keyProjection = createKeyFormatProjection(tableOptions, physicalDataType); final int[] valueProjection = createvalueFormatProjection(tableOptions, physicalDataType); final String keyPrefix = tableOptions.getOptional(KEY_FIELDS_PREFIX).orElse(null); return createKafkaTableSource( physicalDataType, keyDecodingFormat.orElse(null), valueDecodingFormat, keyProjection, valueProjection, keyPrefix, KafkaOptions.getSourceTopics(tableOptions), KafkaOptions.getSourceTopicPattern(tableOptions), properties, startupOptions.startupMode, startupOptions.specificOffsets, startupOptions.startupTimestampMillis); } @Override public DynamicTableSink createDynamicTableSink(Context context) { final TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context); final ReadableConfig tableOptions = helper.getOptions(); final Optional>> keyEncodingFormat = getKeyEncodingFormat(helper); final EncodingFormat> valueEncodingFormat = getValueEncodingFormat(helper); helper.validateExcept(PROPERTIES_PREFIX); validateTableSinkOptions(tableOptions); validatePKConstraints( context.getObjectIdentifier(), context.getCatalogTable(), valueEncodingFormat); final DataType physicalDataType = context.getCatalogTable().getSchema().toPhysicalRowDataType(); final int[] keyProjection = createKeyFormatProjection(tableOptions, physicalDataType); final int[] valueProjection = createvalueFormatProjection(tableOptions, physicalDataType); final String keyPrefix = tableOptions.getOptional(KEY_FIELDS_PREFIX).orElse(null); final Integer parallelism = tableOptions.getOptional(SINK_PARALLELISM).orElse(null); return createKafkaTableSink( physicalDataType, keyEncodingFormat.orElse(null), valueEncodingFormat, keyProjection, valueProjection, keyPrefix, tableOptions.get(TOPIC).get(0), getKafkaProperties(context.getCatalogTable().getOptions()), getFlinkKafkaPartitioner(tableOptions, context.getClassLoader()).orElse(null), getSinkSemantic(tableOptions), parallelism); } // -------------------------------------------------------------------------------------------- private static Optional>> getKeyDecodingFormat( TableFactoryHelper helper) { final Optional>> keyDecodingFormat = helper.discoverOptionalDecodingFormat( DeserializationFormatFactory.class, KEY_FORMAT); keyDecodingFormat.ifPresent( format -> { if (!format.getChangelogMode().containsOnly(RowKind.INSERT)) { throw new ValidationException( String.format( "A key format should only deal with INSERT-only records、" + "But %s has a changelog mode of %s.", helper.getOptions().get(KEY_FORMAT), format.getChangelogMode())); } }); return keyDecodingFormat; } private static Optional>> getKeyEncodingFormat( TableFactoryHelper helper) { final Optional>> keyEncodingFormat = helper.discoverOptionalEncodingFormat(SerializationFormatFactory.class, KEY_FORMAT); keyEncodingFormat.ifPresent( format -> { if (!format.getChangelogMode().containsOnly(RowKind.INSERT)) { throw new ValidationException( String.format( "A key format should only deal with INSERT-only records、" + "But %s has a changelog mode of %s.", helper.getOptions().get(KEY_FORMAT), format.getChangelogMode())); } }); return keyEncodingFormat; } private static DecodingFormat> getValueDecodingFormat( TableFactoryHelper helper) { return helper.discoverOptionalDecodingFormat( DeserializationFormatFactory.class, FactoryUtil.FORMAT) .orElseGet( () -> helper.discoverDecodingFormat( DeserializationFormatFactory.class, VALUE_FORMAT)); } private static EncodingFormat> getValueEncodingFormat( TableFactoryHelper helper) { return helper.discoverOptionalEncodingFormat( SerializationFormatFactory.class, FactoryUtil.FORMAT) .orElseGet( () -> helper.discoverEncodingFormat( SerializationFormatFactory.class, VALUE_FORMAT)); } private static void validatePKConstraints( ObjectIdentifier tableName, CatalogTable catalogTable, Format format) { if (catalogTable.getSchema().getPrimaryKey().isPresent() && format.getChangelogMode().containsOnly(RowKind.INSERT)) { Configuration options = Configuration.fromMap(catalogTable.getOptions()); String formatName = options.getOptional(FactoryUtil.FORMAT).orElse(options.get(VALUE_FORMAT)); throw new ValidationException( String.format( "The Kafka table '%s' with '%s' format doesn't support defining PRIMARY KEY constraint" + " on the table, because it can't guarantee the semantic of primary key.", tableName.asSummaryString(), formatName)); } } // -------------------------------------------------------------------------------------------- protected KafkaDynamicSource createKafkaTableSource( DataType physicalDataType, @Nullable DecodingFormat> keyDecodingFormat, DecodingFormat> valueDecodingFormat, int[] keyProjection, int[] valueProjection, @Nullable String keyPrefix, @Nullable List topics, @Nullable Pattern topicPattern, Properties properties, StartupMode startupMode, Map specificStartupOffsets, long startupTimestampMillis) { return new KafkaDynamicSource( physicalDataType, keyDecodingFormat, valueDecodingFormat, keyProjection, valueProjection, keyPrefix, topics, topicPattern, properties, startupMode, specificStartupOffsets, startupTimestampMillis, false); } protected KafkaDynamicSink createKafkaTableSink( DataType physicalDataType, @Nullable EncodingFormat> keyEncodingFormat, EncodingFormat> valueEncodingFormat, int[] keyProjection, int[] valueProjection, @Nullable String keyPrefix, String topic, Properties properties, FlinkKafkaPartitioner partitioner, KafkaSinkSemantic semantic, Integer parallelism) { return new KafkaDynamicSink( physicalDataType, keyEncodingFormat, valueEncodingFormat, keyProjection, valueProjection, keyPrefix, topic, properties, partitioner, semantic, false, parallelism); }}//package org.apache.flink.table.planner.plan.nodes.commonimport org.apache.flink.api.common.eventtime.WatermarkStrategyimport org.apache.flink.api.common.io.InputFormatimport org.apache.flink.api.dag.Transformationimport org.apache.flink.streaming.api.environment.StreamExecutionEnvironmentimport org.apache.flink.table.connector.source.{DataStreamScanProvider, InputFormatProvider, ScanTableSource, SourceFunctionProvider, SourceProvider}import org.apache.flink.table.data.RowDataimport org.apache.flink.table.planner.calcite.FlinkTypeFactoryimport org.apache.flink.table.planner.plan.schema.TableSourceTableimport org.apache.flink.table.runtime.connector.source.ScanRuntimeProviderContextimport org.apache.flink.table.runtime.typeutils.InternalTypeInfoimport org.apache.calcite.plan.{RelOptCluster, RelTraitSet}import org.apache.calcite.rel.RelWriterimport org.apache.calcite.rel.`type`.RelDataTypeimport org.apache.calcite.rel.core.TableScanimport scala.collection.JavaConverters._abstract class CommonPhysicalTableSourceScan( cluster: RelOptCluster, traitSet: RelTraitSet, relOptTable: TableSourceTable) extends TableScan(cluster, traitSet, relOptTable) { // cache table source transformation. protected var sourceTransform: Transformation[_] = _ protected val tableSourceTable: TableSourceTable = relOptTable.unwrap(classOf[TableSourceTable]) protected[flink] val tableSource: ScanTableSource = tableSourceTable.tableSource.asInstanceOf[ScanTableSource] override def deriveRowType(): RelDataType = { // TableScan row type should always keep same with its // interval RelOptTable's row type. relOptTable.getRowType } override def explainTerms(pw: RelWriter): RelWriter = { super.explainTerms(pw) .item("fields", getRowType.getFieldNames.asScala.mkString(", ")) } protected def createSourceTransformation( env: StreamExecutionEnvironment, name: String): Transformation[RowData] = { val runtimeProvider = tableSource.getScanRuntimeProvider(ScanRuntimeProviderContext.INSTANCE) val outRowType = FlinkTypeFactory.toLogicalRowType(tableSourceTable.getRowType) val outTypeInfo = InternalTypeInfo.of(outRowType) val transformation = runtimeProvider match { case provider: SourceFunctionProvider => val sourceFunction = provider.createSourceFunction() env .addSource(sourceFunction, name, outTypeInfo) .getTransformation case provider: InputFormatProvider => val inputFormat = provider.createInputFormat() createInputFormatTransformation(env, inputFormat, name, outTypeInfo) case provider: SourceProvider => // TODO: Push down watermark strategy to source scan val strategy: WatermarkStrategy[RowData] = WatermarkStrategy.noWatermarks() env.fromSource(provider.createSource(), strategy, name).getTransformation case provider: DataStreamScanProvider => provider.produceDataStream(env).getTransformation } val parallelism = tableSourceTable.catalogTable.toProperties.get("source.parallelism") if(parallelism != null){ transformation.setParallelism(parallelism.toInt) } transformation } protected def createInputFormatTransformation( env: StreamExecutionEnvironment, inputFormat: InputFormat[RowData, _], name: String, outTypeInfo: InternalTypeInfo[RowData]): Transformation[RowData]}

需要覆盖的类有2个:KafkaDynamicTableFactory,CommonPhysicalTableSourceScan

Copyright © 2016-2020 www.365daan.com All Rights Reserved. 365答案网 版权所有 备案号:

部分内容来自互联网,版权归原作者所有,如有冒犯请联系我们,我们将在三个工作时内妥善处理。