Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
158 views
in Technique[技术] by (71.8m points)

Parse large CSV file in Scala

I have a CSV file (~1gb, 10 million rows). I want to parse it in batches and then write the output to the new text/CSV file, where each line would be a JSON array.

val outputFile = new File(outputFileName)
val bw = new BufferedWriter(new FileWriter(file))

Source
  .fromFile(fileName)
  .getLines()
  .grouped(batchSize)
  .foreach(chunk => {
    val jsonArray = doChunkTransformation(chunk)

    bw.write(jsonArray)
  })

bw.close()

Is that approach efficient? Or maybe I should wrap it with Future? I'm quite fresh with Scala, so maybe I'm not aware of all methods and solutions?

Important Note
Unfortunately, I'm limited and I cannot use any external library, so the solution has to be written in pure Scala.

question from:https://stackoverflow.com/questions/65844822/parse-large-csv-file-in-scala

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Answer

0 votes
by (71.8m points)

The only optimization i can think of - parallelized doChunkTransformation execution, offcourse if it's relatively expensive operation, otherwise it does not make sens because IO will take more. For example:

        implicit val ec = scala.concurrent.ExecutionContext.global
        val paralelism = Runtime.getRuntime().availableProcessors() * 2

        val outputFile = new File(outputFileName)
        val bw = new BufferedWriter(new FileWriter(file))
        val timeout = 

        Source
        .fromFile(fileName)
        .getLines()
        .grouped(batchSize * paralelism)
        .foreach(chunk => {
            //Run computations in paralel.
            // Note parallelism level depends on exact `ExecutionContext` implementation 
            // In this example it will be equal to amount of processors multiplied by 2
            // Future.traverse preserve order
            val computations = Future.traverse(chunk.grouped(paralelism)) (smallChunk => Future(doChunkTransformation(smallChunk))) 
            val chunks = Await.result(computations).map(_.flatten), 1 minute)
            bw.write(jsonArray)
        })

        bw.close()

UPD: I did not find any mentions about preserving original order in Future.traverse in docs: https://www.scala-lang.org/api/current/scala/concurrent/Future.html But I've prepared example that shows that order remains the same: https://scastie.scala-lang.org/DUvwX1CXTl2kxrcez3uDgw

import scala.concurrent.ExecutionContext.Implicits._
import scala.concurrent.Future
import scala.concurrent.Await
import scala.concurrent.duration._
import scala.util.Random

import scala.language.postfixOps

val random = new Random()
val computes = Future.traverse((1 to 10).toList) { num: Int => 
  val delay = random.nextInt(1000)
  println(s"Delay is $delay")
  Thread.sleep(delay)
  Future(num.toString)
}
println(Await.result(computes, 1 minute))

prints out at the end : List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...