Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
273 views
in Technique[技术] by (71.8m points)

java - Push data to websocket with Reactive Spring and WebFlux

I'm trying to send data to a JS client but for some reason no .textMessage method works nor and it looks like the user just disconnects, i got the return Mono.zip(inputMessage, outputMessage).then(); from the reactive docs. The basic idea is to send a message when something is published on redis, but i can't even connect and recieve messages properly from the WebSocket.

    @Override
    public Mono<Void> handle(WebSocketSession webSocketSession) {
        HttpHeaders headers = webSocketSession.getHandshakeInfo().getHeaders();
        final String proxyIp = headers.getFirst("x-forwarded-for");
        final String remoteIp = headers.getFirst("x-real-ip");
        final String ip = (proxyIp == null || proxyIp.isEmpty()) ? remoteIp : proxyIp;
        String rawCookieHeader = headers.getFirst("cookie");
        if (rawCookieHeader == null) {
            webSocketSession.textMessage("Missing cookie header");
            // return webSocketSession.close(CloseStatus.PROTOCOL_ERROR);
        }

        final Map<String, String> cookieMap = new HashMap<>();
        CookieParser parser = new CookieParser(rawCookieHeader);
        parser.iterator().forEachRemaining(cookie -> cookieMap.put(cookie.getName(), cookie.getValue()));

        String userId = cookieMap.getOrDefault("userId", "");

        // Handle too many sessions open
        RedisAtomicInteger numberOfOpenWebSockets = getUserWebsocket(userId, chatCode);
        if (numberOfOpenWebSockets.get() >= 3) {
            webSocketSession.textMessage("Too many websocket connections opened by this account");
            return webSocketSession.close(CloseStatus.SERVICE_OVERLOAD);
        }

        Mono<Void> outputMessage = webSocketSession
                .send(messageDirectProcessor.flatMap(JsonConverter::toJson).map(webSocketSession::textMessage)
                        .doOnError(e -> log.info("Error Occurred while sending message to client.", e)));

        // When a user sends a message
        Mono<Void> inputMessage = webSocketSession.receive()
                .flatMap(msg -> redisGlobalPublisher.processMessage(msg.getPayloadAsText(), userId, ip, sessionId))
                .doOnSubscribe(subscription -> {
                    log.info("User '{}' Disconnected. Connections: {}, Active Users: {}", userId,
                            numberOfOpenWebSockets.incrementAndGet(), activeUserCounter.incrementAndGet());
                }).doOnError(throwable -> log.info("Error Occurred while sending message to Redis.", throwable))
                .doFinally(signalType -> {
                    log.info("User '{}' Disconnected. Connections: {}, Active Users: {}", userId,
                            numberOfOpenWebSockets.decrementAndGet(), activeUserCounter.decrementAndGet());
                }).then();

        return Mono.zip(inputMessage, outputMessage).then();
    }

Client is super simple:

        const ws = new WebSocket("ws://localhost:8082/ws")
        ws.onopen = (e) => {
          console.log(e);
          //@ts-ignore
          this.connectedToWebSocket = true;
          // console.log(JSON.stringify({"command": "subscribe","identifier":`{"channel":"MESSAGE-${cookies.get('userId')}"}`}));
          // ws.send(JSON.stringify({"command": "subscribe","identifier":`{"channel":"MESSAGE-${cookies.get('userId')}"}`}))
          console.log("Successfully connected to the echo websocket server...")
        };
        ws.onmessage = (event) => {
          console.log(event);
        }
        ws.onclose = (e) => {
          console.log(e);
        };
        ws.onerror = (e) => {
          console.log(e);
        }

I'm totally lost as to why the input and outputMessage are not pushed properly. Any help appreciated.

question from:https://stackoverflow.com/questions/65951924/push-data-to-websocket-with-reactive-spring-and-webflux

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Answer

0 votes
by (71.8m points)
Waitting for answers

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...