本文整理汇总了Python中pulp.common.util.encode_unicode函数的典型用法代码示例。如果您正苦于以下问题:Python encode_unicode函数的具体用法?Python encode_unicode怎么用?Python encode_unicode使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了encode_unicode函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: unit_key_str
def unit_key_str(unit_key_dict):
"""
Converts the unit key dict form into a single string that can be
used as the key in a dict lookup.
"""
template = '%s-%s-%s'
return template % (encode_unicode(unit_key_dict['name']),
encode_unicode(unit_key_dict['version']),
encode_unicode(unit_key_dict['author']))
开发者ID:hjensas,项目名称:pulp_puppet,代码行数:9,代码来源:forge.py
示例2: test_create_i18n
def test_create_i18n(self):
# Setup
i18n_text = 'Brasília'
# Test
self.manager.create_repo('repo-i18n', display_name=i18n_text, description=i18n_text)
# Verify
repo = Repo.get_collection().find_one({'id': 'repo-i18n'})
self.assertTrue(repo is not None)
self.assertEqual(encode_unicode(repo['display_name']), i18n_text)
self.assertEqual(encode_unicode(repo['description']), i18n_text)
开发者ID:beav,项目名称:pulp,代码行数:12,代码来源:test_cud.py
示例3: deserialize
def deserialize(cls, data):
"""
Deserialize the data returned from a serialize call into a call request
@param data: serialized call request
@type data: dict
@return: deserialized call request
@rtype: CallRequest
"""
if data is None:
return None
constructor_kwargs = dict(data)
constructor_kwargs.pop('callable_name') # added for search
for key, value in constructor_kwargs.items():
constructor_kwargs[encode_unicode(key)] = constructor_kwargs.pop(key)
try:
for field in cls.pickled_fields:
constructor_kwargs[field] = pickle.loads(data[field].encode('ascii'))
except Exception, e:
_LOG.exception(e)
return None
开发者ID:ryanschneider,项目名称:pulp,代码行数:25,代码来源:call.py
示例4: _download_file
def _download_file(self, url, destination):
"""
Downloads the content at the given URL into the given destination.
The object passed into destination must have a method called "update"
that accepts a single parameter (the buffer that was read).
:param url: location to download
:type url: str
:param destination: object
@return:
"""
curl = self._create_and_configure_curl()
url = encode_unicode(url) # because of how the config is stored in pulp
curl.setopt(pycurl.URL, url)
curl.setopt(pycurl.WRITEFUNCTION, destination.update)
curl.perform()
status = curl.getinfo(curl.HTTP_CODE)
curl.close()
if status == 401:
raise exceptions.UnauthorizedException(url)
elif status == 404:
raise exceptions.FileNotFoundException(url)
elif status != 200:
raise exceptions.FileRetrievalException(url)
开发者ID:abhaychrungoo,项目名称:pulp_puppet,代码行数:27,代码来源:web.py
示例5: __init__
def __init__(self, sync_conduit, config):
self.sync_conduit = sync_conduit
self._remove_missing_units = config.get(constants.CONFIG_REMOVE_MISSING_UNITS, default=False)
self._repo_url = encode_unicode(config.get(constants.CONFIG_FEED_URL))
self._validate_downloads = config.get(constants.CONFIG_VALIDATE_DOWNLOADS, default=True)
# Cast our config parameters to the correct types and use them to build a Downloader
max_speed = config.get(constants.CONFIG_MAX_SPEED)
if max_speed is not None:
max_speed = float(max_speed)
num_threads = config.get(constants.CONFIG_NUM_THREADS)
if num_threads is not None:
num_threads = int(num_threads)
else:
num_threads = constants.DEFAULT_NUM_THREADS
downloader_config = {
'max_speed': max_speed, 'num_threads': num_threads,
'ssl_client_cert': config.get(constants.CONFIG_SSL_CLIENT_CERT),
'ssl_client_key': config.get(constants.CONFIG_SSL_CLIENT_KEY),
'ssl_ca_cert': config.get(constants.CONFIG_SSL_CA_CERT), 'ssl_verify_host': 1,
'ssl_verify_peer': 1, 'proxy_url': config.get(constants.CONFIG_PROXY_URL),
'proxy_port': config.get(constants.CONFIG_PROXY_PORT),
'proxy_user': config.get(constants.CONFIG_PROXY_USER),
'proxy_password': config.get(constants.CONFIG_PROXY_PASSWORD)}
downloader_config = DownloaderConfig(protocol='https', **downloader_config)
# We will pass self as the event_listener, so that we can receive the callbacks in this class
self.downloader = factory.get_downloader(downloader_config, self)
self.progress_report = SyncProgressReport(sync_conduit)
开发者ID:jwmatthews,项目名称:pulp_rpm,代码行数:29,代码来源:sync.py
示例6: process_relative_url
def process_relative_url(self, repo_id, importer_config,
distributor_config):
"""
During create (but not update), if the relative path isn't specified it
is derived from the feed_url.
Ultimately, this belongs in the distributor itself. When we rewrite the
yum distributor, we'll remove this entirely from the client.
jdob, May 10, 2013
"""
if constants.PUBLISH_RELATIVE_URL_KEYWORD not in distributor_config:
if importer_constants.KEY_FEED in importer_config:
if importer_config[importer_constants.KEY_FEED] is None:
self.prompt.render_failure_message(
_('Given repository feed URL is invalid.'))
return
url_parse = urlparse(encode_unicode(
importer_config[importer_constants.KEY_FEED]))
if url_parse[2] in ('', '/'):
relative_path = '/' + repo_id
else:
relative_path = url_parse[2]
distributor_config[constants.PUBLISH_RELATIVE_URL_KEYWORD] = relative_path # noqa
else:
distributor_config[constants.PUBLISH_RELATIVE_URL_KEYWORD] = repo_id # noqa
开发者ID:lsjostro,项目名称:pulp_win,代码行数:26,代码来源:repo_create_update.py
示例7: __init__
def __init__(self, sync_conduit, config):
"""
Initialize an ISOSyncRun.
:param sync_conduit: the sync conduit to use for this sync run.
:type sync_conduit: pulp.plugins.conduits.repo_sync.RepoSyncConduit
:param config: plugin configuration
:type config: pulp.plugins.config.PluginCallConfiguration
"""
self.sync_conduit = sync_conduit
self.config = config
self._remove_missing_units = config.get(
importer_constants.KEY_UNITS_REMOVE_MISSING,
default=constants.CONFIG_UNITS_REMOVE_MISSING_DEFAULT)
self._validate_downloads = config.get(importer_constants.KEY_VALIDATE,
default=constants.CONFIG_VALIDATE_DEFAULT)
self._repo_url = encode_unicode(config.get(importer_constants.KEY_FEED))
# The _repo_url must end in a trailing slash, because we will use urljoin to determine
# the path to
# PULP_MANIFEST later
if self._repo_url[-1] != '/':
self._repo_url = self._repo_url + '/'
# Cast our config parameters to the correct types and use them to build a Downloader
max_speed = config.get(importer_constants.KEY_MAX_SPEED)
if max_speed is not None:
max_speed = float(max_speed)
max_downloads = config.get(importer_constants.KEY_MAX_DOWNLOADS)
if max_downloads is not None:
max_downloads = int(max_downloads)
else:
max_downloads = constants.CONFIG_MAX_DOWNLOADS_DEFAULT
ssl_validation = config.get_boolean(importer_constants.KEY_SSL_VALIDATION)
ssl_validation = ssl_validation if ssl_validation is not None else \
constants.CONFIG_VALIDATE_DEFAULT
downloader_config = {
'max_speed': max_speed,
'max_concurrent': max_downloads,
'ssl_client_cert': config.get(importer_constants.KEY_SSL_CLIENT_CERT),
'ssl_client_key': config.get(importer_constants.KEY_SSL_CLIENT_KEY),
'ssl_ca_cert': config.get(importer_constants.KEY_SSL_CA_CERT),
'ssl_validation': ssl_validation,
'proxy_url': config.get(importer_constants.KEY_PROXY_HOST),
'proxy_port': config.get(importer_constants.KEY_PROXY_PORT),
'proxy_username': config.get(importer_constants.KEY_PROXY_USER),
'proxy_password': config.get(importer_constants.KEY_PROXY_PASS),
'basic_auth_username': config.get(importer_constants.KEY_BASIC_AUTH_USER),
'basic_auth_password': config.get(importer_constants.KEY_BASIC_AUTH_PASS),
'working_dir': common_utils.get_working_directory()}
downloader_config = DownloaderConfig(**downloader_config)
# We will pass self as the event_listener, so that we can receive the callbacks in this
# class
if self._repo_url.lower().startswith('file'):
self.downloader = LocalFileDownloader(downloader_config, self)
else:
self.downloader = HTTPThreadedDownloader(downloader_config, self)
self.progress_report = SyncProgressReport(sync_conduit)
self.repo_units = []
开发者ID:ATIX-AG,项目名称:pulp_rpm,代码行数:60,代码来源:sync.py
示例8: make_cert
def make_cert(self, cn, expiration, uid=None):
"""
Generate an x509 certificate with the Subject set to the cn passed into this method:
Subject: CN=someconsumer.example.com
@param cn: ID to be embedded in the certificate
@type cn: string
@param uid: The optional userid. In pulp, this is the DB document _id
for both users and consumers.
@type uid: str
@return: tuple of PEM encoded private key and certificate
@rtype: (str, str)
"""
# Ensure we are dealing with a string and not unicode
try:
cn = str(cn)
except UnicodeEncodeError:
cn = encode_unicode(cn)
log.debug("make_cert: [%s]" % cn)
#Make a private key
# Don't use M2Crypto directly as it leads to segfaults when trying to convert
# the key to a PEM string. Instead create the key with openssl and return the PEM string
# Sorta hacky but necessary.
# rsa = RSA.gen_key(1024, 65537, callback=passphrase_callback)
private_key_pem = _make_priv_key()
rsa = RSA.load_key_string(private_key_pem,
callback=util.no_passphrase_callback)
# Make the Cert Request
req, pub_key = _make_cert_request(cn, rsa, uid=uid)
# Sign it with the Pulp server CA
# We can't do this in m2crypto either so we have to shell out
ca_cert = config.config.get('security', 'cacert')
ca_key = config.config.get('security', 'cakey')
sn = SerialNumber()
serial = sn.next()
cmd = 'openssl x509 -req -sha1 -CA %s -CAkey %s -set_serial %s -days %d' % \
(ca_cert, ca_key, serial, expiration)
p = subprocess.Popen(cmd, shell=True, stdin=subprocess.PIPE,
stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
output = p.communicate(input=req.as_pem())[0]
p.wait()
exit_code = p.returncode
if exit_code != 0:
raise Exception("error signing cert request: %s" % output)
cert_pem_string = output[output.index("-----BEGIN CERTIFICATE-----"):]
return private_key_pem, cert_pem_string
开发者ID:AndreaGiardini,项目名称:pulp,代码行数:55,代码来源:cert_generator.py
示例9: perform_sync
def perform_sync(self, repo, sync_conduit, config):
"""
Perform the sync operation accoring to the config for the given repo, and return a report.
The sync progress will be reported through the sync_conduit.
:param repo: Metadata describing the repository
:type repo: pulp.server.plugins.model.Repository
:param sync_conduit: The sync_conduit that gives us access to the local repository
:type sync_conduit: pulp.server.conduits.repo_sync.RepoSyncConduit
:param config: The configuration for the importer
:type config: pulp.server.plugins.config.PluginCallConfiguration
:return: The sync report
:rtype: pulp.plugins.model.SyncReport
"""
# Build the progress report and set it to the running state
progress_report = SyncProgressReport(sync_conduit)
# Cast our config parameters to the correct types and use them to build an ISOBumper
max_speed = config.get(constants.CONFIG_MAX_SPEED)
if max_speed is not None:
max_speed = float(max_speed)
num_threads = config.get(constants.CONFIG_NUM_THREADS)
if num_threads is not None:
num_threads = int(num_threads)
else:
num_threads = constants.DEFAULT_NUM_THREADS
progress_report.metadata_state = STATE_RUNNING
progress_report.update_progress()
self.bumper = ISOBumper(
repo_url=encode_unicode(config.get(constants.CONFIG_FEED_URL)),
working_path=repo.working_dir,
max_speed=max_speed, num_threads=num_threads,
ssl_client_cert=config.get(constants.CONFIG_SSL_CLIENT_CERT),
ssl_client_key=config.get(constants.CONFIG_SSL_CLIENT_KEY),
ssl_ca_cert=config.get(constants.CONFIG_SSL_CA_CERT),
proxy_url=config.get(constants.CONFIG_PROXY_URL),
proxy_port=config.get(constants.CONFIG_PROXY_PORT),
proxy_user=config.get(constants.CONFIG_PROXY_USER),
proxy_password=config.get(constants.CONFIG_PROXY_PASSWORD))
# Get the manifest and download the ISOs that we are missing
manifest = self.bumper.get_manifest()
progress_report.metadata_state = STATE_COMPLETE
progress_report.modules_state = STATE_RUNNING
progress_report.update_progress()
missing_isos = self._filter_missing_isos(sync_conduit, manifest)
new_isos = self.bumper.download_resources(missing_isos)
# Move the downloaded stuff and junk to the permanent location
self._create_units(sync_conduit, new_isos)
# Report that we are finished
progress_report.modules_state = STATE_COMPLETE
report = progress_report.build_final_report()
return report
开发者ID:lzap,项目名称:pulp_rpm,代码行数:55,代码来源:sync.py
示例10: __init__
def __init__(self, id):
"""
Creates a new instance, populating itself with the default values for all
properties defined in PROPERTIES.
@param id: unique identifier for the repo
@type id: string
"""
self.id = encode_unicode(id)
for k, d in self.PROPERTIES:
self[k] = d
开发者ID:pgustafs,项目名称:pulp_rpm,代码行数:11,代码来源:repo_file.py
示例11: request
def request(self, method, url, body):
headers = dict(self.pulp_connection.headers) # copy so we don't affect the calling method
# Create a new connection each time since HTTPSConnection has problems
# reusing a connection for multiple calls (lame).
# Create SSL.Context and set it to verify peer SSL certificate against system CA certs.
ssl_context = SSL.Context('sslv3')
if self.pulp_connection.validate_ssl_ca:
ssl_context.set_verify(SSL.verify_peer, 1)
ssl_context.load_verify_locations(capath=self.pulp_connection.system_ca_dir)
ssl_context.set_session_timeout(self.pulp_connection.timeout)
if self.pulp_connection.username and self.pulp_connection.password:
raw = ':'.join((self.pulp_connection.username, self.pulp_connection.password))
encoded = base64.encodestring(raw)[:-1]
headers['Authorization'] = 'Basic ' + encoded
elif self.pulp_connection.cert_filename:
ssl_context.load_cert(self.pulp_connection.cert_filename)
# oauth configuration. This block is only True if oauth is not None, so it won't run on RHEL
# 5.
if self.pulp_connection.oauth_key and self.pulp_connection.oauth_secret and oauth:
oauth_consumer = oauth.Consumer(
self.pulp_connection.oauth_key,
self.pulp_connection.oauth_secret)
oauth_request = oauth.Request.from_consumer_and_token(
oauth_consumer,
http_method=method,
http_url='https://%s:%d%s' % (self.pulp_connection.host, self.pulp_connection.port, url))
oauth_request.sign_request(oauth.SignatureMethod_HMAC_SHA1(), oauth_consumer, None)
oauth_header = oauth_request.to_header()
# unicode header values causes m2crypto to do odd things.
for k, v in oauth_header.items():
oauth_header[k] = encode_unicode(v)
headers.update(oauth_header)
headers['pulp-user'] = self.pulp_connection.oauth_user
connection = httpslib.HTTPSConnection(
self.pulp_connection.host, self.pulp_connection.port, ssl_context=ssl_context)
try:
# Request against the server
connection.request(method, url, body=body, headers=headers)
response = connection.getresponse()
except SSL.SSLError, err:
# Translate stale login certificate to an auth exception
if 'sslv3 alert certificate expired' == str(err):
raise exceptions.ClientSSLException(self.pulp_connection.cert_filename)
else:
raise exceptions.ConnectionException(None, str(err), None)
开发者ID:nareshbatthula,项目名称:pulp,代码行数:52,代码来源:server.py
示例12: _ensure_input_encoding
def _ensure_input_encoding(self, input):
"""
Recursively traverse any input structures and ensure any strings are
encoded as utf-8
@param input: input data
@return: input data with strings encoded as utf-8
"""
if isinstance(input, (list, set, tuple)):
return [self._ensure_input_encoding(i) for i in input]
if isinstance(input, dict):
return dict((self._ensure_input_encoding(k), self._ensure_input_encoding(v)) for k, v in input.items())
try:
return encode_unicode(decode_unicode(input))
except (UnicodeDecodeError, UnicodeEncodeError):
raise InputEncodingError(input), None, sys.exc_info()[2]
开发者ID:AndreaGiardini,项目名称:pulp,代码行数:15,代码来源:base.py
示例13: _backup_existing_repodata
def _backup_existing_repodata(self):
"""
Takes a backup of any existing repodata files. This is used in the final
step where other file types in repomd.xml such as presto, updateinfo, comps
are copied back to the repodata.
"""
current_repo_dir = os.path.join(self.repodir, "repodata")
# Note: backup_repo_dir is used to store presto metadata and possibly other custom metadata types
# they will be copied back into new 'repodata' if needed.
current_repo_dir = encode_unicode(current_repo_dir)
if os.path.exists(current_repo_dir):
_LOG.info("existing metadata found; taking backup.")
self.backup_repodata_dir = os.path.join(self.repodir, "repodata.old")
if os.path.exists(self.backup_repodata_dir):
_LOG.debug("clean up any stale dirs")
shutil.rmtree(self.backup_repodata_dir)
shutil.copytree(current_repo_dir, self.backup_repodata_dir)
os.system("chmod -R u+wX %s" % self.backup_repodata_dir)
开发者ID:preethit,项目名称:pulp_rpm,代码行数:18,代码来源:metadata.py
示例14: __get_groups_xml_info
def __get_groups_xml_info(repo_dir):
groups_xml_path = None
repodata_file = os.path.join(repo_dir, "repodata", "repomd.xml")
repodata_file = encode_unicode(repodata_file)
if os.path.isfile(repodata_file):
ftypes = util.get_repomd_filetypes(repodata_file)
_LOG.debug("repodata has filetypes of %s" % (ftypes))
if "group" in ftypes:
comps_ftype = util.get_repomd_filetype_path(
repodata_file, "group")
filetype_path = os.path.join(repo_dir, comps_ftype)
# createrepo uses filename as mdtype, rename to type.<ext>
# to avoid filename too long errors
renamed_filetype_path = os.path.join(os.path.dirname(comps_ftype),
"comps" + '.' + '.'.join(os.path.basename(comps_ftype).split('.')[1:]))
groups_xml_path = os.path.join(repo_dir, renamed_filetype_path)
os.rename(filetype_path, groups_xml_path)
return groups_xml_path
开发者ID:preethit,项目名称:pulp_rpm,代码行数:18,代码来源:metadata.py
示例15: _create_repo
def _create_repo(dir, groups=None, checksum_type=DEFAULT_CHECKSUM):
if not groups:
cmd = "createrepo --database --checksum %s --update %s " % (checksum_type, dir)
else:
try:
cmd = "createrepo --database --checksum %s -g %s --update %s " % (checksum_type, groups, dir)
except UnicodeDecodeError:
groups = decode_unicode(groups)
cmd = "createrepo --database --checksum %s -g %s --update %s " % (checksum_type, groups, dir)
# shlex now can handle unicode strings as well
cmd = encode_unicode(cmd)
try:
cmd = shlex.split(cmd.encode('ascii', 'ignore'))
except:
cmd = shlex.split(cmd)
_LOG.info("started repo metadata update: %s" % (cmd))
handle = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
return handle
开发者ID:preethit,项目名称:pulp_rpm,代码行数:19,代码来源:metadata.py
示例16: subject
def subject(self):
"""
Get the certificate subject.
note: Missing NID mapping for UID added to patch openssl.
@return: A dictionary of subject fields.
@rtype: dict
"""
d = {}
subject = self.x509.get_subject()
subject.nid['UID'] = 458
for key, nid in subject.nid.items():
entry = subject.get_entries_by_nid(nid)
if len(entry):
asn1 = entry[0].get_data()
try:
d[key] = str(asn1)
except UnicodeEncodeError:
d[key] = encode_unicode(asn1)
continue
return d
开发者ID:AndreaGiardini,项目名称:pulp,代码行数:20,代码来源:certificate.py
示例17: initialize
def initialize(self):
"""
Set up the nectar downloader
Originally based on the ISO sync setup
"""
config = self.get_config()
self._validate_downloads = config.get(importer_constants.KEY_VALIDATE, default=True)
self._repo_url = encode_unicode(config.get(importer_constants.KEY_FEED))
# The _repo_url must end in a trailing slash, because we will use
# urljoin to determine the path later
if self._repo_url[-1] != '/':
self._repo_url = self._repo_url + '/'
downloader_config = importer_config_to_nectar_config(config.flatten())
# We will pass self as the event_listener, so that we can receive the
# callbacks in this class
if self._repo_url.lower().startswith('file'):
self.downloader = LocalFileDownloader(downloader_config, self)
else:
self.downloader = HTTPThreadedDownloader(downloader_config, self)
开发者ID:jeremycline,项目名称:pulp,代码行数:22,代码来源:publish_step.py
示例18: process_relative_url
def process_relative_url(self, repo_id, importer_config, yum_distributor_config):
"""
During create (but not update), if the relative path isn't specified it is derived
from the feed_url.
Ultimately, this belongs in the distributor itself. When we rewrite the yum distributor,
we'll remove this entirely from the client. jdob, May 10, 2013
"""
if "relative_url" not in yum_distributor_config:
if importer_constants.KEY_FEED in importer_config:
if importer_config[importer_constants.KEY_FEED] is None:
self.prompt.render_failure_message(_("Given repository feed URL is invalid."))
return
url_parse = urlparse(encode_unicode(importer_config[importer_constants.KEY_FEED]))
if url_parse[2] in ("", "/"):
relative_path = "/" + repo_id
else:
relative_path = url_parse[2]
yum_distributor_config["relative_url"] = relative_path
else:
yum_distributor_config["relative_url"] = repo_id
开发者ID:asmacdo,项目名称:pulp_rpm,代码行数:22,代码来源:repo_create_update.py
示例19: __init__
def __init__(self, msg=None):
if msg is None:
class_name = self.__class__.__name__
msg = _('Pulp auth exception occurred: %(c)s') % {'c': class_name}
self.msg = encode_unicode(msg)
开发者ID:bartwo,项目名称:pulp,代码行数:5,代码来源:decorators.py
示例20: render_document_list
#.........这里部分代码省略.........
if order is None:
ordered_keys = sorted(filters)
else:
# Remove any keys from the order that weren't in the filter
filtered_order = [o for o in order if o in filters]
# The order may only be a subset of filtered keys, so figure out
# which ones are missing and tack them onto the end
not_ordered = [k for k in filters if k not in filtered_order]
# Assemble the pieces: ordered keys + not ordered keys
ordered_keys = order + sorted(not_ordered)
# Generate a list of tuples of key to pretty-formatted key
ordered_formatted_keys = []
for k in ordered_keys:
formatted_key = None
# Don't apply the fancy _ stripping logic to values that start with _
# These values probably shouldn't be in the returned document, but
# let's not rely on that.
if k.startswith('_'):
if omit_hidden:
continue
else:
formatted_key = k
else:
for part in k.split('_'):
part = str(part)
if formatted_key is None:
formatted_part = part.capitalize()
if formatted_part in CAPITALIZE_WORD_EXCEPTIONS:
formatted_part = CAPITALIZE_WORD_EXCEPTIONS[formatted_part]
formatted_key = formatted_part
else:
formatted_part = part.capitalize()
if formatted_part in CAPITALIZE_WORD_EXCEPTIONS:
formatted_part = CAPITALIZE_WORD_EXCEPTIONS[formatted_part]
formatted_key += ' '
formatted_key += formatted_part
ordered_formatted_keys.append((k, formatted_key))
# Generate template using the formatted key values for proper length checking
max_key_length = reduce(lambda x, y: max(x, len(y)), [o[1] for o in ordered_formatted_keys], 0) + 1 # +1 for the : appended later
line_template = (' ' * indent) + '%-' + str(max_key_length) + 's' + (' ' * spaces_between_cols) + '%s'
# Print each item
for i in filtered_items:
if not i:
continue
if header_func is not None:
h = header_func(i)
self.write(h)
for k, formatted_k in ordered_formatted_keys:
# If a filter was specified for a value that's not there, that's
# ok, just skip it
if k not in i:
continue
v = i[k]
if isinstance(v, dict):
self.write(line_template % (formatted_k + ':', ''))
self.render_document_list([v], indent=indent+step)
continue
# If the value is a list, pretty it up
if isinstance(v, (tuple, list)):
if len(v) > 0 and isinstance(v[0], dict):
self.write(line_template % (formatted_k + ':', ''))
self.render_document_list(v, indent=indent+step)
continue
else:
try:
v = ', '.join(v)
except TypeError:
# This is ugly, but it's the quickest way to get around
# lists of other lists.
pass
else:
if isinstance(v, (str, unicode)):
v = v.replace('\n', ' ')
line = line_template % (formatted_k + ':', encode_unicode(v))
long_value_indent = max_key_length + spaces_between_cols + indent
line = self.wrap(line, remaining_line_indent=long_value_indent)
self.write(line, tag=TAG_DOCUMENT, skip_wrap=True)
# Only add a space if we're at the highest level of the rendering
if indent is 0:
self.render_spacer(lines=num_separator_spaces)
# Only add a space if we're at the highest level of the rendering
if indent is 0:
self.render_spacer()
开发者ID:domcleal,项目名称:pulp,代码行数:101,代码来源:core.py
注:本文中的pulp.common.util.encode_unicode函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论