• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Python ServiceGroup.ServiceGroup类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中MilkCheck.Engine.ServiceGroup.ServiceGroup的典型用法代码示例。如果您正苦于以下问题:Python ServiceGroup类的具体用法?Python ServiceGroup怎么用?Python ServiceGroup使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



在下文中一共展示了ServiceGroup类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: test_create_service_imbrications

 def test_create_service_imbrications(self):
     '''Test create service with mutliple level of subservices'''
     sergrp = ServiceGroup('groupinit')
     sergrp.fromdict(
         {'services':
             {'svcA':
                 {'require': ['subgroup'],
                 'actions':
                     {'start': {'cmd': '/bin/True'},
                     'stop': {'cmd': '/bin/False'}},
                     'desc': 'I am the subservice $NAME'},
             'subgroup':
                 {'services':
                     {'svcB':
                         {'require_weak':['svcC'],
                         'actions':
                             {'start': {'cmd': '/bin/True'},
                         '   stop': {'cmd': '/bin/False'}},
                         'desc': 'I am the subservice $NAME'},
                     'svcC':
                         {'actions':
                             {'start': {'cmd': '/bin/True'},
                             'stop': {'cmd': '/bin/False'}},
                             'desc': 'I am the subservice $NAME'}},
                     'target': '127.0.0.1',
                     'desc': "I'm the service $NAME"}},
         'desc': 'I am a group',
         'target': 'localhost',
     })
     for subservice in ('svcA', 'subgroup'):
         if isinstance(sergrp._subservices[subservice], ServiceGroup):
             for subsubser in ('svcB', 'svcC'):
                 self.assertTrue(
                 sergrp._subservices[subservice].has_subservice(subsubser))
         self.assertTrue(sergrp.has_subservice(subservice))
开发者ID:cea-hpc,项目名称:milkcheck,代码行数:35,代码来源:ServiceGroupTest.py


示例2: test_has_subservice

 def test_has_subservice(self):
     '''Test whether a service is an internal dependency of a group'''
     group = ServiceGroup('group')
     serv = Service('intern_service')
     self.assertFalse(group.has_subservice(serv.name))
     group.add_inter_dep(target=serv)
     self.assertTrue(group.has_subservice(serv.name))
开发者ID:cea-hpc,项目名称:milkcheck,代码行数:7,代码来源:ServiceGroupTest.py


示例3: setUp

    def setUp(self):
        '''
        Set up the graph of services within the service manager

        Graph
                              _ start
           group --> service /
                             `- stop
        '''
        CLICommon.setUp(self)

        # ServiceGroup
        group = ServiceGroup('ServiceGroup')
        # Service
        self.service = service = Service('service')
        service.desc = 'I am the service'
        # Actions
        start_action = Action('start', command='/bin/true')
        stop_action = Action('stop', command='/bin/false')
        self.timeout_action = Action('timeout', command='sleep 1', timeout=0.1)
        start_action.inherits_from(service)
        stop_action.inherits_from(service)
        service.add_action(start_action)
        service.add_action(stop_action)
        service.add_action(self.timeout_action)

        # Build graph
        group.add_inter_dep(target=service)

        # Register services within the manager
        self.manager.register_services(group, service)
开发者ID:mdlx,项目名称:milkcheck,代码行数:31,代码来源:CliTest.py


示例4: test_missing_group

 def test_missing_group(self):
     """A group with only MISSING services should be MISSING"""
     grp = ServiceGroup('group')
     svc1 = Service('svc1')
     svc1.add_action(Action('stop', command='/bin/true'))
     grp.add_inter_dep(target=svc1)
     grp.run('start')
     self.assertEqual(grp.status, MISSING)
开发者ID:cea-hpc,项目名称:milkcheck,代码行数:8,代码来源:ServiceGroupTest.py


示例5: test_update_target

 def test_update_target(self):
     '''Test update of the target of a group of services'''
     grp = ServiceGroup('G')
     srva = Service('A')
     grp.add_inter_dep(target=srva)
     grp.update_target('fortoy[5-10]')
     self.assertTrue(grp.target == NodeSet('fortoy[5-10]'))
     self.assertTrue(srva.target == NodeSet('fortoy[5-10]'))
开发者ID:cea-hpc,项目名称:milkcheck,代码行数:8,代码来源:ServiceGroupTest.py


示例6: test_prepare_group_subservice

 def test_prepare_group_subservice(self):
     '''Test prepare group with an internal dependency.'''
     group = ServiceGroup('GROUP')
     subserv = Service('SUB1')
     subserv.add_action(Action('start', command='/bin/true'))
     group.add_inter_dep(target=subserv)
     group.run('start')
     self.assertEqual(group.status, DONE)
     self.assertEqual(subserv.status, DONE)
