【问题标题】:Get empty rows when sending log from Kafka to ClickHouse将日志从 Kafka 发送到 ClickHouse 时获取空行
【发布时间】:2021-04-08 13:37:32
【问题描述】:

我正在尝试使用 filebeat 将数据从 kafka 放入 clickhouse,我的配置如下所示

filebeat 配置

filebeat.inputs:
- type: log
  enabled: true
  paths:
    - /var/log/nginx/access.log

output.kafka:
  # specifying filebeat to take timestamp and message fields, other wise it
  # take the lines as json and publish to kafka
  codec.format:
    string: '%{[@timestamp]} %{[message]}'

  # kafka
  # publishing to 'log' topic
  hosts: ["kafka:9092"]
  topic: 'myfirst'
  partition.round_robin:
    reachable_only: false
  required_acks: 1
  compression: gzip
  max_message_bytes: 1000000

在 Kafka 中,我正在获取我的登录主题,一切都很好,数据插入到这样的 kafka 主题的一部分

2021-01-01T21:51:25.225Z {"remote_addr": "192.168.222.1","remote_user": "-","time_local":  "01/Jan/2021:21:51:17 +0000","request":     "GET / HTTP/1.1","status":      "304","body_bytes_sent": "0","http_referer": "-","http_user_agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/87.0.4280.88 Safari/537.36"}

我创建了 clickhouse 表和 MATERIALIZED

CREATE TABLE accesslog (
...
    ) ENGINE = Kafka SETTINGS kafka_broker_list = 'kafka:9092',


但是在没有数据的情况下,clickhouse 中的查询输出是这样的!为什么?

┌─remote_addr─┬─remote_user─┬─time_local─┬───────date─┬─request─┬─status─┬─body_bytes_sent─┬─http_referer─┬─http_user_agent─┐
│             │             │            │ 0000-00-00 │         │      0 │               0 │              │                 │
│             │             │            │ 0000-00-00 │         │      0 │               0 │              │                 │
│             │             │            │ 0000-00-00 │         │      0 │               0 │              │                 │
└─────────────┴─────────────┴────────────┴────────────┴─────────┴────────┴─────────────────┴──────────────┴─────────────────┘

【问题讨论】:

  • 检查日志/var/log/clickhouse-server/clickhouse-server.log
  • 你确定 accesslog 即将到来的任何事件吗?要检查它:1) 通过 "detach table log_consumer" 停止 MV,2) 向 Topic 添加一些事件,3) 运行这个查询来检查它:"从访问日志中选择 *".
  • 谢谢你的回答,是的,即使在 kafka 中,accesslog 也是由事件更新的,我做了这 3 步,但 select * from accesslog 有每个事件的记录,也没有我说的数据。
  • 和 clickhouse 服务器日志:2021.01.01 23:20:03.466759 [51] {} StorageKafka(访问日志):已分配给:[myfirst[0:#]] 2021.01.01 23 :20:03.466942 [ 47 ] {} StorageKafka (accesslog): 已分配给: [ ] 2021.01.01 23:20:03.980754 [ 48 ] {} StorageKafka (accesslog): 轮询批次 1 条消息。偏移位置:[myfirst[0:1]] 2021.01.01 23:20:03.981755 [48] {} IRowInputFormat:在读取输入流时跳过 1 行错误 2021.01.01 23:20:04.489136 [48] {} StorageKafka(访问日志):停滞
  • 您使用哪个版本的 ClickHouse?我无法在版本 20.12.3.3 上重现此错误。或者更好地提供您的 docker-compose.yml.

标签: apache-kafka clickhouse


【解决方案1】:

问题似乎是错误的 Kafka 代理地址。不应使用外部地址kafka:9092,而是使用内部kafka:19092

CREATE TABLE accesslog (
..
  ) ENGINE = Kafka SETTINGS kafka_broker_list = 'kafka:19092', ..


复制步骤:

卡夫卡方面:

# run shell in Kafka container
docker exec -it kafka bash

# create topic
kafka-topics --create --topic myfirst --partitions 1 --replication-factor 1 --bootstrap-server kafka:19092

# check topic
# kafka-topics --describe --topic myfirst  --bootstrap-server kafka:19092

# add events to the topic
kafka-console-producer --topic myfirst --broker-list kafka:19092
# event body: {"remote_addr": "192.168.222.1","remote_user": "-","time_local":  "01/Jan/2021:21:51:17 +0000","request":     "GET / HTTP/1.1","status":      "304","body_bytes_sent": "0","http_referer": "-","http_user_agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/87.0.4280.88 Safari/537.36"}
..

ClickHouse 端:

SELECT *
FROM accesslog

/*
┌─remote_addr───┬─remote_user─┬─time_local─────────────────┬─request────────┬─status─┬─body_bytes_sent─┬─http_referer─┬─http_user_agent────────────────────────────────────────────────────────────────────────────────────────────────────┐
│ 192.168.222.1 │ -           │ 01/Jan/2021:21:51:17 +0000 │ GET / HTTP/1.1 │    304 │               0 │ -            │ Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/87.0.4280.88 Safari/537.36 │
..
*/

摘自docker-compose.yml

..
  kafka:
    image: confluentinc/cp-kafka:5.2.2
    container_name: kafka
    restart: unless-stopped
    hostname: kafka
    depends_on:
      - zookeeper
    environment:
      KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka:19092,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-x.x.x.x}:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL
      KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
      KAFKA_BROKER_ID: 1
      KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
    ports:
      - 9092:9092
    networks:
      - net1
..

【讨论】:

    猜你喜欢
    • 2023-03-09
    • 1970-01-01
    • 2019-02-28
    • 2020-06-02
    • 2019-10-03
    • 2019-05-27
    • 2018-06-07
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多