If you don't mind a very localised var
, you can serialise the asynchronous processing (each f(item)
) as follows (flatMap
does the serialization):
val fSerialized = {
var fAccum = Future{()}
for(item <- it) {
println(s"Processing ${item}")
fAccum = fAccum flatMap { _ => f(item) }
fSerialized.onComplete{case resTry => println("All Done.")}
In general, avoid Await
operations - they block (kind of defeats the point of async, consumes resources and for sloppy designs, can deadlock)
Cool Trick 1:
You can chain together Futures
via that usual suspect, flatmap
- it serializes asynchronous operations. Is there anything it can't do? ;-)
def f1 = Future { // some background running logic here...}
def f2 = Future { // other background running logic here...}
val fSerialized: Future[Unit] = f1 flatMap(res1 => f2)
fSerialized.onComplete{case resTry => println("Both Done: Success=" + resTry.isSuccess)}
None of the above blocks - the main thread runs straight through in a few dozen nanoseconds. Futures are used in all cases to execute parallel threads and keep track of asynchronous state/results and to chain logic.
represents a composite of two different asynchronous operations chained together. As soon as the val is evaluated, it immediately starts f1
(running asynchonously). f1
runs like any Future
- when it eventually finishes, it calls it's onComplete
callback block. Here's the cool bit - flatMap
installs it's argument as the f1
onComplete callback block - so f2
is initiated as soon as f1
completes, with no blocking, polling or wasteful resource usage. When f2
is complete, then fSerialized
is complete - so it runs the fSerialized.onComplete
callback block - printing "Both Done".
Not only that, but you can chain flatmaps as much as you like with neat non-spaghetti code
f1 flatmap(res1 => f2) flatMap(res2 => f3) flatMap(res3 => f4) ...
If you were to do that via Future.onComplete, you would have to embed the successive operations as nested onComplete layers:
f1.onComplete{case res1Try =>
f2.onComplete{case res2Try =>
f3.onComplete{case res3Try =>
f4.onComplete{ ...
Not as nice.
Test to prove:
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.blocking
import scala.concurrent.duration._
def f(item: Int): Future[Unit] = Future{
print("Waiting " + item + " seconds ...")
blocking{Thread.sleep((item seconds).toMillis)}
val fSerial = f(4) flatMap(res1 => f(16)) flatMap(res2 => f(2)) flatMap(res3 => f(8))
fSerial.onComplete{case resTry => println("!!!! That's a wrap !!!! Success=" + resTry.isSuccess)}
Cool Trick 2:
for-comprehensions like this:
for {a <- aExpr; b <- bExpr; c <- cExpr; d <- dExpr} yield eExpr
are nothing but syntactic-sugar for this:
aExpr.flatMap{a => bExpr.flatMap{b => cExpr.flatMap{c => dExpr.map{d => eExpr} } } }
that's a chain of flatMaps, followed by a final map.
That means that
f1 flatmap(res1 => f2) flatMap(res2 => f3) flatMap(res3 => f4) map(res4 => "Did It!")
is identical to
for {res1 <- f1; res2 <- f2; res3 <- f3; res4 <- f4} yield "Did It!"
Test to Prove (following on from previous test):
val fSerial = for {res1 <- f(4); res2 <- f(16); res3 <- f(2); res4 <- f(8)} yield "Did It!"
fSerial.onComplete{case resTry => println("!!!! That's a wrap !!!! Success=" + resTry.isSuccess)}
Not-So-Cool Trick 3:
Unfortunately you can't mix iterators & futures in the same for-comprehension. Compile error:
val fSerial = {for {nextItem <- itemIterable; nextRes <- f(nextItem)} yield "Did It"}.last
And nesting fors creates a challenge. The following doesn't serialize, but runs async blocks in parallel (nested comprehensions don't chain subsequent Futures with flatMap/Map, but instead chains as Iterable.flatMap{item => f(item)} - not the same!)
val fSerial = {for {nextItem <- itemIterable} yield
for {nextRes <- f(nextItem)} yield "Did It"}.last
Also using foldLeft/foldRight plus flatMap doesn't work as you'd expect - seems a bug/limitation; all async blocks are processed in parallel (so Iterator.foldLeft/Right
is not sociable with Future.flatMap
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.blocking
import scala.concurrent.duration._
def f(item: Int): Future[Unit] = Future{
print("Waiting " + item + " seconds ...")
blocking{Thread.sleep((item seconds).toMillis)}
val itemIterable: Iterable[Int] = List[Int](4, 16, 2, 8)
val empty = Future[Unit]{()}
def serialize(f1: Future[Unit], f2: Future[Unit]) = f1 flatMap(res1 => f2)
//val fSerialized = itemIterable.iterator.foldLeft(empty){(fAccum, item) => serialize(fAccum, f(item))}
val fSerialized = itemIterable.iterator.foldRight(empty){(item, fAccum) => serialize(fAccum, f(item))}
fSerialized.onComplete{case resTry => println("!!!! That's a wrap !!!! Success=" + resTry.isSuccess)}
But this works (var involved):
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.blocking
import scala.concurrent.duration._
def f(item: Int): Future[Unit] = Future{
print("Waiting " + item + " seconds ...")
blocking{Thread.sleep((item seconds).toMillis)}
val itemIterable: Iterable[Int] = List[Int](4, 16, 2, 8)
var fSerial = Future{()}
for {nextItem <- itemIterable} fSerial = fSerial.flatMap(accumRes => f(nextItem))