本文整理汇总了Python中utils.ssh.SSHClient类的典型用法代码示例。如果您正苦于以下问题:Python SSHClient类的具体用法?Python SSHClient怎么用?Python SSHClient使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了SSHClient类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: setup_external_auth_ipa
def setup_external_auth_ipa(**data):
"""Sets up the appliance for an external authentication with IPA.
Keywords:
get_groups: Get User Groups from External Authentication (httpd).
ipaserver: IPA server address.
iparealm: Realm.
credentials: Key of the credential in credentials.yaml
"""
ssh = SSHClient()
ensure_browser_open()
login_admin()
if data["ipaserver"] not in get_ntp_servers():
set_ntp_servers(data["ipaserver"])
sleep(120)
auth = ExternalAuthSetting(get_groups=data.pop("get_groups", False))
auth.setup()
logout()
creds = credentials.get(data.pop("credentials"), {})
data.update(**creds)
rc, out = ssh.run_command(
"appliance_console_cli --ipaserver {ipaserver} --iparealm {iparealm} "
"--ipaprincipal {principal} --ipapassword {password}".format(**data)
)
assert rc == 0, out
assert "failed" not in out.lower(), "External auth setup failed:\n{}".format(out)
login_admin()
开发者ID:MattLombana,项目名称:cfme_tests,代码行数:27,代码来源:ext_auth.py
示例2: get_appliance
def get_appliance(provider):
'''Fixture to provision appliance to the provider being tested if necessary'''
global appliance_list
global appliance_vm_name
if provider not in appliance_list:
if ('appliances_provider' not in cfme_data['basic_info'].keys() or
provider != cfme_data['basic_info']['appliances_provider']):
appliance_list[provider] = provision_appliance(provider)
else:
appliance_list[provider] = re.findall(r'[0-9]+(?:\.[0-9]+){3}', conf.env['base_url'])[0]
prov_data = cfme_data['management_systems'][provider]
if prov_data['type'] == 'virtualcenter':
# ssh in and see if vddk already present, if not, install
ssh_kwargs = {
'username': conf.credentials['ssh']['username'],
'password': conf.credentials['ssh']['password'],
'hostname': appliance_list[provider]
}
# Init SSH client
client = SSHClient(**ssh_kwargs)
if int(client.run_command("ldconfig -p | grep vix | wc -l")[1]) < 1:
install_vddk(appliance_list[provider])
client.close()
elif prov_data['type'] == 'rhevm':
add_rhev_direct_lun_disk(provider, appliance_vm_name)
return appliance_list[provider]
开发者ID:jkrocil,项目名称:cfme_tests,代码行数:27,代码来源:test_vm_analysis.py
示例3: net_check_remote
def net_check_remote(port, addr=None, machine_addr=None, ssh_creds=None, force=False):
"""Checks the availability of a port from outside using another machine (over SSH)"""
from utils.ssh import SSHClient
port = int(port)
if not addr:
addr = my_ip_address()
if port not in _ports[addr] or force:
if not machine_addr:
machine_addr = urlparse.urlparse(store.base_url).hostname
if not ssh_creds:
ssh = store.current_appliance.ssh_client
else:
ssh = SSHClient(
hostname=machine_addr,
username=ssh_creds['username'],
password=ssh_creds['password']
)
with ssh:
# on exception => fails with return code 1
cmd = '''python -c "
import sys, socket
addr = socket.gethostbyname('%s')
socket.create_connection((addr, %d), timeout=10)
sys.exit(0)
"''' % (addr, port)
ret, out = ssh.run_command(cmd)
if ret == 0:
_ports[addr][port] = True
else:
_ports[addr][port] = False
return _ports[addr][port]
开发者ID:FilipB,项目名称:cfme_tests,代码行数:31,代码来源:net.py
示例4: main
def main():
parser = argparse.ArgumentParser(epilog=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter)
parser.add_argument("address", help="hostname or ip address of target appliance")
parser.add_argument("sdk_url", help="url to download sdk pkg")
parser.add_argument(
"--restart",
help="restart evmserverd after installation " + "(required for proper operation)",
action="store_true",
)
args = parser.parse_args()
ssh_kwargs = {
"username": credentials["ssh"]["username"],
"password": credentials["ssh"]["password"],
"hostname": args.address,
}
# Init SSH client
client = SSHClient(**ssh_kwargs)
# start
filename = args.sdk_url.split("/")[-1]
foldername = os.path.splitext(filename)[0]
# download
print "Downloading sdk"
status, out = client.run_command(
"curl %(url)s -o %(file)s > /root/unzip.out 2>&1" % {"url": args.sdk_url, "file": filename}
)
# extract
print "Extracting sdk (" + filename + ")"
status, out = client.run_command("unzip -o -f -d /var/www/miq/vmdb/lib/ %s" % filename)
if status != 0:
print out
sys.exit(1)
# install
print "Installing sdk (" + foldername + ")"
status, out = client.run_command(
'echo "export LD_LIBRARY_PATH=\$LD_LIBRARY_PATH:'
+ "/var/www/miq/vmdb/lib/"
+ foldername
+ '/lib/linux-64" >> /etc/default/evm'
)
if status != 0:
print "SDK installation failure (rc:" + out + ")"
print out
sys.exit(1)
# service evmserverd restart
if args.restart:
print "Appliance restart"
status, out = client.run_command("service evmserverd restart")
print "evmserverd restarted, the UI should start shortly."
else:
print "evmserverd must be restarted before netapp sdk can be used"
开发者ID:rlandy,项目名称:cfme_tests,代码行数:58,代码来源:install_netapp_lib.py
示例5: disable_external_auth_ipa
def disable_external_auth_ipa():
"""Unconfigure external auth."""
ssh = SSHClient()
ensure_browser_open()
login_admin()
auth = DatabaseAuthSetting()
auth.update()
rc, out = ssh.run_command("appliance_console_cli --uninstall-ipa")
assert rc == 0, out
开发者ID:MattLombana,项目名称:cfme_tests,代码行数:9,代码来源:ext_auth.py
示例6: disable_external_auth_ipa
def disable_external_auth_ipa():
"""Unconfigure external auth."""
ssh = SSHClient()
ensure_browser_open()
login_admin()
auth = DatabaseAuthSetting()
auth.update()
assert ssh.run_command("appliance_console_cli --uninstall-ipa")
appliance.IPAppliance().wait_for_web_ui()
logout()
开发者ID:rananda,项目名称:cfme_tests,代码行数:10,代码来源:ext_auth.py
示例7: use_storage
def use_storage(uses_ssh):
ssh_client = SSHClient()
if ssh_client.appliance_has_netapp():
return
if not current_version().is_in_series("5.2"):
pytest.skip("Storage tests run only on .2 so far")
subprocess.call("python ./scripts/install_netapp_lib.py --restart", shell=True)
subprocess.call("python ./scripts/wait_for_appliance_ui.py", shell=True)
if not ssh_client.appliance_has_netapp():
pytest.fail("Could not setup the netapp for storage testing")
开发者ID:petrblaho,项目名称:cfme_tests,代码行数:10,代码来源:storage.py
示例8: set_default_domain
def set_default_domain():
if current_version() < "5.3":
return # Domains are not in 5.2.x and lower
ssh_client = SSHClient()
# The command ignores the case when the Default domain is not present (: true)
result = ssh_client.run_rails_command(
"\"d = MiqAeDomain.where :name => 'Default'; puts (d) ? d.first.enabled : true\"")
if result.output.lower().strip() != "true":
# Re-enable the domain
ssh_client.run_rails_command(
"\"d = MiqAeDomain.where :name => 'Default'; d = d.first; d.enabled = true; d.save!\"")
开发者ID:seandst,项目名称:cfme_tests,代码行数:11,代码来源:conftest.py
示例9: get_worker_pid
def get_worker_pid(worker_type):
"""Obtains the pid of the first worker with the worker_type specified"""
ssh_client = SSHClient()
exit_status, out = ssh_client.run_command('service evmserverd status 2> /dev/null | grep -m 1 '
'\'{}\' | awk \'{{print $7}}\''.format(worker_type))
worker_pid = str(out).strip()
if out:
logger.info('Obtained {} PID: {}'.format(worker_type, worker_pid))
else:
logger.error('Could not obtain {} PID, check evmserverd running or if specific role is'
' enabled...'.format(worker_type))
assert out
return worker_pid
开发者ID:MattLombana,项目名称:cfme_tests,代码行数:13,代码来源:perf.py
示例10: disable_external_auth_openldap
def disable_external_auth_openldap():
auth = DatabaseAuthSetting()
auth.update()
sssd_conf = '/etc/sssd/sssd.conf'
httpd_auth = '/etc/pam.d/httpd-auth'
manageiq_remoteuser = '/etc/httpd/conf.d/manageiq-remote-user.conf'
manageiq_ext_auth = '/etc/httpd/conf.d/manageiq-external-auth.conf'
command = 'rm -rf {} && rm -rf {} && rm -rf {} && rm -rf {}'.format(
sssd_conf, httpd_auth, manageiq_ext_auth, manageiq_remoteuser)
ssh = SSHClient()
assert ssh.run_command(command)
ssh.run_command('systemctl restart evmserverd')
appliance.IPAppliance().wait_for_web_ui()
logout()
开发者ID:rananda,项目名称:cfme_tests,代码行数:14,代码来源:ext_auth.py
示例11: set_yaml_config
def set_yaml_config(config_name, data_dict, hostname=None):
"""Given a yaml name, dictionary and hostname, set the configuration yaml on the server
The configuration yamls must be inserted into the DB using the ruby console, so this function
uses SSH, not the database. It makes sense to be included here as a counterpart to
:py:func:`get_yaml_config`
Args:
config_name: Name of the yaml configuration file
data_dict: Dictionary with data to set/change
hostname: Hostname/address of the server that we want to set up (default ``None``)
Note:
If hostname is set to ``None``, the default server set up for this session will be
used. See :py:class:``utils.ssh.SSHClient`` for details of the default setup.
Warning:
Manually editing the config yamls is potentially dangerous. Furthermore,
the rails runner doesn't return useful information on the outcome of the
set request, so errors that arise from the newly loading config file
will go unreported.
Usage:
# Update the appliance name, for example
vmbd_yaml = get_yaml_config('vmdb')
vmdb_yaml['server']['name'] = 'EVM IS AWESOME'
set_yaml_config('vmdb', vmdb_yaml, '1.2.3.4')
"""
# CFME does a lot of things when loading a configfile, so
# let their native conf loader handle the job
# If hostname is defined, connect to the specified server
if hostname is not None:
_ssh_client = SSHClient(hostname=hostname)
# Else, connect to the default one set up for this session
else:
_ssh_client = SSHClient()
# Build & send new config
temp_yaml = NamedTemporaryFile()
dest_yaml = '/tmp/conf.yaml'
yaml.dump(data_dict, temp_yaml, default_flow_style=False)
_ssh_client.put_file(temp_yaml.name, dest_yaml)
# Build and send ruby script
dest_ruby = '/tmp/load_conf.rb'
ruby_template = data_path.join('utils', 'cfmedb_load_config.rbt')
ruby_replacements = {
'config_name': config_name,
'config_file': dest_yaml
}
temp_ruby = load_data_file(ruby_template.strpath, ruby_replacements)
_ssh_client.put_file(temp_ruby.name, dest_ruby)
# Run it
_ssh_client.run_rails_command(dest_ruby)
开发者ID:jkrocil,项目名称:cfme_tests,代码行数:56,代码来源:db.py
示例12: setup_external_auth_openldap
def setup_external_auth_openldap(**data):
"""Sets up the appliance for an external authentication with OpenLdap.
Keywords:
get_groups: Get User Groups from External Authentication (httpd).
ipaserver: IPA server address.
iparealm: Realm.
credentials: Key of the credential in credentials.yaml
"""
connect_kwargs = {
'username': credentials['host_default']['username'],
'password': credentials['host_default']['password'],
'hostname': data['ipaddress'],
}
appliance_obj = appliance.IPAppliance()
appliance_name = 'cfmeappliance{}'.format(fauxfactory.gen_alpha(7).lower())
appliance_address = appliance_obj.address
appliance_fqdn = '{}.{}'.format(appliance_name, data['domain_name'])
ldapserver_ssh = SSHClient(**connect_kwargs)
# updating the /etc/hosts is a workaround due to the
# https://bugzilla.redhat.com/show_bug.cgi?id=1360928
command = 'echo "{}\t{}" >> /etc/hosts'.format(appliance_address, appliance_fqdn)
ldapserver_ssh.run_command(command)
ldapserver_ssh.get_file(remote_file=data['cert_filepath'],
local_path=conf_path.strpath)
ldapserver_ssh.close()
ensure_browser_open()
login_admin()
auth = ExternalAuthSetting(get_groups=data.pop("get_groups", True))
auth.setup()
appliance_obj.configure_appliance_for_openldap_ext_auth(appliance_fqdn)
logout()
开发者ID:rananda,项目名称:cfme_tests,代码行数:32,代码来源:ext_auth.py
示例13: setup_external_auth_ipa
def setup_external_auth_ipa(**data):
"""Sets up the appliance for an external authentication with IPA.
Keywords:
get_groups: Get User Groups from External Authentication (httpd).
ipaserver: IPA server address.
iparealm: Realm.
credentials: Key of the credential in credentials.yaml
"""
connect_kwargs = {
'username': credentials['host_default']['username'],
'password': credentials['host_default']['password'],
'hostname': data['ipaserver'],
}
import fauxfactory
appliance_name = 'cfmeappliance'.format(fauxfactory.gen_alpha(7).lower())
appliance_address = appliance.IPAppliance().address
appliance_fqdn = '{}.{}'.format(appliance_name, data['iparealm'].lower())
ipaserver_ssh = SSHClient(**connect_kwargs)
# updating the /etc/hosts is a workaround due to the
# https://bugzilla.redhat.com/show_bug.cgi?id=1360928
command = 'echo "{}\t{}" >> /etc/hosts'.format(appliance_address, appliance_fqdn)
ipaserver_ssh.run_command(command)
ipaserver_ssh.close()
ssh = SSHClient()
rc, out = ssh.run_command('appliance_console_cli --host {}'.format(appliance_fqdn))
assert rc == 0, out
ssh.run_command('echo "127.0.0.1\t{}" > /etc/hosts'.format(appliance_fqdn))
ensure_browser_open()
login_admin()
if data["ipaserver"] not in get_ntp_servers():
set_ntp_servers(data["ipaserver"])
sleep(120)
auth = ExternalAuthSetting(get_groups=data.pop("get_groups", False))
auth.setup()
logout()
creds = credentials.get(data.pop("credentials"), {})
data.update(**creds)
rc, out = ssh.run_command(
"appliance_console_cli --ipaserver {ipaserver} --iparealm {iparealm} "
"--ipaprincipal {principal} --ipapassword {password}".format(**data)
)
assert rc == 0, out
assert "failed" not in out.lower(), "External auth setup failed:\n{}".format(out)
login_admin()
开发者ID:FilipB,项目名称:cfme_tests,代码行数:45,代码来源:ext_auth.py
示例14: main
def main():
parser = argparse.ArgumentParser(epilog=__doc__,
formatter_class=argparse.RawDescriptionHelpFormatter)
parser.add_argument('address', help='hostname or ip address of target appliance',
nargs='?', default=None)
args = parser.parse_args()
ssh_kwargs = {
'username': credentials['ssh']['username'],
'password': credentials['ssh']['password'],
}
if args.address:
ssh_kwargs['hostname'] = args.address
# Init SSH client
ssh_client = SSHClient(**ssh_kwargs)
# compile assets if required (not required on 5.2)
if not ssh_client.get_version().startswith("5.2"):
if ssh_client.run_command("ls /var/www/miq/vmdb/public/assets")[0] != 0:
ssh_client.run_rake_command("assets:precompile")
ssh_client.run_rake_command("evm:restart")
print "CFME UI worker restarted, UI should be available shortly"
开发者ID:slouderm,项目名称:cfme_tests,代码行数:25,代码来源:precompile_assets.py
示例15: main
def main():
parser = argparse.ArgumentParser(epilog=__doc__,
formatter_class=argparse.RawDescriptionHelpFormatter)
parser.add_argument('address', help='hostname or ip address of target appliance')
parser.add_argument('-R', '--reverse', help='flag to indicate the patch should be undone',
action='store_true', default=False, dest='reverse')
args = parser.parse_args()
# Find the patch file
patch_file_name = data_path_for_filename('ajax_wait.diff', scripts_path.strpath)
# Set up temp dir
tmpdir = mkdtemp()
atexit.register(shutil.rmtree, tmpdir)
source = '/var/www/miq/vmdb/public/javascripts/application.js'
target = os.path.join(tmpdir, 'application.js')
# Init SSH client
ssh_kwargs = {
'username': credentials['ssh']['username'],
'password': credentials['ssh']['password'],
'hostname': args.address
}
client = SSHClient(**ssh_kwargs)
print 'retriving appliance.js from appliance'
client.get_file(source, target)
os.chdir(tmpdir)
# patch, level 4, patch direction (default forward), ignore whitespace, don't output rejects
direction = '-N -R' if args.reverse else '-N'
exitcode = subprocess.call('patch -p4 %s -l -r- < %s' % (direction, patch_file_name),
shell=True)
if exitcode == 0:
# Put it back after successful patching.
print 'replacing appliance.js on appliance'
client.put_file(target, source)
else:
print 'not changing appliance'
return exitcode
开发者ID:sshveta,项目名称:cfme_tests,代码行数:43,代码来源:patch_ajax_wait.py
示例16: main
def main():
parser = argparse.ArgumentParser(epilog=__doc__,
formatter_class=argparse.RawDescriptionHelpFormatter)
parser.add_argument('address', help='hostname or ip address of target appliance')
parser.add_argument('repo_url', help='updates base url')
parser.add_argument('--reboot', help='reboot after installation ' +
'(required for proper operation)', action="store_true")
args = parser.parse_args()
ssh_kwargs = {
'username': credentials['ssh']['username'],
'password': credentials['ssh']['password'],
'hostname': args.address
}
# Init SSH client
client = SSHClient(**ssh_kwargs)
# create repo file
repo_file = "[rhel-updates]\nname=rhel6-updates\nbaseurl=" + args.repo_url + "\nenabled=1\ngpgcheck=0"
# create repo file on appliance
print 'Create update repo file'
status, out = client.run_command('echo "%s" >/etc/yum.repos.d/rhel_updates.repo' % repo_file)
# update
print 'Running rhel updates...'
status, out = client.run_command('yum update -y --nogpgcheck')
print "\n" + out + "\n"
if status != 0:
print "ERROR during update"
sys.exit(1)
# reboot
if args.reboot:
print 'Appliance reboot'
status, out = client.run_command('reboot')
else:
print 'A reboot is recommended.'
开发者ID:sshveta,项目名称:cfme_tests,代码行数:41,代码来源:update_rhel.py
示例17: main
def main():
parser = argparse.ArgumentParser(epilog=__doc__,
formatter_class=argparse.RawDescriptionHelpFormatter)
parser.add_argument('address',
help='hostname or ip address of target appliance')
parser.add_argument('--region', default=0, type=int,
help='region to assign to the new DB')
args = parser.parse_args()
ssh_kwargs = {
'username': credentials['ssh']['username'],
'password': credentials['ssh']['password'],
'hostname': args.address
}
client = SSHClient(**ssh_kwargs)
print 'Initializing Appliance Internal DB'
if client.run_command('ls -l /bin/appliance_console_cli')[0] == 0:
status, out = client.run_command('appliance_console_cli --ca --region 1 --internal')
if status != 0:
print 'Enabling DB failed with error:'
print out
sys.exit(1)
else:
print 'DB Enabled, evm watchdog should start the UI shortly.'
else:
rbt_repl = {
'miq_lib': '/var/www/miq/lib',
'region': args.region
}
# Find and load our rb template with replacements
base_path = os.path.dirname(__file__)
rbt = datafile.data_path_for_filename(
'enable-internal-db.rbt', base_path)
rb = datafile.load_data_file(rbt, rbt_repl)
# sent rb file over to /tmp
remote_file = '/tmp/%s' % generate_random_string()
client.put_file(rb.name, remote_file)
# Run the rb script, clean it up when done
status, out = client.run_command('ruby %s' % remote_file)
client.run_command('rm %s' % remote_file)
if status != 0:
print 'Enabling DB failed with error:'
print out
sys.exit(1)
else:
print 'DB Enabled, evm watchdog should start the UI shortly.'
开发者ID:slouderm,项目名称:cfme_tests,代码行数:51,代码来源:enable_internal_db.py
示例18: main
def main():
parser = argparse.ArgumentParser(epilog=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter)
parser.add_argument("address", help="hostname or ip address of target appliance")
parser.add_argument("repo_url", help="updates base url")
parser.add_argument(
"--reboot", help="reboot after installation " + "(required for proper operation)", action="store_true"
)
args = parser.parse_args()
ssh_kwargs = {
"username": credentials["ssh"]["username"],
"password": credentials["ssh"]["password"],
"hostname": args.address,
}
# Init SSH client
client = SSHClient(**ssh_kwargs)
# create repo file
repo_file = "[rhel-updates]\nname=rhel6-updates\nbaseurl=" + args.repo_url + "\nenabled=1\ngpgcheck=0"
# create repo file on appliance
print "Create update repo file"
status, out = client.run_command('echo "%s" >/etc/yum.repos.d/rhel_updates.repo' % repo_file)
# update
print "Running rhel updates..."
status, out = client.run_command("yum update -y --nogpgcheck")
print "\n" + out + "\n"
if status != 0:
print "ERROR during update"
sys.exit(1)
# reboot
if args.reboot:
print "Appliance reboot"
status, out = client.run_command("reboot")
else:
print "A reboot is recommended."
开发者ID:jkrocil,项目名称:cfme_tests,代码行数:40,代码来源:update_rhel.py
示例19: generate_version_files
def generate_version_files():
yield
starttime = time.time()
ssh_client = SSHClient()
relative_path = os.path.relpath(str(results_path), str(os.getcwd()))
relative_string = relative_path + '/{}*'.format(test_ts)
directory_list = glob.glob(relative_string)
for directory in directory_list:
module_path = os.path.join(directory, 'version_info')
if os.path.exists(str(module_path)):
return
else:
os.mkdir(str(module_path))
generate_system_file(ssh_client, module_path)
generate_processes_file(ssh_client, module_path)
generate_gems_file(ssh_client, module_path)
generate_rpms_file(ssh_client, module_path)
timediff = time.time() - starttime
logger.info('Generated all version files in {}'.format(timediff))
ssh_client.close()
开发者ID:MattLombana,项目名称:cfme-performance,代码行数:22,代码来源:version_info.py
示例20: fix_merkyl_workaround
def fix_merkyl_workaround():
"""Workaround around merkyl not opening an iptables port for communication"""
ssh_client = SSHClient()
if ssh_client.run_command('test -f /etc/init.d/merkyl').rc == 0:
logger.info('Rudely overwriting merkyl init.d on appliance;')
local_file = data_path.join("bundles").join("merkyl").join("merkyl")
remote_file = "/etc/init.d/merkyl"
ssh_client.put_file(local_file.strpath, remote_file)
ssh_client.run_command("service merkyl restart")
开发者ID:petrblaho,项目名称:cfme_tests,代码行数:9,代码来源:conftest.py
注:本文中的utils.ssh.SSHClient类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论