NLineInputFormat
by design just doesn't perform operation you expect it to:
NLineInputFormat which splits N lines of input as one split. (...) that splits the input file such that by default, one line is fed as a value to one map task.
As you can see it modifies how splits (partitions in the Spark nomenclature) are computed, not how records are determined.
If description is not clear we can illustrate that with a following example:
def nline(n: Int, path: String) = {
val sc = SparkContext.getOrCreate
val conf = new Configuration(sc.hadoopConfiguration)
conf.setInt("mapreduce.input.lineinputformat.linespermap", n);
sc.newAPIHadoopFile(path,
classOf[NLineInputFormat], classOf[LongWritable], classOf[Text], conf
)
}
require(nline(1, "README.md").glom.map(_.size).first == 1)
require(nline(2, "README.md").glom.map(_.size).first == 2)
require(nline(3, "README.md").glom.map(_.size).first == 3)
As show above each partition (possibly excluding the last one) contains exactly n lines.
While you can try to retrofit this to fit your case it won't be recommended for small values of the linespermap
parameter.
与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…