【问题标题】:Getting response from Netty server in non-Netty client在非 Netty 客户端中从 Netty 服务器获取响应
【发布时间】:2016-12-03 02:41:00
【问题描述】:

这是我在 StackOverflow 上的第一个问题,我希望我已经遵守了预期的标准。

我已经从其他不再在这里工作的人那里接管了一些代码,而我几乎被困在这里。我搜索并询问了一些同事(不幸的是没有太多 Java 经验),但似乎没有人能够帮助我。搜索也没有真正帮助我。

我正在从客户端向 Netty 服务器发送 Json 请求,该客户端有意不使用 Netty 实现。目前它只是一个简单的 Java 套接字,但其目的是让 Flask 客户端向 Netty 服务器发送请求。请求到达(使用 Java Sockets 和使用 Python Flask),并在管道中得到正确处理,但我想向客户端发送响应,虽然我怀疑在代码中的何处发送响应,但我显然错过了因为我没有得到任何回应。有什么建议吗?

Java Socket 客户端(请注意,此处的 sn-p 中省略了 json1 和 json2 字符串,因为它们相当长,但它们的格式正确)。使用 Socket 和相关的输出流发布请求。响应部分(具有相同套接字的输入流)只是一些我怀疑的测试,但不知道如何执行此操作(这就是我将其保留在这里的原因)。我已经看到很多客户端实现 Netty 接口的示例,这似乎工作正常,但正如我所说,我希望不使用 Netty 的客户端也能够接收响应(如果可能的话)。

String serverResponse;

for (int j = 0; j < 100; j++) {
    for (int i = 0; i < 1000; i++) {
        try {
            Socket s = new Socket("localhost", 12000);
            PrintWriter out = new PrintWriter(s.getOutputStream(), true);
            out.write(json1 + i + json2);
            out.flush();

            // Testing only - trying to get the response back from the server
            BufferedReader in = new BufferedReader(new InputStreamReader(s.getInputStream()));
            while(true) {
                if ((serverResponse = in.readLine()) != null) {
                    log.info("server says", serverResponse);
                    break;
                }
            }

            out.close();
            s.close();
            Thread.sleep(1000);

        } catch (Exception ex) {
            ex.printStackTrace();
        }
    }
    Thread.sleep(2000);
}

MCTcpServer.java

/**
 * Abstract TCP Server class. this class should be implemented in the subclass to implement an actual server.
 *
 * @param <R> The data to be read from the socket.
 * @param <W> data to be written (in case of duplex) from the socket.
 */

public abstract class MFTcpServer<R, W> {

    protected final AtomicBoolean started;

    protected MFTcpServer() {
        this.started = new AtomicBoolean();
    }

    /**
     * Start the server.
     *
     * @param initializer the channel initializers. they will be called when a new client connects to the server.
     * @return instance of tcp server
     */
    public final MFTcpServer<R, W> start(ChannelInitializer<Channel> initializer) {
        if (!started.compareAndSet(false, true)) {
            throw new IllegalStateException("Server already started");
        }

        doStart(initializer);
        return this;
    }

    /**
     * Start the server and wait for all the threads to be finished before shutdown.
     * @param initializer the channel initializers. they will be called when a new client connects to the server.
     */
    public final void startAndAwait(ChannelInitializer<Channel> initializer) {
        start(initializer);
        awaitShutdown();
    }

    /**
     * Shutdown the server
     * @return true if successfully shutdown.
     */
    public final boolean shutdown() {
        return !started.compareAndSet(true, false) || doShutdown();
    }

    /**
     * Wait for all the threads to be finished before shutdown.
     */
    public abstract void awaitShutdown();

    /**
     * Do the shutdown now.
     * @return true if successfully shutdown
     */
    public abstract boolean doShutdown();

    /**
     * start the server
     * @param initializer the channel initializers. they will be called when a new client connetcs to the server.
     * @return instance of tcp server
     */
    public abstract MFTcpServer<R, W> doStart(ChannelInitializer<Channel> initializer);

    /**
     *
     * @return the port where the server is running.
     */
    public abstract int getPort();

MFNetty4TcpServer.java 实际的服务器实现

public class MFNetty4TcpServer<R, W> extends MFTcpServer<R, W> {

