【问题标题】:Custom Sink for Flume-ng null eventFlume-ng null 事件的自定义接收器
【发布时间】:2013-08-30 16:17:10
【问题描述】:

我正在尝试为 flume-ng 编写一个自定义接收器。我查看了现有的接收器和文档并将其编码。但是,应该接收事件的 'process()' 方法总是以 null 结束。 我在做 Event event = channel.take();但该事件为空。我在日志中看到该方法被重复调用,因为事件仍在通道中。

有人能指出正确的方向吗?

【问题讨论】:

    标签: flume


    【解决方案1】:

    这是一个流程函数的骨架...如果您未能获得您回滚的事件,请将状态更改为 BACKOFF 。如果不是,您 commit 并将状态设置为 READY 。无论如何,您总是关闭交易。

        Status status = null;
        Channel channel = getChannel();
        Transaction transaction = channel.getTransaction();
        transaction.begin();
        try {
            Event event = channel.take();
    
            if (event != null && validEvent(event.getBody()) >= 0) {
               # make some printing
            }
            transaction.commit();
            status = Status.READY;
        } catch (Throwable ex) {
            transaction.rollback();
            status = Status.BACKOFF;
            logger.error("Failed to deliver event. Exception follows.", ex);
            throw new EventDeliveryException("Failed to deliver event: " + ex);
        } finally {
            transaction.close();
        }
        return status;
    

    我相信这会奏效:)。

    【讨论】:

    【解决方案2】:

    这是设计使然。 sink runner 将使用null events 轮询 sink,以便确保 sink 处于活动状态并准备好接受未来的事件。当您收到null 事件时,请确保您返回Status.BACKOFF,接收器处理器会稍等片刻再重试。

    【讨论】:

    • 奇怪的是documentation 对此只字未提。
    • 我同意。 Flume 文档非常少,应该更详细一些。
    • 回退持续时间是多少?以及如何控制? AbstractSink 类没有像 Sources 那样实现方法,例如public long getBackOffSleepIncrement() public long getMaxBackOffSleepInterval(
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2019-05-18
    • 1970-01-01
    • 2012-11-20
    • 1970-01-01
    • 1970-01-01
    • 2015-02-10
    相关资源
    最近更新 更多