【发布时间】:2021-11-29 15:34:12
【问题描述】:
我们正在通过 SF 中称为平台事件的东西生成从 Salesforce 到 Kafka 的事件。为了让这些事件进入 Kafka,我们使用了 Camel 的 Salesforce Kafka Source Connector。可以在此处找到此 Kafka 连接器的文档:
从 Salesforce 生成的事件采用 JSON 格式。格式如下所示:
{
"data": {
"schema": "NhgeDyLTvEyVPQ9uOzDqeQ",
"payload": {
"AccountId__c": "00119000013q2g3AAA",
"AccountUUID__c": "4654fefb-e3d1-4b08-a4e2-5dabaa504abb",
"GUID__c": null,
"CreatedById": "0056g000005YeBAAA0",
"CreatedDate": "2021-10-10T15:24:43.819Z"
},
"event": {
"replayId": "1256093"
}
},
"channel": "/event/Order_Completed__e"
}
这就引出了我们的问题。我们为源连接器使用以下配置:
{
"name": "sf_order_p_event_connector",
"config": {
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"key.converter.schemas.enable": "true",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "true",
"connector.class": "org.apache.camel.kafkaconnector.salesforce.CamelSalesforceSourceConnector",
"camel.component.salesforce.loginUrl": "<redacted>",
"camel.component.salesforce.instanceUrl": "<redacted>",
"topics": "<redacted>",
"camel.source.endpoint.rawPayload": "true",
"camel.source.path.topicName": "/event/Order_Completed__e",
"camel.source.endpoint.replayId": "-1",
"camel.component.salesforce.authenticationType": "USERNAME_PASSWORD",
"camel.component.salesforce.clientId": "<redacted",
"camel.component.salesforce.clientSecret": "<redacted>",
"camel.component.salesforce.password": "<redacted5",
"camel.component.salesforce.userName": "<redacted>",
"camel.source.endpoint.apiVersion": "52.0"
}
}
当使用上面配置的这个 value.converter 时,我们得到以下异常:
ERROR [sf_account_change_connector|task-0] WorkerSourceTask{id=sf_account_change_connector-0} 任务引发未捕获且不可恢复的异常。任务被杀死并且在手动重新启动之前不会恢复(org.apache.kafka.connect.runtime.WorkerTask:184) org.apache.kafka.connect.errors.ConnectException:错误处理程序中超出容限 在 org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:206) 在 org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:132) 在 org.apache.kafka.connect.runtime.WorkerSourceTask.convertTransformedRecord(WorkerSourceTask.java:298) 在 org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:324) 在 org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:248) 在 org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:182) 在 org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:231) 在 java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 在 java.util.concurrent.FutureTask.run(FutureTask.java:266) 在 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 在 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 在 java.lang.Thread.run(Thread.java:748) 引起:java.lang.NullPointerException 在 org.apache.kafka.connect.json.JsonConverter.convertToJson(JsonConverter.java:677) 在 org.apache.kafka.connect.json.JsonConverter.convertToJsonWithEnvelope(JsonConverter.java:592) 在 org.apache.kafka.connect.json.JsonConverter.fromConnectData(JsonConverter.java:346) 在 org.apache.kafka.connect.storage.Converter.fromConnectData(Converter.java:63) 在 org.apache.kafka.connect.runtime.WorkerSourceTask.lambda$convertTransformedRecord$2(WorkerSourceTask.java:298) 在 org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:156) 在 org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:190) ... 11 更多
它似乎无法将传入 Kafka 的任何值转换为 JSON。我将 value.converter 更改为“org.apache.kafka.connect.storage.StringConverter”,以查看我在该主题中实际得到的内容。一旦我这样做了,这就是我看到的从 Salesforce 传递给 Kafka 的消息:
{data={schema=NhgeDyLTvEyVPQ9uOzDqeQ, payload={AccountId__c=00119000013q2g3AAA,
AccountUUID__c=4654fefb-e3d1-4b08-a4e2-5dabaa504abb, GUID__c=null,
CreatedById=0056g000005YeBAAA0, CreatedDate=2021-10-10T15:24:43.819Z}, event=
{replayId=1256093}}, channel=/event/Order_Completed__e}
似乎 Kafka 并未将其作为 JSON 处理,而是作为 key=value(无论是什么)类型的值处理。我的问题是,为什么我看到的是这种类型的有效负载而不是 JSON?另外,我可以使用源连接器的什么配置(如果有)来克服这个问题,并可能将值转换为 JSON?我需要 JSON 中的这个值,以便我的浮士德代理正确处理它。我为源连接器尝试了多种不同的配置,但似乎没有任何效果。
任何帮助将不胜感激。提前谢谢!
【问题讨论】:
标签: apache-kafka apache-camel salesforce apache-kafka-connect cometd