• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Python auth.Auth类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中pykolab.auth.Auth的典型用法代码示例。如果您正苦于以下问题:Python Auth类的具体用法?Python Auth怎么用?Python Auth使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



在下文中一共展示了Auth类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: get_resource_invitationpolicy

def get_resource_invitationpolicy(resource):
    """
        Get this resource's kolabinvitationpolicy configuration
    """
    global auth

    if not resource.has_key('kolabinvitationpolicy') or resource['kolabinvitationpolicy'] is None:
        if not auth:
            auth = Auth()
            auth.connect()

        # get kolabinvitationpolicy attribute from collection
        collections = auth.search_entry_by_attribute('uniquemember', resource['dn'])
        if not isinstance(collections, list):
            collections = [ (collections['dn'],collections) ]

        log.debug("Check collections %r for kolabinvitationpolicy attributes" % (collections), level=9)

        for dn,collection in collections:
            # ldap.search_entry_by_attribute() doesn't return the attributes lower-cased
            if collection.has_key('kolabInvitationPolicy'):
                collection['kolabinvitationpolicy'] = collection['kolabInvitationPolicy']

            if collection.has_key('kolabinvitationpolicy'):
                parse_kolabinvitationpolicy(collection)
                resource['kolabinvitationpolicy'] = collection['kolabinvitationpolicy']
                break

    return resource['kolabinvitationpolicy'] if resource.has_key('kolabinvitationpolicy') else None
开发者ID:tpokorra,项目名称:pykolab,代码行数:29,代码来源:module_resources.py


示例2: execute

def execute(*args, **kw):
    """
        List deleted mailboxes
    """
    imap = IMAP()
    imap.connect()

    auth = Auth()
    auth.connect()

    domains = auth.list_domains()

    folders = []
    for domain in list(set(domains.keys())):
        folders.extend(imap.lm("DELETED/*@%s" % (domain)))

    folders.extend(imap.lm("DELETED/*"))

    print "Deleted folders:"

    for folder in folders:
        mbox_parts = imap.parse_mailfolder(folder)

        if not conf.raw:
            print "%s (Deleted at %s)" % (imap_utf7.decode(folder).encode('utf-8'), datetime.datetime.fromtimestamp(int(mbox_parts['hex_timestamp'], 16)))
        else:
            print "%s (Deleted at %s)" % (folder, datetime.datetime.fromtimestamp(int(mbox_parts['hex_timestamp'], 16)))
开发者ID:tpokorra,项目名称:pykolab,代码行数:27,代码来源:cmd_list_deleted_mailboxes.py


示例3: test_002_resource_collection

 def test_002_resource_collection(self):
     auth = Auth()
     auth.connect()
     attrs = auth.get_entry_attributes(None, self.cars['dn'], ['*'])
     self.assertIn('groupofuniquenames', attrs['objectclass'])
     self.assertEqual(len(attrs['uniquemember']), 3)
     self.assertEqual(attrs['kolabinvitationpolicy'], 'ACT_ACCEPT')
开发者ID:tpokorra,项目名称:pykolab,代码行数:7,代码来源:test_005_resource_add.py


示例4: execute

def execute(*args, **kw):
    """
        List deleted mailboxes
    """

    try:
        domain = conf.cli_args.pop(0)
    except:
        domain = utils.ask_question(_("Domain"))

    imap = IMAP()
    imap.connect()

    auth = Auth()
    auth.connect()

    domains = auth.list_domains()

    folders = []
    for primary,secondaries in domains:
        if not domain == primary and not domain in secondaries:
            continue

        folders.extend(imap.lm("user/%%@%s" % (primary)))
        for secondary in secondaries:
            folders.extend(imap.lm("user/%%@%s" % (secondary)))

    print "Deleted folders:"

    for folder in folders:
        if not conf.raw:
            print imap_utf7.decode(folder)
        else:
            print folder
开发者ID:tpokorra,项目名称:pykolab,代码行数:34,代码来源:cmd_list_domain_mailboxes.py


