【问题标题】:Spark Streaming Twitter createStream IssueSpark Streaming Twitter createStream 问题
【发布时间】:2017-09-04 14:14:08
【问题描述】:

我正在尝试使用 Spark Streaming 从 Twitter 流式传输数据。但是
下面的问题。

import org.apache.spark.streaming.twitter._
import twitter4j.auth._
import twitter4j.conf._
import org.apache.spark.streaming.{Seconds,StreamingContext}
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
val ssc = new StreamingContext(sc, Seconds(10))
val cb = new ConfigurationBuildercb.setDebugEnabled(true).setOAuthConsumerKey("").setOAuthConsumerSecret("").setOAuthAccessToken    ("").setOAuthAccessTokenSecret("")
val auth = new OAuthAuthorization(cb.build)
val tweets = TwitterUtils.createStream(ssc,auth)

错误屏幕:

val tweets = TwitterUtils.createStream(ssc,auth)
<console>:49: error: overloaded method value createStream with alternatives:
  (jssc: org.apache.spark.streaming.api.java.JavaStreamingContext,twitterAuth: twitter4j.auth.Authorization)org.apache.spark.streaming.api.java.JavaReceiverInputDStream[twitter4j.Status] <and>
  (jssc: org.apache.spark.streaming.api.java.JavaStreamingContext,filters: Array[String])org.apache.spark.streaming.api.java.JavaReceiverInputDStream[twitter4j.Status] <and>
  (ssc: org.apache.spark.streaming.StreamingContext,twitterAuth: Option[twitter4j.auth.Authorization],filters: Seq[String],storageLevel: org.apache.spark.storage.StorageLevel)org.apache.spark.streaming.dstream.ReceiverInputDStream[twitter4j.Status]
 cannot be applied to (org.apache.spark.streaming.StreamingContext, twitter4j.auth.OAuthAuthorization)
       val tweets = TwitterUtils.createStream(ssc,auth)

【问题讨论】:

标签: scala spark-streaming


【解决方案1】:

问题中的方法有这个签名:

def createStream(
  ssc: StreamingContext,
  twitterAuth: Option[Authorization],
  filters: Seq[String] = Nil,
  storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
)

我们可以看到ssc: StreamingContexttwitterAuth: Option[Authorization] 是强制性的。另外两个是可选的。

在您的情况下,twitterAuth 类型不正确。这是一个Option[Authorization]。在这种情况下,调用应该如下所示:

val tweets = TwitterUtils.createStream(ssc, Some(auth))

【讨论】:

    【解决方案2】:
    import org.apache.spark._
    import org.apache.spark.SparkContext._
    import org.apache.spark.streaming._
    import org.apache.spark.streaming.twitter._
    import org.apache.spark.streaming.StreamingContext._
    
    
    object TwitterStream {
    
    def setupLogging() = {
    import org.apache.log4j.{Level, Logger}   
    val rootLogger = Logger.getRootLogger()
    rootLogger.setLevel(Level.ERROR)   
    }
    
    /** Configures Twitter service credentials using twiter.txt in the main 
    workspace directory */
    def setupTwitter() = {
    import scala.io.Source
    
    for (line <- Source.fromFile("/Users/sampy/twitter.txt").getLines) {
      val fields = line.split(" ")
      if (fields.length == 2) {
        System.setProperty("twitter4j.oauth." + fields(0), fields(1))
      }
    }
    }
    
    /** Our main function where the action happens */
    def main(args: Array[String]) {
    
    setupTwitter()
    
    
    val ssc = new StreamingContext("local[*]", 
    "PopularHashtags",Seconds(5))
    
    setupLogging()
    
    val tweets = TwitterUtils.createStream(ssc, None)
    val engTweets = tweets.filter(x => x.getLang() == "en")
    
    val statuses = engTweets.map(status => status.getText)
    
    val tweetwords = statuses.flatMap(tweetText => tweetText.split(" ")) 
    
    val hashtags = tweetwords.filter(word => word.startsWith("#"))
    
    val hashtagKeyValues = hashtags.map(hashtag => (hashtag, 1)) // 
    
    
    val hashtagCounts = 
    hashtagKeyValues.reduceByKeyAndWindow((x:Int,y:Int)=>x+y, Seconds(5), 
    Seconds(20))
    val sortedResults = hashtagCounts.transform(rdd => rdd.sortBy(x => 
    x._2, false))
    sortedResults.saveAsTextFiles("/Users/sampy/tweetsTwitter","txt")
    
    sortedResults.print
    
    
    
    ssc.checkpoint("/Users/sampy/checkpointTwitter")
    ssc.start()
    ssc.awaitTermination()
    }  
    }
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2017-07-02
      • 1970-01-01
      • 2017-04-20
      • 2017-06-22
      • 2015-03-10
      • 1970-01-01
      相关资源
      最近更新 更多