【问题标题】:Kafka/PubSub connector: Example pipeline: ERROR Task Converting byte[], Unrecognized token, was expecting ('true', 'false' or 'null')Kafka/PubSub 连接器:示例管道:错误任务转换字节 [],无法识别的令牌,期待('true'、'false' 或 'null')
【发布时间】:2017-05-04 16:48:15
【问题描述】:

我正在使用 kafka_2.11-0.10.2.1 和 google here 提供的 pubsub 连接器。我要做的就是使用独立连接器将数据从 Kafka 主题推送到 PubSub 主题。我按照我应该做的所有步骤进行操作:

  1. 产生了cps-kafka-connector.jar
  2. 在 kafka 的 config 目录中添加了 cps-sink-connector.properties 文件。该文件如下所示:
name=CPSConnector
connector.class=com.google.pubsub.kafka.sink.CloudPubSubSinkConnector
tasks.max=10
topics=kafka_topic
cps.topic=pubsub_topic
cps.project=my_gcp_project_12345
  1. 我确定我没有在connect-standalone.properties 中启用任何转换器:
key.converter.schemas.enable=false
value.converter.schemas.enable=false
  1. 我创建了一个主题kafka_topic,并发送了一些消息如下:
$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic kafka_topic
$ hello streams
$ kafka streams rock
  1. 我按如下方式运行连接器:
$ bin/connect-standalone.sh config/connect-standalone.properties config/cps-sink-connector.properties

目的是奔跑:

$ gcloud beta pubsub subscriptions pull subscription_to_pubsub_topic

收集这些消息。但是,会发生以下错误,我无法理解它们。有什么想法吗?我使用了错误的输入吗?正确的样本输入是什么?

    [2017-05-04 17:34:40,898] INFO Discovered coordinator 10.33.19.146:9092 (id: 2147483647 rack: null) for group connect-CPSConnector. (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:586)
  [2017-05-04 17:34:40,899] INFO Revoking previously assigned partitions [] for group connect-CPSConnector (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:397)
  [2017-05-04 17:34:40,900] INFO (Re-)joining group connect-CPSConnector (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:420)
  [2017-05-04 17:34:40,936] ERROR Task CPSConnector-4 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:141)
  org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed due to serialization error:
    at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:305)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:401)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:249)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:179)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:148)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:139)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:182)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
  Caused by: org.apache.kafka.common.errors.SerializationException: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'hello': was expecting ('true', 'false' or 'null')
   at [Source: [B@3c06c37d; line: 1, column: 11]
  Caused by: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'hello': was expecting ('true', 'false' or 'null')
   at [Source: [B@3c06c37d; line: 1, column: 11]
    at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1702)
    at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:558)
    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._reportInvalidToken(UTF8StreamJsonParser.java:3524)
    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._handleUnexpectedValue(UTF8StreamJsonParser.java:2686)
    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._nextTokenNotInObject(UTF8StreamJsonParser.java:878)
    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:772)
    at com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:3834)
    at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:3783)
    at com.fasterxml.jackson.databind.ObjectMapper.readTree(ObjectMapper.java:2404)
    at org.apache.kafka.connect.json.JsonDeserializer.deserialize(JsonDeserializer.java:50)
    at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:303)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:401)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:249)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:179)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:148)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:139)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:182)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
  [2017-05-04 17:34:40,941] ERROR Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:142)
  [2017-05-04 17:34:43,837] INFO Revoking previously assigned partitions [] for group connect-CPSConnector (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:397)
  [2017-05-04 17:34:43,838] INFO (Re-)joining group connect-CPSConnector (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:420)
  [2017-05-04 17:34:43,846] INFO Successfully joined group connect-CPSConnector with generation 3 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:388)
  [2017-05-04 17:34:43,846] INFO Successfully joined group connect-CPSConnector with generation 3 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:388)
  [2017-05-04 17:34:43,847] INFO Setting newly assigned partitions [] for group connect-CPSConnector (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:256)
  [2017-05-04 17:34:43,847] INFO Successfully joined group connect-CPSConnector with generation 3 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:388)
  [2017-05-04 17:34:43,848] INFO Setting newly assigned partitions [] for group connect-CPSConnector (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:256)
  [2017-05-04 17:34:43,850] INFO Successfully joined group connect-CPSConnector with generation 3 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:388)
  [2017-05-04 17:34:43,851] INFO Successfully joined group connect-CPSConnector with generation 3 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:388)
  [2017-05-04 17:34:43,851] INFO Successfully joined group connect-CPSConnector with generation 3 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:388)
  [2017-05-04 17:34:43,851] INFO Successfully joined group connect-CPSConnector with generation 3 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:388)
  [2017-05-04 17:34:43,853] INFO Setting newly assigned partitions [] for group connect-CPSConnector (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:256)
  [2017-05-04 17:34:43,846] INFO Successfully joined group connect-CPSConnector with generation 3 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:388)
  [2017-05-04 17:34:43,851] INFO Setting newly assigned partitions [test8-0] for group connect-CPSConnector (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:256)
  [2017-05-04 17:34:43,851] INFO Setting newly assigned partitions [] for group connect-CPSConnector (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:256)
  [2017-05-04 17:34:43,851] INFO Setting newly assigned partitions [] for group connect-CPSConnector (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:256)
  [2017-05-04 17:34:43,850] INFO Successfully joined group connect-CPSConnector with generation 3 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:388)
  [2017-05-04 17:34:43,856] INFO Setting newly assigned partitions [] for group connect-CPSConnector (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:256)
  [2017-05-04 17:34:43,846] INFO Setting newly assigned partitions [] for group connect-CPSConnector (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:256)
  [2017-05-04 17:34:43,854] INFO Setting newly assigned partitions [] for group connect-CPSConnector (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:256)
  [2017-05-04 17:34:43,862] ERROR Task CPSConnector-0 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:141)
  org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed due to serialization error:
    at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:305)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:401)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:249)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:179)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:148)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:139)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:182)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
  Caused by: org.apache.kafka.common.errors.SerializationException: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'hello': was expecting ('true', 'false' or 'null')
   at [Source: [B@32a6e3e6; line: 1, column: 11]
  Caused by: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'hello': was expecting ('true', 'false' or 'null')
   at [Source: [B@32a6e3e6; line: 1, column: 11]
    at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1702)
    at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:558)
    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._reportInvalidToken(UTF8StreamJsonParser.java:3524)
    ...

