Flink1.7.2 Dataset 文件切片计算方式和切片数据读取源码分析

  • 时间:
  • 浏览:1

调置当前分片

调置当前分片的长度

首先遍历路径是文件或目录,计算出所有文件放到List files = new ArrayList<>()中存储,计算出所有文件总大小totalLength,计算文件切片,当然是所有文件总大小来计算

调置当前分片的长度

计算实际切片的大小,blockSize 此处为文件大小,maxSplitSize 一般都小于blockSize,只是 最后取的是切片的最大长度maxSplitSize

调用FileInputFormat.createInputSplits(并行度)再实际解决

调置当前分片的结束了了位置

每个切片最大长度计算,totalLength = 9 为文件总长度,minNumSplits = 2 为并行度,也只是 9能够整除并行度2,说明有余数,将会把余数的数据单独在分配好几个 切片,有将会这好几个 切片的数据量很少,就浪费资源了,这里的做法是,余数的最大值,也只是 每个切片+1,就把这里多的余数分配到前面的每个切片中,也只是 每个切片的最大值为 9 / 2 + 1 = 5

将会有换行符,需用删除换行符,在readBuffer

当前切片默认值设置

调置当前分片的结束了了位置

从缓存区readBuffer一键复制当前行数据到 wrapBuffer

切片拆分的计算依据 ,初使值 bytesUnassigned = len(文件总数据长度),每分一次bytesUnassigned会减去当前切片的大小,也只是 bytesUnassigned每次全部时会还剩下总的数据大小,当bytesUnassigned > maxBytesForLastSplit 就老会 循环拆分切片,切片的长度为splitSize(切片大小) = 5, 结束了了位置从0结束了了,并且每个切片结束了了位置都需用打上去并且所有切片大小 position += splitSize ;





long maxSplitSize = totalLength / minNumSplits + (totalLength % minNumSplits == 0 ? 0 : 1);

将会有换行符,需用删除换行符,在readBuffer

读取一行数据,也只是 读到第好几个 换行符

随机读到好几个 切片,给当前DataSourceTask使用,将会在Source读取数据时是不按key分区,也就不分谁解决,有任务来解决,就给好几个 切片解决就行,每给出好几个 从总的切片中移除

实际计算时,当计算最后好几个 切片时,将会剩下的数据大小小于 切片大小的1.1倍,就放到好几个 切片中,没得切分了,直接把剩下的数据放到最后好几个 切片中,将会将会切并且,是意味最后一切片数据量很小,浪费资源

对当前切片进行解决 ,调用 DelimitedInputFormat.open(),//open还没结束了了真正的读数据,只是 定位,把第好几个 换行符,分到前好几个 分片,另一方从第好几个 换行符结束了了读取数据

调置当前分片

-转志Integer

把JobVertex 转化为ExecutionJobVertex,调用new ExecutionJobVertex(),ExecutionJobVertex中存了inputSplits,只是 会根据并行并来计算inputSplits的个数

将会while循环拆分切片是有条件的,bytesUnassigned > maxBytesForLastSplit,那将会bytesUnassigned <= maxBytesForLastSplit,就需用把剩下的数据,都放到最后好几个 切片中

当前切片信息

Source 的操作链(ChainedFlatMapDriver,ChainedMapDriver,SynchronousChainedCombineDriver) 即 FlatMap -> Map -> Combine (SUM(1)),也只是 source读到的数据,都需用经过链上的算子操作

本示例拆分的结果

当前切片默认值设置

从缓存区readBuffer一键复制当前行数据到 wrapBuffer

当前切片信息

对当前切片进行解决 ,调用 DelimitedInputFormat.open(),//open还没结束了了真正的读数据,只是 定位,把第好几个 换行符,分到前好几个 分片,另一方从第好几个 换行符结束了了读取数据

把jobGraph是由JobVertex组成,调用executionGraph.attachJobGraph(sortedTopology) 把JobGraph转成ExecutionGraph,ExecutionGraph由ExecutionJobVertex组成,即把JobVertex转成ExecutionJobVertex

第一次,startPos =0 ,count = 0,没读到数据

流定位到结束了了位置

Source 的操作链(ChainedFlatMapDriver,ChainedMapDriver,SynchronousChainedCombineDriver) 即 FlatMap -> Map -> Combine (SUM(1)),也只是 source读到的数据,都需用经过链上的算子操作

end

流定位到结束了了位置

随机读到好几个 切片,给当前DataSourceTask使用,将会在Source读取数据时是不按key分区,也就不分谁解决,有任务来解决,就给好几个 切片解决就行,每给出好几个 从总的切片中移除