• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Python config.get函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中pylons.config.get函数的典型用法代码示例。如果您正苦于以下问题:Python get函数的具体用法?Python get怎么用?Python get使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了get函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: by_name

    def by_name(cls, name):
        """Return a Domain instance by name."""

        if os.path.exists(os.path.join(config.get('herder.po_dir'), name)):
            return Domain(name, os.path.join(config.get('herder.po_dir'), name))

        raise KeyError("Unknown domain name.")
开发者ID:cc-archive,项目名称:herder,代码行数:7,代码来源:domain.py


示例2: __init__

    def __init__(self, profiles=None, compatibility_mode=False):
        '''
        Creates a parser instance

        You can optionally pass a list of profiles to be used.

        In compatibility mode, some fields are modified to maintain
        compatibility with previous versions of the ckanext-dcat parsers
        (eg adding the `dcat_` prefix or storing comma separated lists instead
        of JSON dumps).

        '''
        if not profiles:
            profiles = config.get(RDF_PROFILES_CONFIG_OPTION, None)
            if profiles:
                profiles = profiles.split(' ')
            else:
                profiles = DEFAULT_RDF_PROFILES
        self._profiles = self._load_profiles(profiles)
        if not self._profiles:
            raise RDFProfileException(
                'No suitable RDF profiles could be loaded')

        if not compatibility_mode:
            compatibility_mode = p.toolkit.asbool(
                config.get(COMPAT_MODE_CONFIG_OPTION, False))
        self.compatibility_mode = compatibility_mode

        self.g = rdflib.Graph()
开发者ID:trickvi,项目名称:ckanext-dcat,代码行数:29,代码来源:parsers.py


示例3: send

def send(to_addrs, from_addr, subject, body):
    """A simple method to send a simple email.

    :param to_addrs: Comma separated list of email addresses to send to.
    :type to_addrs: unicode

    :param from_addr: Email address to put in the 'from' field
    :type from_addr: unicode

    :param subject: Subject line of the email.
    :type subject: unicode

    :param body: Body text of the email, optionally marked up with HTML.
    :type body: unicode
    """
    smtp_server = config.get('smtp_server', 'localhost')
    server = smtplib.SMTP(smtp_server)
    if isinstance(to_addrs, basestring):
        to_addrs = parse_email_string(to_addrs)

    to_addrs = ", ".join(to_addrs)

    msg = ("To: %(to_addrs)s\n"
           "From: %(from_addr)s\n"
           "Subject: %(subject)s\n\n"
           "%(body)s\n") % locals()

    smtp_username = config.get('smtp_username')
    smtp_password = config.get('smtp_password')
    if smtp_username and smtp_password:
        server.login(smtp_username, smtp_password)

    server.sendmail(from_addr, to_addrs, msg.encode('utf-8'))
    server.quit()
开发者ID:SmallsLIVE,项目名称:mediadrop,代码行数:34,代码来源:email.py


示例4: upload

    def upload(self):
        if 'datafile' in request.params:
            myfile = request.params.get('datafile')

            # Store file in user directory on Longboard
            source_dir = config.get('cw.cwdata_loc')
            source = os.path.join(source_dir, myfile.filename)
            ###target = os.path.sep.join([config.get('cw.cwproj_loc'), session.get('user', 'guest')])
            target = os.path.sep.join([config.get('cw.cwproj_rem'), session.get('user', 'guest')])
            file_contents = ''
            for i in myfile.file:
                file_contents += i
            if file.type.find('text') > -1:
                try:
                    fh = open(source, 'w')
                    fh.write(file_contents)
                    fh.close()
                except Exception as _:
                    log.error('Cannot write file to disk.')
            else:
                try:
                    fh = open(source, 'wb')
                    fh.write(file_contents)
                    fh.close()
                except Exception:
                    log.error('Cannot write file to disk.')
            host = config.get('cw.arch_host', 'longboard.acel')
            resource = app_globals.jodis.manager.getResource(host)
            resource.ssh.scp(None, None, source, resource.ssh.user, host, target)
        return render('/data/upload.mako')
开发者ID:sumukh210991,项目名称:Cyberweb,代码行数:30,代码来源:data.py


示例5: setSort

