【问题标题】:Does Hazelcast Jet support rolling number as IMap key, with Kafka as a source?Hazelcast Jet 是否支持滚动数字作为 IMap 键,以 Kafka 作为源?
【发布时间】:2019-05-06 15:10:09
【问题描述】:

我前段时间使用过 Hazelcast,而且我是第一次使用 Hazelcast Jet,似乎对处理一些实时流、探索很感兴趣。

这里有一个情况,我将Kafka topic 拉到IMap 使用:

private static Pipeline buildPipelineForClientDataa() {
        Pipeline p = Pipeline.create();
        p.drawFrom(KafkaSources.kafka(
                props("bootstrap.servers", BOOTSTRAP_SERVERS, 
                        "key.deserializer", StringDeserializer.class.getCanonicalName(), 
                        "value.deserializer", StringDeserializer.class.getCanonicalName(), 
                        "auto.offset.reset", AUTO_OFFSET_RESET), 
                KAFKA_TOPIC))
        .withoutTimestamps()
        .drainTo(Sinks.map(SINK_CLINET_DATA));
        return p;
    }

嗯,我没有这个话题的关键。我应该可以选择将滚动号码分配为键吗?如果是这样,请帮助我使用该技术。谢谢。

【问题讨论】:

    标签: hazelcast hazelcast-imap hazelcast-jet


    【解决方案1】:

    使用递增数字不适合 Jet,因为它是一个分布式系统。它适用于分区流,每个流分区应该是独立的。您需要通过非并行处理器路由所有项目。

    您可以使用 UUID 或 Hazelcast 的 FlakeIdGenerator 作为键,但如果作业重新启动并从快照偏移量重新处理 Kafka 主题,相同的项目将分配不同的键,并且将出现两次目标地图。

    如果想拥有map中的每一项,可以使用Kafka的topic+partitionId+offset组合作为key:

    p.drawFrom(KafkaSources.kafka(
        props(...),
        record -> Util.entry(
            Tuple3.tuple3(record.topic(), record.partition(), record.offset()),
            record.value()),
        KAFKA_TOPIC))
    

    如果只有一个主题,可以省略主题。

    【讨论】:

    • 这是你在代码中放的 org.apache.kafka.common.utils.Utils 吗?如果是这样,则不支持方法条目。
    • 这是com.hazelcast.jet.Util.entry(),不是Utils,我修复了响应。 Map.EntrySinks.map() 所必需的。
    • 我很困惑。请帮我在 Sinks.map() 中嵌入 Map.Entry。插入地图是否需要一些迭代?只需一个 Sinks.map(map_name);报错是从 Tulip 到 String 的 ClassCastException。
    • @srikanth 提出一个新问题并分享您的代码。 Sinks.map 期望 Map.Entry 在其输入中使用实例,但如果不这样做,您应该会收到编译时错误。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-04-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多