• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Python utils.header函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中utils.header函数的典型用法代码示例。如果您正苦于以下问题:Python header函数的具体用法?Python header怎么用?Python header使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了header函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: sfi_configure

 def sfi_configure (self,dir_name):
     plc_spec=self.test_plc.plc_spec
     # cheat a bit: retrieve the global SFA spec from the plc obj
     sfa_spec=self.test_plc.plc_spec['sfa']
     # fetch keys in config spec and expose to sfi
     for spec_name in ['pi_spec','user_spec']:
         user_spec=self.auth_sfa_spec[spec_name]
         user_leaf=user_spec['name']
         key_name=user_spec['key_name']
         key_spec = self.test_plc.locate_key (key_name)
         for (kind,ext) in [ ('private', 'pkey'), ('public', 'pub') ] :
             contents=key_spec[kind]
             file_name=os.path.join(dir_name,self.obj_hrn(user_leaf))+"."+ext
             fileconf=open(file_name,'w')
             fileconf.write (contents)
             fileconf.close()
             utils.header ("(Over)wrote {}".format(file_name))
     #
     file_name=dir_name + os.sep + 'sfi_config'
     fileconf=open(file_name,'w')
     SFI_AUTH=self.auth_hrn()
     fileconf.write ("SFI_AUTH='{}'".format(SFI_AUTH))
     fileconf.write('\n')
     # default is to run as a PI
     SFI_USER=self.obj_hrn(self.auth_sfa_spec['pi_spec']['name'])
     fileconf.write("SFI_USER='{}'".format(SFI_USER))
     fileconf.write('\n')
     SFI_REGISTRY='http://{}:{}/'.format(sfa_spec['settings']['SFA_REGISTRY_HOST'], 12345)
     fileconf.write("SFI_REGISTRY='{}'".format(SFI_REGISTRY))
     fileconf.write('\n')
     SFI_SM='http://{}:{}/'.format(sfa_spec['settings']['SFA_AGGREGATE_HOST'], sfa_spec['sfi-connects-to-port'])
     fileconf.write("SFI_SM='{}'".format(SFI_SM))
     fileconf.write('\n')
     fileconf.close()
     utils.header ("(Over)wrote {}".format(file_name))
开发者ID:dreibh,项目名称:planetlab-lxc-tests,代码行数:35,代码来源:TestAuthSfa.py


示例2: scp_to_webroot

    def scp_to_webroot(self, localfiles, recursive = False):
	if self.config.verbose:
	    utils.header("Copying %s to %s webroot" % (localfiles, self['name']), logfile = self.config.logfile)
	self.scp_to("%(localfiles)s" % locals(), "/var/www/html/")
	url = 'http://%s/%s' % (self['ip'], localfiles)	
	
	return url 
开发者ID:dreibh,项目名称:planetlab-lxc-tests,代码行数:7,代码来源:PLCs.py


示例3: ssh_tasks

    def ssh_tasks(self,options, expected=True, command=None):
#                     timeout_minutes=20, silent_minutes=10, period_seconds=15):
#        timeout  = timedelta(minutes=timeout_minutes)
#        graceout = timedelta(minutes=silent_minutes)
#        period   = timedelta(seconds=period_seconds)
        if not command:
            command = "echo hostname ; hostname; echo id; id; echo uname -a ; uname -a"
        # locate a key
        private_key = self.locate_private_key()
        if not private_key :
            utils.header("WARNING: Cannot find a valid key for slice {}".format(self.name()))
            return False

        # convert nodenames to real hostnames
        if expected:    msg="ssh slice access enabled"
        else:           msg="ssh slice access disabled"
        utils.header("checking for {} -- slice {}".format(msg, self.name()))

        tasks=[]
        slicename=self.name()
        dry_run = getattr(options, 'dry_run', False)
        for nodename in self.slice_spec['nodenames']:
            site_spec, node_spec = self.test_plc.locate_node(nodename)
            tasks.append( CompleterTaskSliceSsh(self.test_plc, node_spec['node_fields']['hostname'],
                                                slicename, private_key, command, expected, dry_run))
        return tasks
开发者ID:dreibh,项目名称:planetlab-lxc-tests,代码行数:26,代码来源:TestSlice.py


