【问题标题】:Sending events transactionally in logstash在 logstash 中以事务方式发送事件
【发布时间】:2019-05-26 06:25:47
【问题描述】:

我正在尝试使用 logstash 从 TCP 套接字接收事件,并将它们输出到 Kafka 主题。我当前的配置能够完美地做到这一点,但我希望能够以事务方式向 Kafka 进行事件。我的意思是,在收到提交消息之前,系统不应该将事件发送到 kafka:

START TXN 123         --No message sent to Kafka
123 - Event1 Message  --No message sent to Kafka
123 - Event2 Message  --No message sent to Kafka
123 - Event3 Message  --No message sent to Kafka
COMMIT TXN 123           --Event1, Event2, Event3 messages sent to Kafka

是否有可能仅使用 logstash 来实现这一点,或者我应该在源和 logstash 之间引入任何其他事务协调器?这是我当前的配置:

input {
  tcp {
    port => 9000
  }
}

output {
  kafka { 
    bootstrap_servers => "localhost:9092"
    topic_id =>  "alpayk"
  }
}

为此,我尝试使用 logstash 的聚合过滤器,但最终无法得到有效的结果。

非常感谢您

【问题讨论】:

  • 虽然可以在内部启用 Kafka 客户端进行事务性写入,但我认为 Logstash 尚未实现此功能。也不,我认为您可以在 Logstash 本身中执行这种类型的“条件刷新”操作
  • @cricket_007 感谢您的评论。事实上,我正在尝试从头开始设计这个系统,所以我不一定会使用 logstash 将事件从 socket 传送到 kafka,我可以在两者之间使用任何其他技术。我的意图是建立一个支持有条件刷新事件的系统,如您所指出的。
  • 那么您可能需要自己编写该生产者并根据您的事件数据手动放入条件语句
  • aggregate filter 可能满足您的需求。这个answer 可能是一个好的开始。我从来没有使用过这个过滤器,所以我无法写出完整的答案。

标签: tcp apache-kafka logstash


【解决方案1】:

我最终决定为此使用 Apache Flume。我修改了它的 netcat 源,以便未提交的消息驻留在 Flume 的堆中,一旦收到事务的提交消息,所有消息都会发送到 kafka sink。

我要将消息存储位置从水槽堆更改为外部缓存,以便在事务异常终止或回滚时使存储的消息过期。

以下是该事务逻辑的一段代码:

String eventMessage = new String(body);
int indexOfTrxIdSeparator = eventMessage.indexOf("-");
if (indexOfTrxIdSeparator != -1) {
    String txnId = eventMessage.substring(0, indexOfTrxIdSeparator).trim();
    String message = eventMessage.substring(indexOfTrxIdSeparator + 1).trim();
    ArrayList<Event> events = cachedEvents.get(txnId);

    if (message.equals("COMMIT")) {

        System.out.println("@@@@@ COMMIT RECEIVED");

        if (events != null) {
            for (Event eventItem : events) {
                ChannelException ex = null;
                try {
                    source.getChannelProcessor().processEvent(eventItem);
                } catch (ChannelException chEx) {
                    ex = chEx;
                }

                if (ex == null) {
                    counterGroup.incrementAndGet("events.processed");
                } else {
                    counterGroup.incrementAndGet("events.failed");
                    logger.warn("Error processing event. Exception follows.", ex);
                }
            }

            cachedEvents.remove(txnId);
        }
    } else {
        System.out.println("@@@@@ MESSAGE RECEIVED: " + message);
        if (events == null) {
            events = new ArrayList<Event>();
        }
        events.add(EventBuilder.withBody(message.getBytes()));
        cachedEvents.put(txnId, events);
    }
}

我将此代码添加到 Flume 的 netcat 源的processEvents 方法中。我不想使用 Ruby 代码,这就是我决定改用 Flume 的原因。然而,同样的事情也可以在logstash中完成。

谢谢

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2011-10-25
    • 1970-01-01
    • 1970-01-01
    • 2020-06-20
    • 2021-12-28
    • 1970-01-01
    • 2021-06-16
    • 1970-01-01
    相关资源
    最近更新 更多