【问题标题】:stop the Twitter stream and return List of status with twitter4j停止 Twitter 流并使用 twitter4j 返回状态列表
【发布时间】:2017-09-20 15:33:19
【问题描述】:

使用 Twitter4j 提供的代码示例,我想在收集到 1,000 个状态的列表后停止流,并返回此列表。我该怎么做?

public class Stream {
    public List<Status> execute throws TwitterException {

    List<Status> statuses = new ArrayList();

    ConfigurationBuilder cb = new ConfigurationBuilder();
    cb.setDebugEnabled(true);
    cb.setOAuthConsumerKey("bbb");
    cb.setOAuthConsumerSecret("bbb");
    cb.setOAuthAccessToken("bbb");
    cb.setOAuthAccessTokenSecret("bbb");

    TwitterStream twitterStream = new TwitterStreamFactory(cb.build()).getInstance();

    StatusListener listener = new StatusListener() {

        public void onStatus(Status status) {
            statuses.add(status);
            if (statuses.size>1000){
              //return statuses. Obviously that's not the correct place for a return statement...
            }
        }

        public void onDeletionNotice(StatusDeletionNotice statusDeletionNotice) {
            System.out.println("Got a status deletion notice id:" + statusDeletionNotice.getStatusId());
        }

        public void onTrackLimitationNotice(int numberOfLimitedStatuses) {
            System.out.println("Got track limitation notice:" + numberOfLimitedStatuses);
        }

        public void onScrubGeo(long userId, long upToStatusId) {
            System.out.println("Got scrub_geo event userId:" + userId + " upToStatusId:" + upToStatusId);
        }

        public void onException(Exception ex) {
            ex.printStackTrace();
        }
    };

    FilterQuery fq = new FilterQuery();
    String keywords[] = {"Keyword 1", "Keyword 2"};

    fq.track(keywords);

    twitterStream.addListener(listener);
    twitterStream.filter(fq);

}

【问题讨论】:

  • 你有解决办法吗?
  • 不...仍然对答案感兴趣!

标签: java twitter twitter4j


【解决方案1】:

考虑使用BlockingQueue 作为中介,使用监听器将Status 对象添加到其中。

流开始后,您可以开始从队列中获取Statuses,直到获得所需的一千个。

作为一个起点,它看起来像下面这样:

public class Stream {
    private static final int TOTAL_TWEETS = 1000;

    public List<Status> execute() throws TwitterException {
        // skipped for brevity...

        // TODO: You may have to tweak the capacity of the queue, depends on the filter query
        final BlockingQueue<Status> statuses = new LinkedBlockingQueue<Status>(10000); 
        final StatusListener listener = new StatusListener() {

            public void onStatus(Status status) {
                statuses.offer(status); // Add received status to the queue
            }

            // etc...
        };

        final FilterQuery fq = new FilterQuery();
        final String keywords[] = {"Keyword 1", "Keyword 2"};
        fq.track(keywords);

        twitterStream.addListener(listener);
        twitterStream.filter(fq);

        // Collect the 1000 statues
        final List<Status> collected = new ArrayList<Status>(TOTAL_TWEETS);
        while (collected.size() < TOTAL_TWEETS) {
            // TODO: Handle InterruptedException
            final Status status = statuses.poll(10, TimeUnit.SECONDS); 

            if (status == null) {
                // TODO: Consider hitting this too often could indicate no further Tweets
                continue;
            }
            collected.add(status);
        }
        twitterStream.shutdown();

        return collected;
    }
} 

【讨论】:

  • 您好,我正在尝试关闭 Twitter 流,但我总​​是收到一堆警告/错误。我有一种方法可以调用清理和关闭。我得到:(请参阅我的“下面的答案”)。你知道为什么会这样吗?
【解决方案2】:

强制异步代码段进入同步模式不是一个好主意。请看https://stackoverflow.com/a/5934656/276263 看看你是否可以修改你的逻辑。

但是,以下代码可以根据您的要求工作。

public class Stream {

  public static void main(String[] args) throws TwitterException {
    Stream stream = new Stream();
    stream.execute();
  }

  private final Object lock = new Object();
  public List<Status> execute() throws TwitterException {

    final List<Status> statuses = new ArrayList();

    ConfigurationBuilder cb = new ConfigurationBuilder();
    cb.setDebugEnabled(true);
    cb.setOAuthConsumerKey("bbb");
    cb.setOAuthConsumerSecret("bbb");
    cb.setOAuthAccessToken("bbb");
    cb.setOAuthAccessTokenSecret("bbb");

    TwitterStream twitterStream = new TwitterStreamFactory(cb.build())
        .getInstance();

    StatusListener listener = new StatusListener() {

      public void onStatus(Status status) {
        statuses.add(status);
        System.out.println(statuses.size() + ":" + status.getText());
        if (statuses.size() > 100) {
          synchronized (lock) {
            lock.notify();
          }
          System.out.println("unlocked");
        }
      }

      public void onDeletionNotice(
          StatusDeletionNotice statusDeletionNotice) {
        System.out.println("Got a status deletion notice id:"
            + statusDeletionNotice.getStatusId());
      }

      public void onTrackLimitationNotice(int numberOfLimitedStatuses) {
        System.out.println("Got track limitation notice:"
            + numberOfLimitedStatuses);
      }

      public void onScrubGeo(long userId, long upToStatusId) {
        System.out.println("Got scrub_geo event userId:" + userId
            + " upToStatusId:" + upToStatusId);
      }

      public void onException(Exception ex) {
        ex.printStackTrace();
      }

      @Override
      public void onStallWarning(StallWarning sw) {
        System.out.println(sw.getMessage());

      }
    };

    FilterQuery fq = new FilterQuery();
    String keywords[] = { "federer", "nadal", "#Salute" };

    fq.track(keywords);


    twitterStream.addListener(listener);
    twitterStream.filter(fq);

    try {
      synchronized (lock) {
        lock.wait();
      }
    } catch (InterruptedException e) {
      // TODO Auto-generated catch block
      e.printStackTrace();
    }
    System.out.println("returning statuses");
    twitterStream.shutdown();
    return statuses;
  }
}

【讨论】:

  • 我最终接受了这个答案,因为这是我实施的答案,它工作正常。谢谢。
【解决方案3】:

有两种方式:

如果您只需要完成使用流的线程(这是调用您的侦听器的线程),您可以使用twitterStream.cleanUp();。这将优雅地停止线程。您可能希望在状态侦听器中使用 boolean stopped 变量,以便忽略此事件后您将收到的任何调用。

您还可以通过调用twitterStream.shutdown(); 来关闭流消费线程及其调度程序线程(即守护线程),但是这是结束与 twitter API 通信的更残酷的方法。虽然如果您从侦听器内部调用它会起作用,但我更喜欢@krishnakumarp 建议的方法

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2010-12-01
    • 1970-01-01
    • 1970-01-01
    • 2021-03-17
    • 2013-08-31
    • 2014-10-22
    • 2013-05-29
    • 1970-01-01
    相关资源
    最近更新 更多