【问题标题】:Query Ignite Cache based on Spark RDD elements基于 Spark RDD 元素查询 Ignite Cache
【发布时间】:2017-06-03 17:21:01
【问题描述】:

我试图检索 JavaPairRDD 中每个元素的缓存值。我使用LOCAL 缓存模式,因为我想最小化缓存数据的数据混洗。点燃节点在 spark 作业中以嵌入模式启动。如果我在单个节点上运行以下代码,它可以正常工作。但是,当我在 5 台机器的集群上运行它时,我得到了zero 结果。

我的第一次尝试是使用 IgniteRDD sql 方法:

 dataRDD.sql("SELECT v.id,v.sub,v.obj FROM VPRow v JOIN table(id bigint = ?) i ON v.id = i.id",new Object[] {objKeyEntries.toArray()});

其中objKeyEntries 是RDD 中收集的一组条目。第二次尝试是使用 AffinityRun:

JavaPairRDD<Long, VPRow> objEntries = objKeyEntries.mapPartitionsToPair(new PairFlatMapFunction<Iterator<Tuple2<Long, Boolean>>, Long, VPRow>() {
    @Override
    public Iterator<Tuple2<Long, VPRow>> call(Iterator<Tuple2<Long, Boolean>> tuple2Iterator) throws Exception {
        ApplicationContext ctx = new ClassPathXmlApplicationContext("ignite-rdd.xml");
        IgniteConfiguration igniteConfiguration = (IgniteConfiguration) ctx.getBean("ignite.cfg");
        Ignite ignite = Ignition.getOrStart(igniteConfiguration);
        IgniteCache<Long, VPRow> cache = ignite.getOrCreateCache("dataRDD");

        ArrayList<Tuple2<Long,VPRow>> lst = new ArrayList<>();
        while(tuple2Iterator.hasNext()) {
            Tuple2<Long, Boolean> val = tuple2Iterator.next();
            ignite.compute().affinityRun("dataRDD", val._1(),()->{
                lst.add(new Tuple2<>(val._1(),cache.get(val._1())));
            });
        }
        return lst.iterator();
    }
});

以下是ignite-rdd.xml配置文件:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="
        http://www.springframework.org/schema/beans
        http://www.springframework.org/schema/beans/spring-beans.xsd">

    <bean id="ignite.cfg" class="org.apache.ignite.configuration.IgniteConfiguration">
        <property name="memoryConfiguration">
            <bean class="org.apache.ignite.configuration.MemoryConfiguration">
                <property name="systemCacheInitialSize" value="#{100 * 1024 * 1024}"/>
                <property name="defaultMemoryPolicyName" value="default_mem_plc"/>
                <property name="memoryPolicies">
                    <list>
                        <bean class="org.apache.ignite.configuration.MemoryPolicyConfiguration">
                            <property name="name" value="default_mem_plc"/>
                            <property name="initialSize" value="#{5 * 1024 * 1024 * 1024}"/>
                        </bean>
                    </list>
                </property>
            </bean>
        </property>
        <property name="cacheConfiguration">
            <list>
                <bean class="org.apache.ignite.configuration.CacheConfiguration">
                    <!-- Set a cache name. -->
                    <property name="name" value="dataRDD"/>
                    <!-- Set a cache mode. -->
                    <property name="cacheMode" value="LOCAL"/>
                    <!-- Index Integer pairs used in the example. -->
                    <property name="indexedTypes">
                        <list>
                            <value>java.lang.Long</value>
                            <value>edu.code.VPRow</value>
                        </list>
                    </property>
                    <property name="affinity">
                        <bean class="org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction">
                            <property name="partitions" value="50"/>
                        </bean>
                    </property>
                </bean>
            </list>
        </property>
        <!-- Explicitly configure TCP discovery SPI to provide list of initial nodes. -->
        <property name="discoverySpi">
            <bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi">
                <property name="ipFinder">
                    <bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder">
                        <property name="addresses">
                            <list>
                                <value>[IP5]</value>
                                <value>[IP4]</value>
                                <value>[IP3]</value>
                                <value>[IP2]</value>
                                <value>[IP1]</value>
                            </list>
                        </property>
                    </bean>
                </property>
            </bean>
        </property>
    </bean>
</beans>

【问题讨论】:

  • 检查缓存中是否有数据:dataRDD.query(new ScanQuery())
  • @Evgenii :在集群上运行代码时,缓存似乎是空的。但是,使用相同的代码,缓存包含单个节点设置中的元素。
  • 正如这里所说的:apacheignite-fs.readme.io/docs/…IgniteRDD 利用 Ignite 缓存的分区特性并向 Spark 执行器提供分区信息。所以,你应该使用分区缓存模式

标签: ignite


【解决方案1】:

您确定需要使用 LOCAL 缓存模式吗?

很可能您只在一个节点上填充了缓存,而其他节点上的本地缓存仍然是空的。

affinityRun 不起作用,因为您有 LOCAL 缓存,而不是 PARTITIONED,因此无法使用 AffinityFunction 确定密钥的所有者节点。

【讨论】:

  • Iv 编辑了问题以传达围绕同一问题的另一个试验(即在本地缓存模式下获取值)。
猜你喜欢
  • 1970-01-01
  • 2016-06-12
  • 2015-12-27
  • 2015-08-08
  • 1970-01-01
  • 1970-01-01
  • 2021-05-20
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多