The only optimization i can think of - parallelized doChunkTransformation
execution, offcourse if it's relatively expensive operation, otherwise it does not make sens because IO will take more.
For example:
implicit val ec = scala.concurrent.ExecutionContext.global
val paralelism = Runtime.getRuntime().availableProcessors() * 2
val outputFile = new File(outputFileName)
val bw = new BufferedWriter(new FileWriter(file))
val timeout =
Source
.fromFile(fileName)
.getLines()
.grouped(batchSize * paralelism)
.foreach(chunk => {
//Run computations in paralel.
// Note parallelism level depends on exact `ExecutionContext` implementation
// In this example it will be equal to amount of processors multiplied by 2
// Future.traverse preserve order
val computations = Future.traverse(chunk.grouped(paralelism)) (smallChunk => Future(doChunkTransformation(smallChunk)))
val chunks = Await.result(computations).map(_.flatten), 1 minute)
bw.write(jsonArray)
})
bw.close()
UPD:
I did not find any mentions about preserving original order in Future.traverse
in docs: https://www.scala-lang.org/api/current/scala/concurrent/Future.html
But I've prepared example that shows that order remains the same: https://scastie.scala-lang.org/DUvwX1CXTl2kxrcez3uDgw
import scala.concurrent.ExecutionContext.Implicits._
import scala.concurrent.Future
import scala.concurrent.Await
import scala.concurrent.duration._
import scala.util.Random
import scala.language.postfixOps
val random = new Random()
val computes = Future.traverse((1 to 10).toList) { num: Int =>
val delay = random.nextInt(1000)
println(s"Delay is $delay")
Thread.sleep(delay)
Future(num.toString)
}
println(Await.result(computes, 1 minute))
prints out at the end : List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…