基于3.2版本分支。
BypassMergeSortShuffleWriter 简化流程图 示例
也就是说每个ShuffleMapTask都会对应着一个FileSegment,每个FileSegment可视作一个临时文件,接着这些FileSegment中对应的文件又会合并到一份DataFile中,通过IndexFile记录每个分区在DataFile中的起始偏移量。
这种Shuffle写文件方式避免了大量小文件给文件系统造成压力的情况,先是每个ShuffleMapTask对应一个文件的方式,再将这些文件合并到一份数据文件中,并索引文件记录了每个分区在数据文件中的偏移量,能够做到随机访问指定RDD分区的数据。
有兴趣可详读BypassMergeSortShuffleWriter#write的实现,这里我给出关键注释:
// BypassMergeSortShuffleWriter.javapublic void write(Iterator> records) throws IOException { assert (partitionWriters == null); ShuffleMapOutputWriter mapOutputWriter = shuffleExecutorComponents .createMapOutputWriter(shuffleId, mapId, numPartitions); try { // 没有需要写的记录,直接提交,结束 if (!records.hasNext()) { partitionLengths = mapOutputWriter.commitAllPartitions( ShuffleChecksumHelper.EMPTY_CHECKSUM_VALUE).getPartitionLengths(); mapStatus = MapStatus$.MODULE$.apply( blockManager.shuffleServerId(), partitionLengths, mapId); return; } final SerializerInstance serInstance = serializer.newInstance(); final long openStartTime = System.nanoTime(); partitionWriters = new DiskBlockObjectWriter[numPartitions]; partitionWriterSegments = new FileSegment[numPartitions]; for (int i = 0; i < numPartitions; i++) { // 为每个RDD分区创建临时的ShuffleBlock,它包含blockId和一个文件句柄,文件名即blockId final Tuple2 tempShuffleBlockIdPlusFile = blockManager.diskBlockManager().createTempShuffleBlock(); final File file = tempShuffleBlockIdPlusFile._2(); final BlockId blockId = tempShuffleBlockIdPlusFile._1(); // 为每个RDD分区都创建一个writer // getDiskWriter即新建writer DiskBlockObjectWriter writer = blockManager.getDiskWriter(blockId, file, serInstance, fileBufferSize, writeMetrics); if (partitionChecksums.length > 0) { writer.setChecksum(partitionChecksums[i]); } partitionWriters[i] = writer; } writeMetrics.incWriteTime(System.nanoTime() - openStartTime); while (records.hasNext()) { // 将每条记录计算出所属的RDD分区,并调用对应分区的writer final Product2 record = records.next(); final K key = record._1(); partitionWriters[partitioner.getPartition(key)].write(key, record._2()); } for (int i = 0; i < numPartitions; i++) { try (DiskBlockObjectWriter writer = partitionWriters[i]) { // 提交并获取对应的FileSegment // 提交实际上就是强制flush,并记录将当前写入前位置和写入长度记录到FileSegment中 partitionWriterSegments[i] = writer.commitAndGet(); } } // 这里将所有上边FileSegment中的数据文件都合并到一份文件中 // 为合并后的文件生成一个索引文件,该文件记录了每个分区在合并后文件的起始偏移量 partitionLengths = writePartitionedData(mapOutputWriter); mapStatus = MapStatus$.MODULE$.apply( blockManager.shuffleServerId(), partitionLengths, mapId); } catch (Exception e) { // ... }}