【问题标题】:RestAPI service call from Spark Streaming来自 Spark Streaming 的 Rest API 服务调用
【发布时间】:2017-01-23 05:09:42
【问题描述】:

我有一个用例,我需要在从 Kafka 读取消息后从 spark 流中调用 RESTAPI 以执行一些计算并将结果保存回 HDFS 和第三方应用程序。

我对此没有什么疑问:

  • 我们如何直接从 spark 流中调用 RESTAPI。
  • 如何使用流式批处理时间管理 RESTAPI 超时。

【问题讨论】:

    标签: scala rest apache-spark spark-streaming


    【解决方案1】:

    此代码不会按原样编译。但这是给定用例的方法。

    val conf = new SparkConf().setAppName("App name").setMaster("yarn")
    val ssc = new StreamingContext(conf, Seconds(1))
    
    val dstream = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap)
    
    dstream.foreachRDD { rdd =>
    
      //Write the rdd to HDFS directly
      rdd.saveAsTextFile("hdfs/location/to/save")
    
      //loop through each parttion in rdd
      rdd.foreachPartition { partitionOfRecords =>
    
        //1. Create HttpClient object here
        //2.a POST data to API
    
        //Use it if you want record level control in rdd or partion
        partitionOfRecords.foreach { record =>
          //2.b Post the the date to API
          record.toString
        }
      }
      //Use 2.a or 2.b to POST data as per your req
    }
    
    ssc.start()
    ssc.awaitTermination()
    

    大部分 HttpClients(用于 REST 调用)支持请求超时。

    使用 Apache HttpClient 超时的 Http POST 调用示例

    val CONNECTION_TIMEOUT_MS = 20000; // Timeout in millis (20 sec).
    
    val requestConfig = RequestConfig.custom()
      .setConnectionRequestTimeout(CONNECTION_TIMEOUT_MS)
      .setConnectTimeout(CONNECTION_TIMEOUT_MS)
      .setSocketTimeout(CONNECTION_TIMEOUT_MS)
      .build();
    
    HttpClientBuilder.create().build();
    
    val client: CloseableHttpClient = HttpClientBuilder.create().build();
    
    val url = "https://selfsolve.apple.com/wcResults.do"
    val post = new HttpPost(url);
    
    //Set config to post
    post.setConfig(requestConfig)
    
    post.setEntity(EntityBuilder.create.setText("some text to post to API").build())
    
    val response: HttpResponse = client.execute(post)
    

    【讨论】:

    • 现在的问题是集群启用了 kerberos,所以当我在任何节点上 curl restapi URL 时,我会收到预期的输出,但相同的 URL 在 spark 程序中失败,出现 HTTP 503/504 错误。跨度>
    • 我真的很高兴,这对你有帮助。
    • 嗨,Nilesh,我也在尝试执行类似的案例,但是这将使用 Spark Java。在您的情况下,只是想了解您在 Spark Streaming 应用程序中调用 HttpRest 调用的位置。此外,Spark 是否提供任何类,例如 Receiver 类,您将从 URL 接收事件流。就我而言,我想将事件发布到 URL。
    • @Avinash 嘿 Avinash,我在 foreachPartition 上调用 RestAPI 以分布式方式执行 RestAPI,因为这是流式应用程序,请确保您的 HTTP 超时为
    猜你喜欢
    • 2017-07-10
    • 2021-11-17
    • 2016-11-03
    • 1970-01-01
    • 2017-07-30
    • 2018-02-03
    • 2017-08-25
    • 2021-05-11
    • 1970-01-01
    相关资源
    最近更新 更多