This specific question is related to a possible resolution to the following question.
I have two pipelines in which I'm receiving a client request one one channel, then firing off a request on a second channel, then copying the response content into a new DefaultHttpResponse, which I attempt to write to the original channel. However, this results in the following exception:
java.lang.IllegalArgumentException: unsupported message type: class org.jboss.netty.handler.codec.http.DefaultHttpResponse
at org.jboss.netty.channel.socket.nio.SocketSendBufferPool.acquire(SocketSendBufferPool.java:53)
at org.jboss.netty.channel.socket.nio.AbstractNioWorker.write0(AbstractNioWorker.java:468)
at org.jboss.netty.channel.socket.nio.AbstractNioWorker.writeFromTaskLoop(AbstractNioWorker.java:432)
at org.jboss.netty.channel.socket.nio.AbstractNioChannel$WriteTask.run(AbstractNioChannel.java:366)
at org.jboss.netty.channel.socket.nio.AbstractNioWorker.processWriteTaskQueue(AbstractNioWorker.java:350)
at org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:246)
at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:35)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
at java.lang.Thread.run(Thread.java:722)
Does anyone know why this might be happening? For reference, here's the code for my two pipelines (I know I'm doing some thread unsafe things here, but I'm trying to get this working for a single-threaded client before I move on):
private static Channel channel;
private static Map<Channel, Channel> proxyToClient = new ConcurrentHashMap<Channel, Channel>();
public static void main(String[] args) throws Exception {
ChannelFactory clientFactory =
new NioClientSocketChannelFactory(
Executors.newCachedThreadPool(),
Executors.newCachedThreadPool());
final ClientBootstrap cb = new ClientBootstrap(clientFactory);
cb.setPipelineFactory(new ChannelPipelineFactory() {
public ChannelPipeline getPipeline() {
return Channels.pipeline(
new HttpRequestEncoder(),
new HttpResponseDecoder(),
new ResponseHandler());
}
});
ChannelFuture cf = cb.connect(new InetSocketAddress("localhost", 18080));
channel = cf.awaitUninterruptibly().getChannel();
ChannelFactory factory =
new NioServerSocketChannelFactory(
Executors.newCachedThreadPool(),
Executors.newCachedThreadPool());
ServerBootstrap sb = new ServerBootstrap(factory);
sb.setPipelineFactory(new ChannelPipelineFactory() {
public ChannelPipeline getPipeline() {
return Channels.pipeline(
new HttpRequestDecoder(),
new RequestHandler());
}
});
sb.setOption("child.tcpNoDelay", true);
sb.setOption("child.keepAlive", true);
sb.bind(new InetSocketAddress(2080));
}
private static class ResponseHandler extends SimpleChannelHandler {
@Override
public void messageReceived(ChannelHandlerContext ctx, final MessageEvent e) {
final HttpResponse proxyResponse = (HttpResponse) e.getMessage();
Channel clientChannel = proxyToClient.get(e.getChannel());
HttpResponse clientResponse = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
clientResponse.setContent(proxyResponse.getContent());
clientChannel.write(clientResponse).addListener(new ChannelFutureListener() {
public void operationComplete(ChannelFuture future) {
Channel ch = future.getChannel();
ch.close();
}
});
}
}
private static class RequestHandler extends SimpleChannelHandler {
@Override
public void messageReceived(ChannelHandlerContext ctx, final MessageEvent e) {
final HttpRequest request = (HttpRequest) e.getMessage();
proxyToClient.put(channel, e.getChannel());
channel.write(request);
}
}
See Question&Answers more detail:
os 与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…