    private static final Logger logger = LoggerFactory.getLogger(MFNetty4TcpServer.class);
    private static final int BOSS_THREAD_POOL_SIZE = 2;

    private int port;
    private ServerBootstrap bootstrap;
    private ChannelFuture bindFuture;

    /**
     * The constructor.
     *
     * @param port port where to listen
     */
    protected MFNetty4TcpServer(int port) {
        this.port = port;
        final NioEventLoopGroup bossGroup = new NioEventLoopGroup(0, new DefaultEventExecutorGroup
                (BOSS_THREAD_POOL_SIZE));
        final NioEventLoopGroup workerGroup = new NioEventLoopGroup(0, new DefaultEventExecutorGroup
                (JsonProducerConfig.THREAD_POOL_SIZE));

        bootstrap = new ServerBootstrap()
                .group(bossGroup, workerGroup)
                .channel(NioServerSocketChannel.class);
    }

    @Override
    public MFNetty4TcpServer<R, W> doStart(ChannelInitializer<Channel> initializer) {
        bootstrap.childHandler(new ChannelInitializer<Channel>() {

            @Override
            protected void initChannel(Channel ch) throws Exception {

                if (initializer != null) {
                    ch.pipeline().addLast(initializer);
                }
            }
        });

        try {
            bindFuture = bootstrap.bind(port).sync();
            if (!bindFuture.isSuccess()) {
                // Connection not successful
                throw new RuntimeException(bindFuture.cause());
            }
            SocketAddress localAddress = bindFuture.channel().localAddress();
            if (localAddress instanceof InetSocketAddress) {
                port = ((InetSocketAddress) localAddress).getPort();
                logger.info("Started server at port: " + port);
            }

        } catch (InterruptedException e) {
            logger.error("Error waiting for binding server port: " + port, e);
        }

        return this;
    }

    @Override
    public void awaitShutdown() {
        try {
            bindFuture.channel().closeFuture().await();
        } catch (InterruptedException e) {
            Thread.interrupted(); // Reset the interrupted status
            logger.error("Interrupted while waiting for the server socket to close.", e);
        }
    }

    @Override
    public boolean doShutdown() {
        try {
            bindFuture.channel().close().sync();
            return true;
        } catch (InterruptedException e) {
            logger.error("Failed to shutdown the server.", e);
            return false;
        }
    }

    @Override
    public int getPort() {
        return port;
    }

    /**
     * Creates a tcp server at the defined port.
     *
     * @param port port to listen to
     * @param <R>  data to be read
     * @param <W>  data to be written back. Only in case of duplex connection.
     * @return instance of tcp server.
     */
    public static <R, W> MFTcpServer<R, W> create(int port) {
        return new MFNetty4TcpServer<>(port);
    }

}

JsonProducerConfig.java 管道设置在这里。

/**
 * Spring Configuration class of the application.
 */
@Configuration
@Import({DatabusConfig.class})
public class JsonProducerConfig {

    private static final Logger log = LoggerFactory.getLogger(JsonProducerConfig.class);

    public static final int THREAD_POOL_SIZE = Runtime.getRuntime().availableProcessors() * 2;

    public static final String TCP_SERVER = "tcpServer";
    public static final String CHANNEL_PIPELINE_INITIALIZER = "channel_initializer";
    public static final String MF_KAFKA_PRODUCER = "mf_kafka_producer";
    public static final String JSON_AVRO_CONVERTOR = "jsonAvroConvertor";

    @Value("#{systemProperties['tcpserver.port']?:'12000'}")
    private String tcpServerPort;

    @Bean(name = TCP_SERVER)
    @Scope(ConfigurableBeanFactory.SCOPE_SINGLETON)
    public MFTcpServer nettyTCPServer() {
        return MFNetty4TcpServer.create(Integer.parseInt(tcpServerPort));
    }

    @Bean(name = MF_KAFKA_PRODUCER)
    @Scope(ConfigurableBeanFactory.SCOPE_SINGLETON)
    public MFKafkaProducer pushToKafka() {
        return new MFKafkaProducer();
    }

    @Bean(name = JSON_AVRO_CONVERTOR)
    @Scope(ConfigurableBeanFactory.SCOPE_SINGLETON)
    public JsonAvroConvertor jsonAvroConvertor() {
        return new JsonAvroConvertor();
    }