示例4: bootcd

    def bootcd(self):
        "all nodes: invoke GetBootMedium and store result locally"
        utils.header("Calling GetBootMedium for {}".format(self.name()))
        # this would clearly belong in the config but, well ..
        options = self.node_spec['bootmedium_options'] if 'bootmedium_options' in self.node_spec else []
        encoded = self.test_plc.apiserver.GetBootMedium(
            self.test_plc.auth_root(), self.name(), 'node-iso', '', options)
        if encoded == '':
            raise Exception('GetBootmedium failed')

        filename = "{}/{}.iso".format(self.nodedir(), self.name())
        utils.header('Storing boot medium into {}'.format(filename))

        # xxx discovered with python3, but a long stading issue:
        # encoded at this point is a str instead of a bytes
        # Quick & dirty : we convert this explicitly to a bytearray
        # Longer run : clearly it seems like the plcapi server side should
        # tag its result with <base64></base64> rather than as a string
        bencoded = str.encode(encoded)
        if self.dry_run():
            print("Dry_run: skipped writing of iso image")
            return True
        else:
            # with python3 we need to call decodestring here
            with open(filename,'wb') as storage:
                storage.write(base64.decodestring(bencoded))
            return True
开发者ID:dreibh,项目名称:planetlab-lxc-tests,代码行数:27,代码来源:TestNode.py


示例5: run

 def run(self, command, message=None, background=False, dry_run=False):
     local_command = self.actual_command(command, dry_run=dry_run)
     if dry_run:
         utils.header("DRY RUN " + local_command)
         return 0
     else:
         self.header(message)
         return utils.system(local_command, background)
开发者ID:dreibh,项目名称:planetlab-lxc-tests,代码行数:8,代码来源:TestSsh.py


示例6: is_ready

    def is_ready(self, timeout=30):
	# Node is considered ready when Node Manager has started avuseradd processes have stopped
	log = self.config.logfile 
	class test:
	    def __init__(self, name, description, system, cmd, check, inverse = False, logfile = log):
	        self.system = system
		self.cmd = cmd
		self.check = check
		self.name = name
		self.description = description
		self.inverse = inverse
		self.logfile = logfile
 
	    def run(self, logfile, verbose = True):
		if verbose:
		    utils.header(self.description, logfile =  self.logfile)	
	        (status, output) = self.system(self.cmd)
		if self.inverse and output.find(self.check) == -1:
		    if verbose: utils.header("%s Passed Test" % self.name, logfile = self.logfile)
		    return True
		elif not self.inverse and output and output.find(self.check)  -1:		
		    if verbose: utils.header("%s Passed Test" % self.name, logfile = self.logfile)
		    return True
		
		if verbose: utils.header("%s Failed Test" % self.name, logfile = self.logfile)
	        return False

	ready = False
	start_time = time.time()
	end_time = start_time + 60 * timeout
	vcheck_cmd = "ps -elfy | grep vuseradd | grep -v grep"  
        grep_cmd = "grep 'Starting Node Manager' %s" % self.logfile.filename
	tests = {
	'1':  test("NodeManager", "Checking if NodeManager has started", utils.commands, grep_cmd, "OK", logfile = self.config.logfile),
	'2':  test("vuseradd", "Checking if vuseradd is done", self.commands, vcheck_cmd, "vuseradd", True, logfile = self.config.logfile)      
	}
	
	while time.time() < end_time and ready == False:
	    # Run tests in order
	    steps = tests.keys()
	    steps.sort()
	    results = {}
	    for step in steps:
		test = tests[step]
		results[step] = result = test.run(self.config.verbose)
		if not result: break
		        	   	 	
	    # Check results. We are ready if all passed 		
	    if not set(results.values()).intersection([False, None]):
		utils.header("%s is ready" % (self['hostname'])) 
		ready = True
	    else:
		if self.config.verbose:
		    utils.header("%s not ready. Waiting 30 seconds. %s seconds left" % \
				 (self['hostname'], int(end_time - time.time())) )
		time.sleep(30)   			

	return ready  
开发者ID:dreibh,项目名称:planetlab-lxc-tests,代码行数:58,代码来源:Nodes.py


