欢迎您访问365答案网,请分享给你的朋友!
生活常识 学习资料

FlinkHudi分析

时间:2023-05-03
Flink Hudi分析

本代码分析基于Flink1.13.1、Hudi0.10.0

-- 生成数据create table datagen ( id bigint, prod_id bigint, price int, uid bigint, buy_time timestamp) with ( 'connector' = 'datagen');-- hudi数据表create table t_hudi ( id bigint, prod_id bigint, price int, uid bigint, buy_time timestamp) with ( 'connector' = 'hudi', 'path' = '${path}', 'table.type' = 'MERGE_ON_READ');insert into t_hudi select * from datagen;

通过分析Hudi connector,meta-INF/services/org.apache.flink.table.factories.Factory中

(HoodieTableFactory、HoodieTableSink的内容,涉及 Flink dynamic table)

发现Flink Hudi写入时所有流程都在HoodieTableSink中

下面是对HoodieTableSink Pipelines的分析:涉及bootstrap、hoodieStreamWrite、compact

Pipelines.bootstrap

Flink流处理调用streamBootstrap

public static DataStream bootstrap( Configuration conf, RowType rowType, int defaultParallelism, DataStream dataStream, boolean bounded, boolean overwrite) { final boolean globalIndex = conf.getBoolean(FlinkOptions.INDEX_GLOBAL_ENABLED); if (overwrite) { return rowDataToHoodieRecord(conf, rowType, dataStream); } else if (bounded && !globalIndex && OptionsResolver.isPartitionedTable(conf)) { return boundedBootstrap(conf, rowType, defaultParallelism, dataStream); } else { return streamBootstrap(conf, rowType, defaultParallelism, dataStream, bounded); } } private static DataStream streamBootstrap( Configuration conf, RowType rowType, int defaultParallelism, DataStream dataStream, boolean bounded) { DataStream dataStream1 = rowDataToHoodieRecord(conf, rowType, dataStream); if (conf.getBoolean(FlinkOptions.INDEX_BOOTSTRAP_ENABLED) || bounded) { dataStream1 = dataStream1 .transform( "index_bootstrap", TypeInformation.of(HoodieRecord.class), new BootstrapOperator<>(conf)) .setParallelism(conf.getOptional(FlinkOptions.INDEX_BOOTSTRAP_TASKS).orElse(defaultParallelism)) .uid("uid_index_bootstrap_" + conf.getString(FlinkOptions.TABLE_NAME)); } return dataStream1; } public static DataStream rowDataToHoodieRecord(Configuration conf, RowType rowType, DataStream dataStream) { return dataStream.map(RowDataToHoodieFunctions.create(rowType, conf), TypeInformation.of(HoodieRecord.class)); }

RowDataToHoodieFunctions

private HoodieRecord toHoodieRecord(I record) throws Exception { GenericRecord gr = (GenericRecord) this.converter.convert(this.avroSchema, record); final HoodieKey hoodieKey = keyGenerator.getKey(gr); HoodieRecordPayload payload = payloadCreation.createPayload(gr); HoodieOperation operation = HoodieOperation.fromValue(record.getRowKind().toBytevalue()); return new HoodieRecord<>(hoodieKey, payload, operation); }

这里只展开讲解RowDataToHoodieFunction,它是负责将Flink RowData数据转化成HoodieRecord

1.首先将RowData转成GenericRecord(avro格式)

2.根据规则生成HoodieKey

3.根据RowData的RowKind获取HoodieOperation,是hudi可以处理增删改事件

BootstrapOperator

1.加载hudi索引

2.waitForBootstrapReady等待其他subTask准备完成

Pipelines.hoodieStreamWrite

