【问题标题】:Kafka Connect: Multiple DB2 JDBC Source Connectors failKafka Connect:多个 DB2 JDBC 源连接器失败
【发布时间】:2019-05-17 23:35:11
【问题描述】:

我正在尝试在本地 Docker 容器中使用 Kafka Connect(使用官方 Confluent 映像),以便将 DB2 数据推送到 Openshift 上的 Kafka 集群(在 AWS 上)。我将 Confluent JDBC 连接器与我的 DB2 JDBC-Jar 一起使用。 我有不同的连接器配置,因为我将 SMT 与“transforms.createKey”(创建我的密钥)一起使用,并且我的表中的密钥列具有不同的名称。

这是我的步骤:

  • 为 Kafka Connect 创建配置、偏移和状态的主题
  • 启动/创建 Kafka Connect 容器(环境变量见下文)
  • 通过对我的 Connect 容器的后调用创建第一个 JDBC 连接器(配置见下文)

到目前为止一切正常,我可以看到我的数据被推送到集群。但是,一旦我通过 post call 添加了第二个 JDBC 连接器,第一个连接器就会停止将数据推送到集群,而第二个连接器会启动并继续加载和推送数据。似乎两个连接器都将数据推送到集群的时间很短,但我假设这可能是来自连接器 1 的仍被刷新的数据。 问题是 a) 甚至跟踪日志也没有显示有意义的错误(至少对我而言),并且 b) 显示的错误在尝试之间有所不同(我总是删除所有主题和容器)。

我假设这不是错误,而是需要适当设置的配置组合和/或我对一些基本的 Kafka Connect 核心功能缺乏了解。我已经尝试添加和更改各种配置,但不幸的是,到目前为止还没有任何结果。我已经尝试了很多次,但没有运气。我附上了我最近两次尝试的日志以及配置。

有没有人知道我可以调整哪个配置或者要研究什么来解决这个问题? 任何帮助表示赞赏 - 谢谢!


Kafka: 2.0.0
Docker image: confluentinc/cp-kafka-connect:5.0.0
DB2: 10.5
JDBC Jar: db2jcc4.jar with version 4.19.76

记录第一次尝试:

[2018-12-17 13:09:15,683] ERROR Invalid call to OffsetStorageWriter flush() while already flushing, the framework should not allow this (org.apache.kafka.connect.storage.OffsetStorageWriter)
[2018-12-17 13:09:15,684] ERROR WorkerSourceTask{id=db2-jdbc-source-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask)
org.apache.kafka.connect.errors.ConnectException: OffsetStorageWriter is already flushing
    at org.apache.kafka.connect.storage.OffsetStorageWriter.beginFlush(OffsetStorageWriter.java:110)
    at org.apache.kafka.connect.runtime.WorkerSourceTask.commitOffsets(WorkerSourceTask.java:409)
    at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:238)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
[2018-12-17 13:09:15,686] ERROR WorkerSourceTask{id=db2-jdbc-source-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask)
[2018-12-17 13:09:15,686] INFO [Producer clientId=producer-4] Closing the Kafka producer with timeoutMillis = 30000 ms. (org.apache.kafka.clients.producer.KafkaProducer)
[2018-12-17 13:09:20,682] ERROR Graceful stop of task db2-jdbc-source-0 failed. (org.apache.kafka.connect.runtime.Worker)
[2018-12-17 13:09:20,682] INFO Finished stopping tasks in preparation for rebalance (org.apache.kafka.connect.runtime.distributed.DistributedHerder)

记录第二次尝试:

[2018-12-17 14:01:31,658] INFO Stopping task db2-jdbc-source-0 (org.apache.kafka.connect.runtime.Worker)
[2018-12-17 14:01:31,689] INFO Stopped connector db2-jdbc-source (org.apache.kafka.connect.runtime.Worker)
[2018-12-17 14:01:31,784] INFO WorkerSourceTask{id=db2-jdbc-source-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask)
[2018-12-17 14:01:31,784] INFO WorkerSourceTask{id=db2-jdbc-source-0} flushing 20450 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask)
[2018-12-17 14:01:36,733] ERROR Graceful stop of task db2-jdbc-source-0 failed. (org.apache.kafka.connect.runtime.Worker)
[2018-12-17 14:01:36,733] INFO Finished stopping tasks in preparation for rebalance (org.apache.kafka.connect.runtime.distributed.DistributedHerder)

screenshot of incoming messages per second in the Kafka cluster

Kafka Connect Docker 环境变量:

