本文整理汇总了Python中tweepy.streaming.Stream类的典型用法代码示例。如果您正苦于以下问题:Python Stream类的具体用法?Python Stream怎么用?Python Stream使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了Stream类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: test_track_encoding
def test_track_encoding(self):
s = Stream(None, None)
s._start = lambda is_async: None
s.filter(track=[u'Caf\xe9'])
# Should be UTF-8 encoded
self.assertEqual(u'Caf\xe9'.encode('utf8'), s.body['track'])
开发者ID:tweepy,项目名称:tweepy,代码行数:7,代码来源:test_streaming.py
示例2: filter_track
def filter_track(follow):
auth = OAuthHandler(consumer_key2, consumer_secret2)
auth.set_access_token(access_token2, access_token_secret2)
stream = Stream(auth, MyStreamListener())
api = API(auth)
#print 'start filter track ', ','.join(track)
stream.filter(track=follow)
开发者ID:LookThisCode,项目名称:NextLevelLatAm,代码行数:7,代码来源:twitter_polling.py
示例3: test_follow_encoding
def test_follow_encoding(self):
s = Stream(None, None)
s._start = lambda async: None
s.filter(follow=[u'Caf\xe9'])
# Should be UTF-8 encoded
self.assertEqual(u'Caf\xe9'.encode('utf8'), s.session.params['follow'])
开发者ID:GoNexia,项目名称:tweepy,代码行数:7,代码来源:test_streaming.py
示例4: main
def main():
logging.info("bot starting")
bot = Bot()
logging.info("markov data loading")
if load_csv == True:
markov = MarkovGenerator()
logging.info("load_csv == True")
csv_tweets = Bot.load_csv_tweets("./tweets.csv")
#newest_tweet_idを保存
newest_tweet = csv_tweets.next()
save_data(newest_tweet[0], "newest_tweet_id")
markov.add_from_sentence(newest_tweet[7])
#読み込み
for tweet in csv_tweets:
markov.add_from_sentence(tweet)
markov.save_dictionary()
else:
markov = MarkovGenerator(markov_dictionary=load_data("markov_dictionary"), reversed_markov_dictionary=load_data("reversed_markov_dictionary"))
# 20分ごとの定期ツイート
#multiprocess間での値のやり取り用変数
logging.info("regular tweet starting")
p = Process(target=regular_tweet, args=(bot, markov))
p.start()
# userstreamによるTL監視(リプライ・ふぁぼ・パクツイ・RT)
logging.info("start timeline watching")
stream = Stream(bot.auth, AbstractedlyListener(), secure=True)
user_stream = stream.userstream()
user_stream.start()
开发者ID:airtoxin,项目名称:twitterbot,代码行数:28,代码来源:bot.py
示例5: IRCPlugin
class IRCPlugin(BasePlugin):
name = "twitter_stream"
enabled = True
def _validate(self, event):
return False
def stop(self):
if self._stream:
self._stream.disconnect()
def run(self):
try:
auth = tweepy.OAuthHandler(config.TWITTER_CONSUMER_KEY, config.TWITTER_CONSUMER_SECRET)
auth.set_access_token(config.TWITTER_ACCESS_KEY, config.TWITTER_ACCESS_SECRET)
self._stream = Stream(auth, StreamerToIrc(self))
LOG.info("Twitter stream successfully initializing")
except Exception as e:
LOG.error("Twitter stream authorization failed: %s" % e)
return
followers = []
api = API(auth)
for f in config.TWITTER_FOLLOW_IDS:
if isinstance(f, (str, unicode)):
try:
user_id = api.get_user(f).id
followers.append(str(user_id))
except Exception:
LOG.debug("Can't get ID for %s" % user_id)
continue
else:
followers.append(str(f))
self._stream.filter(followers)
开发者ID:gigimon,项目名称:thpybotter,代码行数:33,代码来源:irc_twi_stream.py
示例6: get_tweets
def get_tweets(keyword):
init_tweepy()
auth = tweepy.OAuthHandler('QxpECDyg9eM6inD0jo7Q','a0A8OE2ZN7imCg6WR5ygkrGvKG6tNtoZIChQXQ8NIf4')
auth.set_access_token('18752311-FXmc9zaPGcszH1bdDNJQa0MY2XRYfYzT3nBRnMqgB','tzXURgYPAbsD1VgmchkoKH9QOJ80qGgSSL13K5A3rY')
api = tweepy.API(auth)
listener = Listener()
stream = Stream(auth, listener)
stream.filter(track=[keyword])
开发者ID:mguarascio,项目名称:candidatefeed,代码行数:8,代码来源:listener.py
示例7: main_logger
def main_logger(basename=None):
l = RotatingLogListener(basename=basename)
auth = OAuthHandler(consumer_key, consumer_secret)
auth.set_access_token(access_token, access_token_secret)
stream = Stream(auth, l)
# This fetches ANY geotagged tweet:
# https://dev.twitter.com/docs/streaming-apis/parameters#locations
stream.filter(locations=[-180,-90,180,90])
开发者ID:chebee7i,项目名称:twitter,代码行数:9,代码来源:stream.py
示例8: filter_track
def filter_track():
q = Scrapers.all().filter('','')
follow=[s.uid for s in q]
auth = OAuthHandler(consumer_key, consumer_secret)
auth.set_access_token(access_token, access_token_secret)
stream = Stream(auth, MyStreamListener())
api = API(auth)
#print 'start filter track ', ','.join(track)
stream.filter(follow=follow)
开发者ID:LookThisCode,项目名称:NextLevelLatAm,代码行数:9,代码来源:twitter_polling.py
示例9: TweepyStreamTests
class TweepyStreamTests(unittest.TestCase):
def setUp(self):
self.auth = create_auth()
self.listener = MockStreamListener(self)
self.stream = Stream(self.auth, self.listener, timeout=3.0)
def tearDown(self):
self.stream.disconnect()
def test_userstream(self):
# Generate random tweet which should show up in the stream.
def on_connect():
API(self.auth).update_status(mock_tweet())
self.listener.connect_cb = on_connect
self.listener.status_stop_count = 1
self.stream.userstream()
self.assertEqual(self.listener.status_count, 1)
def test_userstream_with_params(self):
# Generate random tweet which should show up in the stream.
def on_connect():
API(self.auth).update_status(mock_tweet())
self.listener.connect_cb = on_connect
self.listener.status_stop_count = 1
self.stream.userstream(_with='user', replies='all', stall_warnings=True)
self.assertEqual(self.listener.status_count, 1)
def test_sample(self):
self.listener.status_stop_count = 10
self.stream.sample()
self.assertEquals(self.listener.status_count,
self.listener.status_stop_count)
def test_filter_track(self):
self.listener.status_stop_count = 5
phrases = ['twitter']
self.stream.filter(track=phrases)
self.assertEquals(self.listener.status_count,
self.listener.status_stop_count)
def test_track_encoding(self):
s = Stream(None, None)
s._start = lambda async: None
s.filter(track=[u'Caf\xe9'])
# Should be UTF-8 encoded
self.assertEqual(u'Caf\xe9'.encode('utf8'), s.parameters['track'])
def test_follow_encoding(self):
s = Stream(None, None)
s._start = lambda async: None
s.filter(follow=[u'Caf\xe9'])
# Should be UTF-8 encoded
self.assertEqual(u'Caf\xe9'.encode('utf8'), s.parameters['follow'])
开发者ID:bakagh,项目名称:tweepy,代码行数:57,代码来源:test_streaming.py
示例10: train_tweets
def train_tweets():
tokyo = LocationFence("Tokyo", 138.946381, 35.523285, 139.953232, 35.906849)
hokkaido = LocationFence("Hokkaido", 139.546509, 41.393294, 145.742798, 45.729191)
kyusyu = LocationFence("Kyusyu", 129.538879, 31.147006, 131.856995, 33.934245)
locations = [tokyo, hokkaido, kyusyu]
request_coordinates = []
for l in locations:
request_coordinates += l.get_coordinates()
stream = Stream(oauth(), Trainer(locations), secure=True)
stream.filter(locations=request_coordinates)
开发者ID:isaac-gritz,项目名称:jubatus-example,代码行数:12,代码来源:train.py
示例11: TweepyStreamTests
class TweepyStreamTests(unittest.TestCase):
def setUp(self):
self.auth = create_auth()
self.listener = MockStreamListener(self)
self.stream = Stream(self.auth, self.listener, timeout=3.0)
def tearDown(self):
self.stream.disconnect()
def test_userstream(self):
# Generate random tweet which should show up in the stream.
def on_connect():
API(self.auth).update_status(mock_tweet())
self.listener.connect_cb = on_connect
self.listener.status_stop_count = 1
self.stream.userstream()
self.assertEqual(self.listener.status_count, 1)
def test_sample(self):
self.listener.status_stop_count = 10
self.stream.sample()
self.assertEquals(self.listener.status_count,
self.listener.status_stop_count)
def test_filter_track(self):
self.listener.status_stop_count = 5
phrases = ['twitter']
self.stream.filter(track=phrases)
self.assertEquals(self.listener.status_count,
self.listener.status_stop_count)
开发者ID:dowens,项目名称:tweepy,代码行数:31,代码来源:test_streaming.py
示例12: TweepyStreamBackoffTests
class TweepyStreamBackoffTests(unittest.TestCase):
def setUp(self):
#bad auth causes twitter to return 401 errors
self.auth = OAuthHandler("bad-key", "bad-secret")
self.auth.set_access_token("bad-token", "bad-token-secret")
self.listener = MockStreamListener(self)
self.stream = Stream(self.auth, self.listener)
def tearDown(self):
self.stream.disconnect()
def test_exp_backoff(self):
self.stream = Stream(self.auth, self.listener, timeout=3.0,
retry_count=1, retry_time=1.0, retry_time_cap=100.0)
self.stream.sample()
# 1 retry, should be 4x the retry_time
self.assertEqual(self.stream.retry_time, 4.0)
def test_exp_backoff_cap(self):
self.stream = Stream(self.auth, self.listener, timeout=3.0,
retry_count=1, retry_time=1.0, retry_time_cap=3.0)
self.stream.sample()
# 1 retry, but 4x the retry_time exceeds the cap, so should be capped
self.assertEqual(self.stream.retry_time, 3.0)
mock_resp = MagicMock()
mock_resp.return_value.status = 420
@patch(getresponse_location, mock_resp)
def test_420(self):
self.stream = Stream(self.auth, self.listener, timeout=3.0, retry_count=0,
retry_time=1.0, retry_420=1.5, retry_time_cap=20.0)
self.stream.sample()
# no retries, but error 420, should be double the retry_420, not double the retry_time
self.assertEqual(self.stream.retry_time, 3.0)
开发者ID:GoNexia,项目名称:tweepy,代码行数:35,代码来源:test_streaming.py
示例13: run
def run():
# Authenticates with Twitter
print('Auth')
auth = authenticate()
api = connect_to_api(auth)
print('Handling')
listener = TweepyListener(api)
listener.add_handler(handler=PrintTweetHandler())
print('Streaming')
stream = Stream(auth=auth, listener=listener)
stream.sample()
开发者ID:anhhuy1605,项目名称:test_rtd,代码行数:14,代码来源:tweepy_integrations.py
示例14: main
def main():
try:
config = get_twitter_config(CONFIG_FILE, TWITTER_SCREEN_NAME)
auth = tweepy.OAuthHandler(config['consumer_key'], config['consumer_secret'])
auth.set_access_token(config['access_key'], config['access_secret'])
# auth = tweepy.OAuthHandler(CONSUMER_KEY, CONSUMER_SECRET)
# auth.set_access_token(ACCESS_KEY, ACCESS_SECRET)
listener = Listener()
stream = Stream(auth, listener, timeout=None,secure=1)
#stream = Stream(USERNAME, PASSWORD, listener, timeout=None)
# sample returns all tweets
stream.filter(track = KEYWORDS)
except KeyboardInterrupt:
print '\nGoodbye!'
开发者ID:neilkod,项目名称:raindrop,代码行数:15,代码来源:raindrop.py
示例15: _start_stream
def _start_stream(self):
self.listener = Streamer(self)
self._auth()
self.stream = Stream(self.auth, self.listener)
self.stream.userstream()
开发者ID:Swizz,项目名称:UnnamedGame,代码行数:7,代码来源:crawler.py
示例16: init_stream
def init_stream(self):
self.q = Queue()
self.keywords = []
self.listener = TwitterStreamListener(self.keywords, self.q)
auth = OAuthHandler(config.con_secret, config.con_secret_key)
auth.set_access_token(config.token, config.token_key)
self.stream = Stream(auth, self.listener)
开发者ID:theddnc,项目名称:sentiment-app,代码行数:7,代码来源:twitter_crawler.py
示例17: __listen
def __listen(self):
listener = TweetStreamListener(self.api, self.sentiment, self.error)
stream = Stream(self.auth, listener)
print 'Starting stream...'
try:
stream.filter(track=self.track)
except:
print 'Encountered error!'
print 'Exiting application'
item = {
'status' : 'stream down',
'timestamp' : datetime.utcnow()
}
self.error.save(item)
stream.disconnect()
开发者ID:bedouins,项目名称:TweetIndex,代码行数:16,代码来源:main.py
示例18: stream
def stream(self):
self.botname = self.api.me().screen_name
print "Bot started: @" + self.botname
self.taghandler = TagHandler(self.api, self.botname)
self.customhandler = CustomHandler(self.api)
self.listener = Listener(self.botname, self.taghandler, self.customhandler)
self.stream = Stream(self.auth, self.listener)
self.stream.filter(track=(self.botname, ))
开发者ID:felixonmars,项目名称:tagbot,代码行数:8,代码来源:tagbot.py
示例19: _stream_init
def _stream_init(self):
api1 = API()
headers = {}
headers["Accept-Encoding"] = "deflate, gzip"
stream_auth = BasicAuthHandler(social_keys.TWITTER_HTTP_AUTH_U,social_keys.TWITTER_HTTP_AUTH_P)
l = TestStreamListener(api=api1)
self._stream = Stream(auth=stream_auth,listener=l,secure=True,headers=headers)
开发者ID:twixmit,项目名称:twixmit,代码行数:9,代码来源:streamstest.py
示例20: Crawler
class Crawler():
''' The global crawler manager '''
def __init__(self, name):
self.name = name
self.regexp = re.compile(r'@%s\s+' % (self.name),
re.IGNORECASE)
self._start_stream()
def _auth(self):
self.auth = OAuthHandler(Ukey, Usecret)
self.auth.set_access_token(Akey, Asecret)
self.api = API(self.auth)
def _start_stream(self):
self.listener = Streamer(self)
self._auth()
self.stream = Stream(self.auth, self.listener)
self.stream.userstream()
def on_request(self, text, author):
text = re.sub(self.regexp, '', text)
text = re.sub(r'\s+$', '', text)
lang = get_language(text, key=u'startword')
if lang is not None:
core = Core(lang)
self.send_message(core.send_message(), author)
def send_message(self, text, target=None):
if(target is not None):
text = u'@%s %s' % (target, text)
print(text)
print '>>>', len(text)
self.api.update_status(text)
开发者ID:Swizz,项目名称:UnnamedGame,代码行数:43,代码来源:crawler.py
注:本文中的tweepy.streaming.Stream类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论