示例5: test_002_user_recipient_policy_duplicate

    def test_002_user_recipient_policy_duplicate(self):
        from tests.functional.user_add import user_add
        user = {
                'local': 'jane.doe',
                'domain': 'example.org'
            }
        user_add("Jane", "Doe")

        time.sleep(3)

        auth = Auth()
        auth.connect()
        recipient = auth.find_recipient("%(local)[email protected]%(domain)s" % (user))
        if hasattr(self, 'assertIsInstance'):
            self.assertIsInstance(recipient, str)

        self.assertEqual(recipient, "uid=doe2,ou=People,dc=example,dc=org")

        result = wap_client.user_info(recipient)

        if not result.has_key('mailhost'):
            from tests.functional.synchronize import synchronize_once
            synchronize_once()

        result = wap_client.user_info(recipient)

        self.assertEqual(result['mail'], '[email protected]')
        self.assertEqual(result['alias'], ['[email protected]', '[email protected]'])
开发者ID:tpokorra,项目名称:pykolab,代码行数:28,代码来源:test_001_user_sync.py


示例6: test_001_two_johns

    def test_001_two_johns(self):
        from tests.functional.user_add import user_add
        user_add("John", "Doe")
        user_add("John", "Doe")

        time.sleep(3)

        auth = Auth()
        auth.connect()

        max_tries = 20
        while max_tries > 0:
            recipient1 = auth.find_recipient('[email protected]')
            recipient2 = auth.find_recipient('[email protected]')

            if not recipient1 or not recipient2:
                time.sleep(1)
                max_tries -= 1
            else:
                break

        imap = IMAP()
        imap.connect()

        folders = imap.lm('user/[email protected]')
        self.assertEqual(len(folders), 1, "No INBOX found for first John")

        folders = imap.lm('user/[email protected]')
        self.assertEqual(len(folders), 1, "No INBOX found for second John")
开发者ID:tpokorra,项目名称:pykolab,代码行数:29,代码来源:test_003_two_johns.py


示例7: test_001_resource_created

    def test_001_resource_created(self):
        auth = Auth()
        auth.connect()
        resource = auth.find_resource(self.audi['mail'])
        self.assertEqual(resource, self.audi['dn'])

        collection = auth.find_resource(self.cars['mail'])
        self.assertEqual(collection, self.cars['dn'])
开发者ID:tpokorra,项目名称:pykolab,代码行数:8,代码来源:test_008_resource_add.py


示例8: do_sync

    def do_sync(self):
        domain_auth = {}

        pid = os.getpid()

        primary_domain = conf.get('kolab', 'primary_domain')

        while 1:
            primary_auth = Auth(primary_domain)

            log.debug(_("Listing domains..."), level=5)

            start = time.time()

            try:
                domains = primary_auth.list_domains()
            except:
                time.sleep(60)
                continue

            # domains now is a list of tuples, we want the primary_domains
            primary_domains = []
            for primary_domain, secondary_domains in domains:
                primary_domains.append(primary_domain)

            # Now we can check if any changes happened.
            added_domains = []
            removed_domains = []

            all_domains = set(primary_domains + domain_auth.keys())

            for domain in all_domains:
                if domain in domain_auth.keys() and domain in primary_domains:
                    if not domain_auth[domain].is_alive():
                        domain_auth[domain].terminate()
                        added_domains.append(domain)
                    else:
                        continue
                elif domain in domain_auth.keys():
                    removed_domains.append(domain)
                else:
                    added_domains.append(domain)

            if len(removed_domains) == 0 and len(added_domains) == 0:
                time.sleep(600)

            log.debug(
                    _("added domains: %r, removed domains: %r") % (
                            added_domains,
                            removed_domains
                        ),
                    level=8
                )

            for domain in added_domains:
                domain_auth[domain] = Process(domain)
                domain_auth[domain].start()
开发者ID:detrout,项目名称:pykolab,代码行数:57,代码来源:__init__.py


示例9: execute

