【问题标题】:Hazelcast Jet QueryHazelcast Jet 查询
【发布时间】:2018-04-10 17:25:34
【问题描述】:

我对 Hazelcast Jet 有以下疑问

用例如下

有一个应用程序(应用程序'A',部署在集群中)使用 Hazelcast IMDG 并将数百万条记录/事务放在 hazelcast IMap 中。

已为此 IMap 配置事件日志。

还有另一个应用程序(应用程序 B,部署在集群中)实例化 JetInstance 并在每个节点上单独运行作业以处理记录。

目前,该作业从事件日志中读取数据并添加到 IList(参考 - hazelcast-jet-0.5.1\code-samples\streaming\map-journal-source\src\main\java\RemoteMapJournalSource.java)

由于作业在多个节点上运行,事件日志中的记录由多个节点处理。这会导致 IList 中有多个条目。

是否可以确保一条记录只被“应用程序B”的一个节点处理而其他节点不处理以避免重复?

如果不是,这是否意味着作业将由“应用程序 B”集群的单个节点运行?

这是一个示例代码(应用程序 B)

        Pipeline p = Pipeline.create();
        p.drawFrom(Sources.<Integer, Integer, Integer>remoteMapJournal(MAP_NAME, clientConfig,
                e -> e.getType() == EntryEventType.ADDED, EventJournalMapEvent::getNewValue, true))
         .peek()
         .drainTo(Sinks.list(SINK_NAME));

        JobConfig jc= new JobConfig();
        jc.setProcessingGuarantee(ProcessingGuarantee.EXACTLY_ONCE);

        localJet.newJob(p,jc);

这是完整的代码。

应用程序 A 源代码。

public class RemoteMapJournalSourceSrv1 {

private static final String MAP_NAME = "map";
private static final String SINK_NAME = "list";

public static void main(String[] args) throws Exception {
    System.setProperty("remoteHz.logging.type", "log4j");

    Config hzConfig = getConfig();
    HazelcastInstance remoteHz = startRemoteHzCluster(hzConfig);

    try {
        IMap<Integer, Integer> map = remoteHz.getMap(MAP_NAME);
        System.out.println("*************** Initial Map  address " +  map.size() );

        while(true) {
            System.out.println("***************map size "+map.size());  
            TimeUnit.SECONDS.sleep(20);
        }

    } finally {
        Hazelcast.shutdownAll();
    }
}

private static HazelcastInstance startRemoteHzCluster(Config config) {
    HazelcastInstance remoteHz = Hazelcast.newHazelcastInstance(config);
    return remoteHz;
}

private static Config getConfig() {
    Config config = new Config();
    // Add an event journal config for map which has custom capacity of 1000 (default 10_000)
    // and time to live seconds as 10 seconds (default 0 which means infinite)
    config.addEventJournalConfig(new EventJournalConfig().setEnabled(true)
                                                         .setMapName(MAP_NAME)
                                                         .setCapacity(10000)
                                                         .setTimeToLiveSeconds(100));
    return config;
}

这是应用程序 B - 节点 1 示例代码

public class RemoteMapJournalSourceCL1 {

private static final String MAP_NAME = "map";
private static final String SINK_NAME = "list";

public static void main(String[] args) throws Exception {
    System.setProperty("remoteHz.logging.type", "log4j");

      JetInstance localJet = startLocalJetCluster();

    try {
        ClientConfig clientConfig = new ClientConfig();
        GroupConfig groupConfig = new GroupConfig();

        clientConfig.getNetworkConfig().addAddress("localhost:5701");
        clientConfig.setGroupConfig(groupConfig);

        IList list1 = localJet.getList(SINK_NAME);

        int size1 = list1.size();

        System.out.println("***************List Initial size "+size1);

        Pipeline p = Pipeline.create();
        p.drawFrom(Sources.<Integer, Integer, Integer>remoteMapJournal(MAP_NAME, clientConfig,
                e -> e.getType() == EntryEventType.ADDED, EventJournalMapEvent::getNewValue, false))
         .peek()
         .drainTo(Sinks.list(SINK_NAME));

        JobConfig jc= new JobConfig();
        jc.setProcessingGuarantee(ProcessingGuarantee.EXACTLY_ONCE);
        localJet.newJob(p,jc);

        while(true){
            TimeUnit.SECONDS.sleep(10); 
            System.out.println("***************Read " + list1.size() + " entries from remote map journal.");
        }           

    } finally {
        Hazelcast.shutdownAll();
        Jet.shutdownAll();
    }

}

private static String getAddress(HazelcastInstance remoteHz) {
    Address address = remoteHz.getCluster().getLocalMember().getAddress();
    System.out.println("***************Remote address " + address.getHost() + ":" + address.getPort() );
    return address.getHost() + ":" + address.getPort();
}

private static JetInstance startLocalJetCluster() {
      JetInstance localJet = Jet.newJetInstance();
    return localJet;
}

这是应用程序 B - 节点 2 示例代码

public class RemoteMapJournalSourceCL2 {

private static final String MAP_NAME = "map";
private static final String SINK_NAME = "list";

public static void main(String[] args) throws Exception {
    System.setProperty("remoteHz.logging.type", "log4j");

      JetInstance localJet = startLocalJetCluster();

    try {
        ClientConfig clientConfig = new ClientConfig();
        GroupConfig groupConfig = new GroupConfig();

        clientConfig.getNetworkConfig().addAddress("localhost:5701");
        clientConfig.setGroupConfig(groupConfig);

        IList list1 = localJet.getList(SINK_NAME);
        int size1 = list1.size();
        System.out.println("***************List Initial size "+size1);


        Pipeline p = Pipeline.create();
        p.drawFrom(Sources.<Integer, Integer, Integer>remoteMapJournal(MAP_NAME, clientConfig,
                e -> e.getType() == EntryEventType.ADDED, EventJournalMapEvent::getNewValue, true))
         .peek()
         .drainTo(Sinks.list(SINK_NAME));

        JobConfig jc= new JobConfig();
        jc.setProcessingGuarantee(ProcessingGuarantee.EXACTLY_ONCE);

        localJet.newJob(p,jc);

        while(true){
            TimeUnit.SECONDS.sleep(10); 
            System.out.println("***************Read " + list1.size() + " entries from remote map journal.");
        }           
    } finally {
        Hazelcast.shutdownAll();
        Jet.shutdownAll();
    }

}
private static JetInstance startLocalJetCluster() {
      JetInstance localJet = Jet.newJetInstance();
    return localJet;
}

Hazelcast 客户端 - 将条目放入 Hazelcast 地图(应用程序 A)

public class HZClient {