public static DataStream hoodieStreamWrite(Configuration conf, int defaultParallelism, DataStream dataStream) { WriteOperatorFactory operatorFactory = StreamWriteOperator.getFactory(conf); return dataStream // Key-by record key, to avoid multiple subtasks write to a bucket at the same time .keyBy(HoodieRecord::getRecordKey) .transform( "bucket_assigner", TypeInformation.of(HoodieRecord.class), new KeyedProcessOperator<>(new BucketAssignFunction<>(conf))) .uid("uid_bucket_assigner_" + conf.getString(FlinkOptions.TABLE_NAME)) .setParallelism(conf.getOptional(FlinkOptions.BUCKET_ASSIGN_TASKS).orElse(defaultParallelism)) // shuffle by fileId(bucket id) .keyBy(record -> record.getCurrentLocation().getFileId()) .transform("hoodie_stream_write", TypeInformation.of(Object.class), operatorFactory) .uid("uid_hoodie_stream_write" + conf.getString(FlinkOptions.TABLE_NAME)) .setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS)); }

BucketAssignFunction

指定数据保存的在哪个文件中,解决小文件的问题

WriteOperatorFactory

Hudi中最复杂的部分,分为StreamWriteFunction、CoordinatedOperatorFactory

StreamWriteFunction

private boolean flushBucket(DataBucket bucket) { String instant = instantToWrite(true); if (instant == null) { // in case there are empty checkpoints that has no input data LOG.info("No inflight instant when flushing data, skip."); return false; } List records = bucket.writeBuffer(); ValidationUtils.checkState(records.size() > 0, "Data bucket to flush has no buffering records"); if (config.getBoolean(FlinkOptions.PRE_COMBINE)) { records = FlinkWriteHelper.newInstance().deduplicateRecords(records, (HoodieIndex) null, -1); } bucket.preWrite(records); final List writeStatus = new ArrayList<>(writeFunction.apply(records, instant)); records.clear(); final WritemetadataEvent event = WritemetadataEvent.builder() .taskID(taskID) .instantTime(instant) // the write instant may shift but the event still use the currentInstant. .writeStatus(writeStatus) .lastBatch(false) .endInput(false) .build(); this.eventGateway.sendEventToCoordinator(event); writeStatuses.addAll(writeStatus); return true; } private void initWriteFunction() { final String writeOperation = this.config.get(FlinkOptions.OPERATION); switch (WriteOperationType.fromValue(writeOperation)) { case INSERT: this.writeFunction = (records, instantTime) -> this.writeClient.insert(records, instantTime); break; case UPSERT: this.writeFunction = (records, instantTime) -> this.writeClient.upsert(records, instantTime); break; case INSERT_OVERWRITE: this.writeFunction = (records, instantTime) -> this.writeClient.insertOverwrite(records, instantTime); break; case INSERT_OVERWRITE_TABLE: this.writeFunction = (records, instantTime) -> this.writeClient.insertOverwriteTable(records, instantTime); break; default: throw new RuntimeException("Unsupported write operation : " + writeOperation); } }

1.获取写入的instantTime Hudi Timeline

2.负责调用HoodieFlinkWriteClient将数据写入HoodieLog(具体写入格式参考HoodieLogFormatWriter)

3.生成WritemetadataEvent发送到OperatorEventGateway

WriteOperatorFactory

@Override public > T createStreamOperator(StreamOperatorParameters parameters) { final OperatorID operatorID = parameters.getStreamConfig().getOperatorID(); final OperatorEventDispatcher eventDispatcher = parameters.getOperatorEventDispatcher(); this.operator.setOperatorEventGateway(eventDispatcher.getOperatorEventGateway(operatorID)); this.operator.setup(parameters.getContainingTask(), parameters.getStreamConfig(), parameters.getOutput()); this.operator.setProcessingTimeService(this.processingTimeService); eventDispatcher.registerEventHandler(operatorID, operator); return (T) operator; } @Override public OperatorCoordinator.Provider getCoordinatorProvider(String s, OperatorID operatorID) { return new StreamWriteOperatorCoordinator.Provider(operatorID, this.conf); }

1.createStreamOperator用于初始化StreamWriteOperator、StreamWriteFunction

2.getCoordinatorProvider用于创建StreamWriteOperatorCoordinator。

OperatorCoordinator运行在JobManager上,通过SubtaskGateway和OperatorEventHandler在OperatorCoordinator和算子之间传递数据。StreamWriteOperatorCoordinator用于在Timeline上保存HoodieLog commit记录,为了保证同一时间只有一个线程在执行commit任务。 StreamWriteOperatorCoordinator.java

@Override public void notifyCheckpointComplete(long checkpointId) { executor.execute( () -> { final boolean committed = commitInstant(this.instant, checkpointId); if (tableState.scheduleCompaction) { // if async compaction is on, schedule the compaction CompactionUtil.scheduleCompaction(metaClient, writeClient, tableState.isDeltaTimeCompaction, committed); } }, "commits the instant %s", this.instant ); } private boolean commitInstant(String instant, long checkpointId) { if (Arrays.stream(eventBuffer).allMatch(Objects::isNull)) { // The last checkpoint finished successfully. return false; } List writeResults = Arrays.stream(eventBuffer) .filter(Objects::nonNull) .map(WritemetadataEvent::getWriteStatuses) .flatMap(Collection::stream) .collect(Collectors.toList()); if (writeResults.size() == 0) { // No data has written, reset the buffer and returns early reset(); // Send commit ack event to the write function to unblock the flushing sendCommitAckEvents(checkpointId); return false; } doCommit(instant, writeResults); return true; } private void doCommit(String instant, List writeResults) { // commit or rollback long totalErrorRecords = writeResults.stream().map(WriteStatus::getTotalErrorRecords).reduce(Long::sum).orElse(0L); long totalRecords = writeResults.stream().map(WriteStatus::getTotalRecords).reduce(Long::sum).orElse(0L); boolean hasErrors = totalErrorRecords > 0; if (!hasErrors || this.conf.getBoolean(FlinkOptions.IGNORE_FAILED)) { HashMap checkpointCommitmetadata = new HashMap<>(); if (hasErrors) { LOG.warn("Some records failed to merge but forcing commit since commitonErrors set to true、Errors/Total=" + totalErrorRecords + "/" + totalRecords); } final Map> partitionToReplacedFileIds = tableState.isOverwrite ? writeClient.getPartitionToReplacedFileIds(tableState.operationType, writeResults) : Collections.emptyMap(); boolean success = writeClient.commit(instant, writeResults, Option.of(checkpointCommitmetadata), tableState.commitAction, partitionToReplacedFileIds); if (success) { reset(); LOG.info("Commit instant [{}] success!", instant); } else { throw new HoodieException(String.format("Commit instant [%s] failed!", instant)); } } else { LOG.error("Error when writing、Errors/Total=" + totalErrorRecords + "/" + totalRecords); LOG.error("The first 100 error messages"); writeResults.stream().filter(WriteStatus::hasErrors).limit(100).forEach(ws -> { LOG.error("Global error for partition path {} and fileID {}: {}", ws.getGlobalError(), ws.getPartitionPath(), ws.getFileId()); if (ws.getErrors().size() > 0) { ws.getErrors().forEach((key, value) -> LOG.trace("Error for key:" + key + " and value " + value)); } }); // Rolls back instant writeClient.rollback(instant); throw new HoodieException(String.format("Commit instant [%s] failed and rolled back !", instant)); } }

Pipelines.compact CompactionPlanOperator

根据之前写入的HoodieLog生成压缩计划,并拆分操作交给CompactFunction

HoodieCompactionPlan compactionPlan = CompactionUtils.getCompactionPlan( table.getmetaClient(), compactionInstantTime);

CompactFunction

执行HoodieLog压缩,生成Parquet文件

private void doCompaction(String instantTime, CompactionOperation compactionOperation, Collector collector) throws IOException { HoodieFlinkMergeOnReadTableCompactor compactor = new HoodieFlinkMergeOnReadTableCompactor(); List writeStatuses = compactor.compact( new HoodieFlinkCopyOnWriteTable<>( writeClient.getConfig(), writeClient.getEngineContext(), writeClient.getHoodieTable().getmetaClient()), writeClient.getHoodieTable().getmetaClient(), writeClient.getConfig(), compactionOperation, instantTime, writeClient.getHoodieTable().getTaskContextSupplier()); collector.collect(new CompactionCommitEvent(instantTime, compactionOperation.getFileId(), writeStatuses, taskID)); }

CompactionCommitSink

等待压缩计划的所有操作都完成后,commit合并操作

Copyright © 2016-2020 www.365daan.com All Rights Reserved. 365答案网 版权所有 备案号:

部分内容来自互联网,版权归原作者所有,如有冒犯请联系我们,我们将在三个工作时内妥善处理。