Instead of designing your application on a blocking manner using SynchronousQueue<Response>
, design it in a nonblocking manner using SynchronousQueue<Promise<Response>>
.
Your public Future<Response> issueCommandToLegacyHardware(Command command)
should then use offer()
to add a DefaultPromise<>()
to the Queue, and then the netty pipeline can use remove()
to get the response for that request, notice I used remove()
instead of take()
, since only under exceptional circumstances, there is none element present.
A quick implementation of this might be:
public class MyLastHandler extends SimpleInboundHandler<Response> {
private final SynchronousQueue<Promise<Response>> queue;
public MyLastHandler (SynchronousQueue<Promise<Response>> queue) {
super();
this.queue = queue;
}
// The following is called messageReceived(ChannelHandlerContext, Response) in 5.0.
@Override
public void channelRead0(ChannelHandlerContext ctx, Response msg) {
this.queue.remove().setSuccss(msg); // Or setFailure(Throwable)
}
}
The above handler should be placed last in the chain.
The implementation of public Future<Response> issueCommandToLegacyHardware(Command command)
can look:
Channel channel = ....;
SynchronousQueue<Promise<Response>> queue = ....;
public Future<Response> issueCommandToLegacyHardware(Command command) {
return issueCommandToLegacyHardware(command, channel.eventLoop().newPromise());
}
public Future<Response> issueCommandToLegacyHardware(Command command, Promise<Response> promise) {
queue.offer(promise);
channel.write(command);
return promise;
}
Using the approach with the overload on issueCommandToLegacyHardware
is also the design pattern used for Channel.write
, this makes it really flexable.
This design pattern can be used as follows in client code:
issueCommandToLegacyHardware(
Command.TAKE_OVER_THE_WORLD_WITH_FIRE,
channel.eventLoop().newPromise()
).addListener(
(Future<Response> f) -> {
System.out.println("We have taken over the world: " + f.get());
}
);
The advantage of this design pattern is that no unneeded blocking is used anywhere, just plain async logic.
Appendix I: Javadoc:
Promise Future DefaultPromise
与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…