  public static void main(String[] args) {

    ClientConfig clientConfig = new ClientConfig();
    GroupConfig groupConfig = new GroupConfig();

    clientConfig.getNetworkConfig().addAddress("localhost:5701");
    clientConfig.setGroupConfig(groupConfig);

    HazelcastInstance client = HazelcastClient.newHazelcastClient(clientConfig);
    IMap<Integer, Integer> map = client.getMap("map");
    Scanner in = new Scanner(System.in);
    int startIndex= 0;
    int endIndex= 0;

    while(true) {
        if(args !=null && args.length > 0 && args[0].equals("BATCH")) {

            System.out.println("Please input the batch size");
            int b = in.nextInt();
            startIndex= endIndex + 1;
            endIndex+= b;
            System.out.println("Batch starts from  "+ startIndex +"ends at"+endIndex);
            putBatch(map,startIndex,endIndex);

        }
        else {
            System.out.println("Please input the map entry");
            int a = in.nextInt();
            System.out.println("You entered integer "+a);
            put(map,a,a);
        }
    }
}

public static void putBatch(IMap map,int startIndex, int endIndex) {
    int index= startIndex;
    System.out.println("Start Index" + startIndex +"End Index"+endIndex );
    while(index<=endIndex){
        System.out.println("Map Values"+ index);
        put(map,index,index);
        index+=1;
    }

}

public static void put(IMap map,int key,int value) {
    map.set(key, value);
}

以下是执行此操作的步骤。

  1. 运行应用程序 A - Java 程序 RemoteMapJournalSourceSrv1

  2. 运行应用程序 B 节点 1 - Java 程序 RemoteMapJournalSourceCL1

  3. 运行应用程序 B 节点 2 - Java 程序 RemoteMapJournalSourceCL2

  4. 为应用程序 A 运行 Hazelcast 客户端 - Java 程序 HZClient

此客户端程序根据控制台输入将条目放入映射中。请提供整数输入。

观察

在执行时,.peek() 记录应用程序 B 的两个节点的值,并且在应用程序 A 映射中插入 1 个条目时,列表计数变为 2。

【问题讨论】:

  • Jet 不应该为多个成员处理相同的日志事件。您能否提供一个MCVE 来重现您的问题?
  • 查询已更新为示例代码。谢谢

标签: hazelcast-jet


【解决方案1】:

您似乎从两个 Jet 客户端提交了两个独立的作业。每个作业接收所有 IMap 事件日志项并将它们推送到同一个 IList,因此预期的结果是 IList 包含每个项的两个实例。

请记住,您仅提交来自 Jet 客户端的作业,但它实际上在 Jet 集群内同时在其所有成员上运行。如果您只需要接收器中的数据副本,请不要两次提交相同的作业。

【讨论】:

  • 感谢您的帮助。这行得通。我这里还有几个问题。 1. 这是否意味着,运行作业的节点与应用程序 B 集群中的其他节点具有不同的代码 2. Hazelcast jet 如何在 Jet 集群中的不同节点上分配负载?
  • 所有 Jet 节点同时运行所有作业。
猜你喜欢
  • 2023-03-08
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2021-11-11
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多