示例7: delete_nodes

 def delete_nodes (self):
     auth = self.owner_auth()
     slice_name = self.slice_name()
     print('retrieving slice {}'.format(slice_name))
     slice=self.test_plc.apiserver.GetSlices(auth, slice_name)[0]
     node_ids=slice['node_ids']
     utils.header ("Deleting {} nodes from slice {}"\
                   .format(len(node_ids), slice_name))
     self.test_plc.apiserver.DeleteSliceFromNodes (auth, slice_name, node_ids)
开发者ID:dreibh,项目名称:planetlab-lxc-tests,代码行数:9,代码来源:TestSlice.py


示例8: host_box

 def host_box(self):
     if self.is_real():
         return 'localhost'
     else:
         try:
             return self.node_spec['host_box']
         except:
             utils.header("WARNING : qemu nodes need a host box")
             return 'localhost'
开发者ID:dreibh,项目名称:planetlab-lxc-tests,代码行数:9,代码来源:TestNode.py


示例9: kill_qemu

 def kill_qemu(self):
     #Prepare the log file before killing the nodes
     test_box = self.test_box()
     # kill the right processes 
     utils.header("Stopping qemu for node {} on box {}"\
                  .format(self.name(), self.test_box().hostname()))
     command = "{}/qemu-kill-node {}".format(self.nodedir(),self.name())
     self.test_box().run_in_buildname(command, dry_run=self.dry_run())
     return True
开发者ID:dreibh,项目名称:planetlab-lxc-tests,代码行数:9,代码来源:TestNode.py


示例10: start_qemu

    def start_qemu(self):
        test_box = self.test_box()
        utils.header("Starting qemu node {} on {}".format(self.name(), test_box.hostname()))

        test_box.run_in_buildname("{}/qemu-bridge-init start >> {}/log.txt"\
                                  .format(self.nodedir(), self.nodedir()),
                                  dry_run=self.dry_run())
        # kick it off in background, as it would otherwise hang
        test_box.run_in_buildname("{}/qemu-start-node 2>&1 >> {}/log.txt"\
                                  .format(self.nodedir(), self.nodedir()))
开发者ID:dreibh,项目名称:planetlab-lxc-tests,代码行数:10,代码来源:TestNode.py


示例11: qemu_start

 def qemu_start(self):
     "all nodes: start the qemu instance (also runs qemu-bridge-init start)"
     model = self.node_spec['node_fields']['model']
     #starting the Qemu nodes before 
     if self.is_qemu():
         self.start_qemu()
     else:
         utils.header("TestNode.qemu_start : {} model {} taken as real node"\
                      .format(self.name(), model))
     return True
开发者ID:dreibh,项目名称:planetlab-lxc-tests,代码行数:10,代码来源:TestNode.py


示例12: add_nodes

 def add_nodes (self):
     auth = self.owner_auth()
     slice_name = self.slice_name()
     hostnames=[]
     for nodename in self.slice_spec['nodenames']:
         node_spec=self.test_site.locate_node(nodename)
         test_node=TestNode(self.test_plc, self.test_site, node_spec)
         hostnames += [test_node.name()]
     utils.header("Adding {} in {}".format(hostnames, slice_name))
     self.test_plc.apiserver.AddSliceToNodes(auth, slice_name, hostnames)
开发者ID:dreibh,项目名称:planetlab-lxc-tests,代码行数:10,代码来源:TestSlice.py


