I'm trying to send data to a JS client but for some reason no .textMessage
method works nor and it looks like the user just disconnects, i got the return Mono.zip(inputMessage, outputMessage).then();
from the reactive docs. The basic idea is to send a message when something is published on redis, but i can't even connect and recieve messages properly from the WebSocket.
@Override
public Mono<Void> handle(WebSocketSession webSocketSession) {
HttpHeaders headers = webSocketSession.getHandshakeInfo().getHeaders();
final String proxyIp = headers.getFirst("x-forwarded-for");
final String remoteIp = headers.getFirst("x-real-ip");
final String ip = (proxyIp == null || proxyIp.isEmpty()) ? remoteIp : proxyIp;
String rawCookieHeader = headers.getFirst("cookie");
if (rawCookieHeader == null) {
webSocketSession.textMessage("Missing cookie header");
// return webSocketSession.close(CloseStatus.PROTOCOL_ERROR);
}
final Map<String, String> cookieMap = new HashMap<>();
CookieParser parser = new CookieParser(rawCookieHeader);
parser.iterator().forEachRemaining(cookie -> cookieMap.put(cookie.getName(), cookie.getValue()));
String userId = cookieMap.getOrDefault("userId", "");
// Handle too many sessions open
RedisAtomicInteger numberOfOpenWebSockets = getUserWebsocket(userId, chatCode);
if (numberOfOpenWebSockets.get() >= 3) {
webSocketSession.textMessage("Too many websocket connections opened by this account");
return webSocketSession.close(CloseStatus.SERVICE_OVERLOAD);
}
Mono<Void> outputMessage = webSocketSession
.send(messageDirectProcessor.flatMap(JsonConverter::toJson).map(webSocketSession::textMessage)
.doOnError(e -> log.info("Error Occurred while sending message to client.", e)));
// When a user sends a message
Mono<Void> inputMessage = webSocketSession.receive()
.flatMap(msg -> redisGlobalPublisher.processMessage(msg.getPayloadAsText(), userId, ip, sessionId))
.doOnSubscribe(subscription -> {
log.info("User '{}' Disconnected. Connections: {}, Active Users: {}", userId,
numberOfOpenWebSockets.incrementAndGet(), activeUserCounter.incrementAndGet());
}).doOnError(throwable -> log.info("Error Occurred while sending message to Redis.", throwable))
.doFinally(signalType -> {
log.info("User '{}' Disconnected. Connections: {}, Active Users: {}", userId,
numberOfOpenWebSockets.decrementAndGet(), activeUserCounter.decrementAndGet());
}).then();
return Mono.zip(inputMessage, outputMessage).then();
}
Client is super simple:
const ws = new WebSocket("ws://localhost:8082/ws")
ws.onopen = (e) => {
console.log(e);
//@ts-ignore
this.connectedToWebSocket = true;
// console.log(JSON.stringify({"command": "subscribe","identifier":`{"channel":"MESSAGE-${cookies.get('userId')}"}`}));
// ws.send(JSON.stringify({"command": "subscribe","identifier":`{"channel":"MESSAGE-${cookies.get('userId')}"}`}))
console.log("Successfully connected to the echo websocket server...")
};
ws.onmessage = (event) => {
console.log(event);
}
ws.onclose = (e) => {
console.log(e);
};
ws.onerror = (e) => {
console.log(e);
}
I'm totally lost as to why the input and outputMessage are not pushed properly. Any help appreciated.
question from:
https://stackoverflow.com/questions/65951924/push-data-to-websocket-with-reactive-spring-and-webflux 与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…