Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
382 views
in Technique[技术] by (71.8m points)

scala - Does cats mapN run all the futures in parallel?

As mentioned in the jump-start guide, mapN will run all the futures in parallel, so I created the below simple Scala program, but a sample run shows diff to be 9187 ms and diffN to be 9106 ms. So it looks like that the mapN is also running the futures sequentially, isn't it? Please let me know if I am missing something?

package example

import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
import java.time.LocalDateTime
import java.time.Duration
import scala.util.Failure
import scala.util.Success
import java.time.ZoneOffset

import cats.instances.future._
import cats.syntax.apply._

object FutureEx extends App {

  val before = LocalDateTime.now()

  val sum = for {
    a <- getA
    b <- getB
    c <- getC
  } yield (a + b + c)

  sum onComplete {
    case Failure(ex) => println(s"Error: ${ex.getMessage()}")

    case Success(value) =>
      val after = LocalDateTime.now()
      println(s"before: $before")
      println(s"after: $after")

      val diff = getDiff(before, after)
      println(s"diff: $diff")
      println(s"sum: $value")
  }

  // let the above finish
  Thread.sleep(20000)
  
  val beforeN = LocalDateTime.now()
  val usingMapN = (getA, getB, getC).mapN(add)

  usingMapN onComplete {
    case Failure(ex) => println(s"Error: ${ex.getMessage()}")

    case Success(value) =>
      val afterN = LocalDateTime.now()
      println(s"beforeN: $beforeN")
      println(s"afterN: $afterN")

      val diff = getDiff(beforeN, afterN)
      println(s"diffN: $diff")
      println(s"sum: $value")
  }

  def getA: Future[Int] = {
    println("inside A")
    Thread.sleep(3000)
    Future.successful(2)
  }

  def getB: Future[Int] = {
    println("inside B")
    Thread.sleep(3000)
    Future.successful(3)
  }

  def getC: Future[Int] = {
    println("inside C")
    Thread.sleep(3000)
    Future.successful(4)
  }

  def add(a: Int, b: Int, c: Int) = a + b + c

  def getDiff(before: LocalDateTime, after: LocalDateTime): Long = {
    Duration.between(before.toInstant(ZoneOffset.UTC), after.toInstant(ZoneOffset.UTC)).toMillis()
  }
}
question from:https://stackoverflow.com/questions/65901410/does-cats-mapn-run-all-the-futures-in-parallel

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Answer

0 votes
by (71.8m points)

Because you have sleep outside Future it should be like:

def getA: Future[Int] = Future {
    println("inside A")
    Thread.sleep(3000)
    2
  }

So you start async Future with apply - Future.successful on the other hand returns pure value, meaning you execute sleep in same thread.


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...