【问题标题】:Kafka Custom DeserializerKafka 自定义反序列化器
【发布时间】:2021-07-28 04:44:17
【问题描述】:

到目前为止,我已经能够在主题的帮助下创建 KStream。

KStream<String, Object> testqa2 = builder.stream("testqa2", Consumed.with(Serdes.String(), Serdes.String()))
                .mapValues(value -> {
                    System.out.println(value);
                    return value;
                });

它不打印任何东西,所以在调试时 - 我意识到我只是在创建我的 KStream。里面没有数据。

我在为工人类创建序列化器/反序列化器时遇到了一点麻烦。

package com.copart.mwa.Avro;

public class Worker {

    private static String WorkerActivityName;
    private static String WorkerSid;
    private static String WorkerPreviousActivityName;
    private static String WorkerPreviousActivitySid;

    public String getWorkerActivityName() {
        return WorkerActivityName;
    }

    public void setWorkerActivityName(String workerActivityName) {
        WorkerActivityName = workerActivityName;
    }

    public static String getWorkerSid() {
        return WorkerSid;
    }

    public void setWorkerSid(String workerSid) {
        WorkerSid = workerSid;
    }

    public String getWorkerPreviousActivityName() {
        return WorkerPreviousActivityName;
    }

    public void setWorkerPreviousActivityName(String workerPreviousActivityName) {
        WorkerPreviousActivityName = workerPreviousActivityName;
    }

    public String getWorkerPreviousActivitySid() {
        return WorkerPreviousActivitySid;
    }

    public void setWorkerPreviousActivitySid(String workerPreviousActivitySid) {
        WorkerPreviousActivitySid = workerPreviousActivitySid;
    }

    @Override
    public String toString() {
        return "Worker(" + WorkerSid + ", " + WorkerActivityName + ")";
    } }

而生产者到消费者的消息是JSON

  {
"WorkerActivityName": "Available",
"EventType": "worker.activity.update",
"ResourceType": "worker",
"WorkerTimeInPreviousActivityMs": "237",
"Timestamp": "1626114642",
"WorkerActivitySid": "WAc9030ef021bc1786d3ae11544f4d9883",
"WorkerPreviousActivitySid": "WAf4feb231e97c1878fecc58b26fdb95f3",
"WorkerTimeInPreviousActivity": "0",
"AccountSid": "AC8c5cd8c9ba538090da104b26d68a12ec",
"WorkerName": "Dorothy.Finegan@Copart.Com",
"Sid": "EV284c8a8bc27480e40865263f0b42e5cf",
"TimestampMs": "1626114642204",
"P": "WKe638256376188fab2a98cccb3c803d67",
"WorkspaceSid": "WS38b10d521442ecb74fcc263d5a4d726e",
"WorkspaceName": "Copart-MiPhone",
"WorkerPreviousActivityName": "Unavailable(RNA)",
"EventDescription": "Worker Dorothy.Finegan@Copart.Com updated to Available Activity",
"ResourceSid": "WKe638256376188fab2a98cccb3c803d67",
"WorkerAttributes": "{\"miphone_dept\":[\"USA_YRD_OPS\"],\"languages\":[\"en\"],\"home_region\":\"GL\",\"roles\":[\"supervisor\"],\"miphone_yards\":[\"81\"],\"miphone_enabled\":true,\"miphone_states\":[\"IL\"],\"home_state\":\"IL\",\"skills\":[\"YD_SELLER\",\"YD_TITLE\"],\"home_division\":\"Northern\",\"miphone_divisions\":[\"Northern\"],\"miphone_functions\":[\"outbound_only\"],\"full_name\":\"Dorothy Finegan\",\"miphone_regions\":[\"GL\"],\"home_country\":\"USA\",\"copart_user_id\":\"USA3204\",\"home_yard\":\"81\",\"home_dept\":\"USA_YRD_OPS\",\"email\":\"dorothy.finegan@copart.com\",\"home_dept_category\":\"OPS\",\"contact_uri\":\"client:Dorothy_2EFinegan_40Copart_2ECom\",\"queue_activity\":\"Available\",\"teams\":[],\"remote_employee\":false,\"miphone_call_center_units\":[\"USA_YRD_OPS|81\"],\"miphone_call_center_teams\":[]}"
}

我想在哪里实现一个客户反序列化器

“WorkspaceSid”:“WS38b10d521442ecb74fcc263d5a4d726e”是键,其他属性的剩余值作为键值对的值。

谢谢, 安莫尔

【问题讨论】:

    标签: java apache-kafka apache-kafka-streams


    【解决方案1】:

    它不打印任何东西

    如果testqa2主题中有数据,而你有auto.offset.reset=earliest,那么应该有。

    为工人类创建序列化器/反序列化器时遇到一点麻烦

    Kafka 具有内置的 JSON 序列化程序,您可以为其构建 Serde。您不需要自己制作。

    “WorkspaceSid”,是关键

    如果要修改密钥,请使用selectKey,或map,而不是mapValues

    Serializer<JsonNode> jsonNodeSerializer = new JsonSerializer();
    Deserializer<JsonNode> jsonNodeDeserializer = new JsonDeserializer();
    final Serde<JsonNode> jsonNodeSerde = Serdes.serdeFrom(jsonNodeSerializer,jsonNodeDeserializer);
    
    KStream<String, JsonNode> testqa2 = builder.stream("testqa2", Consumed.with(Serdes.String(), jsonSerde))
        .selectKey((k, json) -> json.get("WorkspaceSid"))
        .print(Printed.toSysOut());
    

    或者,修复您的生产者代码以从值中获取 Sid,并在那里设置密钥...

    如果您想使用 Avro,您不会编写 Worker 类 - 您会从 Avro 架构生成它。

    【讨论】:

    • 嗨,谢谢。但我的主要目标是从 testqa2 主题中创建一个 Ktable。
    • 好的...不过,这不是您问题的一部分。正如我之前告诉你的,使用groupByKeyaggregate 来获取KTable
    • 聚合的问题是:我的数据(有效负载)在 string/json { "WorkerActivityName": "Available", "EventType": "worker.activity.update", "ResourceType": "worker", "WorkerTimeInPreviousActivityMs": "237", "Timestamp": "1626114642", "WorkerActivitySid": "WAc9030ef021bc1786d3ae11544f4d9883", "WorkerPreviousActivitySid": "WAf4feb231e97c1878fecc58b26fdb95f3", "WorkerTime所有数据,其中工人活动名称可用。 PK = W activity sid 如何由此形成ktable?
    • @anmol 我假设您已经完全遵循了我之前链接的tutorial?而且我相信我回答了您最初提出的问题,因此如果您在创建表格时遇到特定错误,我建议您创建一个新帖子(或编辑您以前的帖子)
    • 是的,我已经理解了教程。我有一个疑问。如果您从 builder.table(topic) 创建一个 Ktable,您如何访问它的数据?如果它是交互式查询,在我的用例中,你可以帮助构建它吗? @OneCricketeer
    猜你喜欢
    • 2018-02-19
    • 1970-01-01
    • 2019-05-07
    • 1970-01-01
    • 2018-01-07
    • 2017-09-17
    • 1970-01-01
    • 2014-07-08
    • 2022-01-02
    相关资源
    最近更新 更多