开发者ID:cea-hpc,项目名称:milkcheck,代码行数:9,代码来源:ServiceGroupTest.py


示例7: setUp

    def setUp(self):
        '''
        Set up the graph of services within the service manager

        Graph
                __ S2                    __ I1
            S1 /         -- G1 -- (src) /    ^  -- (sink)
               `-- S3 --/               `-- I2

        Each node has an action start and an action stop
        '''
        CLICommon.setUp(self)

        svc1 = Service('S1')
        svc1.desc = 'I am the service S1'
        self.svc2 = svc2 = Service('S2')
        svc2.desc = 'I am the service S2'
        svc3 = Service('S3')
        svc3.desc = 'I am the service S3'
        group1 = ServiceGroup('G1')
        inter1 = Service('I1')
        inter1.desc = 'I am the service I1'
        inter2 = Service('I2')
        inter2.desc = 'I am the service I2'

        # Actions S1
        start_svc1 = Action('start', HOSTNAME + ', BADNODE', '/bin/true')
        start_svc1.delay = 1
        stop_svc1 = Action('stop', HOSTNAME + ',BADNODE', '/bin/true')
        stop_svc1.delay = 1
        svc1.add_actions(start_svc1, stop_svc1)
        # Actions S2
        svc2.add_action(Action('start', HOSTNAME + ',BADNODE', '/bin/true'))
        svc2.add_action(Action('stop', HOSTNAME + ',BADNODE', '/bin/true'))
        # Actions S3
        svc3.add_action(Action('start', HOSTNAME + ',BADNODE', '/bin/false'))
        svc3.add_action(Action('stop', HOSTNAME + ',BADNODE', '/bin/false'))
        # Actions I1
        inter1.add_action(Action('start', HOSTNAME, 'echo ok'))
        inter1.add_action(Action('stop', HOSTNAME, 'echo ok'))
        # Actions I2
        inter2.add_action(Action('start', HOSTNAME + ',BADNODE', '/bin/true'))
        inter2.add_action(Action('stop', HOSTNAME + ',BADNODE', '/bin/true'))

        # Build graph
        svc1.add_dep(target=svc2)
        svc1.add_dep(target=svc3)
        svc3.add_dep(target=group1)
        inter2.add_dep(inter1)
        group1.add_inter_dep(target=inter1)
        group1.add_inter_dep(target=inter2)

        # Register services within the manager
        self.manager.add_service(svc1)
        self.manager.add_service(svc2)
        self.manager.add_service(svc3)
        self.manager.add_service(group1)
开发者ID:cea-hpc,项目名称:milkcheck,代码行数:57,代码来源:CliTest.py


示例8: test_prepare_empty_group_external_deps

 def test_prepare_empty_group_external_deps(self):
     '''Test prepare an empty group with a single external dependency.'''
     group = ServiceGroup('GROUP')
     ext_serv = Service('EXT_SERV')
     ac_suc = Action('start', command='/bin/true')
     ext_serv.add_action(ac_suc)
     group.add_dep(ext_serv)
     group.run('start')
     self.assertEqual(ext_serv.status, DONE)
     self.assertEqual(group.status, MISSING)
开发者ID:cea-hpc,项目名称:milkcheck,代码行数:10,代码来源:ServiceGroupTest.py


示例9: test_set_algo_reversed

 def test_set_algo_reversed(self):
     '''Test updates dependencies in changing the reversed flag'''
     group = ServiceGroup('group')
     self.assertTrue(group._source.has_child_dep('group'))
     self.assertFalse(group._sink.has_parent_dep('group'))
     group.algo_reversed = True
     self.assertFalse(group._source.has_child_dep('group'))
     self.assertTrue(group._sink.has_parent_dep('group'))
     group.algo_reversed = False
     self.assertTrue(group._source.has_child_dep('group'))
     self.assertFalse(group._sink.has_parent_dep('group'))
开发者ID:cea-hpc,项目名称:milkcheck,代码行数:11,代码来源:ServiceGroupTest.py


示例10: test_skipped_group

 def test_skipped_group(self):
     """A group with only SKIPPED services should be SKIPPED"""
     grp = ServiceGroup('group')
     svc1 = Service('svc1')
     svc1.add_action(Action('start', target="@NOTHING", command=':'))
     svc2 = Service('svc2')
     svc2.add_action(Action('start', target="@NOTHING", command=':'))
     grp.add_inter_dep(target=svc1)
     grp.add_inter_dep(target=svc2)
     grp.run('start')
     self.assertEqual(grp.status, SKIPPED)