-e CONNECT_BOOTSTRAP_SERVERS=my_kafka_cluster:443 \
  -e CONNECT_PRODUCER_BOOTSTRAP_SERVERS="my_kafka_cluster:443" \
  -e CONNECT_REST_ADVERTISED_HOST_NAME="kafka-connect" \
  -e CONNECT_REST_PORT=8083 \
  -e CONNECT_GROUP_ID="kafka-connect-group" \
  -e CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR=3 \
  -e CONNECT_CONFIG_STORAGE_TOPIC="kafka-connect-config" \
  -e CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR=3 \
  -e CONNECT_OFFSET_STORAGE_TOPIC="kafka-connect-offset" \
  -e CONNECT_OFFSET_FLUSH_INTERVAL_MS=15000 \
  -e CONNECT_OFFSET_FLUSH_TIMEOUT_MS=60000 \
  -e CONNECT_STATUS_STORAGE_REPLICATION_FACTOR=3 \
  -e CONNECT_STATUS_STORAGE_TOPIC="kafka-connect-status" \
  -e CONNECT_KEY_CONVERTER="io.confluent.connect.avro.AvroConverter" \
  -e CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL=http://url_to_schemaregistry \
  -e CONNECT_VALUE_CONVERTER="io.confluent.connect.avro.AvroConverter" \
  -e CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL=http://url_to_schemaregistry \
  -e CONNECT_INTERNAL_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
  -e CONNECT_INTERNAL_KEY_CONVERTER_SCHEMAS_ENABLE="false" \
  -e CONNECT_INTERNAL_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
  -e CONNECT_INTERNAL_VALUE_CONVERTER_SCHEMAS_ENABLE="false" \
  -e CONNECT_PLUGIN_PATH=/usr/share/java \
  -e CONNECT_PRODUCER_BUFFER_MEMORY="8388608" \
  -e CONNECT_SECURITY_PROTOCOL="SSL" \
  -e CONNECT_PRODUCER_SECURITY_PROTOCOL="SSL" \
  -e CONNECT_SSL_TRUSTSTORE_LOCATION="/usr/share/kafka.client.truststore.jks" \
  -e CONNECT_PRODUCER_SSL_TRUSTSTORE_LOCATION="/usr/share/kafka.client.truststore.jks" \
  -e CONNECT_SSL_TRUSTSTORE_PASSWORD="my_ts_pw" \
  -e CONNECT_PRODUCER_SSL_TRUSTSTORE_PASSWORD="my_ts_pw" \
  -e CONNECT_LOG4J_LOGGERS=org.apache.kafka.connect.runtime.rest=WARN,org.reflections=ERROR \
  -e CONNECT_LOG4J_ROOT_LOGLEVEL=INFO \
  -e HOSTNAME=kafka-connect \

JDBC 连接器(仅表和键列不同):

{
    "name": "db2-jdbc-source",
    "config": 
    {
        "mode":"timestamp",
        "debug":"true",
        "batch.max.rows":"50",
        "poll.interval.ms":"10000",
        "timestamp.delay.interval.ms":"60000",
        "timestamp.column.name":"IBMSNAP_LOGMARKER",
        "connector.class":"io.confluent.connect.jdbc.JdbcSourceConnector" ,
        "connection.url":"jdbc:db2://myip:myport/mydb:currentSchema=myschema;",
        "connection.password":"mypw",
        "connection.user":"myuser",
        "connection.backoff.ms":"60000",
        "dialect.name": "Db2DatabaseDialect",
        "table.types": "TABLE",
        "table.poll.interval.ms":"60000",
        "table.whitelist":"MYTABLE1",
        "tasks.max":"1",
        "topic.prefix":"db2_",
        "key.converter":"io.confluent.connect.avro.AvroConverter",
        "key.converter.schema.registry.url":"http://url_to_schemaregistry",
        "value.converter":"io.confluent.connect.avro.AvroConverter",
        "value.converter.schema.registry.url":"http://url_to_schemaregistry",
        "transforms":"createKey",
        "transforms.createKey.type":"org.apache.kafka.connect.transforms.ValueToKey",
        "transforms.createKey.fields":"MYKEY1"
    }
}

【问题讨论】:

  • 当您发布新配置时,它会重新平衡连接集群,但不应“停止”任何工作人员
  • @cricket_007 是的,我就是这么想的。我确实收到了“Rebelance 开始”的日志条目,然后是第一个连接器已停止的条目。查看日志,第一个连接器似乎也再次启动,但似乎“相信”没有更多工作要做。

标签: docker jdbc apache-kafka apache-kafka-connect confluent-platform


【解决方案1】:

我最终发现了问题: 我在时间戳模式下使用 JDBC 连接器,而不是时间戳+递增,因为我不能(总是)指定递增列。我知道这可能会导致问题,当有多个具有相同时间戳的条目时,Connect 无法知道哪些条目已被读取。

我的大部分数据行具有相同的时间戳。当我添加第二个连接器时,第一个连接器的当前时间戳被存储并且 Connect 开始重新平衡,因此丢失了已经读取了该时间戳的哪些行的信息。当连接器重新启动并运行时,第一个连接器继续使用“下一个时间戳”,因此只加载最新的行(这只是一小部分)。

我的错误是假设在这种情况下,第一个连接器将使用前一个时间戳重新开始工作,而不是继续使用“下一个时间戳”。对我来说,宁愿冒着重复的风险而不是可能丢失数据更有意义。

【讨论】:

  • 对于其他有同样问题的人:我通过首先启动所有 kafka-connect 容器解决了这个问题,然后应用配置。这样在重新平衡时不会提交偏移量,因此不会丢失任何数据:)
猜你喜欢
  • 2020-01-15
  • 2018-11-30
  • 2019-01-21
  • 2019-06-17
  • 2017-10-12
  • 2020-08-05
  • 2019-07-20
  • 2018-09-01
  • 2020-08-28
相关资源
最近更新 更多