【问题标题】:Can't get a stream of Tweets using Twitter Streaming API on Spark无法在 Spark 上使用 Twitter 流 API 获取推文流
【发布时间】:2016-07-06 11:33:09
【问题描述】:

我正在尝试使用 Twitter Streaming API 对推文进行一些分析。

我首先想从流中打印状态消息,然后从那里开始。

我的代码如下所示:

public static void main(String[] args) {
  SparkConf conf = new SparkConf().setAppName("TwitterStreamPrinter").setMaster("local");

  Configuration twitterConf = new ConfigurationBuilder()
      .setOAuthConsumerKey(consumerKey)
      .setOAuthConsumerSecret(consumerSecret)
      .setOAuthAccessToken(accessToken)
      .setOAuthAccessTokenSecret(accessTokenSecret).build();
  OAuth2Authorization auth = new OAuth2Authorization(twitterConf);
  JavaReceiverInputDStream<Status> twitterStream = TwitterUtils.createStream(ssc, auth);

  JavaDStream<String> statuses = twitterStream.map(new Function<Status, String>() {
    public String call(Status status) throws Exception {
      return status.getText();
    }
  });
  statuses.print();

它不会打印除Spark 日志以外的任何内容。我一开始还以为是授权的原因,所以我尝试了各种不同的方式来通过授权,但可能不是授权。

我查看了可以从网络上找到的每个示例(尽管数量不多),这段代码看起来像是获取 Twitter 状态的标准代码,但为什么它没有打印任何内容?我也试过System.out.println,但没用。

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
16/03/19 12:02:23 INFO SparkContext: Running Spark version 1.6.1
16/03/19 12:02:24 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/03/19 12:02:24 INFO SecurityManager: Changing view acls to: abcd
16/03/19 12:02:24 INFO SecurityManager: Changing modify acls to: abcd
16/03/19 12:02:24 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(abcd); users with modify permissions: Set(abcd)
16/03/19 12:02:24 INFO Utils: Successfully started service 'sparkDriver' on port 50995.
16/03/19 12:02:24 INFO Slf4jLogger: Slf4jLogger started
16/03/19 12:02:25 INFO Remoting: Starting remoting
16/03/19 12:02:25 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriverActorSystem@10.0.0.12:51003]
16/03/19 12:02:25 INFO Utils: Successfully started service 'sparkDriverActorSystem' on port 51003.
16/03/19 12:02:25 INFO SparkEnv: Registering MapOutputTracker
16/03/19 12:02:25 INFO SparkEnv: Registering BlockManagerMaster
16/03/19 12:02:25 INFO DiskBlockManager: Created local directory at /private/var/folders/3b/wzflbsn146qgwdglbm_6ms3m0000hl/T/blockmgr-e3de07a6-0c62-47cf-9940-da18382c9241
16/03/19 12:02:25 INFO MemoryStore: MemoryStore started with capacity 2.4 GB
16/03/19 12:02:25 INFO SparkEnv: Registering OutputCommitCoordinator
16/03/19 12:02:25 INFO Utils: Successfully started service 'SparkUI' on port 4040.
16/03/19 12:02:25 INFO SparkUI: Started SparkUI at http://10.0.0.12:4040
16/03/19 12:02:25 INFO Executor: Starting executor ID driver on host localhost
16/03/19 12:02:25 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 51016.
16/03/19 12:02:25 INFO NettyBlockTransferService: Server created on 51016
16/03/19 12:02:25 INFO BlockManagerMaster: Trying to register BlockManager
16/03/19 12:02:25 INFO BlockManagerMasterEndpoint: Registering block manager localhost:51016 with 2.4 GB RAM, BlockManagerId(driver, localhost, 51016)
16/03/19 12:02:25 INFO BlockManagerMaster: Registered BlockManager
16/03/19 12:02:25 WARN StreamingContext: spark.master should be set as local[n], n > 1 in local mode if you have receivers to get data, otherwise Spark jobs will not get resources to process the received data.
16/03/19 12:02:26 INFO SparkContext: Invoking stop() from shutdown hook
16/03/19 12:02:26 INFO SparkUI: Stopped Spark web UI at http://10.0.0.12:4040
16/03/19 12:02:26 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
16/03/19 12:02:26 INFO MemoryStore: MemoryStore cleared
16/03/19 12:02:26 INFO BlockManager: BlockManager stopped
16/03/19 12:02:26 INFO BlockManagerMaster: BlockManagerMaster stopped
16/03/19 12:02:26 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
16/03/19 12:02:26 INFO SparkContext: Successfully stopped SparkContext
16/03/19 12:02:26 INFO ShutdownHookManager: Shutdown hook called
16/03/19 12:02:26 INFO ShutdownHookManager: Deleting directory /private/var/folders/3b/.....

【问题讨论】:

    标签: java twitter apache-spark


    【解决方案1】:

    您的日志中包含所有内容:

    6/03/19 12:02:25 WARN StreamingContext:spark.master 应设置为 local[n],如果您有接收器来获取数据,则在本地模式下 n > 1,否则 Spark 作业将无法获取资源处理接收到的数据。

    所以答案是将master设置为本地[*]

    另外,你忘记开始了吗?

    jssc.start(); // 开始计算

    jssc.awaitTermination();

    【讨论】:

    • 更改为本地 [*] 会删除 WARN 消息,但仍不打印任何内容。
    • 你能更新代码和日志吗?你有多少核心?
    • 完全相同的日志,但没有该 WARN 消息。据我所知,输出不应该取决于核心数量。
    • 添加了一个答案 - 你开始你的流媒体上下文了吗?并等待终止?
    • 谢谢。我没有添加 awaitTermination,但从没想过这是个问题。
    猜你喜欢
    • 2014-02-15
    • 1970-01-01
    • 2017-09-13
    • 1970-01-01
    • 2015-12-26
    • 2018-08-20
    • 2015-08-27
    • 2016-08-01
    • 2023-03-18
    相关资源
    最近更新 更多