Project Loom
If you want maximum performance on threaded code that blocks (as opposed to CPU-bound code), then use virtual threads (fibers) provided in Project Loom. Preliminary builds are available now, based on early-access Java 16.
Virtual threads
Virtual threads can be dramatically faster because a virtual thread is “parked” while blocked, set aside, so another virtual thread can make progress. This is so efficient for blocking tasks that threads can number in the millions.
Drop the streams approach. Merely send off each input to a virtual thread.
Full example code
Let's define classes for Answer
and Reply
, our inputs & outputs. We will use record
, a new feature coming to Java 16, as an abbreviated way to define an immutable data-driven class. The compiler implicitly creates default implementations of constructor, getters, equals
& hashCode
, and toString
.
public record Answer (String text)
{
}
…and:
public record Reply (String text)
{
}
Define our task to be submitted to an executor service. We write a class named ReplierTask
that implements Runnable
(has a run
method).
Within the run
method, we sleep the current thread to simulate waiting for a call to a database, file system, and/or remote service.
package work.basil.example;
import java.time.Duration;
import java.time.Instant;
import java.util.UUID;
import java.util.concurrent.ConcurrentMap;
public class ReplierTask implements Runnable
{
private Answer answer;
ConcurrentMap < Answer, Reply > map;
public ReplierTask ( Answer answer , ConcurrentMap < Answer, Reply > map )
{
this.answer = answer;
this.map = map;
}
private Reply getReplyForAnswerCombination ( Answer answer )
{
// Simulating a call to some service to produce a `Reply` object.
try { Thread.sleep( Duration.ofSeconds( 1 ) ); } catch ( InterruptedException e ) { e.printStackTrace(); } // Simulate blocking to wait for call to service or db or such.
return new Reply( UUID.randomUUID().toString() );
}
// `Runnable` interface
@Override
public void run ( )
{
System.out.println( "`run` method at " + Instant.now() + " for answer: " + this.answer );
Reply reply = this.getReplyForAnswerCombination( this.answer );
this.map.put( this.answer , reply );
}
}
Lastly, some code to do the work. We make a class named Mapper
that contains a main
method.
We simulate some input by populating an array of Answer
objects. We create an empty ConcurrentMap
in which to collect the results. And we assign each Answer
object to a new thread where we call for a new Reply
object and store the Answer
/Reply
pair as an entry in the map.
package work.basil.example;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
public class Mapper
{
public static void main ( String[] args )
{
System.out.println("Runtime.version(): " + Runtime.version() );
System.out.println("availableProcessors: " + Runtime.getRuntime().availableProcessors());
System.out.println("maxMemory: " + Runtime.getRuntime().maxMemory() + " | maxMemory/(1024*1024) -> megs: " +Runtime.getRuntime().maxMemory()/(1024*1024) );
Mapper app = new Mapper();
app.demo();
}
private void demo ( )
{
// Simulate our inputs, a list of `Answer` objects.
int limit = 10_000;
List < Answer > answers = new ArrayList <>( limit );
for ( int i = 0 ; i < limit ; i++ )
{
answers.add( new Answer( String.valueOf( i ) ) );
}
// Do the work.
Instant start = Instant.now();
System.out.println( "Starting work at: " + start + " on count of tasks: " + limit );
ConcurrentMap < Answer, Reply > results = new ConcurrentHashMap <>();
try
(
ExecutorService executorService = Executors.newVirtualThreadExecutor() ;
// Executors.newFixedThreadPool( 5 )
// Executors.newFixedThreadPool( 10 )
// Executors.newFixedThreadPool( 1_000 )
// Executors.newVirtualThreadExecutor()
)
{
for ( Answer answer : answers )
{
ReplierTask task = new ReplierTask( answer , results );
executorService.submit( task );
}
}
// At this point the flow-of-control blocks until all submitted tasks are done.
// The executor service is automatically closed by this point as well.
Duration elapsed = Duration.between( start , Instant.now() );
System.out.println( "results.size() = " + results.size() + ". Elapsed: " + elapsed );
}
}
We can change out the Executors.newVirtualThreadExecutor()
with a pool of platform threads, to compare against virtual threads. Let's try a pool of 5, 10, and 1,000 platform threads on a Mac mini Intel with macOS Mojave sporting 6 real cores, no hyper-threading, 32 gigs of memory, and OpenJDK special build version 16-loom+9-316 assigned maxMemory of 8 gigs.
10,000 tasks at 1 second each |
Total elapsed time |
5 platform threads |
half-hour — PT33M29.755792S |
10 platform threads |
quarter-hour — PT16M43.318973S |
1,000 platform threads |
10 seconds — PT10.487689S |
10,000 platform threads |
Error… unable to create native thread: possibly out of memory or process/resource limits reached |
virtual threads |
Under 3 seconds — PT2.645964S |