本文整理汇总了Python中tests.imap.data.mock_imapclient.add_folder_data函数的典型用法代码示例。如果您正苦于以下问题:Python add_folder_data函数的具体用法?Python add_folder_data怎么用?Python add_folder_data使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了add_folder_data函数的15个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: test_gmail_message_deduplication
def test_gmail_message_deduplication(db, default_account, all_mail_folder,
trash_folder, mock_imapclient):
uid = 22
uid_values = uid_data.example()
mock_imapclient.list_folders = lambda: [(('\\All', '\\HasNoChildren',),
'/', u'[Gmail]/All Mail'),
(('\\Trash', '\\HasNoChildren',),
'/', u'[Gmail]/Trash')]
mock_imapclient.idle = lambda: None
mock_imapclient.add_folder_data(all_mail_folder.name, {uid: uid_values})
mock_imapclient.add_folder_data(trash_folder.name, {uid: uid_values})
all_folder_sync_engine = GmailFolderSyncEngine(
default_account.id, default_account.namespace.id, all_mail_folder.name,
default_account.email_address, 'gmail',
BoundedSemaphore(1))
all_folder_sync_engine.initial_sync()
trash_folder_sync_engine = GmailFolderSyncEngine(
default_account.id, default_account.namespace.id, trash_folder.name,
default_account.email_address, 'gmail',
BoundedSemaphore(1))
trash_folder_sync_engine.initial_sync()
# Check that we have two uids, but just one message.
assert [(uid,)] == db.session.query(ImapUid.msg_uid).filter(
ImapUid.folder_id == all_mail_folder.id).all()
assert [(uid,)] == db.session.query(ImapUid.msg_uid).filter(
ImapUid.folder_id == trash_folder.id).all()
assert db.session.query(Message).filter(
Message.namespace_id == default_account.namespace.id,
Message.g_msgid == uid_values['X-GM-MSGID']).count() == 1
开发者ID:GordonYip,项目名称:sync-engine,代码行数:35,代码来源:test_folder_sync.py
示例2: test_condstore_flags_refresh
def test_condstore_flags_refresh(db, default_account, all_mail_folder,
mock_imapclient, monkeypatch):
monkeypatch.setattr(
'inbox.mailsync.backends.imap.generic.CONDSTORE_FLAGS_REFRESH_BATCH_SIZE',
10)
uid_dict = uids.example()
mock_imapclient.add_folder_data(all_mail_folder.name, uid_dict)
mock_imapclient.capabilities = lambda: ['CONDSTORE']
folder_sync_engine = FolderSyncEngine(default_account.id,
default_account.namespace.id,
all_mail_folder.name,
default_account.email_address,
'gmail',
BoundedSemaphore(1))
folder_sync_engine.initial_sync()
# Change the labels provided by the mock IMAP server
for k, v in mock_imapclient._data[all_mail_folder.name].items():
v['X-GM-LABELS'] = ('newlabel',)
v['MODSEQ'] = (k,)
folder_sync_engine.highestmodseq = 0
folder_sync_engine.poll_impl()
imapuids = db.session.query(ImapUid). \
filter_by(folder_id=all_mail_folder.id).all()
for imapuid in imapuids:
assert 'newlabel' in [l.name for l in imapuid.labels]
assert folder_sync_engine.highestmodseq == mock_imapclient.folder_status(
all_mail_folder.name, ['HIGHESTMODSEQ'])['HIGHESTMODSEQ']
开发者ID:GordonYip,项目名称:sync-engine,代码行数:31,代码来源:test_folder_sync.py
示例3: test_renamed_label_refresh
def test_renamed_label_refresh(db, default_account, thread, message,
imapuid, folder, mock_imapclient, monkeypatch):
# Check that imapuids see their labels refreshed after running
# the LabelRenameHandler.
msg_uid = imapuid.msg_uid
uid_dict = {msg_uid: GmailFlags((), ('stale label',))}
update_metadata(default_account.id, folder.id, uid_dict, db.session)
new_flags = {msg_uid: {'FLAGS': ('\\Seen',), 'X-GM-LABELS': ('new label',)}}
mock_imapclient._data['[Gmail]/All mail'] = new_flags
mock_imapclient.add_folder_data(folder.name, new_flags)
monkeypatch.setattr(MockIMAPClient, 'search',
lambda x, y: [msg_uid])
rename_handler = LabelRenameHandler(default_account.id,
default_account.namespace.id,
'new label')
rename_handler.start()
rename_handler.join()
labels = list(imapuid.labels)
assert len(labels) == 1
assert labels[0].name == 'new label'
开发者ID:yodiyo,项目名称:sync-engine,代码行数:26,代码来源:test_delete_handling.py
示例4: test_generic_flags_refresh_expunges_transient_uids
def test_generic_flags_refresh_expunges_transient_uids(
db, generic_account, inbox_folder, mock_imapclient, monkeypatch):
# Check that we delete UIDs which are synced but quickly deleted, so never
# show up in flags refresh.
uid_dict = uids.example()
mock_imapclient.add_folder_data(inbox_folder.name, uid_dict)
inbox_folder.imapfolderinfo = ImapFolderInfo(account=generic_account,
uidvalidity=1,
uidnext=1)
db.session.commit()
folder_sync_engine = FolderSyncEngine(generic_account.id,
generic_account.namespace.id,
inbox_folder.name,
generic_account.email_address,
'custom',
BoundedSemaphore(1))
folder_sync_engine.initial_sync()
folder_sync_engine.poll_impl()
msg = db.session.query(Message).filter_by(
namespace_id=generic_account.namespace.id).first()
transient_uid = ImapUid(folder=inbox_folder, account=generic_account,
message=msg, msg_uid=max(uid_dict) + 1)
db.session.add(transient_uid)
db.session.commit()
folder_sync_engine.last_slow_refresh = None
folder_sync_engine.poll_impl()
with pytest.raises(ObjectDeletedError):
transient_uid.id
开发者ID:GordonYip,项目名称:sync-engine,代码行数:28,代码来源:test_folder_sync.py
示例5: test_change_labels
def test_change_labels(db, default_account, message, folder, mock_imapclient):
mock_imapclient.add_folder_data(folder.name, {})
mock_imapclient.add_gmail_labels = mock.Mock()
mock_imapclient.remove_gmail_labels = mock.Mock()
add_fake_imapuid(db.session, default_account.id, message, folder, 22)
change_labels(default_account.id, message.id,
{'removed_labels': ['\\Inbox'],
'added_labels': [u'motörhead', u'μετάνοια']})
mock_imapclient.add_gmail_labels.assert_called_with(
[22], ['mot&APY-rhead', '&A7wDtQPEA6wDvQO,A7kDsQ-'])
mock_imapclient.remove_gmail_labels.assert_called_with([22], ['\\Inbox'])
开发者ID:nagyistge,项目名称:nylas-sync-engine,代码行数:12,代码来源:test_actions.py
示例6: test_change_labels
def test_change_labels(db, default_account, message, folder, mock_imapclient):
mock_imapclient.add_folder_data(folder.name, {})
mock_imapclient.add_gmail_labels = mock.Mock()
mock_imapclient.remove_gmail_labels = mock.Mock()
add_fake_imapuid(db.session, default_account.id, message, folder, 22)
with writable_connection_pool(default_account.id).get() as crispin_client:
change_labels(crispin_client, default_account.id, message.id,
{'removed_labels': ['\\Inbox'],
'added_labels': [u'motörhead', u'μετάνοια']})
mock_imapclient.add_gmail_labels.assert_called_with(
[22], ['mot&APY-rhead', '&A7wDtQPEA6wDvQO,A7kDsQ-'], silent=True)
mock_imapclient.remove_gmail_labels.assert_called_with([22], ['\\Inbox'],
silent=True)
开发者ID:brandoncc,项目名称:sync-engine,代码行数:14,代码来源:test_actions.py
示例7: test_change_flags
def test_change_flags(db, default_account, message, folder, mock_imapclient):
mock_imapclient.add_folder_data(folder.name, {})
mock_imapclient.add_flags = mock.Mock()
mock_imapclient.remove_flags = mock.Mock()
add_fake_imapuid(db.session, default_account.id, message, folder, 22)
mark_unread(default_account.id, message.id, {'unread': False})
mock_imapclient.add_flags.assert_called_with([22], ['\\Seen'])
mark_unread(default_account.id, message.id, {'unread': True})
mock_imapclient.remove_flags.assert_called_with([22], ['\\Seen'])
mark_starred(default_account.id, message.id, {'starred': True})
mock_imapclient.add_flags.assert_called_with([22], ['\\Flagged'])
mark_starred(default_account.id, message.id, {'starred': False})
mock_imapclient.remove_flags.assert_called_with([22], ['\\Flagged'])
开发者ID:nagyistge,项目名称:nylas-sync-engine,代码行数:16,代码来源:test_actions.py
示例8: test_imap_message_deduplication
def test_imap_message_deduplication(db, generic_account, inbox_folder,
generic_trash_folder, mock_imapclient):
uid = 22
uid_values = uid_data.example()
mock_imapclient.list_folders = lambda: [(('\\All', '\\HasNoChildren',),
'/', u'/Inbox'),
(('\\Trash', '\\HasNoChildren',),
'/', u'/Trash')]
mock_imapclient.idle = lambda: None
mock_imapclient.add_folder_data(inbox_folder.name, {uid: uid_values})
mock_imapclient.add_folder_data(generic_trash_folder.name,
{uid: uid_values})
folder_sync_engine = FolderSyncEngine(
generic_account.id,
generic_account.namespace.id,
inbox_folder.name,
generic_account.email_address,
'custom',
BoundedSemaphore(1),
mock.Mock())
folder_sync_engine.initial_sync()
trash_folder_sync_engine = FolderSyncEngine(
generic_account.id,
generic_account.namespace.id,
generic_trash_folder.name,
generic_account.email_address,
'custom',
BoundedSemaphore(1),
mock.Mock())
trash_folder_sync_engine.initial_sync()
# Check that we have two uids, but just one message.
assert [(uid,)] == db.session.query(ImapUid.msg_uid).filter(
ImapUid.folder_id == inbox_folder.id).all()
assert [(uid,)] == db.session.query(ImapUid.msg_uid).filter(
ImapUid.folder_id == generic_trash_folder.id).all()
# used to uniquely ID messages
body_sha = sha256(uid_values['BODY[]']).hexdigest()
assert db.session.query(Message).filter(
Message.namespace_id == generic_account.namespace.id,
Message.data_sha256 == body_sha).count() == 1
开发者ID:tahajahangir,项目名称:sync-engine,代码行数:47,代码来源:test_folder_sync.py
示例9: test_renamed_label_refresh
def test_renamed_label_refresh(db, default_account, thread, message,
imapuid, folder, mock_imapclient, monkeypatch):
# Check that imapuids see their labels refreshed after running
# the LabelRenameHandler.
msg_uid = imapuid.msg_uid
uid_dict = {msg_uid: GmailFlags((), ('stale label',), ('23',))}
update_metadata(default_account.id, folder.id, folder.canonical_name,
uid_dict, db.session)
new_flags = {msg_uid: {'FLAGS': ('\\Seen',), 'X-GM-LABELS': ('new label',),
'MODSEQ': ('23',)}}
mock_imapclient._data['[Gmail]/All mail'] = new_flags
mock_imapclient.add_folder_data(folder.name, new_flags)
monkeypatch.setattr(MockIMAPClient, 'search',
lambda x, y: [msg_uid])
semaphore = Semaphore(value=1)
rename_handler = LabelRenameHandler(default_account.id,
default_account.namespace.id,
'new label', semaphore)
# Acquire the semaphore to check that LabelRenameHandlers block if
# the semaphore is in-use.
semaphore.acquire()
rename_handler.start()
# Wait 10 secs and check that the data hasn't changed.
gevent.sleep(10)
labels = list(imapuid.labels)
assert len(labels) == 1
assert labels[0].name == 'stale label'
semaphore.release()
rename_handler.join()
db.session.refresh(imapuid)
# Now check that the label got updated.
labels = list(imapuid.labels)
assert len(labels) == 1
assert labels[0].name == 'new label'
开发者ID:bdimcheff,项目名称:sync-engine,代码行数:44,代码来源:test_delete_handling.py
示例10: test_gmail_initial_sync
def test_gmail_initial_sync(db, default_account, all_mail_folder,
mock_imapclient):
uid_dict = uids.example()
mock_imapclient.add_folder_data(all_mail_folder.name, uid_dict)
mock_imapclient.list_folders = lambda: [(('\\All', '\\HasNoChildren',),
'/', u'[Gmail]/All Mail')]
mock_imapclient.idle = lambda: None
folder_sync_engine = GmailFolderSyncEngine(default_account.id,
default_account.namespace.id,
all_mail_folder.name,
default_account.email_address,
'gmail',
BoundedSemaphore(1))
folder_sync_engine.initial_sync()
saved_uids = db.session.query(ImapUid).filter(
ImapUid.folder_id == all_mail_folder.id)
assert {u.msg_uid for u in saved_uids} == set(uid_dict)
开发者ID:GordonYip,项目名称:sync-engine,代码行数:19,代码来源:test_folder_sync.py
示例11: test_new_uids_synced_when_polling
def test_new_uids_synced_when_polling(db, generic_account, inbox_folder,
mock_imapclient):
uid_dict = uids.example()
mock_imapclient.add_folder_data(inbox_folder.name, uid_dict)
inbox_folder.imapfolderinfo = ImapFolderInfo(account=generic_account,
uidvalidity=1,
uidnext=1)
db.session.commit()
folder_sync_engine = FolderSyncEngine(generic_account.id,
generic_account.namespace.id,
inbox_folder.name,
generic_account.email_address,
'custom',
BoundedSemaphore(1))
folder_sync_engine.poll_frequency = 0
folder_sync_engine.poll_impl()
saved_uids = db.session.query(ImapUid).filter(
ImapUid.folder_id == inbox_folder.id)
assert {u.msg_uid for u in saved_uids} == set(uid_dict)
开发者ID:GordonYip,项目名称:sync-engine,代码行数:20,代码来源:test_folder_sync.py
示例12: test_change_flags
def test_change_flags(db, default_account, message, folder, mock_imapclient):
mock_imapclient.add_folder_data(folder.name, {})
mock_imapclient.add_flags = mock.Mock()
mock_imapclient.remove_flags = mock.Mock()
add_fake_imapuid(db.session, default_account.id, message, folder, 22)
with writable_connection_pool(default_account.id).get() as crispin_client:
mark_unread(crispin_client, default_account.id, message.id,
{'unread': False})
mock_imapclient.add_flags.assert_called_with([22], ['\\Seen'], silent=True)
mark_unread(crispin_client, default_account.id, message.id,
{'unread': True})
mock_imapclient.remove_flags.assert_called_with([22], ['\\Seen'], silent=True)
mark_starred(crispin_client, default_account.id, message.id,
{'starred': True})
mock_imapclient.add_flags.assert_called_with([22], ['\\Flagged'], silent=True)
mark_starred(crispin_client, default_account.id, message.id,
{'starred': False})
mock_imapclient.remove_flags.assert_called_with([22], ['\\Flagged'], silent=True)
开发者ID:brandoncc,项目名称:sync-engine,代码行数:21,代码来源:test_actions.py
示例13: test_handle_uidinvalid_loops
def test_handle_uidinvalid_loops(db, generic_account, inbox_folder,
mock_imapclient, monkeypatch):
import inbox.mailsync.backends.imap.generic as generic_import
mock_imapclient.uidvalidity = 1
# We're using a list here because of weird monkeypatching shenanigans.
uidinvalid_count = []
def fake_poll_function(self):
uidinvalid_count.append(1)
raise UidInvalid
monkeypatch.setattr("inbox.mailsync.backends.imap.generic.FolderSyncEngine.poll",
fake_poll_function)
uid_dict = uids.example()
mock_imapclient.add_folder_data(inbox_folder.name, uid_dict)
inbox_folder.imapfolderinfo = ImapFolderInfo(account=generic_account,
uidvalidity=1,
uidnext=1)
db.session.commit()
folder_sync_engine = generic_import.FolderSyncEngine(generic_account.id,
generic_account.namespace.id,
inbox_folder.name,
generic_account.email_address,
'custom',
BoundedSemaphore(1),
mock.Mock())
folder_sync_engine.state = 'poll'
db.session.expunge(inbox_folder.imapsyncstatus)
with pytest.raises(MailsyncDone):
folder_sync_engine._run_impl()
assert len(uidinvalid_count) == MAX_UIDINVALID_RESYNCS + 1
开发者ID:tahajahangir,项目名称:sync-engine,代码行数:38,代码来源:test_folder_sync.py
示例14: test_initial_sync
def test_initial_sync(db, generic_account, inbox_folder, mock_imapclient):
# We should really be using hypothesis.given() to generate lots of
# different uid sets, but it's not trivial to ensure that no state is
# carried over between runs. This will have to suffice for now as a way to
# at least establish coverage.
uid_dict = uids.example()
mock_imapclient.add_folder_data(inbox_folder.name, uid_dict)
folder_sync_engine = FolderSyncEngine(generic_account.id,
generic_account.namespace.id,
inbox_folder.name,
generic_account.email_address,
'custom',
BoundedSemaphore(1))
folder_sync_engine.initial_sync()
saved_uids = db.session.query(ImapUid).filter(
ImapUid.folder_id == inbox_folder.id)
assert {u.msg_uid for u in saved_uids} == set(uid_dict)
saved_message_hashes = {u.message.data_sha256 for u in saved_uids}
assert saved_message_hashes == {sha256(v['BODY[]']).hexdigest() for v in
uid_dict.values()}
开发者ID:GordonYip,项目名称:sync-engine,代码行数:23,代码来源:test_folder_sync.py
示例15: test_handle_uidinvalid
def test_handle_uidinvalid(db, generic_account, inbox_folder, mock_imapclient):
uid_dict = uids.example()
mock_imapclient.add_folder_data(inbox_folder.name, uid_dict)
inbox_folder.imapfolderinfo = ImapFolderInfo(account=generic_account,
uidvalidity=1,
uidnext=1)
db.session.commit()
folder_sync_engine = FolderSyncEngine(generic_account.id,
generic_account.namespace.id,
inbox_folder.name,
generic_account.email_address,
'custom',
BoundedSemaphore(1))
folder_sync_engine.initial_sync()
mock_imapclient.uidvalidity = 2
with pytest.raises(UidInvalid):
folder_sync_engine.poll_impl()
new_state = folder_sync_engine.resync_uids()
assert new_state == 'initial'
assert db.session.query(ImapUid).filter(
ImapUid.folder_id == inbox_folder.id).all() == []
开发者ID:GordonYip,项目名称:sync-engine,代码行数:23,代码来源:test_folder_sync.py
注:本文中的tests.imap.data.mock_imapclient.add_folder_data函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论