Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
3.4k views
in Technique[技术] by (71.8m points)

java - How to ensure that CompletableFuture is completely finished before execute a piece of code?

I have this piece of code that I have two questions.

  1. I'm not sure why I see TimeoutException as there's no blocking anywhere.
  2. What I'm trying to achieve with the Collector is I have a class that will go in to collect a bunch of metrics and after the CompletableFuture is completely done then I would execute Collector to release metrics. Is finally guaranteed that it will be executed last as I think .get() is supposed to be blocked until it's finished?
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
import java.util.function.Supplier;

public class FutureWithCollector
{
    public static void main(String[] args) throws ExecutionException, InterruptedException
    {
        Collector collector = new Collector();
        finalize(() -> query(collector), collector);
    }

    private static void finalize(Supplier<CompletableFuture<String>> submission, Collector collector) throws ExecutionException, InterruptedException
    {
        ExecutorService executorService = Executors.newFixedThreadPool(1);
        CompletableFuture<String> s = CompletableFuture.supplyAsync(submission, executorService).thenCompose(Function.identity());
        try {
            String result = s.get(1000, TimeUnit.MILLISECONDS);
            System.out.println(result);
        } catch (TimeoutException e) {
            e.printStackTrace();
        } finally {
            System.out.println(collector.getI());
        }
    }

    private static CompletableFuture<String> query(Collector collector)
    {
        CompletableFuture<String> future = new CompletableFuture<>();
        return future.thenApply(r -> {
            collector.collectStuff();
            return "Hello";
        });
    }

}

class Collector
{
    private volatile int i;

    public void collectStuff()
    {
        i++;
    }

    public int getI()
    {
        return i;
    }
}

Output

java.util.concurrent.TimeoutException
    at java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771)
    at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
    at FutureWithCollector.finalize(FutureWithCollector.java:23)
    at FutureWithCollector.main(FutureWithCollector.java:15)
0


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Answer

0 votes
by (71.8m points)

It's rather trivial, just change one of your methods:

 private static CompletableFuture<String> query(Collector collector) {

    return CompletableFuture.supplyAsync(() -> {
        collector.collectStuff(); 
        return "hello";
    });

}

You are doing:

CompletableFuture<String> future = new CompletableFuture<>();

which is documented as:

Creates a new incomplete CompletableFuture.

Essentially, no one completes this CompletableFuture, so you will always get a timeout, no matter how big it is.


You can also change your code a bit. If you want to run something, say that explicitly:

private static CompletableFuture<Void> query(Collector collector) {
     return CompletableFuture.runAsync(collector::collectStuff);
}

Then please notice that collectStuff increments a volatile, but these increments are not atomic.

And you can always use join instead of get and not handle checked exceptions (granted there is no join that takes a timeout).


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...