【问题讨论】:

    标签: command-line apache-kafka google-cloud-pubsub


    【解决方案1】:

    connect-standalone.properties 中的这些行不会禁用转换器:

    key.converter.schemas.enable=false
    value.converter.schemas.enable=false
    

    他们禁止在某些转换器(例如 JSON 转换器)中包含架构。您感兴趣的线路是:

    key.converter=org.apache.kafka.connect.json.JsonConverter
    value.converter=org.apache.kafka.connect.json.JsonConverter
    

    key.convertervalue.converter字段分别表示Kafka消息的key和value中数据的格式。由于您发布的消息不是有效的 JSON,因此您会看到此错误。您需要将这些转换器设置为 StringConverter:

    key.converter=org.apache.kafka.connect.storage.StringConverter 
    value.converter=org.apache.kafka.connect.storage.StringConverter
    

    【讨论】:

    • 感谢@Kamal Aboul-Hosn。这似乎可行,但是现在我遇到了一个新错误,因此我无法检查您的建议是否解决了所有问题。我得到的错误是Caused by: java.lang.IllegalArgumentException: Jetty ALPN/NPN has not been properly configured. 关于如何解决这个问题的任何想法?如果没有必要,想避免发布一个新问题。谢谢。
    • 我最后发布了另一个问题。非常感谢您的意见:stackoverflow.com/questions/43793486/… 在此先感谢。
    • 解决了我的redis sink连接器问题
    猜你喜欢
    • 2018-08-04
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2022-12-08
    • 1970-01-01
    • 2021-12-31
    • 1970-01-01
    相关资源
    最近更新 更多