本文整理汇总了Python中tweepy.models.Status类的典型用法代码示例。如果您正苦于以下问题:Python Status类的具体用法?Python Status怎么用?Python Status使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了Status类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: on_data
def on_data(self, raw_data):
"""Called when raw data is received from connection.
Override this method if you wish to manually handle
the stream data. Return False to stop stream and close connection.
"""
data = json.loads(HTMLParser().unescape(raw_data))
if 'in_reply_to_status_id' in data:
status = Status.parse(self.api, data)
if self.on_status(status) is False:
return False
elif 'delete' in data:
delete = data['delete']['status']
if self.on_delete(delete['id'], delete['user_id']) is False:
return False
elif 'event' in data:
status = Status.parse(self.api, data)
if self.on_event(status) is False:
return False
elif 'direct_message' in data:
status = Status.parse(self.api, data)
if self.on_direct_message(status) is False:
return False
elif 'limit' in data:
if self.on_limit(data['limit']['track']) is False:
return False
elif 'disconnect' in data:
if self.on_disconnect(data['disconnect']) is False:
return False
else:
logging.error("Unknown message type: " + str(raw_data))
开发者ID:Ignalion,项目名称:tweepy,代码行数:32,代码来源:streaming.py
示例2: on_data
def on_data(self, data):
"""Called when raw data is received from connection.
Override this method if you wish to manually handle
the stream data. Return False to stop stream and close connection.
"""
if '{"delete"' in data:
try:
delete = json.loads(data)['delete']['status']
if self.on_delete(delete['id'], delete['user_id']) is False:
return False
except:
delete = json.loads(data)['delete']['direct_message']
if self.on_direct_message_delete(delete['id'], delete['user_id']) is False:
return False
elif '{"direct_message"' in data:
message = DirectMessage.parse(self.api, json.loads(data)['direct_message'])
if self.on_direct_message(message) is False:
return False
elif '{"target"' in data:
event = json.loads(data)
if self.on_event(event) is False:
return False
elif '{"limit"' in data:
if self.on_limit(json.loads(data)['limit']['track']) is False:
return False
elif '"in_reply_to_user_id_str"' in data:
status = Status.parse(self.api, json.loads(data))
if self.on_status(status) is False:
return False
开发者ID:Mezgrman,项目名称:tweepy,代码行数:31,代码来源:streaming.py
示例3: test_end_to_end
def test_end_to_end(filename, connections, expected, tmpdir):
api = MockAPI(connections=connections)
with open(filename, 'r') as f:
status = Status.parse(api, json.load(fp=f))
l = LessListener(api=api, post_replies=True, gather='tweets', state_dir=str(tmpdir))
# 100% festivity for all of December
l.december_greetings = ('It is cold outside.',)
l.festive_probability = 1.
assert l.get_festive_probability(dt.date(2016, 12, 5)) == 1.
l.on_status(status)
# Never reply to the same toot twice
l.on_status(status)
# Rate-limit replies for same word
setattr(status, 'id', status.id + 1)
l.on_status(status)
if expected is None:
assert api._updates == []
else:
assert len(api._updates) == 1
u = api._updates[0]
assert u['status'] == expected
for k, before in connections.items():
after = api._connections[k]
assert ('following' in after) == ('followed_by' in before), \
(k, before, after)
开发者ID:wjt,项目名称:fewerror,代码行数:33,代码来源:test_twitter.py
示例4: on_data
def on_data(self, data):
if time.time() >= self.started + self.duration:
stats = open('{0}-sample.stats'.format(int(self.started)), 'w+')
stats.write("================= STATISTICS =================" + "\n")
stats.write("Start time: " + time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(self.started)) + "\n")
stats.write("End time: " + time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())) + "\n")
stats.write("First Tweet ID: " + self.first_tweet_id + "\n")
stats.write("Last Tweet ID: " + self.last_tweet_id + "\n")
stats.write("Language: " + self.lang + "\n")
stats.write("Language classification threshold: " + str(self.lang_threshold) + "\n")
stats.write("Above threshold: " + str(self.counter[self.lang + '-above']) + "\n")
stats.write("Below threshold: " + str(self.counter[self.lang + '-below']) + "\n")
stats.write("Exluded: " + str(self.counter['excluded']) + "\n")
return False
elif 'in_reply_to_status_id' in data:
status = Status.parse(self.api, json.loads(data))
langclass = langid.classify(status.text)
if (self.counter == {self.lang + '-above':0, self.lang + '-below':0, 'excluded':0}):
self.first_tweet_id = str(status.id)
self.last_tweet_id = str(status.id)
if (langclass[0] == self.lang):
if langclass[1] >= self.lang_threshold:
self.above_output.write(data)
self.counter[self.lang + '-above'] += 1
else:
self.below_output.write(data)
self.counter[self.lang + '-below'] += 1
else:
self.excl_output.write(data)
self.counter['excluded'] += 1
return True
开发者ID:StijnPieper,项目名称:wse,代码行数:34,代码来源:SampleDownloader.py
示例5: process
def process(self, tweet):
status = Status.parse(api, json.loads(tweet))
for lf in UNICODE_LINES:
text = status.text.replace(lf, ' ')
print "@%s (%s, %s, %s, %s): %s"%(status.user.screen_name,
status.user.lang, status.user.statuses_count, status.user.friends_count,
status.user.followers_count, text)
开发者ID:edwardabraham,项目名称:drink_the_hose,代码行数:7,代码来源:drink_the_hose.py
示例6: on_data
def on_data(self, raw_data):
"""Called when raw data is received from connection.
This is where all the data comes first. Normally we could use (inherit)
the on_data() in tweepy.StreamListener, but it unnecessarily and naively
reports unknown event types as errors (to simple log); also, we might want
to tweak it further later on.
But for now, this is basically taken from tweepy's on_data().
Return False to stop stream and close connection.
"""
self.processing_data = True
data = json.loads(raw_data)
if 'in_reply_to_status_id' in data:
status = Status.parse(self.api, data)
if self.on_status(status) is False:
return False
elif 'delete' in data:
delete = data['delete']['status']
if self.on_delete(delete['id'], delete['user_id']) is False:
return False
elif 'event' in data:
status = Status.parse(self.api, data)
if self.on_event(status) is False:
return False
elif 'direct_message' in data:
status = Status.parse(self.api, data)
if self.on_direct_message(status) is False:
return False
elif 'limit' in data:
if self.on_limit(data['limit']['track']) is False:
return False
elif 'disconnect' in data:
if self.on_disconnect(data['disconnect']) is False:
return False
else:
log.debug('TwitterBotStreamListener::on_data(): got event/stream data of'
' unknown type. Raw data follows:\n%s', data)
self.processing_data = False
开发者ID:wfn,项目名称:twidibot,代码行数:44,代码来源:twitter_bot.py
示例7: test_sanitize
def test_sanitize(filename, expected):
api = NonCallableMock()
with open(os.path.join('tests', filename), 'r') as f:
status = Status.parse(api, json.load(f))
text = get_sanitized_text(status)
assert '&' not in text
assert 'http' not in text
assert text == expected
开发者ID:wjt,项目名称:fewerror,代码行数:10,代码来源:test_twitter.py
示例8: on_data
def on_data(self, raw_data):
# called on recieval of raw data
data = json.loads(raw_data)
# start of if tree
if 'in_reply_to_status_id' in data:
status = Status.parse(self.api, data)
if self.on_status(status) is False:
return False
elif 'delete' in data:
delete = data['delete']['status']
if self.on_delete(delete['id'], delete['user_id']) is False:
return False
elif 'event' in data:
status = Status.parse(self.api, data)
if self.on_event(status) is False:
return False
elif 'direct_message' in data:
status = Status.parse(self.api, data)
if self.on_direct_message(status) is False:
return False
开发者ID:u-ahmed,项目名称:HookedUp,代码行数:21,代码来源:Twitter.py
示例9: _read_from_table
def _read_from_table(self):
self.running = True
conn = StatusSource.engine.connect()
meta = MetaData()
table = Table(self.table_name, meta, autoload=True, autoload_with=StatusSource.engine)
cmd = select([table])
results = conn.execute(cmd)
for result in results:
status = Status.parse(None, result)
self.listener.on_status(status)
if self.running == False:
break
开发者ID:bh0085,项目名称:tweeql,代码行数:12,代码来源:stream_from_stream.py
示例10: test_patched_status
def test_patched_status(self):
"""@todo: Docstring for test_patched_status.
:returns: @todo
"""
from tweepy.models import Status
from crawler.tweepy_patch import patch
patch()
s = Status.parse('test_api', {'a': 1, 'b': 2})
# pylint: disable=E1101,W0212
self.assertEqual(s._raw, '{"a": 1, "b": 2}')
self.assertEqual(s.a, 1)
self.assertEqual(s.b, 2)
开发者ID:spacelis,项目名称:crawler.kka,代码行数:13,代码来源:test_tweepy_patch.py
示例11: test_save_tweet
def test_save_tweet(tmpdir, id_, expected_filename):
api = MockAPI(connections={})
foo = tmpdir.join('foo')
l = LessListener(api=api, gather=str(foo), state_dir=str(tmpdir))
s = Status.parse(api=api, json={
'id': int(id_),
'id_str': id_,
})
l.save_tweet(s)
j = tmpdir.join('foo', expected_filename)
assert j.check()
开发者ID:wjt,项目名称:fewerror,代码行数:13,代码来源:test_twitter.py
示例12: on_data
def on_data(self, raw_data):
data = json.loads(raw_data)
if self.verbose:
print data
print '-'*60
if 'in_reply_to_status_id' in data:
status = Status.parse(self.api, data)
if self.on_status(status) is False:
return False
elif 'event' in data:
status = Status.parse(self.api, data)
if self.on_event(status) is False:
return False
elif 'friends' in data:
pass # ignore
elif 'delete' in data:
pass # ignore
elif 'user_suspend' in data:
pass # ignore
else:
logging.error("Unknown message type: " + str(raw_data))
开发者ID:aomoriringo,项目名称:number_bot,代码行数:22,代码来源:tweet.py
示例13: on_data
def on_data(self, data):
if 'in_reply_to_status_id' in data:
status = Status.parse(self.api, json.loads(data))
if self.on_status(status, data) is False:
return False
elif 'delete' in data:
delete = json.loads(data)['delete']['status']
if self.on_delete(delete['id'], delete['user_id']) is False:
return False
elif 'limit' in data:
if self.on_limit(json.loads(data)['limit']['track']) is False:
return False
开发者ID:jaredmichaelsmith,项目名称:PyMiner,代码行数:13,代码来源:Streamer.py
示例14: save_status
def save_status(self, data):
"""TODO"""
status = Status.parse(self.api, json.loads(data))
if not status.geo:
# _datafile.write(data+'\n')
return
if Author.objects.filter(owner__userprofile__twitter_id=status.user.id_str).exists():
# this tweet's author is on stargazer
return
try:
author = Author.objects.filter(source=Author.T_TWITTER, external_id=status.user.id_str).get()
except Author.DoesNotExist:
author = Author(
name=status.user.screen_name,
avatar_uri=status.user.profile_image_url,
source=Author.T_TWITTER,
external_id=status.user.id_str,
)
author.save()
try:
post = Post.objects.filter(source=Post.T_TWITTER, external_id=status.id_str).get()
except Post.DoesNotExist:
lat = float(status.geo["coordinates"][0])
lng = float(status.geo["coordinates"][1])
try:
addr = self._latlng2addr.get(lat, lng)
except (LatLng2Addr.ConnectionFailed, LatLng2Addr.GeocodingFailed) as e:
addr = ""
# twitter api response in UTC
created = status.created_at + timedelta(hours=8)
post = Post(
content=status.text,
author=author,
latitude=lat,
longitude=lng,
address=addr,
source=Post.T_TWITTER,
external_id=status.id_str,
external_data=data,
created=created,
)
post.save()
return
开发者ID:snow,项目名称:stargazer,代码行数:51,代码来源:streaming.py
示例15: on_data
def on_data(self, raw_data):
"""Called when raw data is received from connection.
Override this method if you wish to manually handle
the stream data. Return False to stop stream and close connection.
"""
data = json.loads(raw_data)
if "in_reply_to_status_id" in data:
status = Status.parse(self.api, data)
if self.on_status(status) is False:
return False
elif "delete" in data:
delete = data["delete"]["status"]
if self.on_delete(delete["id"], delete["user_id"]) is False:
return False
elif "event" in data:
status = Status.parse(self.api, data)
if self.on_event(status) is False:
return False
elif "direct_message" in data:
status = Status.parse(self.api, data)
if self.on_direct_message(status) is False:
return False
elif "friends" in data:
if self.on_friends(data["friends"]) is False:
return False
elif "limit" in data:
if self.on_limit(data["limit"]["track"]) is False:
return False
elif "disconnect" in data:
if self.on_disconnect(data["disconnect"]) is False:
return False
elif "warning" in data:
if self.on_warning(data["warning"]) is False:
return False
else:
logging.error("Unknown message type: " + str(raw_data))
开发者ID:jrgrafton,项目名称:tweet-debate,代码行数:38,代码来源:streaming.py
示例16: on_data
def on_data(self, data):
"""Called when raw data is received from connection.
Override this method if you wish to manually handle
the stream data. Return False to stop stream and close connection.
"""
if 'in_reply_to_status_id' in data:
status = Status.parse(self.api, json.loads(data))
if self.on_status(status) is False:
return False
elif 'delete' in data:
delete = json.loads(data)['delete']['status']
if self.on_delete(delete['id'], delete['user_id']) is False:
return False
elif 'limit' in data:
if self.on_limit(json.loads(data)['limit']['track']) is False:
return False
开发者ID:dtran320,项目名称:tweepy,代码行数:17,代码来源:streaming.py
示例17: on_data
def on_data(self, data):
"""Called when raw data is received from connection.
Override this method if you wish to manually handle
the stream data. Return False to stop stream and close connection.
"""
if 'in_reply_to_status_id' in data:
status = Status.parse(self.api, json.loads(data))
return self.on_status(status)
elif 'delete' in data:
delete = json.loads(data)['delete']['status']
if self.on_delete(delete['id'], delete['user_id']) is False:
return False
elif 'limit' in data:
if self.on_limit(json.loads(data)['limit']['track']) is False:
return False
elif 'sender_id' in data and 'recipient_id' in data:
dm = DirectMessage.parse(self.api, json.loads(data))
return self.on_dm(dm)
elif 'event' in data and 'follow' in data:
content = json.loads(data)
if 'event' in content and content['event'] == 'follow':
return self.on_follow(content)
开发者ID:vinodkone,项目名称:NextRoute,代码行数:24,代码来源:nextroute.py
示例18: test_skip_check
def test_skip_check():
filt = skip_check([])
tweet = Status()
tweet.text = 'This is a test #nowplaying'
assert filt(tweet) is True
开发者ID:JNRowe-retired,项目名称:bleeter,代码行数:5,代码来源:test_bleeter.py
示例19:
hashtag = 0
url = 0
question = 0
exclamation = 0
pos_term = 0
neg_term = 0
pos_emoticon = 0
neg_emoticon = 0
reply = 0
moment_morning = 0
moment_afternoon = 0
moment_evening = 0
moment_night = 0
retweeted = 0
status = Status.parse(api, tweet)
if tweet['id'] in error_list_tweet_ids:
tweets_discarded_error += 1
elif tweet['text'].startswith("RT @"):
tweets_discarded_retweet += 1
else:
tweets_considered += 1
if regex_username.search(tweet['text']) != None:
tweets_username += 1
username = 1
if regex_hashtag.search(tweet['text']) != None:
tweets_hashtag += 1
hashtag = 1
if regex_url.search(tweet['text']) != None:
tweets_url += 1
开发者ID:StijnPieper,项目名称:wse,代码行数:31,代码来源:SampleReformatter.py
示例20: update_tweets
def update_tweets(self):
print "Updating tweets"
statuses = []
try:
while True:
item = self.incoming.pop() # It's gonna throw up someday!
if "in_reply_to_status_id" in item:
statuses.append(Status.parse(self.stream.api, json.loads(item)))
# Ignore anything other than status updates for now
#else:
# statuses.append(json.loads(item))
except IndexError:
pass
broadcast = {}
broadcast['general'] = {}
broadcast['channels'] = {}
for s in statuses:
tags = re.findall("#([\w]+)(?iu)", s.text) # Case-insensitive, Unicode matching
print "Tags: "
print tags
self.db.execute("INSERT INTO tweets (id, user_id, screen_name, profile_image_url, created_at, text) VALUES (%s,%s,%s,%s,%s,%s)", s.id, s.user.id, s.user.screen_name, s.user.profile_image_url, s.created_at, s.text)
# Establish HABTM relationships, tweets with tags
for t in tags:
t = t.lower() # Force all to lowercase
print "Inserting tag: %s" % t
self.db.execute('''INSERT INTO hashtags (tag) VALUES (%s) ON DUPLICATE KEY UPDATE id=LAST_INSERT_ID(id), tag=%s;
INSERT INTO hashtags_tweets (hash_id, tweet_id) VALUES (LAST_INSERT_ID(), %s)''', t, t, s.id)
# Count the votes while we're at it
if t in campboard['sessions']:
# Attach the tweet to the broadcast channel
if not broadcast['channels'].has_key(t):
broadcast['channels'][t] = {}
broadcast['channels'][t]['recent_tweets'] = []
broadcast['channels'][t]['recent_tweets'].append(
{
'text': s.text, 'created_at': unicode(s.created_at), 'id': s.id,
'user': {
'id': s.user.id,
'screen_name': s.user.screen_name,
'profile_image_url': s.user.profile_image_url
}
}
)
vote_type = None
if re.search('\+1', s.text):
#vote_type = "positive"
self.db.execute("INSERT INTO session_votes (`session`, positive) VALUES (%s, 1) ON DUPLICATE KEY UPDATE positive=positive+1", t)
elif re.search('\-1', s.text):
#vote_type = "negative"
self.db.execute("INSERT INTO session_votes (`session`, negative) VALUES (%s, 1) ON DUPLICATE KEY UPDATE negative=negative+1", t)
broadcast['general']['recent_tweets'] = [
{
'text': s.text, 'created_at': unicode(s.created_at), 'id': s.id,
'user': {
'id': s.user.id,
'screen_name': s.user.screen_name,
'profile_image_url': s.user.profile_image_url
}
}
for s in statuses
]
return broadcast
开发者ID:ruiwen,项目名称:CampBoard,代码行数:73,代码来源:campboard.py
注:本文中的tweepy.models.Status类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论