def execute(*args, **kw):
    """
        Synchronize or display changes
    """

    imap = IMAP()

    if not conf.connect_server == None:
        imap.connect(server=conf.connect_server)
    else:
        imap.connect()

    auth = Auth()
    auth.connect()

    domains = auth.list_domains()

    folders = imap.lm()

    imap_domains_not_domains = []

    for folder in folders:
        if len(folder.split('@')) > 1 and not folder.startswith('DELETED'):
            _folder_domain = folder.split('@')[-1]
            if not _folder_domain in list(set(domains.keys() + domains.values())):
                imap_domains_not_domains.append(folder.split('@')[-1])

    imap_domains_not_domains = list(set(imap_domains_not_domains))

    log.debug(_("Domains in IMAP not in LDAP: %r") % (imap_domains_not_domains), level=8)

    if len(imap_domains_not_domains) > 0:
        for domain in imap_domains_not_domains:
            folders = []

            folders.extend(imap.lm('shared/%%@%s' % (domain)))
            folders.extend(imap.lm('user/%%@%s' % (domain)))

            for folder in folders:
                if conf.delete:
                    if conf.dry_run:
                        if not folder.split('/')[0] == 'shared':
                            log.warning(_("No recipients for '%s' (would have deleted the mailbox if not for --dry-run)!") % ('/'.join(folder.split('/')[1:])))
                        else:
                            continue
                    else:
                        if not '/'.join(folder.split('/')[0]) == 'shared':
                            log.info(_("Deleting mailbox '%s' because it has no recipients") % (folder))
                            try:
                                imap.dm(folder)
                            except Exception, errmsg:
                                log.error(_("An error occurred removing mailbox %r: %r") % (folder, errmsg))
                        else:
                            log.info(_("Not automatically deleting shared folder '%s'") % (folder))
                else:
                    log.warning(_("No recipients for '%s' (use --delete to delete)!") % ('/'.join(folder.split('/')[1:])))
开发者ID:tpokorra,项目名称:pykolab,代码行数:56,代码来源:cmd_sync_mailhost_attrs.py


示例10: execute

def execute(*args, **kw):
    try:
        address = conf.cli_args.pop(0)
    except:
        address = utils.ask_question(_("Email Address"))

    script_to_put = conf.cli_args.pop(0)
    
    script_put_name = conf.cli_args.pop(0)

    auth = Auth()
    auth.connect()

    user = auth.find_recipient(address)

    # Get the main, default backend
    backend = conf.get('kolab', 'imap_backend')

    if len(address.split('@')) > 1:
        domain = address.split('@')[1]
    else:
        domain = conf.get('kolab', 'primary_domain')

    if conf.has_section(domain) and conf.has_option(domain, 'imap_backend'):
        backend = conf.get(domain, 'imap_backend')

    if conf.has_section(domain) and conf.has_option(domain, 'imap_uri'):
        uri = conf.get(domain, 'imap_uri')
    else:
        uri = conf.get(backend, 'uri')

    hostname = None
    port = None

    result = urlparse(uri)

    if hasattr(result, 'hostname'):
        hostname = result.hostname
    else:
        scheme = uri.split(':')[0]
        (hostname, port) = uri.split('/')[2].split(':')

    port = 4190

    # Get the credentials
    admin_login = conf.get(backend, 'admin_login')
    admin_password = conf.get(backend, 'admin_password')

    import sievelib.managesieve
 
    sieveclient = sievelib.managesieve.Client(hostname, port, False)
    sieveclient.connect(None, None, True)
    sieveclient._plain_authentication(admin_login, admin_password, address)
    sieveclient.authenticated = True

    sieveclient.putscript(script_put_name, open(script_to_put, "r").read())
开发者ID:tpokorra,项目名称:pykolab,代码行数:56,代码来源:cmd_put.py


