Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
552 views
in Technique[技术] by (71.8m points)

scala - Difference between map and mapAsync

Can anyone please explain me difference between map and mapAsync w.r.t AKKA stream? In the documentation it is said that

Stream transformations and side effects involving external non-stream based services can be performed with mapAsync or mapAsyncUnordered

Why cant we simply us map here? I assume that Flow, Source, Sink all would be Monadic in nature and thus map should work fine w.r.t the Delay in the nature of these ?

See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Answer

0 votes
by (71.8m points)

Signature

The difference is best highlighted in the signatures: Flow.map takes in a function that returns a type T while Flow.mapAsync takes in a function that returns a type Future[T].

Practical Example

As an example, suppose that we have a function which queries a database for a user's full name based on a user id:

type UserID   = String
type FullName = String

val databaseLookup : UserID => FullName = ???  //implementation unimportant

Given an akka stream Source of UserID values we could use Flow.map within a stream to query the database and print the full names to the console:

val userIDSource : Source[UserID, _] = ???

val stream = 
  userIDSource.via(Flow[UserID].map(databaseLookup))
              .to(Sink.foreach[FullName](println))
              .run()

One limitation of this approach is that this stream will only make 1 db query at a time. This serial querying will be a "bottleneck" and likely prevent maximum throughput in our stream.

We could try to improve performance through concurrent queries using a Future:

def concurrentDBLookup(userID : UserID) : Future[FullName] = 
  Future { databaseLookup(userID) }

val concurrentStream = 
  userIDSource.via(Flow[UserID].map(concurrentDBLookup))
              .to(Sink.foreach[Future[FullName]](_ foreach println))
              .run()

The problem with this simplistic addendum is that we have effectively eliminated backpressure.

The Sink is just pulling in the Future and adding a foreach println, which is relatively fast compared to database queries. The stream will continuously propagate demand to the Source and spawn off more Futures inside of the Flow.map. Therefore, there is no limit to the number of databaseLookup running concurrently. Unfettered parallel querying could eventually overload the database.

Flow.mapAsync to the rescue; we can have concurrent db access while at the same time capping the number of simultaneous lookups:

val maxLookupCount = 10

val maxLookupConcurrentStream = 
  userIDSource.via(Flow[UserID].mapAsync(maxLookupCount)(concurrentDBLookup))
              .to(Sink.foreach[FullName](println))
              .run()

Also notice that the Sink.foreach got simpler, it no longer takes in a Future[FullName] but just a FullName instead.

Unordered Async Map

If maintaining a sequential ordering of the UserIDs to FullNames is unnecessary then you can use Flow.mapAsyncUnordered. For example: you just need to print all of the names to the console but didn't care about order they were printed.


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

2.1m questions

2.1m answers

60 comments

57.0k users

...