    /**
     * This is where the pipeline is set for processing of events.
     *
     * @param jsonAvroConvertor converts json to avro
     * @param kafkaProducer     pushes to kafka
     * @return chanenl initializers pipeline.
     */
    @Bean(name = CHANNEL_PIPELINE_INITIALIZER)
    @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
    public ChannelInitializer<Channel> channelInitializers(JsonAvroConvertor jsonAvroConvertor,
                                                           MFKafkaProducer kafkaProducer) {
        return new ChannelInitializer<Channel>() {
            @Override
            protected void initChannel(Channel channel) throws Exception {

                if (log.isInfoEnabled())
                    log.info("initChannel - initing channel...");

                channel.pipeline().addLast(new NioEventLoopGroup(0, new DefaultEventExecutorGroup(THREAD_POOL_SIZE)));
                channel.pipeline().addLast(new JsonObjectDecoder(1048576));
                channel.pipeline().addLast(jsonAvroConvertor);
                channel.pipeline().addLast(kafkaProducer);

                if (log.isInfoEnabled())
                    log.info("channel = " + channel.toString());
            }
        };
    }

}

JsonProducer.java主程序

public class JsonProducer {

    private static final Logger log = LoggerFactory.getLogger(JsonProducer.class);

    private static MFTcpServer tcpServer;

    /**
     * Main startup method
     *
     * @param args not used
     */
    public static void main(String[] args) {
        System.setProperty("solschema", "false");

        try {

            // the shutdown hook.
            Runtime.getRuntime().addShutdownHook(new Thread(
                    () -> {
                        if (tcpServer != null) {
                            tcpServer.shutdown();
                        }
                    }
            ));

            AnnotationConfigApplicationContext context = new
                    AnnotationConfigApplicationContext(JsonProducerConfig.class);

            tcpServer = (MFTcpServer) context.getBean(JsonProducerConfig.TCP_SERVER);

            ChannelInitializer<Channel> channelInitializer = (ChannelInitializer<Channel>) context.
                    getBean(JsonProducerConfig.CHANNEL_PIPELINE_INITIALIZER);

            tcpServer.startAndAwait(channelInitializer);

        } catch (Exception t) {
            log.error("Error while starting JsonProducer ", t);
            System.exit(-1);
        }
    }
}

MFKafkaProducer.java 作为管道中的最后一个通道。请注意 channelRead 方法中的 ctx.writeAndFlush(msg) ,这是我理解应该启动响应的地方。但那之后呢。运行此 channelFuture.isSuccess() 时,计算结果为 false。响应对象是对字符串响应的尝试。

@ChannelHandler.Sharable
public class MFKafkaProducer extends ChannelInboundHandlerAdapter {

    private static final Logger log = LoggerFactory.getLogger(MFKafkaProducer.class);

    @Resource
    ApplicationContext context;

    @Resource(name = DatabusConfig.ADMIN)
    Admin admin;

    private Map<String, IProducer> streams = new HashMap<>();

    @PreDestroy
    public void stop() {
        removeAllStreams(); // then stop writing to producers
    }

    /**
     * @param clickRecord the record to be pushed to kafka
     * @throws Exception
     */
    public void handle(GenericRecord clickRecord) throws Exception {
        Utf8 clientId = null;
        try {
            clientId = (Utf8) clickRecord.get(SchemaUtil.APP_ID);
            stream(producer(clientId.toString()), clickRecord);
        } catch (Exception e) {
            String message = "Could not push click data for clientId:" + clientId;
            log.warn("handle - " + message + "!!!", e);
            assert clientId != null;
            removeStream(clientId.toString());
        }
    }

    /**
     * removes all the streams
     */
    private void removeAllStreams() {
        Set<String> strings = streams.keySet();

        for (String clientId : strings) {
            removeStream(clientId);
        }
    }

    /**
     * removes a particular stream
     *
     * @param clientId the stream to be removed
     */
    private void removeStream(String clientId) {
        Assert.notEmpty(streams);
        IProducer producer = streams.get(clientId);
        producer.stopProducer();
        streams.remove(clientId);
    }

    /**
     * @param producer    the producer where data needs to be written
     * @param clickRecord teh record to be written
     */
    private void stream(IProducer producer, GenericRecord clickRecord) {
        producer.send(clickRecord);
    }