开发者ID:cea-hpc,项目名称:milkcheck,代码行数:11,代码来源:ServiceGroupTest.py


示例11: test_non_existing_target

 def test_non_existing_target(self):
     '''Test expected behaviour on an non-existing target'''
     sergrp = ServiceGroup('group1')
     sergrp.fromdict(
             { 'services':
             { 'svc1': {
                 'actions': {
                     'start': {'cmd': '/bin/True'},
                 },
               'target': '@none',
             }}})
     self.assertEqual(sergrp._subservices['svc1'].target, NodeSet())
开发者ID:cea-hpc,项目名称:milkcheck,代码行数:12,代码来源:ServiceGroupTest.py


示例12: test_graph

    def test_graph(self):
        '''Test DOT graph output'''
        manager = ServiceManager()
        sergrp = ServiceGroup('S1')
        sergrp.fromdict(
           {'services':
                {'srv1':
                     {'target': 'localhost',
                      'actions':
                        {'start': {'cmd':'/bin/True'},
                         'stop': {'cmd': '/bin/False'}},
                      'desc': "I'm the service srv1"
                    },
                'subgroup':
                    {'services':
                        {'svcB':
                            {'require_weak':['svcC'],
                            'actions':
                                {'start': {'cmd': '/bin/True'},
                            '   stop': {'cmd': '/bin/False'}},
                            'desc': 'I am the subservice $NAME'},
                        'svcC':
                            {'actions':
                                {'start': {'cmd': '/bin/True'},
                                'stop': {'cmd': '/bin/False'}},
                                'desc': 'I am the subservice $NAME'}},
                        'target': '127.0.0.1',
                        'desc': "I'm the service $NAME"}},
                })
        manager.add_service(sergrp)
        self.assertEqual(manager.output_graph(), 
"""digraph dependency {
compound=true;
node [style=filled];
subgraph "cluster_S1" {
label="S1";
style=rounded;
node [style=filled];
"S1.__hook" [style=invis];
"S1.srv1";
subgraph "cluster_S1.subgroup" {
label="S1.subgroup";
style=rounded;
node [style=filled];
"S1.subgroup.__hook" [style=invis];
"S1.subgroup.svcB" -> "S1.subgroup.svcC" [style=dashed];
"S1.subgroup.svcC";
}
}
}
""")
开发者ID:cea-hpc,项目名称:milkcheck,代码行数:51,代码来源:ServiceManagerTest.py


示例13: test_require_no_list

 def test_require_no_list(self):
     """Test parsing require with a simple singleton (no list)"""
     grp = ServiceGroup('grp')
     grp.fromdict({'services': {
                       'svc1': {
                           'actions': {'start': {'cmd': '/bin/true'}}
                       },
                       'svc2': {
                           'require': 'svc1',
                           'actions': {'start': {'cmd': '/bin/true'}}
                       }
                   }})
     svc1 = grp._subservices['svc2'].parents['svc1']
     self.assertEqual(svc1.dep_type, REQUIRE)
开发者ID:cea-hpc,项目名称:milkcheck,代码行数:14,代码来源:ServiceGroupTest.py


示例14: test_filter

 def test_filter(self):
     """Test parsing 'filter' dependency type"""
     grp = ServiceGroup('grp')
     grp.fromdict({
         'services': {
             'svc1': {
                 'actions': {'start': {'cmd': '/bin/true'}}
             },
             'svc2': {
                 'filter': ['svc1'],
                 'actions': {'start': {'cmd': '/bin/true'}}
             }
         }
     })
     svc1 = grp._subservices['svc2'].parents['svc1']
     self.assertEqual(svc1.dep_type, FILTER)
开发者ID:cea-hpc,项目名称:milkcheck,代码行数:16,代码来源:ServiceGroupTest.py


示例15: test_before

 def test_before(self):
     """Test 'before' as an alias of 'after' (only for v1.0 compat)"""
     grp = ServiceGroup('grp')
     grp.fromdict({
         'services': {
             'svc1': {
                 'actions': {'start': {'cmd': '/bin/true'}}
             },
             'svc2': {
                 'before': ['svc1'],
                 'actions': {'start': {'cmd': '/bin/true'}}
             }
         }
     })
     svc1 = grp._subservices['svc2'].parents['svc1']
     self.assertEqual(svc1.dep_type, REQUIRE_WEAK)
开发者ID:cea-hpc,项目名称:milkcheck,代码行数:16,代码来源:ServiceGroupTest.py


示例16: test_fromdict_after

 def test_fromdict_after(self):
     """Test 'after' as an alias of 'require_weak'"""
     grp = ServiceGroup('grp')
     grp.fromdict({
         'services': {
             'svc1': {
                 'actions': { 'start': { 'cmd': '/bin/true' } }
             },
             'svc2': {
                 'after': [ 'svc1' ],
                 'actions': { 'start': { 'cmd': '/bin/true' } }
             }
         }
     })
     svc1 = grp._subservices['svc2'].parents['svc1']
     self.assertEqual(svc1.dep_type, REQUIRE_WEAK)
开发者ID:cea-hpc,项目名称:milkcheck,代码行数:16,代码来源:ServiceGroupTest.py


示例17: test_servicegroup_with_nodeset_like_actions_with_one_decl

    def test_servicegroup_with_nodeset_like_actions_with_one_decl(self):
        '''Test a service group with several group with nodeset-like names'''
        sergrp = ServiceGroup('group1')
        sergrp.fromdict({
            'services': {
                'da[1-3]': {
                    'actions': {'start': {'cmd': '/bin/True'}}
                },
            }})

        self.assertEqual(len(sergrp._subservices), 3)
        self.assertTrue(sergrp.has_subservice('da1'))
        self.assertTrue(sergrp.has_subservice('da2'))
        self.assertTrue(sergrp.has_subservice('da3'))
        self.assertEqual(len(sergrp._subservices['da1']._actions), 1)
        self.assertEqual(len(sergrp._subservices['da2']._actions), 1)
        self.assertEqual(len(sergrp._subservices['da3']._actions), 1)
开发者ID:cea-hpc,项目名称:milkcheck,代码行数:17,代码来源:ServiceGroupTest.py


示例18: test_variable_with_escaping_pattern

 def test_variable_with_escaping_pattern(self):
     """fromdict() should not resolve variables"""
     grp = ServiceGroup('grp')
     grp.fromdict({
         'services': {
             'svc': {
                 'variables': {
                     'foo': 'nice'
                 },
                 'actions': {
                     'start': {'cmd': 'shine config -O %%host %foo'}
                 }
             }
         }
     })
     action = grp._subservices['svc']._actions['start']
     self.assertEqual(action.command, 'shine config -O %%host %foo')
开发者ID:cea-hpc,项目名称:milkcheck,代码行数:17,代码来源:ServiceGroupTest.py


示例19: test_prepare_group_subservices

    def test_prepare_group_subservices(self):
        '''Test prepare group with multiple internal dependencies.'''
        group = ServiceGroup('GROUP')
        ac_suc1 = Action('start', command='/bin/true')
        ac_suc2 = Action('start', command='/bin/true')
        ac_suc3 = Action('start', command='/bin/true')

        subserv_a = Service('SUB1')
        subserv_b = Service('SUB2')
        subserv_c = Service('SUB3')

        subserv_a.add_action(ac_suc1)
        subserv_b.add_action(ac_suc2)
        subserv_c.add_action(ac_suc3)

        group.add_inter_dep(target=subserv_a)
        group.add_inter_dep(target=subserv_b)
        group.add_inter_dep(base=subserv_a, target=subserv_c)
        group.add_inter_dep(base=subserv_b, target=subserv_c)

        group.run('start')
        self.assertEqual(group.status, DONE)
        self.assertEqual(subserv_a.status, DONE)
        self.assertEqual(subserv_b.status, DONE)
        self.assertEqual(subserv_c.status, DONE)
开发者ID:cea-hpc,项目名称:milkcheck,代码行数:25,代码来源:ServiceGroupTest.py


示例20: test_resolve_all_local_variable

 def test_resolve_all_local_variable(self):
     """Variable resolution in 'require' with a local variable."""
     svcgrp = ServiceGroup('group')
     svcgrp.fromdict({
                     'services': {
                         'dep1': {
                                  'actions': {'start': {'cmd': '/bin/True'}}
                         },
                         'dep2': {
                                  'variables': {'foo': ['dep1']},
                                  'require': "%foo",
                                  'actions': {'start': {'cmd': '/bin/True'}}
                         },
                     }
                     })
     svcgrp.resolve_all()
     service = svcgrp._subservices['dep2']
     self.assertTrue('dep1' in service.deps())
开发者ID:cea-hpc,项目名称:milkcheck,代码行数:18,代码来源:ServiceGroupTest.py



注:本文中的MilkCheck.Engine.ServiceGroup.ServiceGroup类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python MiscLib.TestUtils类代码示例发布时间:2022-05-24
下一篇:
Python Service.Service类代码示例发布时间:2022-05-24
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap