Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
190 views
in Technique[技术] by (71.8m points)

java - ExecutorService threads not working as expected in RxJava code


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Answer

0 votes
by (71.8m points)

Setup

Linux ThinkPad-P50 5.4.0-58-generic #64-Ubuntu SMP Wed Dec 9 08:16:25 UTC 2020 x86_64 x86_64 x86_64 GNU/Linux

openjdk version "14.0.2" 2020-07-14
OpenJDK Runtime Environment AdoptOpenJDK (build 14.0.2+12)
OpenJDK 64-Bit Server VM AdoptOpenJDK (build 14.0.2+12, mixed mode, sharing)

I think this has something to do with the JUnit-Runner. Let's have a look at following example:

public class WhileTrueJava {
  public static void main(String[] args) {
    Thread thread =
        new Thread(
            () -> {
              while (true) {}
            });
    thread.setDaemon(false);
    thread.start();

    log("exit");
  }

  private static void log(Object msg) {
    System.out.println(Thread.currentThread().getName() + "-" + msg);
  }
}

When I run given example, the process will not exit. When I use the same with JUnit, the process will finish. I can not yet say, why this is how it is.

Example:

class So65650913 {
  static ExecutorService poolA = newFixedThreadPool(10, threadFactory("Scheduler-A-%d"));
  static Scheduler schedulerA = Schedulers.from(poolA);

  public static void main(String[] args) {
    Utils.log("Starting");
    Observable<String> obs =
        Observable.fromCallable(
            () -> {
              Utils.log("fromCallable lambda called");
              Thread.sleep(1_000);
              Utils.log("fromCallable return value");
              return "42";
            });

    Utils.log("Created");

    Disposable completed =
        obs.doOnNext(Utils::log)
            .map(x -> x + 1)
            .doOnNext(Utils::log)
            .subscribeOn(schedulerA)
            .map(x -> x + 2)
            .doOnNext(Utils::log)
            .subscribe(
                x -> Utils.log("Got " + x),
                Throwable::printStackTrace,
                () -> Utils.log("Completed"));

    Utils.log("exiting");
  }

  private static ThreadFactory threadFactory(String pattern) {
    return new ThreadFactoryBuilder().setDaemon(false).setNameFormat(pattern).build();
  }
}

class Utils {
  private static final long start = System.nanoTime();

  private Utils() {}

  public static void log(Object label) {
    System.out.println(
            (System.nanoTime()
            - start) / 1_000_000
            + "| "
            + Thread.currentThread().getName()
            + "| "
            + label);
  }
}

Output

0   | main  | Starting
62  | main  | Created
77  | main  | exiting
78  | Scheduler-A-0 | fromCallable lambda called
1078    | Scheduler-A-0 | fromCallable return value
1082    | Scheduler-A-0 | 42
1082    | Scheduler-A-0 | 421
1082    | Scheduler-A-0 | 4212
1082    | Scheduler-A-0 | Got 4212
1082    | Scheduler-A-0 | Completed

It looks like the process will not stop, until the ThreadPool is shotdown and all running tasks are executed.

Just add poolA.shutdown(); at the end of the main method and see what happens. I would say, the behavior is as expected.

Update

JUnit behaves differently, because of following code^1

public static void main(String... args) {
    int exitCode = execute(System.out, System.err, args).getExitCode();
    System.exit(exitCode);
}

Looks like, when the main-thread falls through, the process will be killed with System.exit. Therefore it's JUnit ConsoleLaunchers fault.

^1 JUnit Github https://github.com/junit-team/junit5/blob/ae2a336fe4b371d398da386e8b336cc06329da7d/junit-platform-console/src/main/java/org/junit/platform/console/ConsoleLauncher.java


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...