本文整理汇总了Python中pykolab.auth.Auth类的典型用法代码示例。如果您正苦于以下问题:Python Auth类的具体用法?Python Auth怎么用?Python Auth使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了Auth类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: get_resource_invitationpolicy
def get_resource_invitationpolicy(resource):
"""
Get this resource's kolabinvitationpolicy configuration
"""
global auth
if not resource.has_key('kolabinvitationpolicy') or resource['kolabinvitationpolicy'] is None:
if not auth:
auth = Auth()
auth.connect()
# get kolabinvitationpolicy attribute from collection
collections = auth.search_entry_by_attribute('uniquemember', resource['dn'])
if not isinstance(collections, list):
collections = [ (collections['dn'],collections) ]
log.debug("Check collections %r for kolabinvitationpolicy attributes" % (collections), level=9)
for dn,collection in collections:
# ldap.search_entry_by_attribute() doesn't return the attributes lower-cased
if collection.has_key('kolabInvitationPolicy'):
collection['kolabinvitationpolicy'] = collection['kolabInvitationPolicy']
if collection.has_key('kolabinvitationpolicy'):
parse_kolabinvitationpolicy(collection)
resource['kolabinvitationpolicy'] = collection['kolabinvitationpolicy']
break
return resource['kolabinvitationpolicy'] if resource.has_key('kolabinvitationpolicy') else None
开发者ID:tpokorra,项目名称:pykolab,代码行数:29,代码来源:module_resources.py
示例2: execute
def execute(*args, **kw):
"""
List deleted mailboxes
"""
imap = IMAP()
imap.connect()
auth = Auth()
auth.connect()
domains = auth.list_domains()
folders = []
for domain in list(set(domains.keys())):
folders.extend(imap.lm("DELETED/*@%s" % (domain)))
folders.extend(imap.lm("DELETED/*"))
print "Deleted folders:"
for folder in folders:
mbox_parts = imap.parse_mailfolder(folder)
if not conf.raw:
print "%s (Deleted at %s)" % (imap_utf7.decode(folder).encode('utf-8'), datetime.datetime.fromtimestamp(int(mbox_parts['hex_timestamp'], 16)))
else:
print "%s (Deleted at %s)" % (folder, datetime.datetime.fromtimestamp(int(mbox_parts['hex_timestamp'], 16)))
开发者ID:tpokorra,项目名称:pykolab,代码行数:27,代码来源:cmd_list_deleted_mailboxes.py
示例3: test_002_resource_collection
def test_002_resource_collection(self):
auth = Auth()
auth.connect()
attrs = auth.get_entry_attributes(None, self.cars['dn'], ['*'])
self.assertIn('groupofuniquenames', attrs['objectclass'])
self.assertEqual(len(attrs['uniquemember']), 3)
self.assertEqual(attrs['kolabinvitationpolicy'], 'ACT_ACCEPT')
开发者ID:tpokorra,项目名称:pykolab,代码行数:7,代码来源:test_005_resource_add.py
示例4: execute
def execute(*args, **kw):
"""
List deleted mailboxes
"""
try:
domain = conf.cli_args.pop(0)
except:
domain = utils.ask_question(_("Domain"))
imap = IMAP()
imap.connect()
auth = Auth()
auth.connect()
domains = auth.list_domains()
folders = []
for primary,secondaries in domains:
if not domain == primary and not domain in secondaries:
continue
folders.extend(imap.lm("user/%%@%s" % (primary)))
for secondary in secondaries:
folders.extend(imap.lm("user/%%@%s" % (secondary)))
print "Deleted folders:"
for folder in folders:
if not conf.raw:
print imap_utf7.decode(folder)
else:
print folder
开发者ID:tpokorra,项目名称:pykolab,代码行数:34,代码来源:cmd_list_domain_mailboxes.py
示例5: test_002_user_recipient_policy_duplicate
def test_002_user_recipient_policy_duplicate(self):
from tests.functional.user_add import user_add
user = {
'local': 'jane.doe',
'domain': 'example.org'
}
user_add("Jane", "Doe")
time.sleep(3)
auth = Auth()
auth.connect()
recipient = auth.find_recipient("%(local)[email protected]%(domain)s" % (user))
if hasattr(self, 'assertIsInstance'):
self.assertIsInstance(recipient, str)
self.assertEqual(recipient, "uid=doe2,ou=People,dc=example,dc=org")
result = wap_client.user_info(recipient)
if not result.has_key('mailhost'):
from tests.functional.synchronize import synchronize_once
synchronize_once()
result = wap_client.user_info(recipient)
self.assertEqual(result['mail'], '[email protected]')
self.assertEqual(result['alias'], ['[email protected]', '[email protected]'])
开发者ID:tpokorra,项目名称:pykolab,代码行数:28,代码来源:test_001_user_sync.py
示例6: test_001_two_johns
def test_001_two_johns(self):
from tests.functional.user_add import user_add
user_add("John", "Doe")
user_add("John", "Doe")
time.sleep(3)
auth = Auth()
auth.connect()
max_tries = 20
while max_tries > 0:
recipient1 = auth.find_recipient('[email protected]')
recipient2 = auth.find_recipient('[email protected]')
if not recipient1 or not recipient2:
time.sleep(1)
max_tries -= 1
else:
break
imap = IMAP()
imap.connect()
folders = imap.lm('user/[email protected]')
self.assertEqual(len(folders), 1, "No INBOX found for first John")
folders = imap.lm('user/[email protected]')
self.assertEqual(len(folders), 1, "No INBOX found for second John")
开发者ID:tpokorra,项目名称:pykolab,代码行数:29,代码来源:test_003_two_johns.py
示例7: test_001_resource_created
def test_001_resource_created(self):
auth = Auth()
auth.connect()
resource = auth.find_resource(self.audi['mail'])
self.assertEqual(resource, self.audi['dn'])
collection = auth.find_resource(self.cars['mail'])
self.assertEqual(collection, self.cars['dn'])
开发者ID:tpokorra,项目名称:pykolab,代码行数:8,代码来源:test_008_resource_add.py
示例8: do_sync
def do_sync(self):
domain_auth = {}
pid = os.getpid()
primary_domain = conf.get('kolab', 'primary_domain')
while 1:
primary_auth = Auth(primary_domain)
log.debug(_("Listing domains..."), level=5)
start = time.time()
try:
domains = primary_auth.list_domains()
except:
time.sleep(60)
continue
# domains now is a list of tuples, we want the primary_domains
primary_domains = []
for primary_domain, secondary_domains in domains:
primary_domains.append(primary_domain)
# Now we can check if any changes happened.
added_domains = []
removed_domains = []
all_domains = set(primary_domains + domain_auth.keys())
for domain in all_domains:
if domain in domain_auth.keys() and domain in primary_domains:
if not domain_auth[domain].is_alive():
domain_auth[domain].terminate()
added_domains.append(domain)
else:
continue
elif domain in domain_auth.keys():
removed_domains.append(domain)
else:
added_domains.append(domain)
if len(removed_domains) == 0 and len(added_domains) == 0:
time.sleep(600)
log.debug(
_("added domains: %r, removed domains: %r") % (
added_domains,
removed_domains
),
level=8
)
for domain in added_domains:
domain_auth[domain] = Process(domain)
domain_auth[domain].start()
开发者ID:detrout,项目名称:pykolab,代码行数:57,代码来源:__init__.py
示例9: execute
def execute(*args, **kw):
"""
Synchronize or display changes
"""
imap = IMAP()
if not conf.connect_server == None:
imap.connect(server=conf.connect_server)
else:
imap.connect()
auth = Auth()
auth.connect()
domains = auth.list_domains()
folders = imap.lm()
imap_domains_not_domains = []
for folder in folders:
if len(folder.split('@')) > 1 and not folder.startswith('DELETED'):
_folder_domain = folder.split('@')[-1]
if not _folder_domain in list(set(domains.keys() + domains.values())):
imap_domains_not_domains.append(folder.split('@')[-1])
imap_domains_not_domains = list(set(imap_domains_not_domains))
log.debug(_("Domains in IMAP not in LDAP: %r") % (imap_domains_not_domains), level=8)
if len(imap_domains_not_domains) > 0:
for domain in imap_domains_not_domains:
folders = []
folders.extend(imap.lm('shared/%%@%s' % (domain)))
folders.extend(imap.lm('user/%%@%s' % (domain)))
for folder in folders:
if conf.delete:
if conf.dry_run:
if not folder.split('/')[0] == 'shared':
log.warning(_("No recipients for '%s' (would have deleted the mailbox if not for --dry-run)!") % ('/'.join(folder.split('/')[1:])))
else:
continue
else:
if not '/'.join(folder.split('/')[0]) == 'shared':
log.info(_("Deleting mailbox '%s' because it has no recipients") % (folder))
try:
imap.dm(folder)
except Exception, errmsg:
log.error(_("An error occurred removing mailbox %r: %r") % (folder, errmsg))
else:
log.info(_("Not automatically deleting shared folder '%s'") % (folder))
else:
log.warning(_("No recipients for '%s' (use --delete to delete)!") % ('/'.join(folder.split('/')[1:])))
开发者ID:tpokorra,项目名称:pykolab,代码行数:56,代码来源:cmd_sync_mailhost_attrs.py
示例10: execute
def execute(*args, **kw):
try:
address = conf.cli_args.pop(0)
except:
address = utils.ask_question(_("Email Address"))
script_to_put = conf.cli_args.pop(0)
script_put_name = conf.cli_args.pop(0)
auth = Auth()
auth.connect()
user = auth.find_recipient(address)
# Get the main, default backend
backend = conf.get('kolab', 'imap_backend')
if len(address.split('@')) > 1:
domain = address.split('@')[1]
else:
domain = conf.get('kolab', 'primary_domain')
if conf.has_section(domain) and conf.has_option(domain, 'imap_backend'):
backend = conf.get(domain, 'imap_backend')
if conf.has_section(domain) and conf.has_option(domain, 'imap_uri'):
uri = conf.get(domain, 'imap_uri')
else:
uri = conf.get(backend, 'uri')
hostname = None
port = None
result = urlparse(uri)
if hasattr(result, 'hostname'):
hostname = result.hostname
else:
scheme = uri.split(':')[0]
(hostname, port) = uri.split('/')[2].split(':')
port = 4190
# Get the credentials
admin_login = conf.get(backend, 'admin_login')
admin_password = conf.get(backend, 'admin_password')
import sievelib.managesieve
sieveclient = sievelib.managesieve.Client(hostname, port, False)
sieveclient.connect(None, None, True)
sieveclient._plain_authentication(admin_login, admin_password, address)
sieveclient.authenticated = True
sieveclient.putscript(script_put_name, open(script_to_put, "r").read())
开发者ID:tpokorra,项目名称:pykolab,代码行数:56,代码来源:cmd_put.py
示例11: test_002_fr_FR_user_recipient_policy
def test_002_fr_FR_user_recipient_policy(self):
auth = Auth()
auth.connect()
recipient = auth.find_recipient("%(local)[email protected]%(domain)s" % (self.user))
if hasattr(self, 'assertIsInstance'):
self.assertIsInstance(recipient, str)
self.assertEqual(recipient, "uid=fuentes,ou=People,dc=example,dc=org")
result = wap_client.user_info(recipient)
self.assertEqual(result['mail'], '[email protected]')
self.assertEqual(sorted(result['alias']), ['[email protected]', '[email protected]'])
开发者ID:tpokorra,项目名称:pykolab,代码行数:13,代码来源:test_004_user_add_es_ES.py
示例12: test_001_user_recipient_policy
def test_001_user_recipient_policy(self):
auth = Auth()
auth.connect()
recipient = auth.find_recipient("%(local)[email protected]%(domain)s" % (self.user))
if hasattr(self, 'assertIsInstance'):
self.assertIsInstance(recipient, str)
self.assertEqual(recipient, "uid=doe,ou=People,dc=example,dc=org")
result = wap_client.user_info(recipient)
self.assertEqual(result['mail'], '[email protected]')
self.assertEqual(result['alias'], ['[email protected]', '[email protected]'])
开发者ID:tpokorra,项目名称:pykolab,代码行数:13,代码来源:test_001_user_sync.py
示例13: LDAPDataHandler
class LDAPDataHandler(object):
"""
Collector handler to provide user data from LDAP
"""
def __init__(self, *args, **kw):
# load pykolab conf
self.pykolab_conf = pykolab.getConf()
if not hasattr(self.pykolab_conf, 'defaults'):
self.pykolab_conf.finalize_conf(fatal=False)
self.ldap = Auth()
self.ldap.connect()
def register(self, callback):
interests = {
'GETUSERDATA': { 'callback': self.get_user_data }
}
callback(interests)
def get_user_data(self, notification):
notification = json.loads(notification)
log.debug("GETUSERDATA for %r" % (notification), level=9)
if notification.has_key('user'):
try:
user_dn = self.ldap.find_user_dn(notification['user'], True)
log.debug("User DN for %s: %r" % (notification['user'], user_dn), level=8)
except Exception, e:
log.error("LDAP connection error: %r", e)
user_dn = None
if user_dn:
unique_attr = self.pykolab_conf.get('ldap', 'unique_attribute', 'nsuniqueid')
user_rec = self.ldap.get_entry_attributes(None, user_dn, [unique_attr, 'cn'])
log.debug("User attributes: %r" % (user_rec), level=8)
if user_rec and user_rec.has_key(unique_attr):
user_rec['dn'] = user_dn
user_rec['id'] = user_rec[unique_attr]
del user_rec[unique_attr]
else:
user_rec = None
notification['user_data'] = user_rec
return json.dumps(notification)
开发者ID:kolab-groupware,项目名称:bonnie,代码行数:48,代码来源:ldapdata.py
示例14: test_001_default
def test_001_default(self):
from tests.functional.user_add import user_add
user_add("John", "Doe")
from tests.functional.synchronize import synchronize_once
synchronize_once()
auth = Auth()
auth.connect()
user = auth.find_recipient('[email protected]')
user_info = wap_client.user_info(user)
self.assertEqual(user_info['uid'], "doe")
from tests.functional.purge_users import purge_users
purge_users()
开发者ID:tpokorra,项目名称:pykolab,代码行数:17,代码来源:test_007_policy_uid.py
示例15: __init__
def __init__(self, *args, **kw):
# load pykolab conf
self.pykolab_conf = pykolab.getConf()
if not hasattr(self.pykolab_conf, 'defaults'):
self.pykolab_conf.finalize_conf(fatal=False)
self.ldap = Auth()
self.ldap.connect()
开发者ID:kolab-groupware,项目名称:bonnie,代码行数:8,代码来源:ldapdata.py
示例16: execute
def execute(*args, **kw):
global imap, pool
auth = Auth()
log.debug(_("Listing domains..."), level=5)
start_time = time.time()
domains = auth.list_domains()
end_time = time.time()
log.debug(
_("Found %d domains in %d seconds") % (
len(domains),
(end_time-start_time)
),
level=8
)
if version.StrictVersion(sys.version[:3]) >= version.StrictVersion("2.7"):
pool = multiprocessing.Pool(conf.threads, worker_process, (), 1)
else:
pool = multiprocessing.Pool(conf.threads, worker_process, ())
for primary_domain in list(set(domains.values())):
log.debug(_("Running for domain %s") % (primary_domain), level=8)
auth = Auth(primary_domain)
auth.connect(primary_domain)
start_time = time.time()
auth.synchronize(mode='_paged_search', callback=queue_add)
end_time = time.time()
log.info(_("Synchronizing users for %s took %d seconds")
% (primary_domain, (end_time-start_time))
)
while not pool._taskqueue.empty():
time.sleep(1)
开发者ID:tpokorra,项目名称:pykolab,代码行数:35,代码来源:cmd_sync.py
示例17: expand_mydomains
def expand_mydomains():
"""
Return a list of my domains.
"""
auth = Auth()
auth.connect()
mydomains = []
_mydomains = auth.list_domains()
for primary, secondaries in _mydomains:
mydomains.append(primary)
for secondary in secondaries:
mydomains.append(secondary)
return mydomains
开发者ID:detrout,项目名称:pykolab,代码行数:18,代码来源:kolab_smtp_access_policy.py
示例18: test_003_givenname_fc_dot_surname
def test_003_givenname_fc_dot_surname(self):
self.set('example.org', 'policy_uid', "'%(givenname)s'[0:1].%(surname)s")
from tests.functional.user_add import user_add
user_add("John", "Doe")
from tests.functional.synchronize import synchronize_once
synchronize_once()
auth = Auth()
auth.connect()
user = auth.find_recipient('[email protected]')
user_info = wap_client.user_info(user)
self.assertEqual(user_info['uid'], "J.Doe")
from tests.functional.purge_users import purge_users
purge_users()
开发者ID:tpokorra,项目名称:pykolab,代码行数:19,代码来源:test_007_policy_uid.py
示例19: execute
def execute(*args, **kw):
"""
List deleted mailboxes
"""
imap = IMAP()
imap.connect()
auth = Auth()
auth.connect()
domains = auth.list_domains()
folders = []
for domain in domains.keys():
print "%s: %d" % (domain,len(imap.lm("user/%%@%s" % (domain))))
null_realm = len(imap.lm("user/%%"))
if null_realm > 0:
print "null: %d" % (null_realm)
开发者ID:tpokorra,项目名称:pykolab,代码行数:20,代码来源:cmd_count_domain_mailboxes.py
示例20: synchronize
def synchronize(self, domain):
sync_interval = conf.get('kolab', 'sync_interval')
if sync_interval == None or sync_interval == 0:
sync_interval = 300
else:
sync_interval = (int)(sync_interval)
while True:
try:
auth = Auth(domain)
auth.connect(domain)
auth.synchronize()
time.sleep(sync_interval)
except KeyboardInterrupt:
break
except Exception, errmsg:
log.error(_("Error in process %r, terminating: %r") % (self.name, errmsg))
import traceback
traceback.print_exc()
time.sleep(1)
开发者ID:detrout,项目名称:pykolab,代码行数:21,代码来源:process.py
注:本文中的pykolab.auth.Auth类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论