【问题标题】:Is Kafka streaming to Ignite running in Transactional mode ACID?Kafka 流式传输到 Ignite 是否以事务模式 ACID 运行?
【发布时间】:2019-04-08 12:12:37
【问题描述】:

我在 Apache Ignite 中有一个分布式数据库和一个 Apache Kafka 流服务,可将数据流式传输到 Ignite 集群。 Kafka 流媒体的工作原理如下

  1. 创建 ignite 节点以查找集群
  2. 在集群中将 kafka streamer 单例作为服务启动
  3. 关闭 ignite 节点

Ignite 集群处于事务模式,但是我不确定这是否保证 ACID 或仅启用它。这个流媒体服务可以被认为是 ACID 吗?

这是 kafka 流媒体的代码:

public class IgniteKafkaStreamerService implements Service {

private static final long serialVersionUID = 1L;

@IgniteInstanceResource
private Ignite ignite;
private KafkaStreamer<String, JSONObject> kafkaStreamer = new KafkaStreamer<>();
private IgniteLogger logger;

public static void main(String[] args) throws InterruptedException {
    TcpDiscoverySpi spi = new TcpDiscoverySpi();

    TcpDiscoveryMulticastIpFinder ipFinder = new TcpDiscoveryMulticastIpFinder();

    // Set Multicast group.
    //ipFinder.setMulticastGroup("228.10.10.157");

    // Set initial IP addresses.
    // Note that you can optionally specify a port or a port range.
    ipFinder.setAddresses(Arrays.asList("127.0.0.1:47500..47509"));

    spi.setIpFinder(ipFinder);

    IgniteConfiguration cfg = new IgniteConfiguration();

    // Override default discovery SPI.
    cfg.setDiscoverySpi(spi);
    Ignite ignite = Ignition.getOrStart(cfg);

    // Deploy data streamer service on the server nodes.
    ClusterGroup forServers = ignite.cluster().forServers();
    IgniteKafkaStreamerService streamer = new IgniteKafkaStreamerService();
    ignite.services(forServers).deployClusterSingleton("KafkaService", streamer);
    ignite.close();
}


@Override
public void init(ServiceContext ctx) {
    logger = ignite.log();
    IgniteDataStreamer<String, JSONObject> stmr = ignite.dataStreamer("my_cache");
    stmr.allowOverwrite(true);
    stmr.autoFlushFrequency(1000);
    List<String> topics = new ArrayList<>();
    topics.add(0,"IoTData");

    kafkaStreamer.setIgnite(ignite);
    kafkaStreamer.setStreamer(stmr);
    kafkaStreamer.setThreads(4);
    kafkaStreamer.setTopic(topics);
    Properties props = new Properties();
    props.put(StreamsConfig.APPLICATION_ID_CONFIG, "NiFi-consumer");
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.1.242:9092");
    props.put("key.deserializer", StringDeserializer.class.getName());
    props.put("value.deserializer", StringDeserializer.class.getName());
    props.put("group.id", "hello");
    kafkaStreamer.setConsumerConfig(props);
    kafkaStreamer.setSingleTupleExtractor(msg -> {
        JSONObject jsonObj = new JSONObject(msg.value().toString());
        String key = jsonObj.getString("id") + "," + new Date(msg.timestamp());
        JSONObject value = jsonObj.accumulate("date", new Date(msg.timestamp()));

        return new AbstractMap.SimpleEntry<>(key, value);

    });
}

@Override
public void execute(ServiceContext ctx) {
    kafkaStreamer.start();
    logger.info("KafkaStreamer started.");
}

@Override
public void cancel(ServiceContext ctx) {
    kafkaStreamer.stop();
    logger.info("KafkaStreamer stopped.");
}

}

【问题讨论】:

    标签: apache-kafka ignite acid


    【解决方案1】:

    KafkaStreamer 在后台使用 IgniteDataStreamer 实现。 IgniteDataStreamer 本质上不是事务性的,因此没有任何事务性保证。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2020-10-05
      • 2016-04-27
      • 2020-01-10
      • 1970-01-01
      • 2016-10-03
      • 2016-06-21
      • 2019-05-19
      相关资源
      最近更新 更多