示例13: __run_tests

    def __run_tests(self, tests):
        if len(tests) == 0:
            return 0

        logging.debug("Start __run_tests.")
        logging.debug("__name__ = %s",__name__)

        error_happened = False
        if self.os.lower() == 'windows':
            logging.debug("Executing __run_tests on Windows")
            for test in tests:
                with open(self.current_benchmark, 'w') as benchmark_resume_file:
                    benchmark_resume_file.write(test.name)
                with self.quiet_out.enable():
                    if self.__run_test(test) != 0:
                        error_happened = True
        else:
            logging.debug("Executing __run_tests on Linux")

            # Setup a nice progressbar and ETA indicator
            widgets = [self.mode, ': ',  progressbar.Percentage(),
                       ' ', progressbar.Bar(),
                       ' Rough ', progressbar.ETA()]
            pbar = progressbar.ProgressBar(widgets=widgets, maxval=len(tests)).start()
            pbar_test = 0

            # These features do not work on Windows
            for test in tests:
                pbar.update(pbar_test)
                pbar_test = pbar_test + 1
                if __name__ == 'benchmark.benchmarker':
                    print header("Running Test: %s" % test.name)
                    with open(self.current_benchmark, 'w') as benchmark_resume_file:
                        benchmark_resume_file.write(test.name)
                    with self.quiet_out.enable():
                        test_process = Process(target=self.__run_test, name="Test Runner (%s)" % test.name, args=(test,))
                        test_process.start()
                        test_process.join(self.run_test_timeout_seconds)
                    self.__load_results()  # Load intermediate result from child process
                    if(test_process.is_alive()):
                        logging.debug("Child process for {name} is still alive. Terminating.".format(name=test.name))
                        self.__write_intermediate_results(test.name,"__run_test timeout (="+ str(self.run_test_timeout_seconds) + " seconds)")
                        test_process.terminate()
                        test_process.join()
                    if test_process.exitcode != 0:
                        error_happened = True
            pbar.finish()

        if os.path.isfile(self.current_benchmark):
            os.remove(self.current_benchmark)
        logging.debug("End __run_tests.")

        if error_happened:
            return 1
        return 0
开发者ID:Jesterovskiy,项目名称:FrameworkBenchmarks,代码行数:55,代码来源:benchmarker.py


示例14: is_local_hostname

 def is_local_hostname(hostname):
     if hostname == "localhost":
         return True
     import socket
     try:
         local_ip = socket.gethostbyname(socket.gethostname())
         remote_ip = socket.gethostbyname(hostname)
         return local_ip == remote_ip
     except:
         utils.header("WARNING : something wrong in is_local_hostname with hostname={}".format(hostname))
         return False
开发者ID:dreibh,项目名称:planetlab-lxc-tests,代码行数:11,代码来源:TestSsh.py


示例15: qemu_export

 def qemu_export(self):
     "all nodes: push local node-dep directory on the qemu box"
     # if relevant, push the qemu area onto the host box
     if self.test_box().is_local():
         return True
     dry_run = self.dry_run()
     utils.header("Cleaning any former sequel of {} on {}"\
                  .format(self.name(), self.host_box()))
     utils.header("Transferring configuration files for node {} onto {}"\
                  .format(self.name(), self.host_box()))
     return self.test_box().copy(self.nodedir(), recursive=True, dry_run=dry_run) == 0
开发者ID:dreibh,项目名称:planetlab-lxc-tests,代码行数:11,代码来源:TestNode.py


示例16: update_api

    def update_api(self):
	# Set up API acccess
        # If plc is specified, find its configuration
        # and use its API
	self.update_ip()
	name, ip, port, path = self['name'], self['ip'], self['port'], self['api_path']
	if self.config.verbose:
	    utils.header("Updating %(name)s's api to https://%(ip)s:%(port)s/%(path)s" % locals(), logfile = self.config.logfile)   
	api_server = "https://%(ip)s:%(port)s/%(path)s" % locals()
	self.config.api = xmlrpclib.Server(api_server, allow_none = 1)
        self.config.api_type = 'xmlrpc'	
开发者ID:dreibh,项目名称:planetlab-lxc-tests,代码行数:11,代码来源:PLCs.py


示例17: create_user

 def create_user (self):
     user_spec = self.user_spec
     fields = user_spec['user_fields']
     auth = self.test_plc.auth_root()
     utils.header('Adding user {} - roles {}'.format(fields['email'], user_spec['roles']))
     self.test_plc.apiserver.AddPerson(auth, fields)
     self.test_plc.apiserver.UpdatePerson(auth, fields['email'], {'enabled': True})
     for role in user_spec['roles']:
         self.test_plc.apiserver.AddRoleToPerson(auth,role,fields['email'])
     self.test_plc.apiserver.AddPersonToSite(auth,
                                             self.name(),
                                             self.test_site.name())
开发者ID:dreibh,项目名称:planetlab-lxc-tests,代码行数:12,代码来源:TestUser.py


示例18: store_key

 def store_key(self):
     pub = self.publicpath()
     priv = self.privatepath()
     utils.header("Storing key {} in {}".format(self.name(), pub))
     dir = os.path.dirname(pub)
     if not os.path.isdir(dir):
         os.mkdir(dir)
     with open(pub,"w") as f:
         f.write(self.key_spec['key_fields']['key'])
     with open(priv,"w") as f:
         f.write(self.key_spec['private'])
     os.chmod(priv,0o400)
     os.chmod(pub,0o444)
开发者ID:dreibh,项目名称:planetlab-lxc-tests,代码行数:13,代码来源:TestKey.py


示例19: sfa_check_slice_plc

 def sfa_check_slice_plc (self, options):
     "check the slice has been created at the plc - all local nodes should be in slice"
     slice = self.test_plc.apiserver.GetSlices(self.test_plc.auth_root(), self.plc_name())[0]
     nodes = self.test_plc.apiserver.GetNodes(self.test_plc.auth_root(), {'peer_id':None})
     result = True
     for node in nodes: 
         if node['node_id'] in slice['node_ids']:
             utils.header("local node {} found in slice {}".format(node['hostname'], slice['name']))
         else:
             utils.header("ERROR - local node {} NOT FOUND in slice {}"\
                          .format(node['hostname'], slice['name']))
             result = False
     return result
开发者ID:dreibh,项目名称:planetlab-lxc-tests,代码行数:13,代码来源:TestSliceSfa.py


示例20: create_node

    def create_node(self):
        ownername = self.node_spec['owner']
        user_spec = self.test_site.locate_user(ownername)
        test_user = TestUser(self.test_plc,self.test_site,user_spec)
        userauth = test_user.auth()
        utils.header("node {} created by user {}".format(self.name(), test_user.name()))
        rootauth = self.test_plc.auth_root()
        server  =  self.test_plc.apiserver
        node_id = server.AddNode(userauth,
                                 self.test_site.site_spec['site_fields']['login_base'],
                                 self.node_spec['node_fields'])
        # create as reinstall to avoid user confirmation
        server.UpdateNode(userauth, self.name(), { 'boot_state' : 'reinstall' })

        # you are supposed to make sure the tags exist
        for tagname, tagvalue in self.node_spec['tags'].items():
            server.AddNodeTag(userauth, node_id, tagname, tagvalue)
            
        if not self.test_plc.has_addresses_api():
#            print 'USING OLD INTERFACE'
            # populate network interfaces - primary
            server.AddInterface(userauth, self.name(),
                                self.node_spec['interface_fields'])
        else:
#            print 'USING NEW INTERFACE with separate ip addresses'
            # this is for setting the 'dns' stuff that now goes with the node
            server.UpdateNode(userauth, self.name(), self.node_spec['node_fields_nint'])
            interface_id = server.AddInterface(userauth, self.name(),self.node_spec['interface_fields_nint'])
            server.AddIpAddress(userauth, interface_id, self.node_spec['ipaddress_fields'])
            route_fields = self.node_spec['route_fields']
            route_fields['interface_id'] = interface_id
            server.AddRoute(userauth, node_id, self.node_spec['route_fields'])
            pass
        # populate network interfaces - others
        if 'extra_interfaces' in self.node_spec:
            for interface in self.node_spec['extra_interfaces']:
                server.AddInterface(userauth, self.name(), interface['interface_fields'])
                if 'settings' in interface:
                    for attribute, value in interface['settings'].items():
                        # locate node network
                        interface = server.GetInterfaces( userauth,
                                                          {'ip' : interface['interface_fields']['ip']})[0]
                        interface_id = interface['interface_id']
                        # locate or create node network attribute type
                        try:
                            interface_tagtype = server.GetTagTypes(userauth, {'name' : attribute})[0]
                        except:
                            interface_tagtype = server.AddTagType(rootauth,{'category' : 'test',
                                                                            'tagname' : attribute})
                        # attach value
                        server.AddInterfaceTag(userauth, interface_id, attribute, value)
开发者ID:dreibh,项目名称:planetlab-lxc-tests,代码行数:51,代码来源:TestNode.py



注:本文中的utils.header函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python utils.hex_to_int函数代码示例发布时间:2022-05-26
下一篇:
Python utils.hash_to_string函数代码示例发布时间:2022-05-26
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap