【问题标题】:Netty request timeoutNetty 请求超时
【发布时间】:2018-08-07 03:58:42
【问题描述】:

我正在尝试编写一个 HTTP 服务,该服务将从 HTTP 获取数据并使用 Netty 将其放入 Kafka。我需要在 m5.large EC2 实例上处理 20K RPS,这似乎很可行。

代码很简单:

Server.java

public class Server {
    public static void main(final String[] args) throws Exception {
        final EventLoopGroup bossGroup = new EpollEventLoopGroup();
        final EventLoopGroup workerGroup = new EpollEventLoopGroup();

        try {
            final ServerBootstrap bootstrap = new ServerBootstrap();

            bootstrap
                .group(bossGroup, workerGroup)
                .channel(EpollServerSocketChannel.class)
                .childHandler(new RequestChannelInitializer(createProducer()))
                .childOption(ChannelOption.SO_KEEPALIVE, true);
            bootstrap.bind(PORT).sync().channel().closeFuture().sync();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

    private static Producer<String, ByteBuffer> createProducer() {
        final Properties properties = new Properties();

        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_HOST);
        properties.put(ProducerConfig.CLIENT_ID_CONFIG, "KafkaBidRequestProducer");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteBufferSerializer.class.getName());
        properties.put(ProducerConfig.RETRIES_CONFIG, 0);
        properties.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 10000);
        properties.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 10000);
        properties.put(ProducerConfig.SEND_BUFFER_CONFIG, 33554432);

        return new KafkaProducer<>(properties);
    }
}

RequestChannelInitializer.java

public class RequestChannelInitializer extends io.netty.channel.ChannelInitializer<SocketChannel> {
    private final Producer<String, ByteBuffer> producer;

    public BidRequestChannelInitializer(final Producer<String, ByteBuffer> producer) {
        this.producer = producer;
    }

    @Override
    public void initChannel(final SocketChannel ch) {
        ch.pipeline().addLast(new HttpServerCodec());
        ch.pipeline().addLast(new HttpObjectAggregator(1048576));
        ch.pipeline().addLast(new RequestHandler(producer));
    }
}

RequestHandler.java

public class RequestHandler extends SimpleChannelInboundHandler<FullHttpMessage> {
    private final Producer<String, ByteBuffer> producer;

    public BidRequestHandler(final Producer<String, ByteBuffer> producer) {
        this.producer = producer;
    }

    @Override
    public void channelReadComplete(final ChannelHandlerContext ctx) {
        ctx.flush();
    }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, FullHttpMessage msg) {
        final DefaultFullHttpResponse response = new DefaultFullHttpResponse(HTTP_1_1, OK);
        final ProducerRecord<String, ByteBuffer> record = new ProducerRecord<>(
            "test",
            UUID.randomUUID().toString(),
            msg.content().nioBuffer()
        );

        producer.send(record);

        if (HttpUtil.isKeepAlive(msg)) {
            response.headers().set(CONNECTION, HttpHeaderValues.KEEP_ALIVE);
        }

        ctx.write(response).addListener(ChannelFutureListener.CLOSE);
    }
}

代码取自官方文档。但是,有时我会在负载测试中遇到Request 'Post BidRequest' failed: j.u.c.TimeoutException: Request timeout after 60000 ms 异常。

据我了解,这意味着在我的负载测试实例和服务实例之间建立了连接,但完成的时间超过了 60 秒。这个简单程序的哪一部分可以阻塞这么久?

我已经调整了 Kafka 生产者:减少了超时。我知道send 可能会阻塞,所以我增加了发送缓冲区,但没有帮助。 我还为服务用户增加了ulimits。 我在 OpenJDK 版本 1.8.0_171 上运行,securerandom.source 设置为file:/dev/urandom,因此对randomUUID 的调用不应阻塞。

【问题讨论】:

    标签: java apache-kafka netty kafka-producer-api


    【解决方案1】:

    你是对的,那里没有任何东西可以阻挡。发送到 Kafka 的调用是异步的。我查看了您的代码,从我所看到的一切看起来都很好。

    我会检查几件事:

    • 确保 AWS 中的安全组定义允许 Kafka 服务器和应用程序与 Zookeeper 通信。如果这是一个测试/POC,您应该只允许所有三个实例/集群之间的所有流量。 60 秒的超时让我怀疑网络超时,这可能意味着某些服务无法访问。
    • 您是否尝试过更简单的测试,尝试在不依赖 Netty 的情况下生产到 Kafka?也许这有助于缩小问题的范围。

    【讨论】:

    • Kafka 和 ZK 都托管在另一台机器上。安全组允许我的应用程序与 Kafka 通信。我还使用 JVisualVM 跟踪了该应用程序,并且所有工作线程几乎 100% 的时间都在工作,没有任何阻塞(绿色实线)。 CPU 和 RAM 为最大值的 25%。我开始认为响应只是在电线上的某个地方丢失了,但我不知道如何追踪它。虚拟机网络为 10 Gigabit。
    猜你喜欢
    • 1970-01-01
    • 2011-02-26
    • 2021-05-16
    • 1970-01-01
    • 1970-01-01
    • 2016-01-25
    • 1970-01-01
    • 2016-05-27
    • 2020-05-31
    相关资源
    最近更新 更多