Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
109 views
in Technique[技术] by (71.8m points)

java - Netty buffering inbound data until I have enough data to execute some logic

I would like to proxy some arbitrary protocol based on in coming data. For example I have a bot system that consist of few bots. The user will chat with a generic bot until he will type some keyword. When the keyword is found the bot will start proxying all the data from this bot to a specific backend bot. The first thing the generic bot will do after it will connect the backend bot will be to stream all chat history to that bot.

This is what I came out with until now.

enter image description here

Starting with a predefined pipeline of boots ending in the generic bot.

public class ChatBot {
    private static final Logger LOG = LogManager.getLogger(ChatBot.class);

    public static void main(String[] args) {
        InternalLoggerFactory.setDefaultFactory(Log4J2LoggerFactory.INSTANCE);
        final var reverseProxy = new ChatBot();
        reverseProxy.bootstrap();
    }

    public void bootstrap() throws InterruptedException {
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        ServerBootstrap bootstrap = new ServerBootstrap();
        try {
            LOG.info("Starting generic chat boot");
            bootstrap.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .handler(new LoggingHandler(LogLevel.INFO))
                    .localAddress(9025)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                                      //Predefined pipeline of boots ending in the generic bot
                                      @Override
                                      protected void initChannel(SocketChannel socketChannel){
                                          socketChannel.pipeline()
                                                  .addLast("workerLog",new LoggingHandler(LogLevel.INFO))
                                                  .addLast("Router",new RouterBot())
                                                  .addLast("Generic",new GenericBot());
                                      }
                                  }
                    ).bind().sync().channel().closeFuture().sync();

        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

The Router bot will keep notifying the Generic bot which just echo back the message. When he find the "KEYWORD" we change the channel and replace the generic bot with a ProxyChannel

public class RouterBot extends ChannelInboundHandlerAdapter {
    private final Logger log = LogManager.getLogger(getClass());
    private boolean proxy;
    private List<Object> messageHistory = new ArrayList<>();

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf in = (ByteBuf) msg;
        final var message = in.toString(CharsetUtil.UTF_8);
        messageHistory.add(message);
        if (!proxy && message.contains("KEYWORD")) {
            proxy = true;
            log.info("Routing messages to keyword bot");
            ctx.pipeline().replace("Generic", "Proxy",
                    new ProxyChannel(messageHistory.stream().collect(Collectors.toList()),
                            "127.0.0.1",
                            4040));
            messageHistory.clear();
            ctx.fireChannelActive();
        }
        ctx.fireChannelRead(msg);
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.fireChannelReadComplete();
    }
}
public class ProxyChannel extends SimpleChannelInboundHandler<ByteBuf> {
    private final List<Object> history;
    private Logger log = LogManager.getLogger(getClass());
    private String remoteHost;
    private int remotePort;
    private Channel outboundChannel;

    public ProxyChannel(List<Object> history, String remoteHost, int remotePort) {
        this.history = history;
        this.remoteHost = remoteHost;
        this.remotePort = remotePort;
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        log.info("Creating Proxy channel");
        Bootstrap bootstrap = new Bootstrap();
        ChannelFuture connectFuture;
        //Use the same event group for server channel and client
        bootstrap.group(ctx.channel().eventLoop());
        bootstrap.channel(ctx.channel().getClass())
                .handler(new BackendChannel(ctx.channel()));
        connectFuture = bootstrap.connect(new InetSocketAddress(remoteHost, remotePort));
        connectFuture.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) {
                if (future.isSuccess()) {
                    // connection complete start to read first data
                    ctx.channel().read();
                } else {
                    // Close the connection if the connection attempt has failed.
                    log.error("Failed to connect remote {}",future.cause().getMessage());
                    ctx.channel().close();
                }
            }
        });
        outboundChannel = connectFuture.channel();
    }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, ByteBuf byteBuf) throws Exception {
        if (outboundChannel.isActive()) {
            if(!history.isEmpty()){
                for (Object msg: history) {
                    byteBuf.writeBytes(msg.toString().getBytes(StandardCharsets.UTF_8));
                }
                history.clear();
            }
            log.info("streaming to proxy");
            outboundChannel.writeAndFlush(byteBuf.retain()).addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) {
                    if (future.isSuccess()) {
                        // was able to flush out data, start to read the next chunk
                        ctx.channel().read();
                    } else {
                        log.error("Failed read from remote {}",future.cause().getMessage());
                        future.channel().close();
                    }
                }
            });
        }
    }
}

My questions:

  1. Are the private fields on the RouterBot handler thread safe or do we have a risk here of mixing messages?
  2. I heard some claims that manipulating pipeline is expensive. Is that critical? What are the alternatives?
  3. Any suggestion on how making the history replay more interactive evaluating the response of the replays

Update: After more research

  1. Are the private fields on the RouterBot handler thread safe? According to ChannelHandler javadoc it sounds like a reasonable approche and I am creating a new handler in the child ChannelInitializer

A ChannelHandler often needs to store some stateful information. The simplest and recommended approach is to use member variables Because the handler instance has a state variable which is dedicated to one connection, you have to create a new handler instance for each new channel to avoid a race condition where a unauthenticated client can get the confidential information (refer to the auth example):

  1. Is pipeline manipulating expensive? According to the following answer by trustin this is an expensive approche. I fail to understand how the simple switch approche will work here apart from just adding all the logic to the RouterBot and throwing away the ProxyChannel
  2. Still no solution
question from:https://stackoverflow.com/questions/66066914/netty-buffering-inbound-data-until-i-have-enough-data-to-execute-some-logic

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)
Waitting for answers

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

1.4m articles

1.4m replys

5 comments

56.9k users

...