本文整理汇总了Python中testtools.content.text_content函数的典型用法代码示例。如果您正苦于以下问题:Python text_content函数的具体用法?Python text_content怎么用?Python text_content使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了text_content函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: test_bug_import_script_in_testing_mode
def test_bug_import_script_in_testing_mode(self):
# Test that the bug import script works with --testing.
script = os.path.join(config.root, 'scripts', 'bug-import.py')
cache_filename = os.path.join(self.tmpdir, 'bug-map.pickle')
stdout, stderr, returncode = run_process(
(script, '--testing', '--cache', cache_filename,
self.write_example_xml()))
self.addDetail("stdout", text_content(stdout))
self.addDetail("stderr", text_content(stderr))
self.assertEqual(0, returncode)
# Find the product that was created:
match = re.search(r'Product ([^ ]+) created', stderr)
self.assertIsNotNone(match)
product_name = match.group(1)
# Find the imported bug number:
match = re.search(r'Creating Launchpad bug #(\d+)', stderr)
self.assertIsNotNone(match)
bug_id = int(match.group(1))
# Abort transaction so we can see the result:
transaction.abort()
bug = getUtility(IBugSet).get(bug_id)
self.assertEqual('A test bug', bug.title)
self.assertEqual(product_name, bug.bugtasks[0].product.name)
开发者ID:pombreda,项目名称:UnnaturalCodeFork,代码行数:26,代码来源:test_bugimport.py
示例2: run_apt_autoremove
def run_apt_autoremove(self):
if sys.platform == "win32":
return
deb_env = os.environ.copy()
deb_env.update(
{"DEBIAN_FRONTEND": "noninteractive", "DEBCONF_NONINTERACTIVE_SEEN": "true"}
)
try:
autoremove_output = subprocess.check_output(
"sudo apt-get autoremove -y".split(),
stderr=subprocess.STDOUT,
env=deb_env,
)
self.addDetail(
"apt-get autoremove output",
content.text_content(autoremove_output.decode("utf-8")),
)
except FileNotFoundError as e:
self.addDetail("apt-get autoremove error", content.text_content(str(e)))
except subprocess.CalledProcessError as e:
self.addDetail("apt-get autoremove error", content.text_content(str(e)))
self.addDetail(
"apt-get autoremove output",
content.text_content(e.output.decode("utf-8")),
)
if os.getenv("SNAPCRAFT_APT_AUTOREMOVE_CHECK_FAIL", False):
raise
开发者ID:mvo5,项目名称:snapcraft,代码行数:30,代码来源:__init__.py
示例3: test_custom_rpm_version_py_command
def test_custom_rpm_version_py_command(self):
"""Test custom rpm_version command."""
stdout, stderr, return_code = self.run_setup('rpm_version')
self.addDetail('stdout', content.text_content(stdout))
self.addDetail('stderr', content.text_content(stderr))
self.assertIn('Extracting rpm version', stdout)
self.assertEqual(return_code, 0)
开发者ID:kxepal,项目名称:pbr,代码行数:7,代码来源:test_commands.py
示例4: test_show_redundancy_summary
def test_show_redundancy_summary(self):
ret, session = self.login()
prompt = accelerated_upgrade.get_prompt(session)
search_window = len(prompt) + 1
self.addDetail('prompt', text_content(prompt))
session.sendline()
status = session.expect_exact(
[prompt], timeout=3, searchwindowsize=search_window)
self.assertEqual(0, status)
self.addDetail('received-prompt', text_content(session.after))
cmd = 'show redundancy summary'
session.sendline(cmd)
status = session.expect_exact(
[prompt, INVALID_INPUT, MORE, EOF],
timeout=5,
searchwindowsize=search_window
)
#check if get prompt after the command
self.assertEqual(0, status)
# replace LF with CR/LF (maybe different on Windows)
output = asr_9k.commands[cmd][0].replace('\n', '\r\n')
found = session.before.find(output)
self.addDetail('found', text_content(str(found)))
self.addDetail('received', text_content(session.before))
self.addDetail('expected', text_content(output))
self.assertNotEqual(-1, found)
开发者ID:ommaurya,项目名称:csm,代码行数:35,代码来源:test_ssh.py
示例5: test_pep8
def test_pep8(self):
# NOTE(jecarey): Add tests marked as off_by_default to enable testing
turn_on = set(['H106'])
if self.options.select:
turn_on.update(self.options.select)
self.options.select = tuple(turn_on)
report = pep8.BaseReport(self.options)
checker = pep8.Checker(filename=self.filename, lines=self.lines,
options=self.options, report=report)
checker.check_all()
self.addDetail('doctest', content.text_content(self.raw))
if self.code == 'Okay':
self.assertThat(
len(report.counters),
matchers.Not(matchers.GreaterThan(
len(self.options.benchmark_keys))),
"incorrectly found %s" % ', '.join(
[key for key in report.counters
if key not in self.options.benchmark_keys]))
else:
self.addDetail('reason',
content.text_content("Failed to trigger rule %s" %
self.code))
self.assertIn(self.code, report.counters)
开发者ID:HoratiusTang,项目名称:neutron,代码行数:26,代码来源:test_checks.py
示例6: _run_core
def _run_core(self):
# Add an observer to trap all logged errors.
self.case.reactor = self._reactor
error_observer = _log_observer
full_log = StringIO()
full_observer = log.FileLogObserver(full_log)
spinner = self._make_spinner()
successful, unhandled = run_with_log_observers(
[error_observer.gotEvent, full_observer.emit], self._blocking_run_deferred, spinner
)
self.case.addDetail("twisted-log", text_content(full_log.getvalue()))
logged_errors = error_observer.flushErrors()
for logged_error in logged_errors:
successful = False
self._got_user_failure(logged_error, tb_label="logged-error")
if unhandled:
successful = False
for debug_info in unhandled:
f = debug_info.failResult
info = debug_info._getDebugTracebacks()
if info:
self.case.addDetail("unhandled-error-in-deferred-debug", text_content(info))
self._got_user_failure(f, "unhandled-error-in-deferred")
junk = spinner.clear_junk()
if junk:
successful = False
self._log_user_exception(UncleanReactorError(junk))
if successful:
self.result.addSuccess(self.case, details=self.case.getDetails())
开发者ID:ClusterHQ,项目名称:testtools,代码行数:34,代码来源:deferredruntest.py
示例7: _setUp
def _setUp(self):
super(NeutronNetworkFixture, self)._setUp()
self.neutron = get_neutron_client(
project_name=self.project_fixture.name,
user_name=self.project_fixture.admin_user.name,
password=self.project_fixture.admin_user_fixture.password)
self.subnet_id = self.get_subnet_id()
cidr = CONF.network['cidr'].format(subnet=self.subnet_id)
# TODO: handle clashes and retry.
self.net_name = factory.make_obj_name("network")
self.sub_name = factory.make_obj_name("subnet")
self.network = self.neutron.create_network(
{"network": dict(name=self.net_name)})
network_id = self.network["network"]["id"]
self.subnet = self.neutron.create_subnet(
{"subnet": dict(
name=self.sub_name, network_id=network_id, cidr=cidr,
ip_version=4)})
self.addCleanup(self.delete_network)
self.addDetail(
'NeutronNetworkFixture-network',
text_content('Network %s created' % self.net_name))
self.addDetail(
'NeutronNetworkFixture-subnet',
text_content('Subnet %s created (cidr=%s)' % (
self.sub_name, cidr)))
开发者ID:testiny,项目名称:testiny,代码行数:26,代码来源:neutron.py
示例8: test_recordsets_with_zone_id
def test_recordsets_with_zone_id(self):
'''Test DNS recordsets functionality'''
sub = ''.join(random.choice(string.ascii_lowercase) for _ in range(6))
zone = '%s.example2.net.' % sub
email = '[email protected]'
name = 'www.%s' % zone
type_ = 'a'
description = 'Test recordset'
ttl = 3600
records = ['192.168.1.1']
self.addDetail('zone', content.text_content(zone))
self.addDetail('recordset', content.text_content(name))
# Create a zone to hold the tested recordset
zone_obj = self.user_cloud.create_zone(name=zone, email=email)
# Test we can create a recordset and we get it returned
created_recordset = self.user_cloud.create_recordset(zone_obj['id'],
name,
type_,
records,
description, ttl)
self.addCleanup(self.cleanup, zone, created_recordset['id'])
self.assertEqual(created_recordset['zone_id'], zone_obj['id'])
self.assertEqual(created_recordset['name'], name)
self.assertEqual(created_recordset['type'], type_.upper())
self.assertEqual(created_recordset['records'], records)
self.assertEqual(created_recordset['description'], description)
self.assertEqual(created_recordset['ttl'], ttl)
# Test that we can list recordsets
recordsets = self.user_cloud.list_recordsets(zone_obj['id'],)
self.assertIsNotNone(recordsets)
# Test we get the same recordset with the get_recordset method
get_recordset = self.user_cloud.get_recordset(zone_obj['id'],
created_recordset['id'])
self.assertEqual(get_recordset['id'], created_recordset['id'])
# Test we can update a field on the recordset and only that field
# is updated
updated_recordset = self.user_cloud.update_recordset(
zone_obj['id'],
created_recordset['id'],
ttl=7200)
self.assertEqual(updated_recordset['id'], created_recordset['id'])
self.assertEqual(updated_recordset['name'], name)
self.assertEqual(updated_recordset['type'], type_.upper())
self.assertEqual(updated_recordset['records'], records)
self.assertEqual(updated_recordset['description'], description)
self.assertEqual(updated_recordset['ttl'], 7200)
# Test we can delete and get True returned
deleted_recordset = self.user_cloud.delete_recordset(
zone, created_recordset['id'])
self.assertTrue(deleted_recordset)
开发者ID:ctrlaltdel,项目名称:neutrinator,代码行数:59,代码来源:test_recordset.py
示例9: test_logfile_contains_finish
def test_logfile_contains_finish(self):
"""Confirm that logger calls in 'finish' phase recorded
Repository layout being checked
B---C local/master
/
A---D---E upstream/master
"""
tree = [
('A', []),
('B', ['A']),
('C', ['B']),
('D', ['A']),
('E', ['D']),
]
branches = {
'head': ('master', 'C'),
'upstream': ('upstream/master', 'E'),
}
self.gittree = base.BuildTree(self.testrepo, tree, branches.values())
cmdline = self.parser.get_default('script_cmdline')
tfile = None
try:
tfile = tempfile.NamedTemporaryFile(delete=False)
# need to close to allow reopen to write
tfile.close()
cmdline.extend(['-v', '--log-file', tfile.name, '--log-level',
'debug', 'import', '--into', 'master',
'upstream/master'])
try:
output = subprocess.check_output(cmdline,
stderr=subprocess.STDOUT)
except subprocess.CalledProcessError as cpe:
self.addDetail(
'subprocess-output',
content.text_content(cpe.output.decode('utf-8')))
raise
self.addDetail(
'subprocess-output',
content.text_content(output.decode('utf-8')))
logfile_contents = open(tfile.name, 'r').read()
finally:
if tfile and os.path.exists(tfile.name):
os.remove(tfile.name)
self.assertThat(
logfile_contents,
matchers.Contains("Merging by inverting the 'ours' strategy"))
self.assertThat(
logfile_contents,
matchers.Contains("Replacing tree contents with those from"))
开发者ID:openstack,项目名称:git-upstream,代码行数:59,代码来源:test_commands.py
示例10: detailed_refcounts
def detailed_refcounts(self, track, rc, prev):
"""Report a change in reference counts, with extra detail."""
details = {
'sys-refcounts': text_content(str(rc)),
'changes': text_content(str(rc - prev)),
'track': text_content(str(track.delta)),
}
self._emit_fake_test(self.TAG_REFCOUNTS, self.TAG_REFCOUNTS, details)
开发者ID:zopefoundation,项目名称:zope.testrunner,代码行数:8,代码来源:formatter.py
示例11: refcounts
def refcounts(self, rc, prev):
"""Report a change in reference counts."""
details = {
'sys-refcounts': text_content(str(rc)),
'changes': text_content(str(rc - prev)),
}
# XXX: Emit the details dict as JSON?
self._emit_fake_test(self.TAG_REFCOUNTS, self.TAG_REFCOUNTS, details)
开发者ID:zopefoundation,项目名称:zope.testrunner,代码行数:8,代码来源:formatter.py
示例12: test_update_cluster
def test_update_cluster(self):
profile_name = "test_profile"
spec = {
"properties": {
"flavor": "m1.tiny",
"image": "cirros-0.4.0-x86_64-disk",
"networks": [
{
"network": "private"
}
],
"security_groups": [
"default"
]
},
"type": "os.nova.server",
"version": 1.0
}
self.addDetail('profile', content.text_content(profile_name))
# Test that we can create a profile and we get it returned
profile = self.user_cloud.create_cluster_profile(name=profile_name,
spec=spec)
self.addCleanup(self.cleanup_profile, profile['id'])
cluster_name = 'example_cluster'
desired_capacity = 0
self.addDetail('cluster', content.text_content(cluster_name))
# Test that we can create a cluster and we get it returned
cluster = self.user_cloud.create_cluster(
name=cluster_name, profile=profile,
desired_capacity=desired_capacity)
self.addCleanup(self.cleanup_cluster, cluster['cluster']['id'])
# Test that we can update a field on the cluster and only that field
# is updated
self.user_cloud.update_cluster(cluster['cluster']['id'],
new_name='new_cluster_name')
wait = wait_for_status(
self.user_cloud.get_cluster_by_id,
{'name_or_id': cluster['cluster']['id']}, 'status', 'ACTIVE')
self.assertTrue(wait)
cluster_update = self.user_cloud.get_cluster_by_id(
cluster['cluster']['id'])
self.assertEqual(cluster_update['id'], cluster['cluster']['id'])
self.assertEqual(cluster_update['name'], 'new_cluster_name')
self.assertEqual(cluster_update['profile_id'],
cluster['cluster']['profile_id'])
self.assertEqual(cluster_update['desired_capacity'],
cluster['cluster']['desired_capacity'])
开发者ID:ctrlaltdel,项目名称:neutrinator,代码行数:58,代码来源:test_clustering.py
示例13: test_get_cluster_receiver_by_id
def test_get_cluster_receiver_by_id(self):
profile_name = "test_profile"
spec = {
"properties": {
"flavor": "m1.tiny",
"image": "cirros-0.4.0-x86_64-disk",
"networks": [
{
"network": "private"
}
],
"security_groups": [
"default"
]
},
"type": "os.nova.server",
"version": 1.0
}
self.addDetail('profile', content.text_content(profile_name))
# Test that we can create a profile and we get it returned
profile = self.user_cloud.create_cluster_profile(name=profile_name,
spec=spec)
self.addCleanup(self.cleanup_profile, profile['id'])
cluster_name = 'example_cluster'
desired_capacity = 0
self.addDetail('cluster', content.text_content(cluster_name))
# Test that we can create a cluster and we get it returned
cluster = self.user_cloud.create_cluster(
name=cluster_name, profile=profile,
desired_capacity=desired_capacity)
self.addCleanup(self.cleanup_cluster, cluster['cluster']['id'])
receiver_name = "example_receiver"
receiver_type = "webhook"
self.addDetail('receiver', content.text_content(receiver_name))
# Test that we can create a receiver and we get it returned
receiver = self.user_cloud.create_cluster_receiver(
name=receiver_name, receiver_type=receiver_type,
cluster_name_or_id=cluster['cluster']['id'],
action='CLUSTER_SCALE_OUT')
self.addCleanup(self.cleanup_receiver, receiver['id'])
# Test that we get the same receiver with the get_receiver method
receiver_get = self.user_cloud.get_cluster_receiver_by_id(
receiver['id'])
self.assertEqual(receiver_get['id'], receiver["id"])
开发者ID:ctrlaltdel,项目名称:neutrinator,代码行数:58,代码来源:test_clustering.py
示例14: test_yaml_snippet
def test_yaml_snippet(self):
if not self.in_filename:
return
if self.conf_filename is not None:
config = configparser.ConfigParser()
config.readfp(open(self.conf_filename))
else:
config = {}
expected_xml = self._read_utf8_content()
yaml_content = self._read_yaml_content(self.in_filename)
if isinstance(yaml_content, list):
yaml_content = yaml_content[0]
project = None
if ('project-type' in yaml_content):
yaml_content['project-type']
if (yaml_content['project-type'] == "maven"):
project = project_maven.Maven(None)
elif (yaml_content['project-type'] == "matrix"):
project = project_matrix.Matrix(None)
elif (yaml_content['project-type'] == "flow"):
project = project_flow.Flow(None)
elif (yaml_content['project-type'] == "multijob"):
project = project_multijob.MultiJob(None)
elif (yaml_content['project-type'] == "folder"):
project = folders.Folder(None)
if project:
xml_project = project.root_xml(yaml_content)
else:
xml_project = XML.Element('project')
print yaml_content
plugins_info = None
if self.plugins_info_filename is not None:
plugins_info = self._read_yaml_content(self.plugins_info_filename)
self.addDetail("plugins-info-filename",
text_content(self.plugins_info_filename))
self.addDetail("plugins-info",
text_content(str(plugins_info)))
parser = YamlParser(config, plugins_info)
pub = self.klass(parser.registry)
# Generate the XML tree directly with modules/general
pub.gen_xml(parser, xml_project, yaml_content)
# Prettify generated XML
pretty_xml = XmlJob(xml_project, 'fixturejob').output().decode('utf-8')
self.assertThat(
pretty_xml,
testtools.matchers.DocTestMatches(expected_xml,
doctest.ELLIPSIS |
doctest.NORMALIZE_WHITESPACE |
doctest.REPORT_NDIFF)
)
开发者ID:dominicscimeca,项目名称:jenkins-job-builder-addons,代码行数:58,代码来源:base.py
示例15: test_recordsets
def test_recordsets(self):
'''Test DNS recordsets functionality'''
zone = 'example2.net.'
email = '[email protected]'
name = 'www'
type_ = 'a'
description = 'Test recordset'
ttl = 3600
records = ['192.168.1.1']
self.addDetail('zone', content.text_content(zone))
self.addDetail('recordset', content.text_content(name))
self.addCleanup(self.cleanup, zone, name)
# Create a zone to hold the tested recordset
zone_obj = self.user_cloud.create_zone(name=zone, email=email)
# Test we can create a recordset and we get it returned
created_recordset = self.user_cloud.create_recordset(zone, name, type_,
records,
description, ttl)
self.assertEqual(created_recordset['zone_id'], zone_obj['id'])
self.assertEqual(created_recordset['name'], name + '.' + zone)
self.assertEqual(created_recordset['type'], type_.upper())
self.assertEqual(created_recordset['records'], records)
self.assertEqual(created_recordset['description'], description)
self.assertEqual(created_recordset['ttl'], ttl)
# Test that we can list recordsets
recordsets = self.user_cloud.list_recordsets(zone)
self.assertIsNotNone(recordsets)
# Test we get the same recordset with the get_recordset method
get_recordset = self.user_cloud.get_recordset(zone,
created_recordset['id'])
self.assertEqual(get_recordset['id'], created_recordset['id'])
# Test the get method also works by name
get_recordset = self.user_cloud.get_recordset(zone, name + '.' + zone)
self.assertEqual(get_recordset['id'], created_recordset['id'])
# Test we can update a field on the recordset and only that field
# is updated
updated_recordset = self.user_cloud.update_recordset(zone_obj['id'],
name + '.' + zone,
ttl=7200)
self.assertEqual(updated_recordset['id'], created_recordset['id'])
self.assertEqual(updated_recordset['name'], name + '.' + zone)
self.assertEqual(updated_recordset['type'], type_.upper())
self.assertEqual(updated_recordset['records'], records)
self.assertEqual(updated_recordset['description'], description)
self.assertEqual(updated_recordset['ttl'], 7200)
# Test we can delete and get True returned
deleted_recordset = self.user_cloud.delete_recordset(
zone, name + '.' + zone)
self.assertTrue(deleted_recordset)
开发者ID:dtroyer,项目名称:python-openstacksdk,代码行数:57,代码来源:test_recordset.py
示例16: addSkip
def addSkip(self, test, reason=None, details=None):
super(TestByTestResult, self).addSkip(test, reason, details)
self._status = 'skip'
if details is None:
details = {'reason': text_content(reason)}
elif reason:
# XXX: What if details already has 'reason' key?
details['reason'] = text_content(reason)
self._details = details
开发者ID:AIdrifter,项目名称:samba,代码行数:9,代码来源:real.py
示例17: test_yaml_snippet
def test_yaml_snippet(self):
if not self.in_filename:
return
jjb_config = self._get_config()
expected_xml = self._read_utf8_content()
yaml_content = self._read_yaml_content(self.in_filename)
plugins_info = None
if self.plugins_info_filename is not None:
plugins_info = self._read_yaml_content(self.plugins_info_filename)
self.addDetail("plugins-info-filename", text_content(self.plugins_info_filename))
self.addDetail("plugins-info", text_content(str(plugins_info)))
parser = YamlParser(jjb_config)
registry = ModuleRegistry(jjb_config, plugins_info)
registry.set_parser_data(parser.data)
pub = self.klass(registry)
project = None
if "project-type" in yaml_content:
if yaml_content["project-type"] == "maven":
project = project_maven.Maven(registry)
elif yaml_content["project-type"] == "matrix":
project = project_matrix.Matrix(registry)
elif yaml_content["project-type"] == "flow":
project = project_flow.Flow(registry)
elif yaml_content["project-type"] == "multijob":
project = project_multijob.MultiJob(registry)
elif yaml_content["project-type"] == "externaljob":
project = project_externaljob.ExternalJob(registry)
if "view-type" in yaml_content:
if yaml_content["view-type"] == "list":
project = view_list.List(None)
elif yaml_content["view-type"] == "pipeline":
project = view_pipeline.Pipeline(None)
else:
raise InvalidAttributeError("view-type", yaml_content["view-type"])
if project:
xml_project = project.root_xml(yaml_content)
else:
xml_project = XML.Element("project")
# Generate the XML tree directly with modules/general
pub.gen_xml(xml_project, yaml_content)
# Prettify generated XML
pretty_xml = XmlJob(xml_project, "fixturejob").output().decode("utf-8")
self.assertThat(
pretty_xml, testtools.matchers.DocTestMatches(expected_xml, doctest.ELLIPSIS | doctest.REPORT_NDIFF)
)
开发者ID:openstack-infra,项目名称:jenkins-job-builder,代码行数:56,代码来源:base.py
示例18: test_json_renderer
def test_json_renderer(self):
context = json.loads(TEST_JSON)
x = renderers.JsonRenderer()
result = x.render('{{a.b}}', context)
self.addDetail('result', content.text_content(result))
result_structure = json.loads(result)
desire_structure = json.loads('[1,2,3,"foo"]')
self.assertEqual(desire_structure, result_structure)
result = x.render('{{a.c}}', context)
self.addDetail('result', content.text_content(result))
self.assertEqual(u'the quick brown fox', result)
开发者ID:AsherBond,项目名称:os-apply-config,代码行数:11,代码来源:test_json_renderer.py
示例19: test_smoke
def test_smoke(self):
diff = create_proper_job(self.factory)
transaction.commit()
out, err, exit_code = run_script(
"LP_DEBUG_SQL=1 cronscripts/process-job-source.py -vv %s" % (IPackageDiffJobSource.getName())
)
self.addDetail("stdout", text_content(out))
self.addDetail("stderr", text_content(err))
self.assertEqual(0, exit_code)
self.assertEqual(PackageDiffStatus.COMPLETED, diff.status)
self.assertIsNot(None, diff.diff_content)
开发者ID:vitaminmoo,项目名称:unnaturalcode,代码行数:12,代码来源:test_packagediffjob.py
示例20: setUp
def setUp(self):
# add description in case parent setup fails.
self.addDetail('description', text_content(self.desc))
# builds the tree to be tested
super(TestUpstreamMergeBaseSearcher, self).setUp()
# need the tree built at this point
self.addDetail('expected-changes',
text_content(pformat(
list((c, self.gittree.graph[c].hexsha)
for c in self.expected_changes))))
开发者ID:openstack,项目名称:git-upstream,代码行数:12,代码来源:test_searchers.py
注:本文中的testtools.content.text_content函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论