示例11: test_002_fr_FR_user_recipient_policy

    def test_002_fr_FR_user_recipient_policy(self):
        auth = Auth()
        auth.connect()
        recipient = auth.find_recipient("%(local)[email protected]%(domain)s" % (self.user))
        if hasattr(self, 'assertIsInstance'):
            self.assertIsInstance(recipient, str)

        self.assertEqual(recipient, "uid=fuentes,ou=People,dc=example,dc=org")

        result = wap_client.user_info(recipient)

        self.assertEqual(result['mail'], '[email protected]')
        self.assertEqual(sorted(result['alias']), ['[email protected]', '[email protected]'])
开发者ID:tpokorra,项目名称:pykolab,代码行数:13,代码来源:test_004_user_add_es_ES.py


示例12: test_001_user_recipient_policy

    def test_001_user_recipient_policy(self):
        auth = Auth()
        auth.connect()
        recipient = auth.find_recipient("%(local)[email protected]%(domain)s" % (self.user))
        if hasattr(self, 'assertIsInstance'):
            self.assertIsInstance(recipient, str)

        self.assertEqual(recipient, "uid=doe,ou=People,dc=example,dc=org")

        result = wap_client.user_info(recipient)

        self.assertEqual(result['mail'], '[email protected]')
        self.assertEqual(result['alias'], ['[email protected]', '[email protected]'])
开发者ID:tpokorra,项目名称:pykolab,代码行数:13,代码来源:test_001_user_sync.py


示例13: LDAPDataHandler

class LDAPDataHandler(object):
    """
        Collector handler to provide user data from LDAP
    """

    def __init__(self, *args, **kw):
        # load pykolab conf
        self.pykolab_conf = pykolab.getConf()
        if not hasattr(self.pykolab_conf, 'defaults'):
            self.pykolab_conf.finalize_conf(fatal=False)

        self.ldap = Auth()
        self.ldap.connect()

    def register(self, callback):
        interests = {
                'GETUSERDATA': { 'callback': self.get_user_data }
            }

        callback(interests)

    def get_user_data(self, notification):
        notification = json.loads(notification)
        log.debug("GETUSERDATA for %r" % (notification), level=9)

        if notification.has_key('user'):
            try:
                user_dn = self.ldap.find_user_dn(notification['user'], True)
                log.debug("User DN for %s: %r" % (notification['user'], user_dn), level=8)
            except Exception, e:
                log.error("LDAP connection error: %r", e)
                user_dn = None

            if user_dn:
                unique_attr = self.pykolab_conf.get('ldap', 'unique_attribute', 'nsuniqueid')
                user_rec = self.ldap.get_entry_attributes(None, user_dn, [unique_attr, 'cn'])
                log.debug("User attributes: %r" % (user_rec), level=8)

                if user_rec and user_rec.has_key(unique_attr):
                    user_rec['dn'] = user_dn
                    user_rec['id'] = user_rec[unique_attr]
                    del user_rec[unique_attr]
            else:
                user_rec = None

            notification['user_data'] = user_rec

        return json.dumps(notification)
开发者ID:kolab-groupware,项目名称:bonnie,代码行数:48,代码来源:ldapdata.py


示例14: test_001_default

    def test_001_default(self):
        from tests.functional.user_add import user_add
        user_add("John", "Doe")
        from tests.functional.synchronize import synchronize_once
        synchronize_once()

        auth = Auth()
        auth.connect()

        user = auth.find_recipient('[email protected]')

        user_info = wap_client.user_info(user)

        self.assertEqual(user_info['uid'], "doe")

        from tests.functional.purge_users import purge_users
        purge_users()
开发者ID:tpokorra,项目名称:pykolab,代码行数:17,代码来源:test_007_policy_uid.py


示例15: __init__

    def __init__(self, *args, **kw):
        # load pykolab conf
        self.pykolab_conf = pykolab.getConf()
        if not hasattr(self.pykolab_conf, 'defaults'):
            self.pykolab_conf.finalize_conf(fatal=False)

        self.ldap = Auth()
        self.ldap.connect()
开发者ID:kolab-groupware,项目名称:bonnie,代码行数:8,代码来源:ldapdata.py


示例16: execute

def execute(*args, **kw):
    global imap, pool

    auth = Auth()
    log.debug(_("Listing domains..."), level=5)
    start_time = time.time()
    domains = auth.list_domains()
    end_time = time.time()
    log.debug(
            _("Found %d domains in %d seconds") % (
                    len(domains),
                    (end_time-start_time)
                ),
            level=8
        )

    if version.StrictVersion(sys.version[:3]) >= version.StrictVersion("2.7"):
        pool = multiprocessing.Pool(conf.threads, worker_process, (), 1)
    else:
        pool = multiprocessing.Pool(conf.threads, worker_process, ())

    for primary_domain in list(set(domains.values())):
        log.debug(_("Running for domain %s") % (primary_domain), level=8)
        auth = Auth(primary_domain)
        auth.connect(primary_domain)
        start_time = time.time()
        auth.synchronize(mode='_paged_search', callback=queue_add)
        end_time = time.time()

        log.info(_("Synchronizing users for %s took %d seconds")
                % (primary_domain, (end_time-start_time))
            )

    while not pool._taskqueue.empty():
        time.sleep(1)
开发者ID:tpokorra,项目名称:pykolab,代码行数:35,代码来源:cmd_sync.py


示例17: expand_mydomains

def expand_mydomains():
    """
        Return a list of my domains.
    """

    auth = Auth()
    auth.connect()

    mydomains = []

    _mydomains = auth.list_domains()

    for primary, secondaries in _mydomains:
        mydomains.append(primary)
        for secondary in secondaries:
            mydomains.append(secondary)

    return mydomains
开发者ID:detrout,项目名称:pykolab,代码行数:18,代码来源:kolab_smtp_access_policy.py


示例18: test_003_givenname_fc_dot_surname

    def test_003_givenname_fc_dot_surname(self):
        self.set('example.org', 'policy_uid', "'%(givenname)s'[0:1].%(surname)s")

        from tests.functional.user_add import user_add
        user_add("John", "Doe")
        from tests.functional.synchronize import synchronize_once
        synchronize_once()

        auth = Auth()
        auth.connect()

        user = auth.find_recipient('[email protected]')

        user_info = wap_client.user_info(user)

        self.assertEqual(user_info['uid'], "J.Doe")

        from tests.functional.purge_users import purge_users
        purge_users()
开发者ID:tpokorra,项目名称:pykolab,代码行数:19,代码来源:test_007_policy_uid.py


示例19: execute

def execute(*args, **kw):
    """
        List deleted mailboxes
    """
    imap = IMAP()
    imap.connect()

    auth = Auth()
    auth.connect()

    domains = auth.list_domains()

    folders = []
    for domain in domains.keys():
        print "%s: %d" % (domain,len(imap.lm("user/%%@%s" % (domain))))

    null_realm = len(imap.lm("user/%%"))

    if null_realm > 0:
        print "null: %d" % (null_realm)
开发者ID:tpokorra,项目名称:pykolab,代码行数:20,代码来源:cmd_count_domain_mailboxes.py


示例20: synchronize

    def synchronize(self, domain):
        sync_interval = conf.get('kolab', 'sync_interval')

        if sync_interval == None or sync_interval == 0:
            sync_interval = 300
        else:
            sync_interval = (int)(sync_interval)

        while True:
            try:
                auth = Auth(domain)
                auth.connect(domain)
                auth.synchronize()
                time.sleep(sync_interval)
            except KeyboardInterrupt:
                break
            except Exception, errmsg:
                log.error(_("Error in process %r, terminating: %r") % (self.name, errmsg))
                import traceback
                traceback.print_exc()
                time.sleep(1)
开发者ID:detrout,项目名称:pykolab,代码行数:21,代码来源:process.py



注:本文中的pykolab.auth.Auth类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python imap.IMAP类代码示例发布时间:2022-05-25
下一篇:
Python pykolab.getLogger函数代码示例发布时间:2022-05-25
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap