本文整理汇总了Python中tests.util.base.add_fake_thread函数的典型用法代码示例。如果您正苦于以下问题:Python add_fake_thread函数的具体用法?Python add_fake_thread怎么用?Python add_fake_thread使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了add_fake_thread函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: test_thread_count
def test_thread_count(db, api_client, default_account):
date1 = datetime.datetime(2015, 1, 1, 0, 0, 0)
date2 = datetime.datetime(2012, 1, 1, 0, 0, 0)
date3 = datetime.datetime(2010, 1, 1, 0, 0, 0)
date4 = datetime.datetime(2009, 1, 1, 0, 0, 0)
date5 = datetime.datetime(2008, 1, 1, 0, 0, 0)
thread1 = add_fake_thread(db.session, default_account.namespace.id)
thread2 = add_fake_thread(db.session, default_account.namespace.id)
test_subject = "test_thread_view_count_with_category"
for thread in [thread1, thread2]:
add_fake_message(db.session, default_account.namespace.id, thread,
subject=test_subject, received_date=date1)
add_fake_message(db.session, default_account.namespace.id, thread,
subject=test_subject, received_date=date2,
add_sent_category=True)
add_fake_message(db.session, default_account.namespace.id, thread,
subject=test_subject, received_date=date3)
add_fake_message(db.session, default_account.namespace.id, thread,
subject=test_subject, received_date=date4,
add_sent_category=True)
add_fake_message(db.session, default_account.namespace.id, thread,
subject=test_subject, received_date=date5)
resp = api_client.get_raw('/threads/?view=count&in=sent')
assert resp.status_code == 200
threads = json.loads(resp.data)
assert threads['count'] == 2
开发者ID:busseyl,项目名称:sync-engine,代码行数:30,代码来源:test_threads.py
示例2: test_update_metadata
def test_update_metadata(db, folder):
"""Check that threads are updated correctly when a label that we haven't
seen before is added to multiple threads -- previously, this would fail
with an IntegrityError because autoflush was disabled."""
first_thread = add_fake_thread(db.session, NAMESPACE_ID)
second_thread = add_fake_thread(db.session, NAMESPACE_ID)
uids = []
first_thread_uids = (22222, 22223)
for msg_uid in first_thread_uids:
message = add_fake_message(db.session, NAMESPACE_ID, first_thread)
uids.append(add_fake_imapuid(db.session, ACCOUNT_ID, message, folder, msg_uid))
second_thread_uids = (22224, 22226)
for msg_uid in second_thread_uids:
message = add_fake_message(db.session, NAMESPACE_ID, second_thread)
uids.append(add_fake_imapuid(db.session, ACCOUNT_ID, message, folder, msg_uid))
db.session.add_all(uids)
db.session.commit()
msg_uids = first_thread_uids + second_thread_uids
new_flags = {msg_uid: GmailFlags((), (u"\\some_new_label",)) for msg_uid in msg_uids}
update_metadata(ACCOUNT_ID, db.session, folder.name, folder.id, msg_uids, new_flags)
db.session.commit()
assert "some_new_label" in [tag.name for tag in first_thread.tags]
assert "some_new_label" in [tag.name for tag in second_thread.tags]
开发者ID:apolmig,项目名称:inbox,代码行数:27,代码来源:test_update_metadata.py
示例3: test_thread_received_recent_date
def test_thread_received_recent_date(db, api_client, default_account):
date1 = datetime.datetime(2015, 1, 1, 0, 0, 0)
date2 = datetime.datetime(2012, 1, 1, 0, 0, 0)
thread1 = add_fake_thread(db.session, default_account.namespace.id)
date_dict = dict()
add_fake_message(db.session, default_account.namespace.id, thread1,
subject="Test Thread 1", received_date=date1,
add_sent_category=True)
add_fake_message(db.session, default_account.namespace.id, thread1,
subject="Test Thread 1", received_date=date2)
date_dict["Test Thread 1"] = date2
thread2 = add_fake_thread(db.session, default_account.namespace.id)
add_fake_message(db.session, default_account.namespace.id, thread2,
subject="Test Thread 2", received_date=date1,
add_sent_category=True)
date_dict["Test Thread 2"] = date1
resp = api_client.client.get(api_client.full_path('/threads/'))
assert resp.status_code == 200
threads = json.loads(resp.data)
for thread in threads:
assert date_dict[thread['subject']] == \
datetime.datetime.fromtimestamp(
thread['last_message_received_timestamp'])
开发者ID:devwaker,项目名称:sync-engine,代码行数:31,代码来源:test_threads.py
示例4: test_data_deduplication
def test_data_deduplication(db, default_namespace, raw_message):
thread = add_fake_thread(db.session, default_namespace.id)
msg = new_message_from_synced(db, default_namespace.account, raw_message)
msg.thread = thread
db.session.add(msg)
db.session.commit()
account = add_fake_account(db.session)
thread = add_fake_thread(db.session, account.namespace.id)
duplicate_msg = new_message_from_synced(db, account, raw_message)
duplicate_msg.thread = thread
db.session.add(duplicate_msg)
db.session.commit()
assert len(msg.parts) == len(duplicate_msg.parts)
for i in range(len(msg.parts)):
msg_block = msg.parts[i].block
duplicate_msg_block = duplicate_msg.parts[i].block
assert msg_block.namespace_id == msg.namespace_id
assert duplicate_msg_block.namespace_id == duplicate_msg.namespace_id
assert msg_block.size == duplicate_msg_block.size
assert msg_block.data_sha256 == duplicate_msg_block.data_sha256
assert msg_block.data == duplicate_msg_block.data
开发者ID:PriviPK,项目名称:privipk-sync-engine,代码行数:26,代码来源:test_message_deduplication.py
示例5: test_add_remove_tags_by_name
def test_add_remove_tags_by_name(api_client, thread, db, default_namespace):
add_fake_thread(db.session, default_namespace.id)
assert 'foo' not in [tag['name'] for tag in api_client.get_data('/tags/')]
assert 'bar' not in [tag['name'] for tag in api_client.get_data('/tags/')]
api_client.post_data('/tags/', {'name': 'foo'})
api_client.post_data('/tags/', {'name': 'bar'})
thread_id = api_client.get_data('/threads/')[0]['id']
thread_path = '/threads/{}'.format(thread_id)
api_client.put_data(thread_path, {'add_tags': ['foo']})
api_client.put_data(thread_path, {'add_tags': ['bar']})
tag_names = [tag['name'] for tag in
api_client.get_data(thread_path)['tags']]
assert 'foo' in tag_names
assert 'bar' in tag_names
# Check that tag was only applied to this thread
another_thread_id = api_client.get_data('/threads/')[1]['id']
tag_names = get_tag_names(
api_client.get_data('/threads/{}'.format(another_thread_id)))
assert 'foo' not in tag_names
api_client.put_data(thread_path, {'remove_tags': ['foo']})
api_client.put_data(thread_path, {'remove_tags': ['bar']})
tag_names = get_tag_names(api_client.get_data(thread_path))
assert 'foo' not in tag_names
assert 'bar' not in tag_names
开发者ID:raghuveerkancherla,项目名称:sync-engine,代码行数:29,代码来源:test_tags.py
示例6: test_data_deduplication
def test_data_deduplication(s3_db, raw_message):
from inbox.models import Namespace
default_namespace = s3_db.session.query(Namespace).get(1)
thread = add_fake_thread(s3_db.session, default_namespace.id)
msg = new_message_from_synced(s3_db, default_namespace.account, raw_message)
msg.thread = thread
s3_db.session.add(msg)
s3_db.session.commit()
account = add_fake_account(s3_db.session)
thread = add_fake_thread(s3_db.session, account.namespace.id)
duplicate_msg = new_message_from_synced(s3_db, account, raw_message)
duplicate_msg.thread = thread
s3_db.session.add(duplicate_msg)
s3_db.session.commit()
assert len(msg.parts) == len(duplicate_msg.parts)
for i in range(len(msg.parts)):
msg_block = msg.parts[i].block
duplicate_msg_block = duplicate_msg.parts[i].block
assert msg_block.namespace_id == msg.namespace_id
assert duplicate_msg_block.namespace_id == duplicate_msg.namespace_id
assert msg_block.size == duplicate_msg_block.size
assert msg_block.data_sha256 == duplicate_msg_block.data_sha256
assert msg_block.data == duplicate_msg_block.data
开发者ID:PriviPK,项目名称:privipk-sync-engine,代码行数:29,代码来源:test_s3.py
示例7: test_received_before_after
def test_received_before_after(db, api_client, default_namespace):
thread = add_fake_thread(db.session, default_namespace.id)
message = add_fake_message(db.session, default_namespace.id, thread,
to_addr=[('Bob', '[email protected]')],
from_addr=[('Alice', '[email protected]')],
received_date=datetime.datetime(year=1999,
day=20,
month=03),
subject='some subject')
thread2 = add_fake_thread(db.session, default_namespace.id)
message2 = add_fake_message(db.session, default_namespace.id, thread,
to_addr=[('Bob', '[email protected]')],
from_addr=[('Alice', '[email protected]')],
received_date=datetime.datetime(year=2000,
day=20,
month=03),
subject='another subject')
inbox = Category(namespace_id=message.namespace_id, name='inbox',
display_name='Inbox', type_='label')
message.categories.add(inbox)
thread.subject = message.subject
message2.categories.add(inbox)
thread2.subject = message2.subject
db.session.commit()
received_date = message.received_date
t_epoch = dt_to_timestamp(datetime.datetime(year=1998, month=2, day=3))
t_firstmsg = dt_to_timestamp(received_date)
results = api_client.get_data('/messages?received_before={}'
.format(t_epoch))
assert len(results) == 0
# received_before should be inclusive (i.e: match <=, not just <).
results = api_client.get_data('/messages?received_before={}'
.format(t_firstmsg))
assert len(results) == 1
t1 = dt_to_timestamp(received_date + datetime.timedelta(days=1))
results = api_client.get_data('/messages?received_after={}'
.format(t1))
assert len(results) == 1
results = api_client.get_data(
'/messages?received_before={}&received_after={}'.format(t1,
t_firstmsg))
assert len(results) == 0
# bogus values
results = api_client.get_data(
'/messages?received_before={}&received_after={}'.format(t_epoch, t1))
assert len(results) == 0
开发者ID:nohobby,项目名称:sync-engine,代码行数:56,代码来源:test_filtering.py
示例8: test_namespace_deletion
def test_namespace_deletion(db, default_account):
from inbox.models import (Account, Thread, Message, Block,
Contact, Event, Transaction)
from inbox.models.util import delete_namespace
models = [Thread, Message]
namespace = default_account.namespace
namespace_id = namespace.id
account_id = default_account.id
account = db.session.query(Account).get(account_id)
assert account
thread = add_fake_thread(db.session, namespace_id)
message = add_fake_message(db.session, namespace_id, thread)
for m in models:
c = db.session.query(m).filter(
m.namespace_id == namespace_id).count()
print "count for", m, ":", c
assert c != 0
fake_account = add_fake_account(db.session)
fake_account_id = fake_account.id
assert fake_account_id != account.id and \
fake_account.namespace.id != namespace_id
thread = add_fake_thread(db.session, fake_account.namespace.id)
thread_id = thread.id
message = add_fake_message(db.session, fake_account.namespace.id, thread)
message_id = message.id
# Delete namespace, verify data corresponding to this namespace /only/
# is deleted
delete_namespace(account_id, namespace_id)
db.session.commit()
account = db.session.query(Account).get(account_id)
assert not account
for m in models:
assert db.session.query(m).filter(
m.namespace_id == namespace_id).count() == 0
fake_account = db.session.query(Account).get(fake_account_id)
assert fake_account
thread = db.session.query(Thread).get(thread_id)
message = db.session.query(Message).get(message_id)
assert thread and message
开发者ID:rskumar,项目名称:sync-engine,代码行数:54,代码来源:test_namespace.py
示例9: test_generic_grouping
def test_generic_grouping(db, generic_account):
thread = add_fake_thread(db.session, NAMESPACE_ID)
message = add_fake_message(db.session, NAMESPACE_ID, thread,
subject="Golden Gate Park next Sat")
imapuid = ImapUid(message=message, account_id=ACCOUNT_ID,
msg_uid=2222)
thread = add_fake_thread(db.session, generic_account.namespace.id)
message = add_fake_message(db.session, NAMESPACE_ID + 1, thread,
subject="Golden Gate Park next Sat")
thread = fetch_corresponding_thread(db.session, generic_account.namespace.id, message)
assert thread is None, ("fetch_similar_threads should "
"heed namespace boundaries")
开发者ID:Analect,项目名称:sync-engine,代码行数:14,代码来源:test_thread_creation.py
示例10: test_message_delete
def test_message_delete(db, gmail_account):
""" Ensure that all associated MessageCategories are deleted
when a Message is deleted """
api_client = new_api_client(db, gmail_account.namespace)
generic_thread = add_fake_thread(db.session, gmail_account.namespace.id)
gen_message = add_fake_message(db.session,
gmail_account.namespace.id,
generic_thread)
category_ids = []
for i in xrange(10):
po_data = api_client.post_data('/labels/',
{"display_name": str(i)})
assert po_data.status_code == 200
category_ids.append(json.loads(po_data.data)['id'])
data = {"label_ids": category_ids}
resp = api_client.put_data('/messages/{}'.
format(gen_message.public_id), data)
assert resp.status_code == 200
associated_mcs = db.session.query(MessageCategory). \
filter(MessageCategory.message_id == gen_message.id).all()
assert len(associated_mcs) == 10
db.session.delete(gen_message)
db.session.commit()
assert db.session.query(MessageCategory). \
filter(MessageCategory.message_id == gen_message.id).all() == []
开发者ID:DrMoriarty,项目名称:sync-engine,代码行数:33,代码来源:test_relationships.py
示例11: test_message_updates_create_transaction
def test_message_updates_create_transaction(db):
with session_scope() as db_session:
with db_session.no_autoflush:
thr = add_fake_thread(db_session, NAMESPACE_ID)
msg = add_fake_message(db_session, NAMESPACE_ID, thr)
msg.is_read = True
db_session.commit()
transaction = get_latest_transaction(db_session, 'message', msg.id,
NAMESPACE_ID)
assert transaction.record_id == msg.id
assert transaction.object_type == 'message'
assert transaction.command == 'update'
msg = add_fake_message(db_session, NAMESPACE_ID, thr)
msg.state = 'sent'
db_session.commit()
transaction = get_latest_transaction(db_session, 'message', msg.id,
NAMESPACE_ID)
assert transaction.record_id == msg.id
assert transaction.object_type == 'message'
assert transaction.command == 'update'
msg = add_fake_message(db_session, NAMESPACE_ID, thr)
msg.is_draft = True
db_session.commit()
transaction = get_latest_transaction(db_session, 'message', msg.id,
NAMESPACE_ID)
assert transaction.record_id == msg.id
assert transaction.object_type == 'message'
assert transaction.command == 'update'
开发者ID:GEverding,项目名称:inbox,代码行数:31,代码来源:test_transaction_creation.py
示例12: test_thread_sent_recent_date
def test_thread_sent_recent_date(db, api_client, default_account):
date1 = datetime.datetime(2015, 1, 1, 0, 0, 0)
date2 = datetime.datetime(2012, 1, 1, 0, 0, 0)
date3 = datetime.datetime(2010, 1, 1, 0, 0, 0)
date4 = datetime.datetime(2009, 1, 1, 0, 0, 0)
date5 = datetime.datetime(2008, 1, 1, 0, 0, 0)
thread1 = add_fake_thread(db.session, default_account.namespace.id)
test_subject = "test_thread_sent_recent_date"
add_fake_message(db.session, default_account.namespace.id, thread1,
subject=test_subject, received_date=date1)
add_fake_message(db.session, default_account.namespace.id, thread1,
subject=test_subject, received_date=date2,
add_sent_category=True)
add_fake_message(db.session, default_account.namespace.id, thread1,
subject=test_subject, received_date=date3)
add_fake_message(db.session, default_account.namespace.id, thread1,
subject=test_subject, received_date=date4,
add_sent_category=True)
add_fake_message(db.session, default_account.namespace.id, thread1,
subject=test_subject, received_date=date5)
resp = api_client.get_raw('/threads/')
assert resp.status_code == 200
threads = json.loads(resp.data)
for thread in threads: # should only be one
assert datetime.datetime.fromtimestamp(
thread['last_message_sent_timestamp']) == date2
开发者ID:busseyl,项目名称:sync-engine,代码行数:31,代码来源:test_threads.py
示例13: test_message_updates_create_thread_transaction
def test_message_updates_create_thread_transaction(db, default_namespace):
with db.session.no_autoflush:
thr = add_fake_thread(db.session, default_namespace.id)
msg = add_fake_message(db.session, default_namespace.id, thr)
transaction = get_latest_transaction(db.session, "thread", thr.id, default_namespace.id)
assert transaction.record_id == thr.id and transaction.object_type == "thread"
assert transaction.command == "update"
# An update to one of the message's propagated_attributes creates a
# revision for the thread
msg.is_read = True
db.session.commit()
new_transaction = get_latest_transaction(db.session, "thread", thr.id, default_namespace.id)
assert new_transaction.id != transaction.id
assert new_transaction.record_id == thr.id and new_transaction.object_type == "thread"
assert new_transaction.command == "update"
# An update to one of its other attributes does not
msg.subject = "Ice cubes and dogs"
db.session.commit()
same_transaction = get_latest_transaction(db.session, "thread", thr.id, default_namespace.id)
assert same_transaction.id == new_transaction.id
开发者ID:yodiyo,项目名称:sync-engine,代码行数:25,代码来源:test_transaction_creation.py
示例14: test_message_label_updates
def test_message_label_updates(db, api_client, default_account, api_version,
custom_label):
"""Check that you can update a message (optimistically or not),
and that the update is queued in the ActionLog."""
headers = dict()
headers['Api-Version'] = api_version
# Gmail threads, messages have a 'labels' field
gmail_thread = add_fake_thread(db.session, default_account.namespace.id)
gmail_message = add_fake_message(db.session,
default_account.namespace.id, gmail_thread)
resp_data = api_client.get_data(
'/messages/{}'.format(gmail_message.public_id), headers=headers)
assert resp_data['labels'] == []
category = custom_label.category
update = dict(labels=[category.public_id])
resp = api_client.put_data(
'/messages/{}'.format(gmail_message.public_id), update,
headers=headers)
resp_data = json.loads(resp.data)
if api_version == API_VERSIONS[0]:
assert len(resp_data['labels']) == 1
assert resp_data['labels'][0]['id'] == category.public_id
else:
assert resp_data['labels'] == []
开发者ID:busseyl,项目名称:sync-engine,代码行数:32,代码来源:test_messages.py
示例15: test_folder_delete
def test_folder_delete(db, generic_account, folder_client):
# Make a new message
generic_thread = add_fake_thread(db.session, generic_account.namespace.id)
gen_message = add_fake_message(db.session,
generic_account.namespace.id,
generic_thread)
g_data = folder_client.get_raw('/folders/')
# Add message to folder
generic_folder = json.loads(g_data.data)[0]
data = {"folder_id": generic_folder['id']}
folder_client.put_data('/messages/{}'.format(gen_message.public_id), data)
# Test that DELETE requests 403 on folders with items in them
d_data = folder_client.delete('/folders/{}'.format(generic_folder['id']))
assert d_data.status_code == 400
# Make an empty folder
resp = folder_client.post_data('/folders/',
{"display_name": "Empty_Folder"})
empty_folder = json.loads(resp.data)
# Test that DELETE requests delete empty folders
d_data = folder_client.delete('/folders/{}'.format(empty_folder['id']))
assert d_data.status_code == 200
category_id = empty_folder['id']
category = db.session.query(Category).filter(
Category.public_id == category_id).one()
assert category.deleted_at != EPOCH
assert category.is_deleted is True
开发者ID:DrMoriarty,项目名称:sync-engine,代码行数:29,代码来源:test_folders_labels.py
示例16: test_basic_message_grouping
def test_basic_message_grouping(db, default_namespace):
first_thread = add_fake_thread(db.session, default_namespace.id)
first_thread.subject = 'Some kind of test'
add_fake_message(db.session, default_namespace.id,
thread=first_thread,
subject='Some kind of test',
from_addr=[('Karim Hamidou', '[email protected]')],
to_addr=[('Eben Freeman', '[email protected]')],
bcc_addr=[('Some person', '[email protected]')])
msg2 = add_fake_message(db.session, default_namespace.id, thread=None,
subject='Re: Some kind of test',
from_addr=[('Some random dude',
'[email protected]')],
to_addr=[('Karim Hamidou', '[email protected]')])
matched_thread = fetch_corresponding_thread(db.session,
default_namespace.id, msg2)
assert matched_thread is None, "the algo shouldn't thread different convos"
msg3 = add_fake_message(db.session, default_namespace.id, thread=None)
msg3.subject = 'Re: Some kind of test'
msg3.from_addr = [('Eben Freeman', '[email protected]')]
msg3.to_addr = [('Karim Hamidou', '[email protected]')]
matched_thread = fetch_corresponding_thread(db.session, default_namespace.id, msg3)
assert matched_thread is first_thread, "Should match on participants"
开发者ID:DrMoriarty,项目名称:sync-engine,代码行数:28,代码来源:test_threading.py
示例17: test_category_delete
def test_category_delete(db, gmail_account):
""" Ensure that all associated MessageCategories are deleted
when a Category is deleted """
api_client = new_api_client(db, gmail_account.namespace)
po_data = api_client.post_data('/labels/',
{"display_name": "Test_Label"})
assert po_data.status_code == 200
category_public_id = json.loads(po_data.data)['id']
category = db.session.query(Category).filter(
Category.public_id == category_public_id).one()
category_id = category.id
for i in xrange(10):
generic_thread = add_fake_thread(db.session,
gmail_account.namespace.id)
gen_message = add_fake_message(db.session,
gmail_account.namespace.id,
generic_thread)
data = {"label_ids": [category_public_id]}
resp = api_client.put_data('/messages/{}'.
format(gen_message.public_id), data)
assert resp.status_code == 200
associated_mcs = db.session.query(MessageCategory). \
filter(MessageCategory.category_id == category_id).all()
assert len(associated_mcs) == 10
db.session.delete(category)
db.session.commit()
assert db.session.query(MessageCategory). \
filter(MessageCategory.category_id == category_id).all() == []
开发者ID:DrMoriarty,项目名称:sync-engine,代码行数:34,代码来源:test_relationships.py
示例18: test_file_transactions
def test_file_transactions(db, default_namespace):
from inbox.models.message import Message
account = default_namespace.account
thread = add_fake_thread(db.session, default_namespace.id)
mime_msg = mime.create.multipart("mixed")
mime_msg.append(
mime.create.text("plain", "This is a message with attachments"),
mime.create.attachment("image/png", "filler", "attached_image.png", "attachment"),
mime.create.attachment("application/pdf", "filler", "attached_file.pdf", "attachment"),
)
msg = Message.create_from_synced(account, 22, "[Gmail]/All Mail", datetime.utcnow(), mime_msg.to_string())
msg.thread = thread
db.session.add(msg)
db.session.commit()
assert len(msg.parts) == 2
assert all(part.content_disposition == "attachment" for part in msg.parts)
block_ids = [part.block.id for part in msg.parts]
with db.session.no_autoflush:
transaction = get_latest_transaction(db.session, "file", block_ids[0], default_namespace.id)
assert transaction.command == "insert"
transaction = get_latest_transaction(db.session, "file", block_ids[1], default_namespace.id)
assert transaction.command == "insert"
开发者ID:yodiyo,项目名称:sync-engine,代码行数:27,代码来源:test_transaction_creation.py
示例19: test_label_delete
def test_label_delete(db, gmail_account, label_client, api_version):
headers = dict()
headers['Api-Version'] = api_version
# Make a new message
gmail_thread = add_fake_thread(db.session, gmail_account.namespace.id)
gmail_message = add_fake_message(db.session,
gmail_account.namespace.id, gmail_thread)
g_data = label_client.get_raw('/labels/', headers=headers)
# Add label to message
gmail_label = json.loads(g_data.data)[0]
data = {"labels": [gmail_label['id']]}
label_client.put_data('/messages/{}'.format(gmail_message.public_id), data,
headers=headers)
# DELETE requests should work on labels whether or not messages have them
d_data = label_client.delete('/labels/{}'.format(gmail_label['id']),
headers=headers)
assert d_data.status_code == 200
if api_version == API_VERSIONS[0]:
# Optimistic update.
category_id = gmail_label['id']
category = db.session.query(Category).filter(
Category.public_id == category_id).one()
assert category.deleted_at != EPOCH
assert category.is_deleted is True
开发者ID:busseyl,项目名称:sync-engine,代码行数:27,代码来源:test_folders_labels.py
示例20: test_deleting_from_a_message_with_multiple_uids
def test_deleting_from_a_message_with_multiple_uids(db):
# Now check that deleting a imapuid from a message with
# multiple uids doesn't delete the message itself
ACCOUNT_ID = 1
NAMESPACE_ID = 1
account = db.session.query(Account).get(ACCOUNT_ID)
inbox_folder = account.inbox_folder
sent_folder = account.sent_folder
thread = add_fake_thread(db.session, NAMESPACE_ID,)
message = add_fake_message(db.session, NAMESPACE_ID, thread)
sent_uid = ImapUid(message=message, account=account, folder=sent_folder,
msg_uid=1337)
inbox_uid = ImapUid(message=message, account=account, folder=inbox_folder,
msg_uid=2222)
db.session.add(sent_uid)
db.session.add(inbox_uid)
db.session.commit()
remove_messages(ACCOUNT_ID, db.session, [2222], inbox_folder.name)
msg = db.session.query(Message).get(message.id)
assert msg is not None, "the associated message should not have been deleted"
assert len(msg.imapuids) == 1, "the message should have only one imapuid"
开发者ID:GEverding,项目名称:inbox,代码行数:27,代码来源:test_message_deletion.py
注:本文中的tests.util.base.add_fake_thread函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论