def setSort(spectra, config, package):
    sort = None

    # backword compatibility.  But moving away from config object
    # all 'advanced data' should be stored in package
    extras = package.get('extras')
    if extras != None and extras.get('sort') != None:
        sort = json.loads(extras.get('sort'))
    elif config.get("sort") != None:
        sort = config.get("sort")

    if sort == None:
        return

    on = sort.get('on')
    type = sort.get('type')

    if on is None:
        return

    if on not in spectra:
        return

    if type == 'datetime':
        try:
            spectra['ecosis']['sort'] = dateutil.parser.parse(spectra[on])
        except:
            pass
    elif type == 'numeric':
        try:
            spectra['ecosis']['sort'] = float(spectra[on])
        except:
            pass
    else:
        spectra['ecosis']['sort'] = spectra[on]
开发者ID:CSTARS,项目名称:ckanext-ecosis,代码行数:35,代码来源:__init__.py


示例6: update_config

    def update_config(self, config):
        """Update CKAN's config with settings needed by this plugin.

        """
        toolkit.add_template_directory(config, "templates")
        self.header_parameter = config.get("ckan.simplesso.header_parameter", "user-id")
        self.email_domain = config.get("ckan.simplesso.email_domain")
开发者ID:jorabra,项目名称:ckanext-simplesso,代码行数:7,代码来源:plugin.py


示例7: before_create

    def before_create(self, context, resource):
        disallowed_extensions = toolkit.aslist(config.get('ckanext.romania_theme.disallowed_extensions',[]))
        disallowed_mimetypes = [mimetypes.types_map["." + x] for x in disallowed_extensions]

        allowed_extensions = toolkit.aslist(config.get('ckanext.romania_theme.allowed_extensions',[]))
        allowed_mimetypes = [mimetypes.types_map["." + x] for x in allowed_extensions]

        is_resource_extension_allowed = False
        error_message = ''
        if allowed_mimetypes:
            if resource['upload'].type in allowed_mimetypes:
                is_resource_extension_allowed = True
            else:
                error_message="Doar urmatoarele extensii sunt permise: " + ", ".join(allowed_extensions) + "."
        else:
            if resource['upload'].type not in disallowed_mimetypes:
                is_resource_extension_allowed = True
            else:
                error_message= "Urmatoarele extensii sunt nepermise: " + ", ".join(disallowed_extensions) + "."

        if ('upload' in resource) and (type(resource['upload']) is not unicode) and not is_resource_extension_allowed:
            # If we did not do this, the URL field would contain the filename
            # and people can press finalize afterwards.
            resource['url'] = ''


            raise toolkit.ValidationError(['Fisierul are o extensie nepermisa! ' + error_message])
开发者ID:govro,项目名称:ckanext-romania_theme,代码行数:27,代码来源:plugin.py


示例8: _get_allowed_sender_options

    def _get_allowed_sender_options(cls, user):
        sender_options = {
            'user': {
                'email': user.email,
                'checked': False,
                'enabled': user.is_email_activated(),
                'reason': _("Email isn't activated"),
            },
            'system': {
                'email': config.get('adhocracy.email.from'),
                'checked': False,
                'enabled': asbool(config.get(
                    'allow_system_email_in_mass_messages', 'true')),
                'reason': _("Not permitted in system settings"),
            },
            'support': {
                'email': config.get('adhocracy.registration_support_email'),
                'checked': False,
                'enabled': (config.get('adhocracy.registration_support_email')
                            is not None),
                'reason': _("adhocracy.registration_support_email not set"),
            }
        }

        if sender_options['user']['enabled']:
            sender_options['user']['checked'] = True
        elif sender_options['system']['enabled']:
            sender_options['system']['checked'] = True

        return sender_options
开发者ID:rowanthorpe,项目名称:adhocracy,代码行数:30,代码来源:massmessage.py


示例9: _mail_recipient

