【发布时间】:2018-09-23 18:38:30
【问题描述】:
我有一个 Spark 应用程序(2.1 版)将数据写入 Ignite 服务器缓存(2.2 版)。这是我用来从 Spark 作业中的 IgniteContext 创建缓存的代码:
object Spark_Streaming_Processing {
case class Custom_Class(
@(QuerySqlField @field)(index = true) a: String,
@(QuerySqlField @field)(index = true) b: String,
@(QuerySqlField @field)(index = true) c: String,
@(QuerySqlField @field)(index = true) d: String,
@(QuerySqlField @field)(index = true) e: String,
@(QuerySqlField @field)(index = true) f: String,
@(QuerySqlField @field)(index = true) g: String,
@(QuerySqlField @field)(index = true) h: String
)
//START IGNITE CONTEXT
val addresses=new util.ArrayList[String]()
addresses.add("127.0.0.1:48500..48520")
val igniteContext:IgniteContext=new IgniteContext(sqlContext.sparkContext,()=>
new IgniteConfiguration().setDiscoverySpi(new TcpDiscoverySpi().setIpFinder(new TcpDiscoveryVmIpFinder().setAddresses(addresses))
).setCacheConfiguration(new CacheConfiguration[String,Custom_Class]()
.setName("Spark_Ignite").setBackups(1).setIndexedTypes(classOf[String],classOf[Custom_Class]))
,true)
println(igniteContext.ignite().cacheNames())
val ignite_cache_rdd:IgniteRDD[String,Custom_Class] =igniteContext.fromCache[String,Custom_Class]("Spark_Ignite")
val processed_Pair:RDD[(String,Custom_Class)]=(...)// rdd with data, which as you can see has the correct datatypes as parameters
ignite_cache_rdd.savePairs(processed_PairRDD)
}
}
一切正常,但昨天我决定销毁 Spark_Ignite 缓存并重新启动 Ignite 服务器。但是,再次运行 Spark 应用程序,我现在收到以下错误
javax.cache.CacheException: Failed to find data nodes for cache: Spark_Ignite
at org.apache.ignite.internal.processors.query.h2.twostep.GridReduceQueryExecutor.stableDataNodes(GridReduceQueryExecutor.java:447)
at org.apache.ignite.internal.processors.query.h2.twostep.GridReduceQueryExecutor.query(GridReduceQueryExecutor.java:591)
at org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing$8.iterator(IgniteH2Indexing.java:1160)
如果我转到 Ignite visor,我可以看到缓存是在 Ignite 服务器端创建的,并且我可以看到 Ignite 服务器检测到 Spark 应用程序的 Ignite 客户端:
[12:17:01] Topology snapshot [ver=4, servers=1, clients=1, CPUs=12, heap=1.9GB]
但是,Spark 应用程序中的 Ignite 客户端在启动时似乎没有检测到服务器节点,即使创建了缓存:
18/04/13 12:17:01 INFO GridDiscoveryManager: Topology snapshot [ver=4, servers=0, clients=1, CPUs=12, heap=0.89GB]
18/04/13 12:17:07 ERROR Executor: Exception in task 0.0 in stage 2.0 (TID 2)
javax.cache.CacheException: Failed to find data nodes for cache: Spark_Ignite
我应该补充一点,如果 Ignite 服务器没有启动,Ignite 客户端将不会启动,它会在给定的地址中查找它并挂起。当我连接服务器时,会创建缓存,然后才会出现此错误。
这可能是什么问题?它以前是如何工作的?
谢谢。
更新
这是我用于单个 Ignite 服务器的 xml 配置文件:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:util="http://www.springframework.org/schema/util"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
http://www.springframework.org/schema/util
http://www.springframework.org/schema/util/spring-util-2.0.xsd">
<bean id="ignite_cluster.cfg"
class="org.apache.ignite.configuration.IgniteConfiguration">
<property name="igniteInstanceName" value="ignite_node1" />
<!-- Explicitly configure TCP discovery SPI to provide list of initial nodes. -->
<property name="discoverySpi">
<bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi">
<!-- Initial local port to listen to. -->
<property name="localPort" value="48510"/>
<!-- Changing local port range. This is an optional action. -->
<property name="localPortRange" value="20"/>
<!-- Setting up IP finder for this cluster -->
<property name="ipFinder">
<bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder">
<property name="addresses">
<list>
<!-- Addresses and port range of the nodes from the first cluster.
127.0.0.1 can be replaced with actual IP addresses or host names.
Port range is optional. -->
<value>127.0.0.1:48500..48520</value>
</list>
</property>
</bean>
</property>
</bean>
</property>
<property name="communicationSpi">
<bean class="org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi">
<property name="localPort" value="48100"/>
</bean>
</property>
</bean>
<bean class="org.apache.ignite.configuration.FileSystemConfiguration">
<property name="secondaryFileSystem">
<bean class="org.apache.ignite.hadoop.fs.IgniteHadoopIgfsSecondaryFileSystem">
<property name="fileSystemFactory">
<bean class="org.apache.ignite.hadoop.fs.CachingHadoopFileSystemFactory">
<property name="uri" value="hdfs://localhost:9000/"/>
</bean>
</property>
</bean>
</property>
</bean>
<bean class="org.apache.ignite.hadoop.fs.CachingHadoopFileSystemFactory">
<property name="uri" value="hdfs://localhost:9000/"/>
<property name="configPaths">
<list>
<value>/etc/hadoop-2.9.0/etc/hadoop/core-site.xml</value>
</list>
</property>
</bean>
</beans>
【问题讨论】:
-
从拓扑快照 [ver=4, servers=0, clients=1] 中看到,您的客户端无法连接到服务器节点,很可能是发现问题。你能分享所有节点的完整日志吗?您是否尝试过重启客户端节点?
-
@EvgeniiZhuravlev ,我将为我正在运行的 Ignite 服务器添加 xml conf 文件(我在本地只运行一台服务器)和完整的日志。是的,每次启动 Spark 应用程序时,都会重新创建客户端节点
-
@EvgeniiZhuravlev,添加了配置文件。你可以看到我将服务器的本地端口设置为 48510,然后我在我的 Spark 应用程序中发现了它,地址范围为
-
奇怪的是,重新启动我的计算机后,一切似乎都运行良好......我之前检查过它是否是端口问题,但一切都很好......好吧,希望它不会再次发生: )
标签: scala apache-spark caching h2 ignite