Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
227 views
in Technique[技术] by (71.8m points)

concurrency - java parallelStreams on different machines

I have a function that is iterating the list using parallelStream in forEach is then calling an API with the the item as param. I am then storing the result in a hashMap.

    try {
            return answerList.parallelStream()
                    .map(answer -> getReplyForAnswerCombination(answer))
                    .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
        } catch (final NullPointerException e) {
            log.error("Error in generating final results.", e);
            return null;
        }

When I run it on laptop 1, it takes 1 hour. But on laptop 2, it takes 5 hours.

Doing some basic research I found that the parallel streams use the default ForkJoinPool.commonPool which by default has one less threads as you have processors.

Laptop1 and laptop2 have different processors.

  • Is there a way to find out how many streams that can run parallelly on Laptop1 and Laptop2?
  • Can I use the suggestion given here to safely increase the number of parallel streams in laptop2?
long start = System.currentTimeMillis();
IntStream s = IntStream.range(0, 20);
System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "20");
s.parallel().forEach(i -> {
    try { Thread.sleep(100); } catch (Exception ignore) {}
    System.out.print((System.currentTimeMillis() - start) + " ");
});

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Answer

0 votes
by (71.8m points)

Project Loom

If you want maximum performance on threaded code that blocks (as opposed to CPU-bound code), then use virtual threads (fibers) provided in Project Loom. Preliminary builds are available now, based on early-access Java 16.

Virtual threads

Virtual threads can be dramatically faster because a virtual thread is “parked” while blocked, set aside, so another virtual thread can make progress. This is so efficient for blocking tasks that threads can number in the millions.

Drop the streams approach. Merely send off each input to a virtual thread.

Full example code

Let's define classes for Answer and Reply, our inputs & outputs. We will use record, a new feature coming to Java 16, as an abbreviated way to define an immutable data-driven class. The compiler implicitly creates default implementations of constructor, getters, equals & hashCode, and toString.

public record Answer (String text)
{
}

…and:

public record Reply (String text)
{
}

Define our task to be submitted to an executor service. We write a class named ReplierTask that implements Runnable (has a run method).

Within the run method, we sleep the current thread to simulate waiting for a call to a database, file system, and/or remote service.

package work.basil.example;

import java.time.Duration;
import java.time.Instant;
import java.util.UUID;
import java.util.concurrent.ConcurrentMap;

public class ReplierTask implements Runnable
{
    private Answer answer;
    ConcurrentMap < Answer, Reply > map;

    public ReplierTask ( Answer answer , ConcurrentMap < Answer, Reply > map )
    {
        this.answer = answer;
        this.map = map;
    }

    private Reply getReplyForAnswerCombination ( Answer answer )
    {
        // Simulating a call to some service to produce a `Reply` object.
        try { Thread.sleep( Duration.ofSeconds( 1 ) ); } catch ( InterruptedException e ) { e.printStackTrace(); }  // Simulate blocking to wait for call to service or db or such.
        return new Reply( UUID.randomUUID().toString() );
    }

    // `Runnable` interface
    @Override
    public void run ( )
    {
        System.out.println( "`run` method at " + Instant.now() + " for answer: " + this.answer );
        Reply reply = this.getReplyForAnswerCombination( this.answer );
        this.map.put( this.answer , reply );
    }
}

Lastly, some code to do the work. We make a class named Mapper that contains a main method.

We simulate some input by populating an array of Answer objects. We create an empty ConcurrentMap in which to collect the results. And we assign each Answer object to a new thread where we call for a new Reply object and store the Answer/Reply pair as an entry in the map.

package work.basil.example;

import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;

public class Mapper
{
    public static void main ( String[] args )
    {
        System.out.println("Runtime.version(): " + Runtime.version() );
        System.out.println("availableProcessors: " + Runtime.getRuntime().availableProcessors());
        System.out.println("maxMemory: " + Runtime.getRuntime().maxMemory() + " | maxMemory/(1024*1024) -> megs: " +Runtime.getRuntime().maxMemory()/(1024*1024)  );
        Mapper app = new Mapper();
        app.demo();
    }

    private void demo ( )
    {
        // Simulate our inputs, a list of `Answer` objects.
        int limit = 10_000;
        List < Answer > answers = new ArrayList <>( limit );
        for ( int i = 0 ; i < limit ; i++ )
        {
            answers.add( new Answer( String.valueOf( i ) ) );
        }

        // Do the work.
        Instant start = Instant.now();
        System.out.println( "Starting work at: " + start + " on count of tasks: " + limit );
        ConcurrentMap < Answer, Reply > results = new ConcurrentHashMap <>();
        try
                (
                        ExecutorService executorService = Executors.newVirtualThreadExecutor() ;
                        // Executors.newFixedThreadPool( 5 )
                        // Executors.newFixedThreadPool( 10 )
                        // Executors.newFixedThreadPool( 1_000 )
                        // Executors.newVirtualThreadExecutor()
                )
        {
            for ( Answer answer : answers )
            {
                ReplierTask task = new ReplierTask( answer , results );
                executorService.submit( task );
            }
        }
        // At this point the flow-of-control blocks until all submitted tasks are done.
        // The executor service is automatically closed by this point as well.
        Duration elapsed = Duration.between( start , Instant.now() );
        System.out.println( "results.size() = " + results.size() + ". Elapsed: " + elapsed );
    }
}

We can change out the Executors.newVirtualThreadExecutor() with a pool of platform threads, to compare against virtual threads. Let's try a pool of 5, 10, and 1,000 platform threads on a Mac mini Intel with macOS Mojave sporting 6 real cores, no hyper-threading, 32 gigs of memory, and OpenJDK special build version 16-loom+9-316 assigned maxMemory of 8 gigs.

10,000 tasks at 1 second each Total elapsed time
5 platform threads half-hour — PT33M29.755792S
10 platform threads quarter-hour — PT16M43.318973S
1,000 platform threads 10 seconds — PT10.487689S
10,000 platform threads Error…
unable to create native thread: possibly out of memory or process/resource limits reached
virtual threads Under 3 seconds — PT2.645964S

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...