【发布时间】:2021-08-29 04:30:26
【问题描述】:
我正在尝试将基本的流式 Twitter 机器人转换为 Tweepy 4.0 alpha,以便利用一些较新的功能。但是,我什至无法开始使用描述新流协议的文档。
我已将我的程序简化为一个测试用例,但运行它仍然会给出错误AttributeError: 'MyStreamListener' object has no attribute 'running'。有人能指出我的问题在哪里吗?关于这个特定错误似乎没有任何其他问题,我试图密切关注文档。由于缺少示例,我不得不做出一些猜测——例如,连接到流需要一组不同的参数,而不是连接到常规 api,因此有两个 get_auth 调用——但目前我是更感兴趣的是让它工作而不是让事情变得超级干净。
#!/usr/bin/env python3
import tweepy
import json
import configparser
import os
def _read_config():
config = configparser.ConfigParser()
script_dir = os.path.dirname(__file__)
config_file = os.path.join(script_dir, '../bot-config.ini')
config.read(config_file)
return config
def _get_api_auth(config):
auth = tweepy.OAuthHandler(
config['auth']['consumer_key'], config['auth']['consumer_secret'])
auth.set_access_token(config['auth']['token'],
config['auth']['token_secret'])
return auth
def _get_stream_auth(config):
return (config['auth']['consumer_key'], config['auth']['consumer_secret'],
config['auth']['token'], config['auth']['token_secret'])
class MyStreamListener(tweepy.Stream):
def __init__(self, stream_auth, api):
super(tweepy.Stream, self).__init__()
self._api = api
self._tweet_count = 0
def on_data(self, data):
tweet = json.loads(data)
if 'text' not in tweet:
return
self._tweet_count += 1
if tweet['truncated']:
text = tweet['extended_tweet']['full_text']
else:
text = tweet['text']
if 'Rumplestiltskin' in text:
try:
self._api.retweet(tweet['id'])
print('Tweeted id %d!', tweet['id'])
except tweepy.TweepError as err:
print('Unable to tweet %d:', tweet['id'])
print(err.reason)
if (self._tweet_count % 10000) == 0:
print('Processed %s tweets', self._tweet_count)
def run():
config = _read_config()
api_auth = _get_api_auth(config)
api = tweepy.API(api_auth)
stream_auth = _get_stream_auth(config)
twitterStream = MyStreamListener(stream_auth, api)
twitterStream.sample()
if __name__ == '__main__':
run()
【问题讨论】: