• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Python _utils.is_reachable函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中tests._utils.is_reachable函数的典型用法代码示例。如果您正苦于以下问题:Python is_reachable函数的具体用法?Python is_reachable怎么用?Python is_reachable使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了is_reachable函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: test_get

 def test_get(self):
     name = "scheduler"
     c = _utils.create_component_status(name=name)
     if _utils.is_reachable(c.config):
         from_get = c.get()
         self.assertIsInstance(from_get, K8sComponentStatus)
         self.assertEqual(c, from_get)
开发者ID:froch,项目名称:kubernetes-py,代码行数:7,代码来源:test_k8s_component_status.py


示例2: test_delete_nonexistent

 def test_delete_nonexistent(self):
     name = "yo-{0}".format(str(uuid.uuid4().hex[:16]))
     svc = _utils.create_service(name=name)
     svc.add_port(name="redis", port=5432, target_port=5432, protocol="tcp")
     if _utils.is_reachable(svc.config):
         with self.assertRaises(NotFoundException):
             svc.delete()
开发者ID:froch,项目名称:kubernetes-py,代码行数:7,代码来源:test_k8s_service.py


示例3: test_warnings

    def test_warnings(self):
        cfg = _utils.create_config()

        if _utils.is_reachable(cfg):
            objs = K8sEvent(config=cfg, name="yo").warnings()
            for x in objs:
                self.assertIsInstance(x, K8sEvent)
开发者ID:froch,项目名称:kubernetes-py,代码行数:7,代码来源:test_k8s_event.py


示例4: test_create_without_containers

    def test_create_without_containers(self):
        name = "yopod-{0}".format(str(uuid.uuid4()))
        pod = _utils.create_pod(name=name)

        if _utils.is_reachable(pod.config):
            with self.assertRaises(UnprocessableEntityException):
                pod.create()
开发者ID:froch,项目名称:kubernetes-py,代码行数:7,代码来源:test_k8s_pod.py


示例5: test_update_container_image_keep_env_vars

 def test_update_container_image_keep_env_vars(self):
     name = "yodep-{0}".format(str(uuid.uuid4()))
     dep = _utils.create_deployment(name=name)
     cont_name = "nginx"
     cont_image = "nginx:1.7.9"
     new_image = "nginx:1.10.3"
     env_var_name = "YoVariable"
     cont = _utils.create_container(name=cont_name, image=cont_image)
     cont.add_env(name=env_var_name, value=name)
     dep.add_container(container=cont)
     dep.desired_replicas = 1
     
     if _utils.is_reachable(dep.config):
         dep.create()
         with self.assertRaises(AlreadyExistsException):
             dep.create()
         # Change the container image
         dep.container_image = (cont_name, new_image)
         # Update the deployment
         dep.update()
         # Refresh
         dep.get()
         for c in dep.containers:
             self.assertIsInstance(c, K8sContainer)
             if c.name == cont_name:
                 self.assertEqual(c.image, new_image)
                 self.assertEqual(c.env[0].name, env_var_name)
                 self.assertEqual(c.env[0].value, name)
开发者ID:froch,项目名称:kubernetes-py,代码行数:28,代码来源:test_k8s_deployment.py


示例6: test_api_create_hostpath_minikube

    def test_api_create_hostpath_minikube(self):
        cfg = _utils.create_config()

        if _utils.is_reachable(cfg):
            pv = K8sPersistentVolume(
                name="pv-mysql",
                type="hostPath")
            try:
                pv.get()
            except NotFoundException:
                pv.capacity = {"storage": "512Mi"}
                pv.access_modes = ["ReadWriteOnce"]
                pv.reclaim_policy = "Delete"
                pv.path = "/tmp/mysql/data"
                pv.storage_class_name = "manual"
                pv.add_label('type', 'local')
                print("** creating mysql persistent volume...")
                pv.create()

            pvc = K8sPersistentVolumeClaim(
                config=cfg,
                name="pvc-mysql")
            try:
                pvc.get()
            except NotFoundException:
                pvc.storage_class_name = "manual"
                pvc.access_modes = ['ReadWriteOnce']
                pvc.resources = {'requests': {'storage': '512Mi'}}
                print("** creating mysql persistent volume claim...")
                pvc.create()

            self.assertIsInstance(pv, K8sPersistentVolume)
            self.assertIsInstance(pvc, K8sPersistentVolumeClaim)
开发者ID:froch,项目名称:kubernetes-py,代码行数:33,代码来源:test_k8s_persistent_volume_claim.py


示例7: test_create

 def test_create(self):
     name = "yo-{0}".format(str(uuid.uuid4().hex[:16]))
     node = _utils.create_node(name=name)
     if _utils.is_reachable(node.config):
         node.create()
         from_get = node.get()
         self.assertEqual(node, from_get)
开发者ID:froch,项目名称:kubernetes-py,代码行数:7,代码来源:test_k8s_node.py


示例8: test_node_affinity_with_required_and_preferred

    def test_node_affinity_with_required_and_preferred(self):
        model = _constants.pod_with_node_affinity()
        pod = Pod(model)
        config = _utils.create_config()
        k8s = K8sPod(config=config, name="yo")
        k8s.model = pod

        if _utils.is_reachable(config):
            # ensure we have enough nodes
            nodes = K8sNode(config=config, name="yo").list()
            self.assertGreaterEqual(len(nodes), 3)
            # tag the nodes
            for i in range(len(nodes)):
                node = nodes[i]
                if i == 0:
                    node.labels.update({'kubernetes.io/e2e-az-name': 'e2e-az1'})
                    node.labels.update({'another-node-label-key': 'another-node-label-value'})
                if i == 1:
                    node.labels.update({'kubernetes.io/e2e-az-name': 'e2e-az2'})
                if i == 2:
                    node.labels.update({'kubernetes.io/e2e-az-name': 'e2e-az3'})
                node.update()
            nodes = K8sNode(config=config, name="yo").list()
            for node in nodes:
                self.assertIn('kubernetes.io/e2e-az-name', node.labels)
            # create the pod
            k8s.create()
            # untag the nodes
            for node in nodes:
                node.labels.pop('kubernetes.io/e2e-az-name', None)
                node.labels.pop('another-node-label-key', None)
                node.update()
        pass
开发者ID:froch,项目名称:kubernetes-py,代码行数:33,代码来源:test_k8s_pod.py


示例9: test_object_delete_not_found

 def test_object_delete_not_found(self):
     ot = "Pod"
     name = "yomama-{}".format(str(uuid.uuid4()))
     obj = _utils.create_object(name=name, obj_type=ot)
     if _utils.is_reachable(obj.config):
         with self.assertRaises(NotFoundException):
             obj.delete()
开发者ID:froch,项目名称:kubernetes-py,代码行数:7,代码来源:test_k8s_object.py


示例10: test_api_create_aws_ebs

    def test_api_create_aws_ebs(self):
        pvname = "yopv"
        pvcname = "yopvc"
        volname = "yovolume"
        podname = "yopod"
        contname = "yocontainer"

        pvtype = "awsElasticBlockStore"
        pv = _utils.create_pv(name=pvname, type=pvtype)
        pv.volume_id = "vol-0e3056a2"
        pv.fs_type = "xfs"

        pvc = _utils.create_pvc(name=pvcname)

        vol = _utils.create_volume(name=volname, type='persistentVolumeClaim')
        vol.claim_name = pvcname

        container = _utils.create_container(name=contname, image="nginx:latest")
        volmount = _utils.create_volume_mount(name=volname, mount_path='/test-persistent')
        container.add_volume_mount(volmount)

        pod = _utils.create_pod(name=podname)
        pod.add_volume(vol)
        pod.add_container(container)

        if _utils.is_reachable(pvc.config):
            try:
                pv.create()
                pvc.create()
                pod.create()
                self.assertIsInstance(pv, K8sPersistentVolume)
                self.assertIsInstance(pvc, K8sPersistentVolumeClaim)
            except Exception as err:
                self.assertIsInstance(err, TimedOutException)
开发者ID:froch,项目名称:kubernetes-py,代码行数:34,代码来源:test_k8s_persistent_volume_claim.py


示例11: test_create_already_exists

 def test_create_already_exists(self):
     name = "yo-{0}".format(str(uuid.uuid4().hex[:16]))
     node = _utils.create_node(name=name)
     if _utils.is_reachable(node.config):
         node.create()
         with self.assertRaises(AlreadyExistsException):
             node.create()
开发者ID:froch,项目名称:kubernetes-py,代码行数:7,代码来源:test_k8s_node.py


示例12: test_create

 def test_create(self):
     name = "mnubo.com-sa-{0}".format(str(uuid.uuid4().hex[:5]))
     acct = _utils.create_service_account(name=name)
     if _utils.is_reachable(acct.config):
         acct.create()
         from_get = acct.get()
         self.assertEqual(acct, from_get)
开发者ID:froch,项目名称:kubernetes-py,代码行数:7,代码来源:test_k8s_service_account.py


示例13: test_create

 def test_create(self):
     name = "yosecret-{0}".format(str(uuid.uuid4()))
     secret = _utils.create_secret(name=name)
     if _utils.is_reachable(secret.config):
         secret.create()
         _list = secret.list()
         self.assertEqual(2, len(_list))  # service-account-token + 1
开发者ID:froch,项目名称:kubernetes-py,代码行数:7,代码来源:test_k8s_secret.py


示例14: test_api_create_nfs

    def test_api_create_nfs(self):
        pvname = "yopv"
        pvcname = "yopvc"
        volname = "yovolume"
        podname = "yopod"
        contname = "yocontainer"

        pvtype = "nfs"
        pv = _utils.create_pv(name=pvname, type=pvtype)
        pv.nfs_server = "nfs.company.com"
        pv.nfs_path = "/fs1/test-nfs"

        pvc = _utils.create_pvc(name=pvcname)

        vol = _utils.create_volume(name=volname, type='persistentVolumeClaim')
        vol.claim_name = pvcname

        container = _utils.create_container(name=contname, image="nginx:latest")
        volmount = _utils.create_volume_mount(name=volname, mount_path='/test-persistent')
        container.add_volume_mount(volmount)

        pod = _utils.create_pod(name=podname)
        pod.add_volume(vol)
        pod.add_container(container)

        if _utils.is_reachable(pvc.config):
            try:
                pv.create()
                pvc.create()
                pod.create()
                self.assertIsInstance(pv, K8sPersistentVolume)
                self.assertIsInstance(pvc, K8sPersistentVolumeClaim)
            except Exception as err:
                self.assertIsInstance(err, TimedOutException)
开发者ID:froch,项目名称:kubernetes-py,代码行数:34,代码来源:test_k8s_persistent_volume_claim.py


示例15: test_rollback_no_args

    def test_rollback_no_args(self):
        name = "nginx"
        image1 = "nginx:1.7.9"
        image2 = "nginx:1.9.1"
        container = _utils.create_container(name=name, image=image1)
        dep_name = "yodep-{0}".format(str(uuid.uuid4()))
        dep = _utils.create_deployment(name=dep_name)
        dep.add_container(container)
        dep.desired_replicas = 3

        if _utils.is_reachable(dep.config):
            dep.create()
            self.assertEqual(image1, dep.containers[0].image)
            dep.container_image = (name, image2)
            dep.update()
            self.assertIn('deployment.kubernetes.io/revision', dep.annotations)
            rev_before = dep.get_annotation('deployment.kubernetes.io/revision')
            self.assertNotEqual(image1, dep.containers[0].image)
            self.assertEqual(image2, dep.containers[0].image)
            dep.rollback()
            self.assertIn('deployment.kubernetes.io/revision', dep.annotations)
            rev_after = dep.get_annotation('deployment.kubernetes.io/revision')
            self.assertNotEqual(rev_before, rev_after)
            self.assertGreater(rev_after, rev_before)
            self.assertEqual(image1, dep.containers[0].image)
            self.assertNotEqual(image2, dep.containers[0].image)
开发者ID:froch,项目名称:kubernetes-py,代码行数:26,代码来源:test_k8s_deployment.py


示例16: test_create_no_args

    def test_create_no_args(self):
        name = "yodep-{0}".format(str(uuid.uuid4()))
        dep = _utils.create_deployment(name=name)

        if _utils.is_reachable(dep.config):
            with self.assertRaises(UnprocessableEntityException):
                dep.create()
开发者ID:froch,项目名称:kubernetes-py,代码行数:7,代码来源:test_k8s_deployment.py


示例17: test_get_pod_status_nonexistent

    def test_get_pod_status_nonexistent(self):
        name = "yopod-{0}".format(str(uuid.uuid4()))
        pod = _utils.create_pod(name=name)

        if _utils.is_reachable(pod.config):
            with self.assertRaises(NotFoundException):
                s = pod.status
开发者ID:froch,项目名称:kubernetes-py,代码行数:7,代码来源:test_k8s_pod.py


示例18: test_delete_nonexistent

    def test_delete_nonexistent(self):
        name = "yorc-{0}".format(str(uuid.uuid4()))
        dep = _utils.create_deployment(name=name)

        if _utils.is_reachable(dep.config):
            with self.assertRaises(NotFoundException):
                dep.delete()
开发者ID:froch,项目名称:kubernetes-py,代码行数:7,代码来源:test_k8s_deployment.py


示例19: test_add_api_token

 def test_add_api_token(self):
     name = "mnubo.com-sa-{0}".format(str(uuid.uuid4().hex[:5]))
     acct = _utils.create_service_account(name=name)
     if _utils.is_reachable(acct.config):
         acct.create()
         acct.add_api_token()
         secrets = K8sSecret.api_tokens_for_service_account(config=acct.config, name=acct.name)
         self.assertEqual(2, len(secrets))
开发者ID:froch,项目名称:kubernetes-py,代码行数:8,代码来源:test_k8s_service_account.py


示例20: test_list

 def test_list(self):
     name = "mnubo.com-sa-{0}".format(str(uuid.uuid4().hex[:5]))
     acct = _utils.create_service_account(name=name)
     if _utils.is_reachable(acct.config):
         acct.create()
         _list = acct.list()
         for x in _list:
             self.assertIsInstance(x, K8sServiceAccount)
开发者ID:froch,项目名称:kubernetes-py,代码行数:8,代码来源:test_k8s_service_account.py



注:本文中的tests._utils.is_reachable函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python _utils.validates_against_xml_schema函数代码示例发布时间:2022-05-27
下一篇:
Python _utils.create_service函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap