本文整理汇总了Python中utils.providers.list_providers函数的典型用法代码示例。如果您正苦于以下问题:Python list_providers函数的具体用法?Python list_providers怎么用?Python list_providers使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了list_providers函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: setup_one_or_skip
def setup_one_or_skip(request, filters=None, use_global_filters=True):
""" Sets up one of matching providers or skips the test
Args:
filters: List of :py:class:`ProviderFilter` or None
request: Needed for logging a potential skip correctly in artifactor
use_global_filters: Will apply global filters as well if `True`, will not otherwise
"""
filters = filters or []
providers = list_providers(filters=filters, use_global_filters=use_global_filters)
# All providers filtered out?
if not providers:
global_providers = list_providers(filters=None, use_global_filters=use_global_filters)
if not global_providers:
# This can also mean that there simply are no providers in the yamls!
pytest.skip("No provider matching global filters found")
else:
pytest.skip("No provider matching test-specific filters found")
# Are all providers marked as problematic?
if _problematic_providers.issuperset(providers):
skip_msg = "All providers marked as problematic: {}".format([p.key for p in providers])
_artifactor_skip_providers(request, providers, skip_msg)
# If there is a provider already set up matching the user's requirements, reuse it
for provider in providers:
if provider.exists:
return provider
# If we have more than one provider, we create two separate groups of providers, preferred
# and not preferred, that we shuffle separately and then join together
if len(providers) > 1:
only_preferred_filter = ProviderFilter(required_fields=[("do_not_prefer", True)],
inverted=True)
preferred_providers = list_providers(
filters=filters + [only_preferred_filter], use_global_filters=use_global_filters)
not_preferred_providers = [p for p in providers if p not in preferred_providers]
random.shuffle(preferred_providers)
random.shuffle(not_preferred_providers)
providers = preferred_providers + not_preferred_providers
# Try to set up one of matching providers
non_existing = [prov for prov in providers if not prov.exists]
for provider in non_existing:
if _setup_provider_verbose(request, provider):
return provider
skip_msg = "Failed to set up any matching providers: {}", [p.key for p in providers]
_artifactor_skip_providers(request, non_existing, skip_msg)
开发者ID:dajohnso,项目名称:cfme_tests,代码行数:51,代码来源:provider.py
示例2: run
def run(**kwargs):
try:
thread_queue = []
for provider in list_providers("virtualcenter"):
mgmt_sys = cfme_data['management_systems'][provider]
creds = credentials[mgmt_sys['credentials']]
hostname = mgmt_sys['hostname']
username = creds['username']
password = creds['password']
default_name = credentials['host_default']['username']
default_password = credentials['host_default']['password']
host_ip = mgmt_sys['ipaddress']
client = VMWareSystem(hostname, username, password)
if not net.is_pingable(host_ip):
continue
thread = Thread(target=upload_template,
args=(client, hostname, username, password, provider,
kwargs.get('image_url'), kwargs.get('template_name'),
default_name, default_password))
thread.daemon = True
thread_queue.append(thread)
thread.start()
for thread in thread_queue:
thread.join()
except Exception as e:
print(e)
return False
开发者ID:prachiyadav,项目名称:cfme_tests,代码行数:32,代码来源:template_upload_vsphere.py
示例3: run
def run(**kwargs):
thread_queue = []
for provider in list_providers("openstack"):
mgmt_sys = cfme_data['management_systems'][provider]
rhos_credentials = credentials[mgmt_sys['credentials']]
default_host_creds = credentials['host_default']
username = rhos_credentials['username']
password = rhos_credentials['password']
auth_url = mgmt_sys['auth_url']
rhosip = mgmt_sys['ipaddress']
sshname = default_host_creds['username']
sshpass = default_host_creds['password']
if not net.is_pingable(rhosip):
continue
if not net.net_check(ports.SSH, rhosip):
print("SSH connection to {}:{} failed, port unavailable".format(
provider, ports.SSH))
continue
thread = Thread(target=upload_template,
args=(rhosip, sshname, sshpass, username, password, auth_url, provider,
kwargs.get('image_url'), kwargs.get('template_name')))
thread.daemon = True
thread_queue.append(thread)
thread.start()
for thread in thread_queue:
thread.join()
开发者ID:prachiyadav,项目名称:cfme_tests,代码行数:29,代码来源:template_upload_rhos.py
示例4: test_containers_overview_data_integrity
def test_containers_overview_data_integrity():
"""Test data integrity of status boxes in containers dashboard.
Steps:
* Go to Containers / Overview
* All cells should contain the correct relevant information
# of nodes
# of providers
# ...
"""
navigate_to(ContainersOverview, 'All')
# We should wait ~2 seconds for the StatusBox population
# (until we find a better solution)
time.sleep(2)
statusbox_values = {data_set.object: int(StatusBox(data_set.name).value())
for data_set in DATA_SETS}
api_values = get_api_object_counts(
map(lambda name: ContainersProvider(name, key=name),
list_providers('openshift'))
)
results = {}
for cls in DATA_SETS:
results[cls.object] = api_values[cls.object] == statusbox_values[cls.object]
if not all(results.values()):
pytest.fail('There is a mismatch between API and UI values:\n{}'.format(
'\n'.join(['{}: {} (API) != {} (UI)'.format(
obj.__name__, api_values[obj], statusbox_values[obj])
for obj, is_pass in results.items() if not is_pass])))
开发者ID:ManageIQ,项目名称:integration_tests,代码行数:27,代码来源:test_containers_overview_data_integrity.py
示例5: run
def run(**kwargs):
"""Calls all the functions needed to upload new template to RHEVM.
This is called either by template_upload_all script, or by main function.
Args:
**kwargs: Kwargs generated from cfme_data['template_upload']['template_upload_rhevm'].
"""
thread_queue = []
valid_providers = []
providers = list_providers("rhevm")
if kwargs['provider_data']:
mgmt_sys = providers = kwargs['provider_data']['management_systems']
for provider in providers:
if kwargs['provider_data']:
if mgmt_sys[provider]['type'] != 'rhevm':
continue
sshname = mgmt_sys[provider]['sshname']
sshpass = mgmt_sys[provider]['sshpass']
rhevip = mgmt_sys[provider]['ipaddress']
else:
mgmt_sys = cfme_data['management_systems']
ssh_rhevm_creds = mgmt_sys[provider]['ssh_creds']
sshname = credentials[ssh_rhevm_creds]['username']
sshpass = credentials[ssh_rhevm_creds]['password']
rhevip = mgmt_sys[provider]['ipaddress']
print("RHEVM:{} verifying provider's state before template upload".format(provider))
if not net.is_pingable(rhevip):
continue
elif not is_ovirt_engine_running(rhevip, sshname, sshpass):
print('RHEVM:{} ovirt-engine service not running..'.format(provider))
continue
valid_providers.append(provider)
for provider in valid_providers:
if kwargs['provider_data']:
sshname = mgmt_sys[provider]['sshname']
sshpass = mgmt_sys[provider]['sshpass']
username = mgmt_sys[provider]['username']
password = mgmt_sys[provider]['password']
else:
ssh_rhevm_creds = mgmt_sys[provider]['ssh_creds']
sshname = credentials[ssh_rhevm_creds]['username']
sshpass = credentials[ssh_rhevm_creds]['password']
rhevm_credentials = mgmt_sys[provider]['credentials']
username = credentials[rhevm_credentials]['username']
password = credentials[rhevm_credentials]['password']
rhevip = mgmt_sys[provider]['ipaddress']
thread = Thread(target=upload_template,
args=(rhevip, sshname, sshpass, username, password, provider,
kwargs.get('image_url'), kwargs.get('template_name'),
kwargs['provider_data']))
thread.daemon = True
thread_queue.append(thread)
thread.start()
for thread in thread_queue:
thread.join()
开发者ID:pavelzag,项目名称:cfme_tests,代码行数:59,代码来源:template_upload_rhevm.py
示例6: servers_in_mgmt
def servers_in_mgmt(cls, provider=None, server_group=None):
if provider is None:
servers = []
for _provider in list_providers('hawkular'):
servers.extend(cls._servers_in_mgmt(get_crud(_provider), server_group))
return servers
else:
return cls._servers_in_mgmt(provider, server_group)
开发者ID:RonnyPfannschmidt,项目名称:cfme_tests,代码行数:8,代码来源:server.py
示例7: datasources_in_mgmt
def datasources_in_mgmt(cls, provider=None, server=None):
if provider is None:
datasources = []
for _provider in list_providers('hawkular'):
datasources.extend(cls._datasources_in_mgmt(get_crud(_provider), server))
return datasources
else:
return cls._datasources_in_mgmt(provider, server)
开发者ID:jteehan,项目名称:cfme_tests,代码行数:8,代码来源:datasource.py
示例8: deployments_in_mgmt
def deployments_in_mgmt(cls, provider=None, server=None):
if provider is None:
deployments = []
for _provider in list_providers('hawkular'):
deployments.extend(cls._deployments_in_mgmt(get_crud(_provider), server))
return deployments
else:
return cls._deployments_in_mgmt(provider, server)
开发者ID:FilipB,项目名称:cfme_tests,代码行数:8,代码来源:deployment.py
示例9: templates_uploaded_on_providers
def templates_uploaded_on_providers(api, stream, template):
if get_untested_templates(api, stream, template):
print('report will not be generated, proceed with the next untested provider')
sys.exit()
for temp in api.template.get(
limit=1, tested=False, group__name=stream).get('objects', []):
if 'template_rhevm' in images_uploaded(stream):
if not provider_in_the_list(list_providers('rhevm'), temp['providers']):
return False
if 'template_rhos' in images_uploaded(stream):
if not provider_in_the_list(list_providers('openstack'), temp['providers']):
return False
if 'template_vsphere' in images_uploaded(stream):
if not provider_in_the_list(list_providers('virtualcenter'), temp['providers']):
return False
if 'template_scvmm' in images_uploaded(stream):
if not provider_in_the_list(list_providers('scvmm'), temp['providers']):
return False
return True
开发者ID:RonnyPfannschmidt,项目名称:cfme_tests,代码行数:19,代码来源:latest_template_tester_report.py
示例10: ec2cleanup
def ec2cleanup(texts, max_hours, exclude_instances, exclude_volumes, exclude_eips, output):
for provider in list_providers('ec2'):
ec2provider = get_mgmt(provider)
logger.info("\n" + provider + ":\n")
logger.info("Deleted instances:")
delete_old_instances(texts=texts, ec2provider=ec2provider, provider_key=provider,
date=datetime.datetime.now(), maxhours=max_hours,
excluded_instances=exclude_instances, output=output)
time.sleep(120)
logger.info("\nReleased addresses:")
delete_disassociated_addresses(ec2provider=ec2provider, excluded_eips=exclude_eips)
logger.info("\nDeleted volumes:")
delete_unattached_volumes(ec2provider=ec2provider, excluded_volumes=exclude_volumes)
开发者ID:kzvyahin,项目名称:cfme_tests,代码行数:13,代码来源:ec2cleanup.py
示例11: ec2cleanup
def ec2cleanup(max_hours, exclude_instances, exclude_volumes, exclude_eips):
for provider in list_providers("ec2"):
ec2provider = get_mgmt(provider)
logger.info("\n" + provider + ":\n")
logger.info("Deleted instances:")
delete_old_instances(
ec2provider=ec2provider, date=datetime.now(), maxhours=max_hours, excluded_instances=exclude_instances
)
sleep(120)
logger.info("\nReleased addresses:")
delete_disassociated_addresses(ec2provider=ec2provider, excluded_ips=exclude_eips)
logger.info("\nDeleted volumes:")
delete_unattached_volumes(ec2provider=ec2provider, excluded_volumes=exclude_volumes)
开发者ID:RonnyPfannschmidt,项目名称:cfme_tests,代码行数:13,代码来源:ec2cleanup.py
示例12: main
def main(*providers):
for provider_key in list_providers('openstack'):
print('Checking {}'.format(provider_key))
api = get_mgmt(provider_key).api
try:
fips = api.floating_ips.findall(fixed_ip=None)
except Exception:
print('Unable to get fips for {}:'.format(provider_key))
print(format_exc().splitlines()[-1])
continue
for fip in fips:
print('Deleting {} on {}'.format(fip.ip, provider_key))
fip.delete()
print('{} deleted'.format(fip.ip))
开发者ID:FilipB,项目名称:cfme_tests,代码行数:15,代码来源:cleanup_openstack_fips.py
示例13: run
def run(**kwargs):
"""Calls all the functions needed to import new template to EC2.
This is called either by template_upload_all script, or by main function.
Args:
**kwargs: Kwargs are passed by template_upload_all.
"""
mgmt_sys = cfme_data['management_systems']
for provider in list_providers('ec2'):
ssh_rhevm_creds = mgmt_sys[provider]['credentials']
username = credentials[ssh_rhevm_creds]['username']
password = credentials[ssh_rhevm_creds]['password']
upload_bucket_name = mgmt_sys[provider]['upload_bucket_name']
upload_template(provider, username, password, upload_bucket_name, kwargs.get(
'image_url'), kwargs.get('template_name'))
开发者ID:RonnyPfannschmidt,项目名称:cfme_tests,代码行数:15,代码来源:template_upload_ec2.py
示例14: pytest_configure
def pytest_configure(config):
""" Filters the list of providers as part of pytest configuration
Note:
Additional filter is added to the global_filters dict of active filters here.
"""
cmd_filter = config.getvalueorskip('use_provider')
if not cmd_filter:
cmd_filter = ["default"]
new_filter = ProviderFilter(keys=cmd_filter, required_tags=cmd_filter, conjunctive=False)
global_filters['use_provider'] = new_filter
logger.debug('Filtering providers with {}, leaves {}'.format(
cmd_filter, [prov.key for prov in list_providers()]))
开发者ID:dajohnso,项目名称:cfme_tests,代码行数:16,代码来源:prov_filter.py
示例15: clear_providers_by_class
def clear_providers_by_class(prov_class, validate=True):
""" Removes all providers that are an instance of given class or one of it's subclasses
"""
from utils.providers import ProviderFilter, list_providers
pf = ProviderFilter(classes=[prov_class])
provs = list_providers(filters=[pf], use_global_filters=False)
# First, delete all
deleted_provs = []
for prov in provs:
existed = prov.delete_if_exists(cancel=False)
if existed:
deleted_provs.append(prov)
# Then, check that all were deleted
if validate:
for prov in deleted_provs:
prov.wait_for_delete()
开发者ID:rananda,项目名称:cfme_tests,代码行数:17,代码来源:provider.py
示例16: run
def run(**kwargs):
thread_queue = []
for provider in list_providers("gce"):
mgmt_sys = cfme_data['management_systems'][provider]
gce_credentials = credentials[mgmt_sys['credentials']]
service_account = gce_credentials['service_account']
project = mgmt_sys['project']
zone = mgmt_sys['zone']
thread = Thread(target=upload_template,
args=(project, zone, service_account, kwargs.get('image_url'),
kwargs.get('template_name'), kwargs.get('bucket_name'), provider))
thread.daemon = True
thread_queue.append(thread)
thread.start()
for thread in thread_queue:
thread.join()
开发者ID:MattLombana,项目名称:cfme_tests,代码行数:18,代码来源:template_upload_gce.py
示例17: providers
def providers(metafunc, filters=None):
""" Gets providers based on given (+ global) filters
Note:
Using the default 'function' scope, each test will be run individually for each provider
before moving on to the next test. To group all tests related to single provider together,
parametrize tests in the 'module' scope.
Note:
testgen for providers now requires the usage of test_flags for collection to work.
Please visit http://cfme-tests.readthedocs.org/guides/documenting.html#documenting-tests
for more details.
"""
filters = filters or []
argnames = []
argvalues = []
idlist = []
# Obtains the test's flags in form of a ProviderFilter
meta = getattr(metafunc.function, 'meta', None)
test_flag_str = getattr(meta, 'kwargs', {}).get('from_docs', {}).get('test_flag')
if test_flag_str:
test_flags = test_flag_str.split(',')
flags_filter = ProviderFilter(required_flags=test_flags)
filters = filters + [flags_filter]
for provider in list_providers(filters):
argvalues.append([provider])
# Use the provider key for idlist, helps with readable parametrized test output
idlist.append(provider.key)
# Add provider to argnames if missing
if 'provider' in metafunc.fixturenames and 'provider' not in argnames:
metafunc.function = pytest.mark.uses_testgen()(metafunc.function)
argnames.append('provider')
if metafunc.config.getoption('sauce'):
break
return argnames, argvalues, idlist
开发者ID:dajohnso,项目名称:cfme_tests,代码行数:38,代码来源:testgen.py
示例18: get_orphaned_vmware_files
def get_orphaned_vmware_files(provider=None):
providers = [provider] if provider else list_providers("virtualcenter")
for provider_key in providers:
# we can add thread here
get_datastores_per_host(provider_key)
开发者ID:RonnyPfannschmidt,项目名称:cfme_tests,代码行数:6,代码来源:get_unregistered_vmware_files.py
示例19: main
def main(trackerbot_url, mark_usable=None):
api = trackerbot.api(trackerbot_url)
thread_q = []
thread_lock = Lock()
template_providers = defaultdict(list)
all_providers = set(list_providers())
unresponsive_providers = set()
# Queue up list_template calls
for provider_key in all_providers:
thread = Thread(target=get_provider_templates,
args=(provider_key, template_providers, unresponsive_providers, thread_lock))
thread_q.append(thread)
thread.start()
# Join the queued calls
for thread in thread_q:
thread.join()
seen_templates = set()
if mark_usable is None:
usable = {}
else:
usable = {'usable': mark_usable}
existing_provider_templates = [
pt['id']
for pt
in trackerbot.depaginate(api, api.providertemplate.get())['objects']]
# Find some templates and update the API
for template_name, providers in template_providers.items():
template_name = str(template_name)
group_name, datestamp, stream = trackerbot.parse_template(template_name)
# Don't want sprout templates
if group_name in ('sprout', 'rhevm-internal'):
print('Ignoring {} from group {}'.format(template_name, group_name))
continue
seen_templates.add(template_name)
group = trackerbot.Group(group_name, stream=stream)
template = trackerbot.Template(template_name, group, datestamp)
for provider_key in providers:
provider = trackerbot.Provider(provider_key)
if '{}_{}'.format(template_name, provider_key) in existing_provider_templates:
print('Template {} already tracked for provider {}'.format(
template_name, provider_key))
continue
try:
trackerbot.mark_provider_template(api, provider, template, **usable)
print('Added {} template {} on provider {} (datestamp: {})'.format(
group_name, template_name, provider_key, datestamp))
except SlumberHttpBaseException as ex:
print("{}\t{}".format(ex.response.status_code, ex.content))
# Remove provider relationships where they no longer exist, skipping unresponsive providers,
# and providers not known to this environment
for pt in trackerbot.depaginate(api, api.providertemplate.get())['objects']:
provider_key, template_name = pt['provider']['key'], pt['template']['name']
if provider_key not in template_providers[template_name] \
and provider_key not in unresponsive_providers:
if provider_key in all_providers:
print("Cleaning up template {} on {}".format(template_name, provider_key))
trackerbot.delete_provider_template(api, provider_key, template_name)
else:
print("Skipping template cleanup {} on unknown provider {}".format(
template_name, provider_key))
# Remove templates that aren't on any providers anymore
for template in trackerbot.depaginate(api, api.template.get())['objects']:
if not template['providers']:
print("Deleting template {} (no providers)".format(template['name']))
api.template(template['name']).delete()
开发者ID:FilipB,项目名称:cfme_tests,代码行数:79,代码来源:sync_template_tracker.py
示例20: run
def run(**kwargs):
for provider in list_providers("scvmm"):
kwargs = make_kwargs_scvmm(cfme_data, provider,
kwargs.get('image_url'), kwargs.get('template_name'))
check_kwargs(**kwargs)
mgmt_sys = cfme_data['management_systems'][provider]
hostname_fqdn = mgmt_sys['hostname']
creds = credentials[mgmt_sys['credentials']]
# For powershell to work, we need to extract the User Name from the Domain
user = creds['username'].split('\\')
if len(user) == 2:
username_powershell = user[1]
else:
username_powershell = user[0]
username_scvmm = creds['domain'] + "\\" + creds['username']
scvmm_args = {
"hostname": mgmt_sys['ipaddress'],
"username": username_powershell,
"password": creds['password'],
"domain": creds['domain'],
}
client = SCVMMSystem(**scvmm_args)
url = kwargs.get('image_url', None)
# Template name equals either user input of we extract the name from the url
new_template_name = kwargs.get('template_name', None)
if new_template_name is None:
new_template_name = os.path.basename(url)[:-4]
print("SCVMM:{} started template {} upload".format(provider, new_template_name))
print("SCVMM:{} Make Template out of the VHD {}".format(provider, new_template_name))
# use_library is either user input or we use the cfme_data value
use_library = kwargs.get('library', None)
if use_library is None:
use_library = mgmt_sys['template_upload'].get('library', None) + "\\VHDS\\"
print("SCVMM:{} Template Library: {}".format(provider, use_library))
# The VHD name changed, match the template_name.
new_vhd_name = new_template_name + '.vhd'
use_network = mgmt_sys['template_upload'].get('network', None)
use_os_type = mgmt_sys['template_upload'].get('os_type', None)
cores = mgmt_sys['template_upload'].get('cores', None)
ram = mgmt_sys['template_upload'].get('ram', None)
# Uses PowerShell Get-SCVMTemplate to return a list of templates and aborts if exists.
if not check_template_exists(client, new_template_name):
if kwargs.get('upload'):
upload_vhd(client, url, use_library, new_vhd_name)
if kwargs.get('template'):
print("SCVMM:{} Make Template out of the VHD {}".format(
provider, new_template_name))
make_template(
client,
hostname_fqdn,
new_template_name,
use_library,
use_network,
use_os_type,
username_scvmm,
cores,
ram
)
try:
wait_for(check_template_exists,
[client, new_template_name], fail_condition=False, delay=5)
print("SCVMM:{} template {} uploaded successfully".format(
provider, new_template_name))
except Exception as e:
print(e)
print("SCVMM:{} Exception occured while verifying the template {} upload".format(
provider, new_template_name))
else:
print("SCVMM: A Template with that name already exists in the SCVMMLibrary")
开发者ID:MattLombana,项目名称:cfme_tests,代码行数:83,代码来源:template_upload_scvmm.py
注:本文中的utils.providers.list_providers函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论