def _mail_recipient(recipient_name, recipient_email,
        sender_name, sender_url, subject,
        body, headers={}):
    mail_from = config.get('smtp.mail_from')
    body = add_msg_niceties(recipient_name, body, sender_name, sender_url)
    msg = MIMEText(body.encode('utf-8'), 'plain', 'utf-8')
    for k, v in headers.items(): msg[k] = v
    subject = Header(subject.encode('utf-8'), 'utf-8')
    msg['Subject'] = subject
    msg['From'] = _("%s <%s>") % (sender_name, mail_from)
    recipient = u"%s <%s>" % (recipient_name, recipient_email)
    msg['To'] = Header(recipient, 'utf-8')
    msg['Date'] = Utils.formatdate(time())
    msg['X-Mailer'] = "CKAN %s" % ckan.__version__

    # Send the email using Python's smtplib.
    smtp_connection = smtplib.SMTP()
    if 'smtp.test_server' in config:
        # If 'smtp.test_server' is configured we assume we're running tests,
        # and don't use the smtp.server, starttls, user, password etc. options.
        smtp_server = config['smtp.test_server']
        smtp_starttls = False
        smtp_user = None
        smtp_password = None
    else:
        smtp_server = config.get('smtp.server', 'localhost')
        smtp_starttls = paste.deploy.converters.asbool(
                config.get('smtp.starttls'))
        smtp_user = config.get('smtp.user')
        smtp_password = config.get('smtp.password')
    smtp_connection.connect(smtp_server)
    try:
        #smtp_connection.set_debuglevel(True)

        # Identify ourselves and prompt the server for supported features.
        smtp_connection.ehlo()

        # If 'smtp.starttls' is on in CKAN config, try to put the SMTP
        # connection into TLS mode.
        if smtp_starttls:
            if smtp_connection.has_extn('STARTTLS'):
                smtp_connection.starttls()
                # Re-identify ourselves over TLS connection.
                smtp_connection.ehlo()
            else:
                raise MailerException("SMTP server does not support STARTTLS")

        # If 'smtp.user' is in CKAN config, try to login to SMTP server.
        if smtp_user:
            assert smtp_password, ("If smtp.user is configured then "
                    "smtp.password must be configured as well.")
            smtp_connection.login(smtp_user, smtp_password)

        smtp_connection.sendmail(mail_from, [recipient_email], msg.as_string())
        log.info("Sent email to {0}".format(recipient_email))

    except smtplib.SMTPException, e:
        msg = '%r' % e
        log.exception(msg)
        raise MailerException(msg)
开发者ID:GianlucaGLC,项目名称:ckan,代码行数:60,代码来源:mailer.py


示例10: __init__

    def __init__(self):

        connect_string = config.get("linotpOpenID.sql.url")

        implicit_returning = config.get("linotpSQL.implicit_returning", True)
        self.engine = None

        if connect_string is None:
            log.info("[__init__] Missing linotpOpenID.sql.url parameter in "
                     "config file! Using the sqlalchemy.url")
            # raise Exception("Missing linotpOpenID.sql.url parameter in "
            # "config file!")
            connect_string = config.get("sqlalchemy.url")
        ########################## SESSION ##################################

        # Create an engine and create all the tables we need
        if implicit_returning:
            # If implicit_returning is explicitly set to True, we
            # get lots of mysql errors:
            # AttributeError: 'MySQLCompiler_mysqldb' object has no attribute
            # 'returning_clause' So we do not mention explicit_returning at all
            self.engine = create_engine(connect_string)
        else:
            self.engine = create_engine(connect_string,
                                        implicit_returning=False)

        metadata.bind = self.engine
        metadata.create_all()

        # Set up the session
        self.sm = orm.sessionmaker(bind=self.engine, autoflush=True,
                                   autocommit=False,
            expire_on_commit=True)
        self.session = orm.scoped_session(self.sm)
开发者ID:alexxtasi,项目名称:LinOTP,代码行数:34,代码来源:openid.py


示例11: shorten_link

def shorten_link(long_url):
    longUrl = cgi.escape(long_url)
    bitly_userid= config.get('bitly.userid')
    bitly_key = config.get('bitly.key')
    bitly_result = urllib.urlopen("http://api.bit.ly/v3/shorten?login=%(bitly_userid)s&apiKey=%(bitly_key)s&longUrl=%(longUrl)s&format=json" % locals()).read()
    bitly_data = json.loads(bitly_result)['data']
    return bitly_data["url"]
开发者ID:anantn,项目名称:f1,代码行数:7,代码来源:shortener.py


示例12: checkTarget

 def checkTarget(environ, result):
     back = parseBoolString(config.get('ckan.gov_theme.is_back', False));
     ignoreList =  config.get('ckan.api.ignore', 'NotInList')
     path = environ['PATH_INFO'].split("/")[-1]
     if  back or ignoreList.find(path) > 0:
         return False
     return True
开发者ID:CIOIL,项目名称:DataGovIL,代码行数:7,代码来源:plugin.py


示例13: user_language

