【问题标题】:how to Connect to NEO4J in Spark worker nodes?如何在 Spark 工作节点中连接到 NEO4J?
【发布时间】:2018-01-03 09:54:45
【问题描述】:

我需要在火花图函数中获取一个小子图。我曾尝试使用 AnormCypher 和 NEO4J-SPARK-CONNECTOR,但都不起作用。 AnormCypher 将导致 java IOException 错误(我在 mapPartition 函数中建立连接,在 localhost 服务器上测试)。 Neo4j-spark-connector 将导致下面的 TASK NOT SERIALIZABLE 异常。

有没有在 Spark 工作节点中获取子图(或连接到 Neo4j 等图数据库)的好方法?

Exception in thread "main" org.apache.spark.SparkException: Task not serializable
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)
    at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
    at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
    at org.apache.spark.SparkContext.clean(SparkContext.scala:2094)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:793)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:792)
    at ....

我的代码 sn-p 使用 neo4j-spark-connector 2.0.0-m2:

val neo = Neo4j(sc) // this runs on the driver

// this runs by a map function
def someFunctionToBeMapped(p: List[Long]) = { 
  val metaGraph = neo.cypher("match p = (a:TourPlace) -[r:could_go_to] -> (b:TourPlace)" +
    "return a.id ,r.distance, b.id").loadRowRdd.map( row => ((row(0).asInstanceOf[Long],row(2).asInstanceOf[Long]), row(1).asInstanceOf[Double]) ).collect().toList

AnromCypher 代码是:

def partitionMap(partition: Iterator[List[Long]]) = {
  import org.anormcypher._
  import play.api.libs.ws._
  // Provide an instance of WSClient
  val wsclient = ning.NingWSClient()
  // Setup the Rest Client
  // Need to add the Neo4jConnection type annotation so that the default
  // Neo4jConnection -> Neo4jTransaction conversion is in the implicit scope
  implicit val connection: Neo4jConnection = Neo4jREST("127.0.0.1", 7474, "neo4j", "000000")(wsclient)
  //
  // Provide an ExecutionContext
  implicit val ec = scala.concurrent.ExecutionContext.global

  val res = partition.filter( placeList => {

    val startPlace = Cypher("match p = (a:TourPlace) -[r:could_go_to] -> (b:TourPlace)"  +
      "return p")().flatMap( row => row.data )
  })
  wsclient.close()
  res
}

【问题讨论】:

  • 您使用了哪个版本的 spark 连接器,您的代码是什么样的
  • @Michael Hunger
  • 你是否尝试在集群(YARN)上运行 Spark?

标签: scala apache-spark neo4j anormcypher


【解决方案1】:

我用过spark独立模式,可以连接neo4j数据库

使用的版本:

火花2.1.0

neo4j-spark-connector 2.1.0-m2

我的代码:-

val sparkConf = new SparkConf().setAppName("Neo$j").setMaster("local")
    val sc = new SparkContext(sparkConf)
    println("***Getting Started ****")
    val neo = Neo4j(sc)
    val rdd = neo.cypher("MATCH (n) RETURN id(n) as id").loadDataFrame
    println(rdd.count)

Spark 提交:- spark-submit --class package.classname --jars pathofneo4jsparkconnectoryJAR --conf spark.neo4j.bolt.password=***** targetJarFile.jar

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2017-02-19
    • 1970-01-01
    • 2017-12-15
    • 1970-01-01
    • 2017-11-13
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多