Flink1.7.2 Dataset 文件切片计算方式和切片数据读取源码分析

  • 时间:
  • 浏览:1
  • 来源:uu快3苹果版_uu快3单双_套路

long maxSplitSize = totalLength / minNumSplits + (totalLength % minNumSplits == 0 ? 0 : 1);

从缓存区readBuffer一键复制当前行数据到 wrapBuffer

首先遍历路径是文件或目录,计算出所有文件放在List files = new ArrayList<>()中存储,计算出所有文件总大小totalLength,计算文件切片,当然是所有文件总大小来计算

当前切片信息

计算实际切片的大小,blockSize 此处为文件大小,maxSplitSize 一般都小于blockSize,好多好多 最后取的是切片的最大长度maxSplitSize

当前切片信息

对当前切片进行出理 ,调用 DelimitedInputFormat.open(),//open还没现在结速真正的读数据,只是我 定位,把第一个 多换行符,分到前一个 多分片,被委托人从第一个换行符现在结速读取数据

切片拆分的计算法律妙招,初使值 bytesUnassigned = len(文件总数据长度),每分一次bytesUnassigned会减去当前切片的大小,也只是我 bytesUnassigned每次也有还剩下总的数据大小,当bytesUnassigned > maxBytesForLastSplit 就老是 循环拆分切片,切片的长度为splitSize(切片大小) = 5, 现在结速位置从0现在结速,过后每个切片现在结速位置都不需要 上加过后所有切片大小 position += splitSize ;

调置当前分片的现在结速位置

Source 的操作链(ChainedFlatMapDriver,ChainedMapDriver,SynchronousChainedCombineDriver) 即 FlatMap -> Map -> Combine (SUM(1)),也只是我 source读到的数据,都不需要 经过链上的算子操作

第一次,startPos =0 ,count = 0,没读到数据

把jobGraph是由JobVertex组成,调用executionGraph.attachJobGraph(sortedTopology) 把JobGraph转成ExecutionGraph,ExecutionGraph由ExecutionJobVertex组成,即把JobVertex转成ExecutionJobVertex

只是 有换行符,不需要 删除换行符,在readBuffer

把JobVertex 转化为ExecutionJobVertex,调用new ExecutionJobVertex(),ExecutionJobVertex中存了inputSplits,好多好多 会根据并行并来计算inputSplits的个数

对当前切片进行出理 ,调用 DelimitedInputFormat.open(),//open还没现在结速真正的读数据,只是我 定位,把第一个 多换行符,分到前一个 多分片,被委托人从第一个换行符现在结速读取数据

调置当前分片





调置当前分片的现在结速位置

-转志Integer

调置当前分片的长度

读取一行数据,也只是我 读到第一个 多换行符

流定位到现在结速位置

从缓存区readBuffer一键复制当前行数据到 wrapBuffer

Source 的操作链(ChainedFlatMapDriver,ChainedMapDriver,SynchronousChainedCombineDriver) 即 FlatMap -> Map -> Combine (SUM(1)),也只是我 source读到的数据,都不需要 经过链上的算子操作

调用FileInputFormat.createInputSplits(并行度)再实际出理

随机读到一个 多切片,给当前DataSourceTask使用,只是 在Source读取数据时是不按key分区,也就不分谁出理 ,有任务来出理 ,就给一个 多切片出理 就行,每给出一个 多从总的切片中移除

end

当前切片默认值设置

本示例拆分的结果

随机读到一个 多切片,给当前DataSourceTask使用,只是 在Source读取数据时是不按key分区,也就不分谁出理 ,有任务来出理 ,就给一个 多切片出理 就行,每给出一个 多从总的切片中移除

调置当前分片的长度

每个切片最大长度计算,totalLength = 9 为文件总长度,minNumSplits = 2 为并行度,也只是我 9没办法整除并行度2,说明有余数,只是 把余数的数据单独在分配一个 多切片,有只是 你你是什么 个 多切片的数据量很少,就浪费资源了,这里的做法是,余数的最大值,也只是我 每个切片+1,就把这里多的余数分配到前面的每个切片中,也只是我 每个切片的最大值为 9 / 2 + 1 = 5

只是 while循环拆分切片是有条件的,bytesUnassigned > maxBytesForLastSplit,那只是 bytesUnassigned <= maxBytesForLastSplit,就不需要 把剩下的数据,都放在最后一个 多切片中

当前切片默认值设置

调置当前分片

流定位到现在结速位置

实际计算时,当计算最后一个 多切片时,只是 剩下的数据大小小于 切片大小的1.1倍,就放在一个 多切片中,没了切分了,直接把剩下的数据放在最后一个 多切片中,只是 只是 切过后,原应着最后一切片数据量很小,浪费资源

只是 有换行符,不需要 删除换行符,在readBuffer