Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
1.6k views
in Technique[技术] by (71.8m points)

project reactor - Handling connection errors in Spring reactive webclient

I have a spring webclient making http calls to an external service and backed by reactive circuit breaker factory (resilience4J impl). WebClient and circuit breaker behave as expected when the client establishes connection and fails on response (Any internal server or 4XX errors). However, if the client fails to establish connection, either Connection Refused or UnknownHost, it starts to break down.

  1. I cannot seem to catch the error message within the webclient and trigger circuit breaker.
  2. Circuit breaker never opens and throws TimeoutException.
    java.util.concurrent.TimeoutException: Did not observe any item or terminal signal within 1000ms in 'circuitBreaker' (and no fallback has been configured) .

Error from Web client.
io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: localhost/127.0.0.1:9000 .

Here's my code. I have pasted the error origins as well. I have tried to map ConnectException to my custom exception for circuit breaker to pick up but, it did not work. Can someone help me on handling errors without the response from remote server?

 public Mono<String> toSink(
  Envelope envelope, ConsumerConfiguration webClientConfiguration) {

return getWebClient()
    .post()
    .uri(
        uriBuilder -> {
          if (webClientConfiguration.getPort() != null) {
            uriBuilder.port(webClientConfiguration.getPort());
          }
          return uriBuilder.path(webClientConfiguration.getServiceURL()).build();
        })
    .headers(
        httpHeaders ->
            webClientConfiguration.getHttpHeaders().forEach((k, v) -> httpHeaders.add(k, v)))
    .bodyValue(envelope.toString())
    .retrieve()
    .bodyToMono(Map.class)
    // Convert 5XX internal server error and throw CB exception
    .onErrorResume(
        throwable -> {
          log.error("Inside the error resume callback of webclient {}", throwable.toString());
          if (throwable instanceof WebClientResponseException) {
            WebClientResponseException r = (WebClientResponseException) throwable;
            if (r.getStatusCode().is5xxServerError()) {
              return Mono.error(new CircuitBreakerOpenException());
            }
          }
          return Mono.error(new CircuitBreakerOpenException());
        })
    .map(
        map -> {
          log.info("Response map:{}", Any.wrap(map).toString());
          return Status.SUCCESS.name();
        })
    .transform(
        it -> {
          ReactiveCircuitBreaker rcb =
              reactiveCircuitBreakerFactory.create(
                  webClientConfiguration.getCircuitBreakerId());
          return rcb.run(
              it,
              throwable -> {
                /// "Did not observe any item or terminal signal within 1000ms.. " <--- Error here
                log.info("throwable in CB {}", throwable.toString());
                if (throwable instanceof CygnusBusinessException) {
                  return Mono.error(throwable);
                }
                return Mono.error(
                    new CircuitBreakerOpenException(
                        throwable, new CygnusContext(), null, null, null));
              });
        })
    ///io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: localhost/127.0.0.1:9000  <-- Error prints here    
    .onErrorContinue((throwable, o) -> log.error(throwable.toString()))
    .doOnError(throwable -> log.error("error from webclient:{}", throwable.toString()));

}

question from:https://stackoverflow.com/questions/65841285/handling-connection-errors-in-spring-reactive-webclient

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Answer

0 votes
by (71.8m points)

I fixed it by adding an onErrorContinue block and re-throwing the exception as a custom that gets handled in my circuit breaker code.

.onErrorContinue(
        (throwable, o) -> {
          log.info("throwable => {}", throwable.toString());
          if (throwable instanceof ReadTimeoutException || throwable instanceof ConnectException) {
            throw new CircuitBreakerOpenException();
          }
        })

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...