【问题标题】:Salesforce Camel Kafka Connector converting message to weird formatSalesforce Camel Kafka 连接器将消息转换为奇怪的格式
【发布时间】:2021-11-29 15:34:12
【问题描述】:

我们正在通过 SF 中称为平台事件的东西生成从 Salesforce 到 Kafka 的事件。为了让这些事件进入 Kafka,我们使用了 Camel 的 Salesforce Kafka Source Connector。可以在此处找到此 Kafka 连接器的文档:

https://camel.apache.org/camel-kafka-connector/latest/reference/connectors/camel-salesforce-kafka-source-connector.html

从 Salesforce 生成的事件采用 JSON 格式。格式如下所示:

{
 "data": {
    "schema": "NhgeDyLTvEyVPQ9uOzDqeQ",
    "payload": {
        "AccountId__c": "00119000013q2g3AAA",
        "AccountUUID__c": "4654fefb-e3d1-4b08-a4e2-5dabaa504abb",
        "GUID__c": null,
        "CreatedById": "0056g000005YeBAAA0",
        "CreatedDate": "2021-10-10T15:24:43.819Z"
    },
    "event": {
        "replayId": "1256093"
    }
},
 "channel": "/event/Order_Completed__e"
}

这就引出了我们的问题。我们为源连接器使用以下配置:

{
"name": "sf_order_p_event_connector",
"config": {
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "key.converter.schemas.enable": "true",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": "true",
    "connector.class": "org.apache.camel.kafkaconnector.salesforce.CamelSalesforceSourceConnector",
    "camel.component.salesforce.loginUrl": "<redacted>",
    "camel.component.salesforce.instanceUrl": "<redacted>",
    "topics": "<redacted>",
    "camel.source.endpoint.rawPayload": "true",
    "camel.source.path.topicName": "/event/Order_Completed__e",
    "camel.source.endpoint.replayId": "-1",
    "camel.component.salesforce.authenticationType": "USERNAME_PASSWORD",
    "camel.component.salesforce.clientId": "<redacted",
    "camel.component.salesforce.clientSecret": "<redacted>",
    "camel.component.salesforce.password": "<redacted5",
    "camel.component.salesforce.userName": "<redacted>",
    "camel.source.endpoint.apiVersion": "52.0"
}
}

当使用上面配置的这个 value.converter 时,我们得到以下异常:

ERROR [sf_account_change_connector|task-0] WorkerSourceTask{id=sf_account_change_connector-0} 任务引发未捕获且不可恢复的异常。任务被杀死并且在手动重新启动之前不会恢复(org.apache.kafka.connect.runtime.WorkerTask:184) org.apache.kafka.connect.errors.ConnectException:错误处理程序中超出容限 在 org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:206) 在 org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:132) 在 org.apache.kafka.connect.runtime.WorkerSourceTask.convertTransformedRecord(WorkerSourceTask.java:298) 在 org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:324) 在 org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:248) 在 org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:182) 在 org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:231) 在 java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 在 java.util.concurrent.FutureTask.run(FutureTask.java:266) 在 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 在 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 在 java.lang.Thread.run(Thread.java:748) 引起:java.lang.NullPointerException 在 org.apache.kafka.connect.json.JsonConverter.convertToJson(JsonConverter.java:677) 在 org.apache.kafka.connect.json.JsonConverter.convertToJsonWithEnvelope(JsonConverter.java:592) 在 org.apache.kafka.connect.json.JsonConverter.fromConnectData(JsonConverter.java:346) 在 org.apache.kafka.connect.storage.Converter.fromConnectData(Converter.java:63) 在 org.apache.kafka.connect.runtime.WorkerSourceTask.lambda$convertTransformedRecord$2(WorkerSourceTask.java:298) 在 org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:156) 在 org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:190) ... 11 更多

它似乎无法将传入 Kafka 的任何值转换为 JSON。我将 value.converter 更改为“org.apache.kafka.connect.storage.StringConverter”,以查看我在该主题中实际得到的内容。一旦我这样做了,这就是我看到的从 Salesforce 传递给 Kafka 的消息:

{data={schema=NhgeDyLTvEyVPQ9uOzDqeQ, payload={AccountId__c=00119000013q2g3AAA, 
AccountUUID__c=4654fefb-e3d1-4b08-a4e2-5dabaa504abb, GUID__c=null, 
CreatedById=0056g000005YeBAAA0, CreatedDate=2021-10-10T15:24:43.819Z}, event= 
{replayId=1256093}}, channel=/event/Order_Completed__e}

似乎 Kafka 并未将其作为 JSON 处理,而是作为 key=value(无论是什么)类型的值处理。我的问题是,为什么我看到的是这种类型的有效负载而不是 JSON?另外,我可以使用源连接器的什么配置(如果有)来克服这个问题,并可能将值转换为 JSON?我需要 JSON 中的这个值,以便我的浮士德代理正确处理它。我为源连接器尝试了多种不同的配置,但似乎没有任何效果。

任何帮助将不胜感激。提前谢谢!

【问题讨论】:

标签: apache-kafka apache-camel salesforce apache-kafka-connect cometd


【解决方案1】:

请在您的连接器配置中使用 camel.source.endpoint.rawPayload=true

【讨论】:

    猜你喜欢
    • 2021-01-11
    • 1970-01-01
    • 2022-08-23
    • 1970-01-01
    • 2017-11-09
    • 2022-01-14
    • 1970-01-01
    • 2012-05-06
    • 1970-01-01
    相关资源
    最近更新 更多