    /**
     * This will create a producer in case it is not already created.
     * If already created return the already present one
     *
     * @param clientId stream id
     * @return the producer instance
     */
    private IProducer producer(String clientId) {
        if (streams.containsKey(clientId)) {
            return streams.get(clientId);
        } else {
            IProducer producer = admin.createKeyTopicProducer(SchemaUtil.APP_ID, "test_" + clientId, new ICallback() {
                @Override
                public void onSuccess(long offset) {
                    if (log.isInfoEnabled())
                        log.info("onSuccess - Data at offset:" + offset + " send.");
                }

                @Override
                public void onError(long offset, Exception ex) {
                    if (log.isInfoEnabled())
                        log.info("onError - Data at offset:" + offset + " failed. Exception: ", ex);
                }

                @Override
                public void onStreamClosed() {
                    log.warn("onStreamClosed - Stream:" + clientId + " closed.");
                    removeStream(clientId);
                }
            });
            producer.startProducer();
            streams.put(clientId, producer);
            return producer;
        }
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        log.debug("KafkaProducer - channelRead() called with " + "ctx = [" + ctx + "], msg = [" + msg + "]");

        if (msg instanceof GenericRecord) {
            GenericRecord genericRecord = (GenericRecord) msg;
            try {
                handle(genericRecord);
                log.debug("channelRead sending response");
                Charset charset = Charset.defaultCharset();
                ByteBuf response = Unpooled.copiedBuffer("Just a response", charset);
                ChannelFuture future = ctx.writeAndFlush(msg);
                future.addListener(new ChannelFutureListener() {
                    @Override
                    public void operationComplete(ChannelFuture channelFuture) throws Exception {
                        if (channelFuture.isSuccess())
                            log.info("channelRead - future.operationComplete - Response has been delivered to all channels");
                        else
                            log.info("channelRead - future.operationComplete - Response has NOT been delivered to all channels");
                    }
                });
            } catch (Exception ex) {
                log.error("Something went wrong processing the generic record: " + msg + "\n ", ex);
            }
        } else {
            log.debug("KafkaProducer - msg not of Type Generic Record !!! " + msg);
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        // Close the connection when an exception is raised.
        log.error("Something went wrong writing to Kafka: \n", cause);
        ctx.close();
    }

}

【问题讨论】:

  • 如果 ChannelFuture#isSuccess() 返回 false,我建议记录 ChannelFuture#cause()
  • 如果您将ByteBuf response 写回频道而不是收到的msg,会发生什么?
  • 感谢@forty-2,这确实是个问题,因为被序列化的消息不是 ByteBuf 而是 GenericRecord。我通过从 genericRecord 创建一个 ByteBuf 解决了这个问题(现在只是为了测试目的,看看是否有任何响应:ByteBuf response = Unpooled.copiedBuffer(genericRecord.toString(), charset); ChannelFuture future = ctx.writeAndFlush(response);@Moh-Aw:我现在确实从 ChannelFuture 成功传递了消息,但不知何故我还没有看到它到达我的客户(这是一个非常简单的客户,这可能是一个原因)。
  • 感谢您的帮助,我终于解决了。这确实是客户端的糟糕实现。通过使用 SocketChannel 而不仅仅是 Socket,我也确实在客户端看到了回复。
  • 我建议您编写解决方案并接受它以关闭此问题,以便其他人可以利用它。 :)

标签: java netty


【解决方案1】:

使用 ChannelFuture#cause() 我注意到我没有序列化一个 ByteBuf 对象,而是一个 GenericRecord。使用

ByteBuf 响应 = Unpooled.copiedBuffer(genericRecord.toString(), charset);
ChannelFuture 未来 = ctx.writeAndFlush(response);

GenericRecord 被转换为 ButeBuf 并使用 writeAndFlush 方法发送响应。

使用 Socket 实现的测试客户端不知何故从未真正收到响应,但通过使用 SocketChannel,这也得到了解决。

【讨论】:

    猜你喜欢
    • 2019-05-18
    • 2012-02-21
    • 2014-06-01
    • 1970-01-01
    • 2013-11-08
    • 1970-01-01
    • 1970-01-01
    • 2012-07-19
    • 2012-12-05
    相关资源
    最近更新 更多