本文整理汇总了Python中pylons.config.get函数的典型用法代码示例。如果您正苦于以下问题:Python get函数的具体用法?Python get怎么用?Python get使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了get函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: by_name
def by_name(cls, name):
"""Return a Domain instance by name."""
if os.path.exists(os.path.join(config.get('herder.po_dir'), name)):
return Domain(name, os.path.join(config.get('herder.po_dir'), name))
raise KeyError("Unknown domain name.")
开发者ID:cc-archive,项目名称:herder,代码行数:7,代码来源:domain.py
示例2: __init__
def __init__(self, profiles=None, compatibility_mode=False):
'''
Creates a parser instance
You can optionally pass a list of profiles to be used.
In compatibility mode, some fields are modified to maintain
compatibility with previous versions of the ckanext-dcat parsers
(eg adding the `dcat_` prefix or storing comma separated lists instead
of JSON dumps).
'''
if not profiles:
profiles = config.get(RDF_PROFILES_CONFIG_OPTION, None)
if profiles:
profiles = profiles.split(' ')
else:
profiles = DEFAULT_RDF_PROFILES
self._profiles = self._load_profiles(profiles)
if not self._profiles:
raise RDFProfileException(
'No suitable RDF profiles could be loaded')
if not compatibility_mode:
compatibility_mode = p.toolkit.asbool(
config.get(COMPAT_MODE_CONFIG_OPTION, False))
self.compatibility_mode = compatibility_mode
self.g = rdflib.Graph()
开发者ID:trickvi,项目名称:ckanext-dcat,代码行数:29,代码来源:parsers.py
示例3: send
def send(to_addrs, from_addr, subject, body):
"""A simple method to send a simple email.
:param to_addrs: Comma separated list of email addresses to send to.
:type to_addrs: unicode
:param from_addr: Email address to put in the 'from' field
:type from_addr: unicode
:param subject: Subject line of the email.
:type subject: unicode
:param body: Body text of the email, optionally marked up with HTML.
:type body: unicode
"""
smtp_server = config.get('smtp_server', 'localhost')
server = smtplib.SMTP(smtp_server)
if isinstance(to_addrs, basestring):
to_addrs = parse_email_string(to_addrs)
to_addrs = ", ".join(to_addrs)
msg = ("To: %(to_addrs)s\n"
"From: %(from_addr)s\n"
"Subject: %(subject)s\n\n"
"%(body)s\n") % locals()
smtp_username = config.get('smtp_username')
smtp_password = config.get('smtp_password')
if smtp_username and smtp_password:
server.login(smtp_username, smtp_password)
server.sendmail(from_addr, to_addrs, msg.encode('utf-8'))
server.quit()
开发者ID:SmallsLIVE,项目名称:mediadrop,代码行数:34,代码来源:email.py
示例4: upload
def upload(self):
if 'datafile' in request.params:
myfile = request.params.get('datafile')
# Store file in user directory on Longboard
source_dir = config.get('cw.cwdata_loc')
source = os.path.join(source_dir, myfile.filename)
###target = os.path.sep.join([config.get('cw.cwproj_loc'), session.get('user', 'guest')])
target = os.path.sep.join([config.get('cw.cwproj_rem'), session.get('user', 'guest')])
file_contents = ''
for i in myfile.file:
file_contents += i
if file.type.find('text') > -1:
try:
fh = open(source, 'w')
fh.write(file_contents)
fh.close()
except Exception as _:
log.error('Cannot write file to disk.')
else:
try:
fh = open(source, 'wb')
fh.write(file_contents)
fh.close()
except Exception:
log.error('Cannot write file to disk.')
host = config.get('cw.arch_host', 'longboard.acel')
resource = app_globals.jodis.manager.getResource(host)
resource.ssh.scp(None, None, source, resource.ssh.user, host, target)
return render('/data/upload.mako')
开发者ID:sumukh210991,项目名称:Cyberweb,代码行数:30,代码来源:data.py
示例5: setSort
def setSort(spectra, config, package):
sort = None
# backword compatibility. But moving away from config object
# all 'advanced data' should be stored in package
extras = package.get('extras')
if extras != None and extras.get('sort') != None:
sort = json.loads(extras.get('sort'))
elif config.get("sort") != None:
sort = config.get("sort")
if sort == None:
return
on = sort.get('on')
type = sort.get('type')
if on is None:
return
if on not in spectra:
return
if type == 'datetime':
try:
spectra['ecosis']['sort'] = dateutil.parser.parse(spectra[on])
except:
pass
elif type == 'numeric':
try:
spectra['ecosis']['sort'] = float(spectra[on])
except:
pass
else:
spectra['ecosis']['sort'] = spectra[on]
开发者ID:CSTARS,项目名称:ckanext-ecosis,代码行数:35,代码来源:__init__.py
示例6: update_config
def update_config(self, config):
"""Update CKAN's config with settings needed by this plugin.
"""
toolkit.add_template_directory(config, "templates")
self.header_parameter = config.get("ckan.simplesso.header_parameter", "user-id")
self.email_domain = config.get("ckan.simplesso.email_domain")
开发者ID:jorabra,项目名称:ckanext-simplesso,代码行数:7,代码来源:plugin.py
示例7: before_create
def before_create(self, context, resource):
disallowed_extensions = toolkit.aslist(config.get('ckanext.romania_theme.disallowed_extensions',[]))
disallowed_mimetypes = [mimetypes.types_map["." + x] for x in disallowed_extensions]
allowed_extensions = toolkit.aslist(config.get('ckanext.romania_theme.allowed_extensions',[]))
allowed_mimetypes = [mimetypes.types_map["." + x] for x in allowed_extensions]
is_resource_extension_allowed = False
error_message = ''
if allowed_mimetypes:
if resource['upload'].type in allowed_mimetypes:
is_resource_extension_allowed = True
else:
error_message="Doar urmatoarele extensii sunt permise: " + ", ".join(allowed_extensions) + "."
else:
if resource['upload'].type not in disallowed_mimetypes:
is_resource_extension_allowed = True
else:
error_message= "Urmatoarele extensii sunt nepermise: " + ", ".join(disallowed_extensions) + "."
if ('upload' in resource) and (type(resource['upload']) is not unicode) and not is_resource_extension_allowed:
# If we did not do this, the URL field would contain the filename
# and people can press finalize afterwards.
resource['url'] = ''
raise toolkit.ValidationError(['Fisierul are o extensie nepermisa! ' + error_message])
开发者ID:govro,项目名称:ckanext-romania_theme,代码行数:27,代码来源:plugin.py
示例8: _get_allowed_sender_options
def _get_allowed_sender_options(cls, user):
sender_options = {
'user': {
'email': user.email,
'checked': False,
'enabled': user.is_email_activated(),
'reason': _("Email isn't activated"),
},
'system': {
'email': config.get('adhocracy.email.from'),
'checked': False,
'enabled': asbool(config.get(
'allow_system_email_in_mass_messages', 'true')),
'reason': _("Not permitted in system settings"),
},
'support': {
'email': config.get('adhocracy.registration_support_email'),
'checked': False,
'enabled': (config.get('adhocracy.registration_support_email')
is not None),
'reason': _("adhocracy.registration_support_email not set"),
}
}
if sender_options['user']['enabled']:
sender_options['user']['checked'] = True
elif sender_options['system']['enabled']:
sender_options['system']['checked'] = True
return sender_options
开发者ID:rowanthorpe,项目名称:adhocracy,代码行数:30,代码来源:massmessage.py
示例9: _mail_recipient
def _mail_recipient(recipient_name, recipient_email,
sender_name, sender_url, subject,
body, headers={}):
mail_from = config.get('smtp.mail_from')
body = add_msg_niceties(recipient_name, body, sender_name, sender_url)
msg = MIMEText(body.encode('utf-8'), 'plain', 'utf-8')
for k, v in headers.items(): msg[k] = v
subject = Header(subject.encode('utf-8'), 'utf-8')
msg['Subject'] = subject
msg['From'] = _("%s <%s>") % (sender_name, mail_from)
recipient = u"%s <%s>" % (recipient_name, recipient_email)
msg['To'] = Header(recipient, 'utf-8')
msg['Date'] = Utils.formatdate(time())
msg['X-Mailer'] = "CKAN %s" % ckan.__version__
# Send the email using Python's smtplib.
smtp_connection = smtplib.SMTP()
if 'smtp.test_server' in config:
# If 'smtp.test_server' is configured we assume we're running tests,
# and don't use the smtp.server, starttls, user, password etc. options.
smtp_server = config['smtp.test_server']
smtp_starttls = False
smtp_user = None
smtp_password = None
else:
smtp_server = config.get('smtp.server', 'localhost')
smtp_starttls = paste.deploy.converters.asbool(
config.get('smtp.starttls'))
smtp_user = config.get('smtp.user')
smtp_password = config.get('smtp.password')
smtp_connection.connect(smtp_server)
try:
#smtp_connection.set_debuglevel(True)
# Identify ourselves and prompt the server for supported features.
smtp_connection.ehlo()
# If 'smtp.starttls' is on in CKAN config, try to put the SMTP
# connection into TLS mode.
if smtp_starttls:
if smtp_connection.has_extn('STARTTLS'):
smtp_connection.starttls()
# Re-identify ourselves over TLS connection.
smtp_connection.ehlo()
else:
raise MailerException("SMTP server does not support STARTTLS")
# If 'smtp.user' is in CKAN config, try to login to SMTP server.
if smtp_user:
assert smtp_password, ("If smtp.user is configured then "
"smtp.password must be configured as well.")
smtp_connection.login(smtp_user, smtp_password)
smtp_connection.sendmail(mail_from, [recipient_email], msg.as_string())
log.info("Sent email to {0}".format(recipient_email))
except smtplib.SMTPException, e:
msg = '%r' % e
log.exception(msg)
raise MailerException(msg)
开发者ID:GianlucaGLC,项目名称:ckan,代码行数:60,代码来源:mailer.py
示例10: __init__
def __init__(self):
connect_string = config.get("linotpOpenID.sql.url")
implicit_returning = config.get("linotpSQL.implicit_returning", True)
self.engine = None
if connect_string is None:
log.info("[__init__] Missing linotpOpenID.sql.url parameter in "
"config file! Using the sqlalchemy.url")
# raise Exception("Missing linotpOpenID.sql.url parameter in "
# "config file!")
connect_string = config.get("sqlalchemy.url")
########################## SESSION ##################################
# Create an engine and create all the tables we need
if implicit_returning:
# If implicit_returning is explicitly set to True, we
# get lots of mysql errors:
# AttributeError: 'MySQLCompiler_mysqldb' object has no attribute
# 'returning_clause' So we do not mention explicit_returning at all
self.engine = create_engine(connect_string)
else:
self.engine = create_engine(connect_string,
implicit_returning=False)
metadata.bind = self.engine
metadata.create_all()
# Set up the session
self.sm = orm.sessionmaker(bind=self.engine, autoflush=True,
autocommit=False,
expire_on_commit=True)
self.session = orm.scoped_session(self.sm)
开发者ID:alexxtasi,项目名称:LinOTP,代码行数:34,代码来源:openid.py
示例11: shorten_link
def shorten_link(long_url):
longUrl = cgi.escape(long_url)
bitly_userid= config.get('bitly.userid')
bitly_key = config.get('bitly.key')
bitly_result = urllib.urlopen("http://api.bit.ly/v3/shorten?login=%(bitly_userid)s&apiKey=%(bitly_key)s&longUrl=%(longUrl)s&format=json" % locals()).read()
bitly_data = json.loads(bitly_result)['data']
return bitly_data["url"]
开发者ID:anantn,项目名称:f1,代码行数:7,代码来源:shortener.py
示例12: checkTarget
def checkTarget(environ, result):
back = parseBoolString(config.get('ckan.gov_theme.is_back', False));
ignoreList = config.get('ckan.api.ignore', 'NotInList')
path = environ['PATH_INFO'].split("/")[-1]
if back or ignoreList.find(path) > 0:
return False
return True
开发者ID:CIOIL,项目名称:DataGovIL,代码行数:7,代码来源:plugin.py
示例13: user_language
def user_language(user, fallbacks=[]):
# find out the locale
locale = None
if user and user.locale:
locale = user.locale
if locale is None:
locales = map(str, LOCALES)
locale = Locale.parse(Locale.negotiate(fallbacks, locales)) \
or get_default_locale()
# determinate from which path we load the translations
translations_module = config.get('adhocracy.translations', 'adhocracy')
translations_module_loader = pkgutil.get_loader(translations_module)
if translations_module_loader is None:
raise ValueError(('Cannot import the module "%s" configured for '
'"adhocracy.translations". Make sure it is an '
'importable module (and contains the '
'translation files in a subdirectory '
'"i18n"') % translations_module)
translations_root = translations_module_loader.filename
translations_config = {'pylons.paths': {'root': translations_root},
'pylons.package': config.get('pylons.package')}
# set language and fallback
set_lang(locale.language, pylons_config=translations_config)
add_fallback(get_default_locale().language,
pylons_config=translations_config)
formencode.api.set_stdtranslation(domain="FormEncode",
languages=[locale.language])
return locale
开发者ID:JonnyWalker,项目名称:adhocracy,代码行数:32,代码来源:__init__.py
示例14: all
def all(cls):
"""Return a sequence of all available domains."""
return [Domain(n, os.path.join(config.get('herder.po_dir'), n))
for n in os.listdir(config.get('herder.po_dir'))
if n not in cls._IGNORE_DIRS and
os.path.isdir(os.path.join(config.get('herder.po_dir'), n))]
开发者ID:cc-archive,项目名称:herder,代码行数:7,代码来源:domain.py
示例15: send_mail
def send_mail(self,to,subject, _body):
body = _body.replace('\n', '\r\n')
guid=str(uuid.uuid4())
body = config.get('ckan.site_url') + "/user/register?guid=" + guid + "&activate"
# Prepare actual message
message = """From: %s
To: %s
Subject: %s
%s
""" % (config.get('dk.aarhuskommune.odaa_from'),str(to),subject,body)
try:
result=self.connection()
connectString = """dbname='%s' user='%s' host='%s' password='%s'""" % (result["database"],result["user"],result["host"],result["password"])
conn = psycopg2.connect(connectString)
c = conn.cursor()
p=request.params["password1"]
password=self._encode(result["password"],p)
now = datetime.datetime.now()
sql="INSERT INTO mailnotifikation VALUES ('" + guid + "','" + str(now) + "','" + request.params["name"] + "','" +request.params["fullname"] + "','" + request.params["email"] + "','" + password + "','False')"
c.execute(sql)
conn.commit()
nowlm = now - datetime.timedelta(days=int(config.get('dk.aarhuskommune.odaa_days')))
sNowlm=nowlm.strftime('%Y-%m-%d')
sql="delete from mailnotifikation where _date<'%s'" % (sNowlm);
c.execute(sql)
conn.commit()
conn.close()
smtpObj = smtplib.SMTP('localhost')
to=to.split(',')
smtpObj.sendmail(config.get('dk.aarhuskommune.odaa_from'), to, message)
except Exception as e:
logging.error('Error: unable to send email. %s ',e)
开发者ID:OpenDataAarhus,项目名称:mailNotifikation,代码行数:35,代码来源:user.py
示例16: _mail_recipient
def _mail_recipient(recipient_name, recipient_email,
sender_name, sender_url, subject,
body, headers={}):
mail_from = config.get('ckan.mail_from')
body = add_msg_niceties(recipient_name, body, sender_name, sender_url)
msg = MIMEText(body.encode('utf-8'), 'plain', 'utf-8')
for k, v in headers.items(): msg[k] = v
subject = Header(subject.encode('utf-8'), 'utf-8')
msg['Subject'] = subject
msg['From'] = _("%s <%s>") % (sender_name, mail_from)
recipient = u"%s <%s>" % (recipient_name, recipient_email)
msg['To'] = Header(recipient, 'utf-8')
msg['Date'] = Utils.formatdate(time())
msg['X-Mailer'] = "CKAN %s" % __version__
try:
server = smtplib.SMTP(
config.get('test_smtp_server',
config.get('smtp_server', 'localhost')))
#server.set_debuglevel(1)
server.sendmail(mail_from, [recipient_email], msg.as_string())
server.quit()
except Exception, e:
msg = '%r' % e
log.exception(msg)
raise MailerException(msg)
开发者ID:Big-Data,项目名称:ckan,代码行数:25,代码来源:mailer.py
示例17: ssl_check
def ssl_check(ssl_required=[], ssl_allowed=[], ssl_required_all=False, ssl_allowed_all=False):
if not asbool(config.get('enable_ssl_requirement', False)):
return
action = request.environ['pylons.routes_dict']['action']
if action in ssl_allowed or ssl_allowed_all: # We don't care if they use http or https
return
elif action in ssl_required or ssl_required_all: # Must have ssl
protocol = 'https'
else:
protocol = 'http'
if current_protocol() == protocol:
return
if request.method.upper() != 'POST':
log.debug('Redirecting to %s, request: %s', protocol, request.path_info)
host = config.get('ssl_host')
if host:
redirect_to(protocol=protocol, host=host)
else:
redirect_to(protocol=protocol)
else:
abort(405, headers=[('Allow', 'GET')]) # don't allow POSTs.
开发者ID:Ivoz,项目名称:zookeepr,代码行数:26,代码来源:ssl_requirement.py
示例18: harvest_source_index_clear
def harvest_source_index_clear(context, data_dict):
'''
Clears all datasets, jobs and objects related to a harvest source, but
keeps the source itself. This is useful to clean history of long running
harvest sources to start again fresh.
:param id: the id of the harvest source to clear
:type id: string
'''
check_access('harvest_source_clear', context, data_dict)
harvest_source_id = data_dict.get('id')
source = HarvestSource.get(harvest_source_id)
if not source:
log.error('Harvest source %s does not exist', harvest_source_id)
raise NotFound('Harvest source %s does not exist' % harvest_source_id)
harvest_source_id = source.id
conn = make_connection()
query = ''' +%s:"%s" +site_id:"%s" ''' % (
'harvest_source_id', harvest_source_id, config.get('ckan.site_id'))
try:
conn.delete_query(query)
if asbool(config.get('ckan.search.solr_commit', 'true')):
conn.commit()
except Exception, e:
log.exception(e)
raise SearchIndexError(e)
开发者ID:AQUACROSS,项目名称:ckanext-harvest,代码行数:30,代码来源:update.py
示例19: _call
def _call(self, **kwargs):
account_name = config.get("ckanext.doi.account_name")
account_password = config.get("ckanext.doi.account_password")
endpoint = os.path.join(get_endpoint(), self.path)
try:
path_extra = kwargs.pop('path_extra')
except KeyError:
pass
else:
endpoint = os.path.join(endpoint, path_extra)
try:
method = kwargs.pop('method')
except KeyError:
method = 'get'
# Add authorisation to request
kwargs['auth'] = (account_name, account_password)
log.info("Calling %s:%s - %s", endpoint, method, kwargs)
r = getattr(requests, method)(endpoint, **kwargs)
r.raise_for_status()
# Return the result
return r
开发者ID:okfn,项目名称:ckanext-doi,代码行数:27,代码来源:datacite_api.py
示例20: getPosts
def getPosts(self, ident):
thread = self.getThread(ident)
request = thread
fresh_thread = False
if thread['code'] != 0:
thread = self.createThreadFromIdent(ident)
fresh_thread = True
if thread['code'] == 0:
thread_id = thread['response']['id']
api_key = config.get('edcdisqus.api_key')
forum_name = config.get('edcdisqus.forum_name')
data_string = urllib.urlencode({'forum': forum_name, 'thread': thread_id, 'api_key': api_key, 'limit': 100 })
request_json = self.request('https://disqus.com/api/3.0/posts/list.json', data_string, 'get')
request = json.loads(request_json)
# For some reason, the disqus API likes to return all of the comments when a thread id doesn't exist. we're just emptying the result set
# when a new thread is created. (There shouldn't be any comments anyway, the thread was just made!)
if fresh_thread:
request['response'] = []
else:
request = thread
return request
开发者ID:HighwayThree,项目名称:ckanext-bcgov,代码行数:27,代码来源:disqus.py
注:本文中的pylons.config.get函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论