def user_language(user, fallbacks=[]):
    # find out the locale
    locale = None
    if user and user.locale:
        locale = user.locale

    if locale is None:
        locales = map(str, LOCALES)
        locale = Locale.parse(Locale.negotiate(fallbacks, locales)) \
                 or get_default_locale()

    # determinate from which path we load the translations
    translations_module = config.get('adhocracy.translations', 'adhocracy')
    translations_module_loader = pkgutil.get_loader(translations_module)
    if translations_module_loader is None:
        raise ValueError(('Cannot import the module "%s" configured for '
                          '"adhocracy.translations". Make sure it is an '
                          'importable module (and contains the '
                          'translation files in a subdirectory '
                          '"i18n"') % translations_module)

    translations_root = translations_module_loader.filename
    translations_config = {'pylons.paths': {'root': translations_root},
                           'pylons.package': config.get('pylons.package')}

    # set language and fallback
    set_lang(locale.language, pylons_config=translations_config)
    add_fallback(get_default_locale().language,
                 pylons_config=translations_config)
    formencode.api.set_stdtranslation(domain="FormEncode",
                                      languages=[locale.language])
    return locale
开发者ID:JonnyWalker,项目名称:adhocracy,代码行数:32,代码来源:__init__.py


示例14: all

    def all(cls):
        """Return a sequence of all available domains."""

        return [Domain(n, os.path.join(config.get('herder.po_dir'), n)) 
                for n in os.listdir(config.get('herder.po_dir'))
                if n not in cls._IGNORE_DIRS and 
                   os.path.isdir(os.path.join(config.get('herder.po_dir'), n))]
开发者ID:cc-archive,项目名称:herder,代码行数:7,代码来源:domain.py


示例15: send_mail

    def send_mail(self,to,subject, _body):              
        body = _body.replace('\n', '\r\n')
        guid=str(uuid.uuid4())
        body = config.get('ckan.site_url') + "/user/register?guid=" + guid + "&activate"
        # Prepare actual message
        message = """From: %s
        To: %s
        Subject: %s

        %s 
        """ % (config.get('dk.aarhuskommune.odaa_from'),str(to),subject,body)

        try:
            result=self.connection()       
            connectString = """dbname='%s' user='%s' host='%s' password='%s'""" % (result["database"],result["user"],result["host"],result["password"])
            conn = psycopg2.connect(connectString)
            c = conn.cursor()            
            p=request.params["password1"]            
            password=self._encode(result["password"],p)
            now = datetime.datetime.now()                                    
            sql="INSERT INTO mailnotifikation VALUES ('" + guid + "','" + str(now) + "','" + request.params["name"] + "','" +request.params["fullname"] +   "','" + request.params["email"] + "','" + password + "','False')"
            c.execute(sql)
	    conn.commit()

            nowlm = now - datetime.timedelta(days=int(config.get('dk.aarhuskommune.odaa_days')))
            sNowlm=nowlm.strftime('%Y-%m-%d')            
            sql="delete from mailnotifikation where _date<'%s'" % (sNowlm);           
            c.execute(sql)
	    conn.commit()
            conn.close()
            smtpObj = smtplib.SMTP('localhost')        
            to=to.split(',')
            smtpObj.sendmail(config.get('dk.aarhuskommune.odaa_from'), to, message)
        except Exception as e:
            logging.error('Error: unable to send email. %s ',e)
开发者ID:OpenDataAarhus,项目名称:mailNotifikation,代码行数:35,代码来源:user.py


示例16: _mail_recipient

def _mail_recipient(recipient_name, recipient_email,
        sender_name, sender_url, subject,
        body, headers={}):
    mail_from = config.get('ckan.mail_from')
    body = add_msg_niceties(recipient_name, body, sender_name, sender_url)
    msg = MIMEText(body.encode('utf-8'), 'plain', 'utf-8')
    for k, v in headers.items(): msg[k] = v
    subject = Header(subject.encode('utf-8'), 'utf-8')
    msg['Subject'] = subject
    msg['From'] = _("%s <%s>") % (sender_name, mail_from)
    recipient = u"%s <%s>" % (recipient_name, recipient_email)
    msg['To'] = Header(recipient, 'utf-8')
    msg['Date'] = Utils.formatdate(time())
    msg['X-Mailer'] = "CKAN %s" % __version__
    try:
        server = smtplib.SMTP(
            config.get('test_smtp_server',
                       config.get('smtp_server', 'localhost')))
        #server.set_debuglevel(1)
        server.sendmail(mail_from, [recipient_email], msg.as_string())
        server.quit()
    except Exception, e:
        msg = '%r' % e
        log.exception(msg)
        raise MailerException(msg)
开发者ID:Big-Data,项目名称:ckan,代码行数:25,代码来源:mailer.py


示例17: ssl_check

def ssl_check(ssl_required=[], ssl_allowed=[], ssl_required_all=False, ssl_allowed_all=False):

    if not asbool(config.get('enable_ssl_requirement', False)):
        return

    action = request.environ['pylons.routes_dict']['action']

    if action in ssl_allowed or ssl_allowed_all:             # We don't care if they use http or https
        return
    elif action in ssl_required or ssl_required_all:     # Must have ssl
        protocol = 'https'
    else:
        protocol = 'http'

    if current_protocol() == protocol:
        return

    if request.method.upper() != 'POST':
        log.debug('Redirecting to %s, request: %s', protocol, request.path_info)
        host = config.get('ssl_host')
        if host:
            redirect_to(protocol=protocol, host=host)
        else:
            redirect_to(protocol=protocol)
    else:
        abort(405, headers=[('Allow', 'GET')]) # don't allow POSTs.
开发者ID:Ivoz,项目名称:zookeepr,代码行数:26,代码来源:ssl_requirement.py


示例18: harvest_source_index_clear

def harvest_source_index_clear(context, data_dict):
    '''
    Clears all datasets, jobs and objects related to a harvest source, but
    keeps the source itself.  This is useful to clean history of long running
    harvest sources to start again fresh.

    :param id: the id of the harvest source to clear
    :type id: string
    '''

    check_access('harvest_source_clear', context, data_dict)
    harvest_source_id = data_dict.get('id')

    source = HarvestSource.get(harvest_source_id)
    if not source:
        log.error('Harvest source %s does not exist', harvest_source_id)
        raise NotFound('Harvest source %s does not exist' % harvest_source_id)

    harvest_source_id = source.id

    conn = make_connection()
    query = ''' +%s:"%s" +site_id:"%s" ''' % (
        'harvest_source_id', harvest_source_id, config.get('ckan.site_id'))
    try:
        conn.delete_query(query)
        if asbool(config.get('ckan.search.solr_commit', 'true')):
            conn.commit()
    except Exception, e:
        log.exception(e)
        raise SearchIndexError(e)
开发者ID:AQUACROSS,项目名称:ckanext-harvest,代码行数:30,代码来源:update.py


示例19: _call

    def _call(self, **kwargs):

        account_name = config.get("ckanext.doi.account_name")
        account_password = config.get("ckanext.doi.account_password")
        endpoint = os.path.join(get_endpoint(), self.path)

        try:
            path_extra = kwargs.pop('path_extra')
        except KeyError:
            pass
        else:
            endpoint = os.path.join(endpoint, path_extra)

        try:
            method = kwargs.pop('method')
        except KeyError:
            method = 'get'

        # Add authorisation to request
        kwargs['auth'] = (account_name, account_password)

        log.info("Calling %s:%s - %s", endpoint, method, kwargs)

        r = getattr(requests, method)(endpoint, **kwargs)
        r.raise_for_status()
        # Return the result
        return r
开发者ID:okfn,项目名称:ckanext-doi,代码行数:27,代码来源:datacite_api.py


示例20: getPosts

    def getPosts(self, ident):

        thread = self.getThread(ident)
        request = thread
        fresh_thread = False

        if thread['code'] != 0:
            thread = self.createThreadFromIdent(ident)
            fresh_thread = True

        if thread['code'] == 0:
            thread_id = thread['response']['id']
            api_key = config.get('edcdisqus.api_key')
            forum_name = config.get('edcdisqus.forum_name')
            data_string = urllib.urlencode({'forum': forum_name, 'thread': thread_id, 'api_key': api_key, 'limit': 100 })
            request_json = self.request('https://disqus.com/api/3.0/posts/list.json', data_string, 'get')
            request = json.loads(request_json)

            # For some reason, the disqus API likes to return all of the comments when a thread id doesn't exist.  we're just emptying the result set
            # when a new thread is created.  (There shouldn't be any comments anyway, the thread was just made!)
            if fresh_thread:
                request['response'] = []

        else:
            request = thread

        return request
开发者ID:HighwayThree,项目名称:ckanext-bcgov,代码行数:27,代码来源:disqus.py



注:本文中的pylons.config.get函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python config.init_app函数代码示例发布时间:2022-05-25
下一篇:
Python config.copy函数代码示例发布时间:2022-05-25
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap