• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Python config.get函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中pulp.server.config.config.get函数的典型用法代码示例。如果您正苦于以下问题:Python get函数的具体用法?Python get怎么用?Python get使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了get函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: start

 def start(cls):
     url = config.get('messaging', 'url')
     transport = config.get('messaging', 'transport')
     # asynchronous reply
     cls.reply_handler = ReplyHandler(url, transport)
     cls.reply_handler.start()
     log.info(_('AMQP reply handler started'))
开发者ID:VuokkoVuorinnen,项目名称:pulp,代码行数:7,代码来源:services.py


示例2: create_consumer_payload

 def create_consumer_payload(self, repo, config):
     payload = {}
     ##TODO for jdob: load the pulp.conf and make it accessible to distributor
     payload["repo_name"] = repo.display_name
     payload["server_name"] = pulp_server_config.get("server", "server_name")
     ssl_ca_path = pulp_server_config.get("security", "ssl_ca_certificate")
     if os.path.exists(ssl_ca_path):
         payload["ca_cert"] = open(pulp_server_config.get("security", "ssl_ca_certificate")).read()
     else:
         payload["ca_cert"] = config.get("https_ca")
     payload["relative_path"] = "/".join((RELATIVE_URL, self.get_repo_relative_path(repo, config)))
     payload["protocols"] = []
     if config.get("http"):
         payload["protocols"].append("http")
     if config.get("https"):
         payload["protocols"].append("https")
     payload["gpg_keys"] = []
     if config.get("gpgkey") is not None:
         payload["gpg_keys"] = config.get("gpgkey")
     payload["client_cert"] = None
     if config.get("auth_cert") and config.get("auth_ca"):
         payload["client_cert"] = config.get("auth_cert")
     else:
         # load the global auth if enabled
         repo_auth_config = load_config()
         global_cert_dir = repo_auth_config.get("repos", "global_cert_location")
         global_auth_cert = os.path.join(global_cert_dir, "pulp-global-repo.cert")
         global_auth_key = os.path.join(global_cert_dir, "pulp-global-repo.key")
         global_auth_ca = os.path.join(global_cert_dir, "pulp-global-repo.ca")
         if os.path.exists(global_auth_ca) and os.path.exists(global_auth_cert):
             payload["global_auth_cert"] = open(global_auth_cert).read()
             payload["global_auth_key"] = open(global_auth_key).read()
             payload["global_auth_ca"] = open(global_auth_ca).read()
     return payload
开发者ID:lzap,项目名称:pulp_rpm,代码行数:34,代码来源:distributor.py


示例3: init

 def init():
     url = config.get('messaging', 'url')
     transport = config.get('messaging', 'transport')
     broker = Broker(url, transport=transport)
     broker.cacert = config.get('messaging', 'cacert')
     broker.clientcert = config.get('messaging', 'clientcert')
     log.info(_('AMQP broker configured: %(b)s'), {'b': broker})
开发者ID:VuokkoVuorinnen,项目名称:pulp,代码行数:7,代码来源:services.py


示例4: _check_username_password_ldap

    def _check_username_password_ldap(self, username, password=None):
        """
        Check a username and password against the ldap server.
        Return None if the username and password are not valid

        :type username: str
        :param username: the login of the user

        :type password: str or None
        :param password: password of the user, None => do not validate the password

        :rtype: L{pulp.server.db.model.auth.User} instance or None
        :return: user corresponding to the credentials
        """

        ldap_uri = config.get('ldap', 'uri')
        ldap_base = config.get('ldap', 'base')
        ldap_tls = config.getboolean('ldap', 'tls')

        ldap_filter = None
        if config.has_option('ldap', 'filter'):
            ldap_filter = config.get('ldap', 'filter')

        ldap_server = ldap_connection.LDAPConnection(server=ldap_uri, tls=ldap_tls)
        ldap_server.connect()
        user = ldap_server.authenticate_user(ldap_base, username, password,
                                             filter=ldap_filter)
        return user
开发者ID:beav,项目名称:pulp,代码行数:28,代码来源:authentication.py


示例5: _send_email

def _send_email(subject, body, to_address):
    """
    Send a text email to one recipient

    :param subject: email subject
    :type  subject: basestring
    :param body:    text body of the email
    :type  body:    basestring
    :param to_address:  email address to send to
    :type  to_address:  basestring

    :return: None
    """
    host = config.get('email', 'host')
    port = config.getint('email', 'port')
    from_address = config.get('email', 'from')

    message = MIMEText(body)
    message['Subject'] = subject
    message['From'] = from_address
    message['To'] = to_address

    try:
        connection = smtplib.SMTP(host=host, port=port)
    except smtplib.SMTPConnectError:
        logger.error('SMTP connection failed to %s on %s' % (host, port))
        return

    try:
        connection.sendmail(from_address, to_address, message.as_string())
    except smtplib.SMTPException, e:
        try:
            logger.error('Error sending mail: %s' % e.message)
        except AttributeError:
            logger.error('SMTP error while sending mail')
开发者ID:ehelms,项目名称:pulp,代码行数:35,代码来源:mail.py


示例6: init

 def init():
     url = Services.get_url()
     broker = Broker(url)
     broker.ssl.ca_certificate = config.get('messaging', 'cacert')
     broker.ssl.client_certificate = config.get('messaging', 'clientcert')
     Domain.broker.add(broker)
     _logger.info(_('AMQP broker configured: %(b)s'), {'b': broker})
开发者ID:AndreaGiardini,项目名称:pulp,代码行数:7,代码来源:services.py


示例7: _get_streamer_url

def _get_streamer_url(catalog_entry, signing_key):
    """
    Build a URL that can be used to retrieve the file in the catalog entry from
    the lazy streamer.

    :param catalog_entry: The catalog entry to get the URL for.
    :type  catalog_entry: pulp.server.db.model.LazyCatalogEntry
    :param signing_key: The server private RSA key to sign the url with.
    :type  signing_key: M2Crypto.RSA.RSA

    :return: The signed streamer URL which corresponds to the catalog entry.
    :rtype:  str
    """
    try:
        https_retrieval = parse_bool(pulp_conf.get('lazy', 'https_retrieval'))
    except Unparsable:
        raise PulpCodedTaskException(error_codes.PLP1014, section='lazy', key='https_retrieval',
                                     reason=_('The value is not boolean'))
    retrieval_scheme = 'https' if https_retrieval else 'http'
    host = pulp_conf.get('lazy', 'redirect_host')
    port = pulp_conf.get('lazy', 'redirect_port')
    path_prefix = pulp_conf.get('lazy', 'redirect_path')
    netloc = (host + ':' + port) if port else host
    path = os.path.join(path_prefix, catalog_entry.path.lstrip('/'))
    unsigned_url = urlunsplit((retrieval_scheme, netloc, path, None, None))
    # Sign the URL for a year to avoid the URL expiring before the task completes
    return str(URL(unsigned_url).sign(signing_key, expiration=31536000))
开发者ID:maxamillion,项目名称:pulp,代码行数:27,代码来源:content.py


示例8: redirect

    def redirect(request, key):
        """
        Redirected GET request.

        :param request: The WSGI request object.
        :type request: django.core.handlers.wsgi.WSGIRequest
        :param key: A private RSA key.
        :type key: RSA.RSA
        :return: A redirect or not-found reply.
        :rtype: django.http.HttpResponse
        """
        path = os.path.realpath(request.path_info)
        scheme = request.environ['REQUEST_SCHEME']
        host = request.environ['SERVER_NAME']
        port = request.environ['SERVER_PORT']
        query = request.environ['QUERY_STRING']
        remote_ip = request.environ['REMOTE_ADDR']

        redirect_host = pulp_conf.get('lazy', 'redirect_host')
        redirect_port = pulp_conf.get('lazy', 'redirect_port')
        redirect_path = pulp_conf.get('lazy', 'redirect_path')

        redirect = ContentView.urljoin(
            scheme,
            redirect_host or host,
            redirect_port or port,
            redirect_path,
            path,
            query)

        url = URL(redirect)
        signed = url.sign(key, remote_ip=remote_ip)
        return HttpResponseRedirect(str(signed))
开发者ID:grizax,项目名称:pulp,代码行数:33,代码来源:views.py


示例9: pulp_bindings

def pulp_bindings():
    """
    Get a pulp bindings object for this node.
    Properties defined in the pulp server configuration are used
    when not defined in the node configuration.
    :return: A pulp bindings object.
    :rtype: Bindings
    """
    node_conf = read_config()
    oauth = node_conf.oauth
    verify_ssl = parse_bool(node_conf.main.verify_ssl)
    ca_path = node_conf.main.ca_path
    host = pulp_conf.get('server', 'server_name')
    key = pulp_conf.get('oauth', 'oauth_key')
    secret = pulp_conf.get('oauth', 'oauth_secret')
    connection = PulpConnection(
        host=host,
        port=443,
        oauth_key=key,
        oauth_secret=secret,
        oauth_user=oauth.user_id,
        verify_ssl=verify_ssl,
        ca_path=ca_path)
    bindings = Bindings(connection)
    return bindings
开发者ID:AndreaGiardini,项目名称:pulp,代码行数:25,代码来源:resources.py


示例10: create_consumer_payload

    def create_consumer_payload(self, repo, config, binding_config):
        """
        Called when a consumer binds to a repository using this distributor.
        This call should return a dictionary describing all data the consumer
        will need to access the repository.

        :param repo: metadata describing the repository
        :type  repo: pulp.plugins.model.Repository

        :param config: plugin configuration
        :type  config: pulp.plugins.config.PluginCallConfiguration

        :param binding_config: configuration applicable only for the specific
               consumer the payload is generated for; this will be None
               if there are no specific options for the consumer in question
        :type  binding_config: object or None

        :return: dictionary of relevant data
        :rtype:  dict
        """
        payload = dict()
        payload['repo_name'] = repo.display_name
        payload['server_name'] = pulp_server_config.get('server', 'server_name')
        ssl_ca_path = pulp_server_config.get('security', 'ssl_ca_certificate')
        try:
            payload['ca_cert'] = open(ssl_ca_path).read()
        except (OSError, IOError):
            payload['ca_cert'] = config.get('https_ca')

        payload['relative_path'] = \
            '/'.join([RELATIVE_URL, configuration.get_repo_relative_path(repo, config)])
        payload['protocols'] = []
        if config.get('http'):
            payload['protocols'].append('http')
        if config.get('https'):
            payload['protocols'].append('https')
        payload['gpg_keys'] = []
        if config.get('gpgkey') is not None:
            payload['gpg_keys'] = {'pulp.key': config.get('gpgkey')}
        payload['client_cert'] = None
        if config.get('auth_cert') and config.get('auth_ca'):
            payload['client_cert'] = config.get('auth_cert')
        else:
            # load the global auth if enabled
            repo_auth_config = configuration.load_config(constants.REPO_AUTH_CONFIG_FILE)
            global_cert_dir = repo_auth_config.get('repos', 'global_cert_location')
            global_auth_cert = os.path.join(global_cert_dir, 'pulp-global-repo.cert')
            global_auth_key = os.path.join(global_cert_dir, 'pulp-global-repo.key')
            global_auth_ca = os.path.join(global_cert_dir, 'pulp-global-repo.ca')
            if os.path.exists(global_auth_ca) and \
                    os.path.exists(global_auth_cert) and \
                    os.path.exists(global_auth_key):
                payload['global_auth_cert'] = open(global_auth_cert).read()
                payload['global_auth_key'] = open(global_auth_key).read()
                payload['global_auth_ca'] = open(global_auth_ca).read()

        return payload
开发者ID:AndreaGiardini,项目名称:pulp_rpm,代码行数:57,代码来源:distributor.py


示例11: test_init

 def test_init(self):
     Services.init()
     url = pulp_conf.get('messaging', 'url')
     ca_cert = pulp_conf.get('messaging', 'cacert')
     client_cert = pulp_conf.get('messaging', 'clientcert')
     broker = Broker(url)
     self.assertEqual(broker.url, URL(url))
     self.assertEqual(broker.cacert, ca_cert)
     self.assertEqual(broker.clientcert, client_cert)
开发者ID:preethit,项目名称:pulp-1,代码行数:9,代码来源:test_services.py


示例12: add_connector

def add_connector():
    """
    Configure and add the gofer connector used to connect
    to the message broker.  This call is idempotent.
    """
    url = get_url()
    connector = Connector(url)
    connector.ssl.ca_certificate = config.get('messaging', 'cacert')
    connector.ssl.client_certificate = config.get('messaging', 'clientcert')
    connector.add()
开发者ID:BrnoPCmaniak,项目名称:pulp,代码行数:10,代码来源:connector.py


示例13: test_init

 def test_init(self, mock_broker):
     Services.init()
     url = pulp_conf.get('messaging', 'url')
     transport = pulp_conf.get('messaging', 'transport')
     ca_cert = pulp_conf.get('messaging', 'cacert')
     client_cert = pulp_conf.get('messaging', 'clientcert')
     mock_broker.assert_called_with(url, transport=transport)
     broker = mock_broker()
     self.assertEqual(broker.cacert, ca_cert)
     self.assertEqual(broker.clientcert, client_cert)
开发者ID:VuokkoVuorinnen,项目名称:pulp,代码行数:10,代码来源:test_services.py


示例14: on_success

    def on_success(self, retval, task_id, args, kwargs):
        """
        This overrides the success handler run by the worker when the task
        executes successfully. It updates state, finish_time and traceback
        of the relevant task status for asynchronous tasks. Skip updating status
        for synchronous tasks.

        :param retval:  The return value of the task.
        :param task_id: Unique id of the executed task.
        :param args:    Original arguments for the executed task.
        :param kwargs:  Original keyword arguments for the executed task.
        """
        _logger.debug("Task successful : [%s]" % task_id)
        if kwargs.get('scheduled_call_id') is not None:
            if not isinstance(retval, AsyncResult):
                _logger.info(_('resetting consecutive failure count for schedule %(id)s')
                             % {'id': kwargs['scheduled_call_id']})
                utils.reset_failure_count(kwargs['scheduled_call_id'])
        if not self.request.called_directly:
            now = datetime.now(dateutils.utc_tz())
            finish_time = dateutils.format_iso8601_datetime(now)
            task_status = TaskStatus.objects.get(task_id=task_id)
            task_status['finish_time'] = finish_time
            task_status['result'] = retval

            # Only set the state to finished if it's not already in a complete state. This is
            # important for when the task has been canceled, so we don't move the task from canceled
            # to finished.
            if task_status['state'] not in constants.CALL_COMPLETE_STATES:
                task_status['state'] = constants.CALL_FINISHED_STATE
            if isinstance(retval, TaskResult):
                task_status['result'] = retval.return_value
                if retval.error:
                    task_status['error'] = retval.error.to_dict()
                if retval.spawned_tasks:
                    task_list = []
                    for spawned_task in retval.spawned_tasks:
                        if isinstance(spawned_task, AsyncResult):
                            task_list.append(spawned_task.task_id)
                        elif isinstance(spawned_task, dict):
                            task_list.append(spawned_task['task_id'])
                    task_status['spawned_tasks'] = task_list
            if isinstance(retval, AsyncResult):
                task_status['spawned_tasks'] = [retval.task_id, ]
                task_status['result'] = None

            task_status.save()

            if config.get('profiling', 'enabled') is True:
                profile_directory = config.get('profiling', 'directory')
                self.pr.disable()
                self.pr.dump_stats("%s/%s" % (profile_directory, task_id))

            common_utils.delete_working_directory()
开发者ID:pulp,项目名称:pulp,代码行数:54,代码来源:tasks.py


示例15: get_url

    def get_url():
        """
        This constructs a gofer 2.x URL and is intended to maintain
        configuration file backwards compatibility until pulp 3.0

        :return: A gofer 2.x broker URL.
        :rtype: str
        """
        url = config.get('messaging', 'url')
        adapter = config.get('messaging', 'transport')
        return '+'.join((adapter, url))
开发者ID:AndreaGiardini,项目名称:pulp,代码行数:11,代码来源:services.py


示例16: __init__

 def __init__(self, **kwargs):
     super(ContentView, self).__init__(**kwargs)
     self.key = Key.load(pulp_conf.get('authentication', 'rsa_key'))
     # Make sure all requested paths fall under these sub-directories, otherwise
     # we might find ourselves serving private keys to all and sundry.
     local_storage = pulp_conf.get('server', 'storage_dir')
     self.safe_serving_paths = [os.path.realpath(os.path.join(local_storage, subdir))
                                for subdir in SAFE_STORAGE_SUBDIRS]
     logger.debug(_("Serving Pulp content from {paths}; ensure "
                    "Apache's mod_xsendfile is configured to serve from "
                    "these paths as well").format(paths=str(self.safe_serving_paths)))
开发者ID:alanoe,项目名称:pulp,代码行数:11,代码来源:views.py


示例17: configure_SSL

def configure_SSL():
    """
    Configures the celery object with BROKER_USE_SSL options
    """
    if config.getboolean("tasks", "celery_require_ssl"):
        BROKER_USE_SSL = {
            "ca_certs": config.get("tasks", "cacert"),
            "keyfile": config.get("tasks", "keyfile"),
            "certfile": config.get("tasks", "certfile"),
            "cert_reqs": ssl.CERT_REQUIRED,
        }
        celery.conf.update(BROKER_USE_SSL=BROKER_USE_SSL)
开发者ID:pcreech,项目名称:pulp,代码行数:12,代码来源:celery_instance.py


示例18: check_oauth

    def check_oauth(self, username, method, url, auth, query):
        """
        Check OAuth header credentials.
        Return None if the credentials are invalid

        :type username: str
        :param username: username corresponding to credentials

        :type method: str
        :param method: http method

        :type url: str
        :param url: request url

        :type auth: str
        :param auth: http authorization header value

        :type query: str
        :param query: http request query string

        :rtype: str or None
        :return: user login corresponding to the credentials
        """
        is_consumer = False
        headers = {'Authorization': auth}
        req = oauth2.Request.from_request(method, url, headers, query_string=query)

        if not req:
            return None, is_consumer

        if not (config.has_option('oauth', 'oauth_key') and
                config.has_option('oauth', 'oauth_secret')):
            _logger.error(_("Attempting OAuth authentication and you do not have oauth_key and "
                            "oauth_secret in pulp.conf"))
            return None, is_consumer

        key = config.get('oauth', 'oauth_key')
        secret = config.get('oauth', 'oauth_secret')

        consumer = oauth2.Consumer(key=key, secret=secret)
        server = oauth2.Server()
        server.add_signature_method(oauth2.SignatureMethod_HMAC_SHA1())

        try:
            # this call has a return value, but failures are noted by the exception
            server.verify_request(req, consumer, None)
        except oauth2.Error, e:
            _logger.error('error verifying OAuth signature: %s' % e)
            return None, is_consumer
开发者ID:beav,项目名称:pulp,代码行数:49,代码来源:authentication.py


示例19: _missing_units

 def _missing_units(self, unit_inventory):
     """
     Determine the list of units defined parent inventory that are
     not in the child inventory.
     :param unit_inventory: The inventory of both parent and child content units.
     :type unit_inventory: UnitInventory
     :return: The list of units to be added.
         Each item: (parent_unit, unit_to_be_added)
     :rtype: list
     """
     new_units = []
     storage_dir = pulp_conf.get('server', 'storage_dir')
     for unit in unit_inventory.parent_only():
         unit['metadata'].pop('_id')
         unit['metadata'].pop('_ns')
         type_id = unit['type_id']
         unit_key = unit['unit_key']
         metadata = unit['metadata']
         storage_path = unit.get('storage_path')
         if storage_path:
             relative_path = unit['_relative_path']
             storage_path = '/'.join((storage_dir, relative_path))
         unit_in = Unit(type_id, unit_key, metadata, storage_path)
         new_units.append((unit, unit_in))
     return new_units
开发者ID:domcleal,项目名称:pulp,代码行数:25,代码来源:strategies.py


示例20: _missing_units

 def _missing_units(self, unit_inventory):
     """
     Determine the list of units defined upstream inventory that are
     not in the local inventory.
     :param unit_inventory: The inventory of both upstream and local content units.
     :type unit_inventory: UnitInventory
     :return: The list of units to be added.
         Each item: (unit_upstream, unit_to_be_added)
     :rtype: list
     """
     new_units = []
     storage_dir = pulp_conf.get("server", "storage_dir")
     for unit in unit_inventory.upstream_only():
         unit["metadata"].pop("_id")
         unit["metadata"].pop("_ns")
         type_id = unit["type_id"]
         unit_key = unit["unit_key"]
         metadata = unit["metadata"]
         storage_path = unit.get("storage_path")
         if storage_path:
             relative_path = unit["_relative_path"]
             storage_path = "/".join((storage_dir, relative_path))
         unit_in = Unit(type_id, unit_key, metadata, storage_path)
         new_units.append((unit, unit_in))
     return new_units
开发者ID:pieska,项目名称:pulp,代码行数:25,代码来源:strategies.py



注:本文中的pulp.server.config.config.get函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python config.set函数代码示例发布时间:2022-05-25
下一篇:
Python tasks.cancel函数代码示例发布时间:2022-05-25
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap