本文整理汇总了Python中tests.helper.dict_compare函数的典型用法代码示例。如果您正苦于以下问题:Python dict_compare函数的具体用法?Python dict_compare怎么用?Python dict_compare使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了dict_compare函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: test_validate_schema_for_successful_validation
def test_validate_schema_for_successful_validation(m_lru_cache, m_open,
m_validate):
# Given: Existing schema
m_open.return_value.__enter__().read.return_value = '''{
"title": "Schema for Job Config",
"id": "#generic-hook-v1",
"properties": {
"mock": {
"$ref": "${base_url}/link/config#/properties/mock"
}
}
}'''
# And: Validator that succeeds validation
m_validate.return_value = None
# And: Config that needs to be validated
config = {
'mock-obj': 'mock-value'
}
# When: I validate against existing schema
ret_value = service.validate_schema(config)
# Then: Validation succeeds
dict_compare(ret_value, config)
dict_compare(m_validate.call_args[0][0], config)
开发者ID:totem,项目名称:cluster-orchestrator,代码行数:27,代码来源:test_config.py
示例2: test_massage_config
def test_massage_config():
"""
Should massage configuration as required by search.
"""
# Given: Configuration that needs to be massaged.
config = {
'key1': 'value1',
'key2': {
'value': 'value2',
'encrypted': True
},
'key3': [
{
'key3.1': {
'value': 'value3.1'
}
}
]
}
# When: I massage the config
result = massage_config(config)
# Then: Config gets massaged as expected
dict_compare(result, {
'key1': 'value1',
'key2': '',
'key3': [
{
'key3.1': 'value3.1'
}
]
})
开发者ID:totem,项目名称:cluster-deployer,代码行数:34,代码来源:test_util.py
示例3: test_init_deployment
def test_init_deployment(mock_time):
"""
Should initialize deployment instance
"""
# Given: Mock implementation for time
mock_time.time.return_value = 0.12
# When: I create a deployment instance
deployment = Deployment(Mock(spec=Provider), Mock(spec=Environment),
'mock-app', template_args={
'arg-1': 'value1',
'arg_2': 'value2'})
# Then: Deployment gets initialized as expected
eq_(deployment.nodes, 1)
eq_(deployment.version, '120')
dict_compare(deployment.template_args, {
'name': 'mock-app',
'version': '120',
'service_type': 'app',
'arg_1': 'value1',
'arg_2': 'value2'
})
eq_(deployment.service_name_prefix, 'mock-app-120-app')
eq_(deployment.template_name, '[email protected]')
开发者ID:chrismcguire,项目名称:fleet-py,代码行数:25,代码来源:test_deployer.py
示例4: test_create_new_job
def test_create_new_job(self):
# Given: Job that needs to be created
job = {
'meta-info': {
'git': {
'owner': 'find-create-owner',
'repo': 'find-create-repo',
'ref': 'find-create-ref',
'commit': 'find-create-commit'
},
'job-id': 'find-create-job-id'
},
'state': JOB_STATE_NEW
}
# When: I execute find or create for non existing job
self.store.update_job(job)
# Then: Existing job is returned
created_job = self._get_raw_document_without_internal_id(
'find-create-job-id')
expected_job = dict_merge({
'modified': NOW,
'_expiry': NOW
}, job)
dict_compare(created_job, expected_job)
开发者ID:totem,项目名称:cluster-orchestrator,代码行数:26,代码来源:test_mongo.py
示例5: test_evaluate_config_with_no_deployers
def test_evaluate_config_with_no_deployers():
"""
Should evaluate config as expected
:return: None
"""
# Given: Config that needs to be evaluated
config = {
'variables': {
'var1': 'value1',
'var2': {
'value': '{{var1}}-var2value',
'template': True,
'priority': 2,
},
},
'key1': {
'value': 'test-{{var1}}-{{var2}}-{{var3}}',
'template': True
}
}
# When: I evaluate the config
result = service.evaluate_config(config, {
'var1': 'default1',
'var2': 'default2',
'var3': 'default3'
})
# Then: Expected config is returned
dict_compare(result, {
'key1': 'test-value1-value1-var2value-default3',
'deployers': {}
})
开发者ID:totem,项目名称:cluster-orchestrator,代码行数:34,代码来源:test_config.py
示例6: test_add_event
def test_add_event(self):
# When: I add event to mongo store
self.store.add_event(
'MOCK_EVENT',
details={'mock': 'details'},
search_params={
'meta-info': {
'mock': 'search'
}
})
# Then: Event gets added as expected
event = self.store._events.find_one({'type': 'MOCK_EVENT'})
del(event['_id'])
dict_compare(event, {
'component': 'orchestrator',
'type': 'MOCK_EVENT',
'date': NOW,
'meta-info': {
'mock': 'search'
},
'details': {
'mock': 'details'
}
})
开发者ID:totem,项目名称:cluster-orchestrator,代码行数:26,代码来源:test_mongo.py
示例7: test_fetch_units_matching_with_multiple_match
def test_fetch_units_matching_with_multiple_match(mock_run):
"""
Should return empty list when there are no matching units found
"""
# Given: Fleet provider
provider = _get_fleet_provider()
# And no existing units for given service prefix
mock_run.return_value = '''[email protected] 442337f12da14ad7830cda843079730b/10.249.0.235 active running
[email protected] 0a5239ec591e4981905c792e99341f03/10.229.23.106 activating start-pre
invalidrow
''' # noqa
# When: I try to fetch units with no matching unit
units = list(provider.fetch_units_matching('cluster-deployer-develop-'))
# Then: Empty list is returned
eq_(len(units), 2, 'Expecting 2 units to be returned. Found: %d' %
len(units))
dict_compare(units[0], {
'unit': '[email protected]',
'machine': '442337f12da14ad7830cda843079730b/10.249.0.235',
'active': 'active',
'sub': 'running'
})
dict_compare(units[1], {
'unit': '[email protected]',
'machine': '0a5239ec591e4981905c792e99341f03/10.229.23.106',
'active': 'activating',
'sub': 'start-pre'
})
开发者ID:totem,项目名称:fleet-py,代码行数:31,代码来源:test_fleet_fabric.py
示例8: test_clone_deployment
def test_clone_deployment(m_uuid):
"""
Should clone exiting deployment and reset version
"""
# Given: New job id
m_uuid.return_value = 'new-job-id'
# When: I clone existing deployment
cloned = clone_deployment({
'deployment': {
'name': 'mock',
'version': 'v1'
},
'meta-info': {
'job-id': 'old-job-id'
}
})
# Then: Expected cloned deployment is created
dict_compare(cloned, {
'deployment': {
'name': 'mock'
},
'meta-info': {
'job-id': 'new-job-id'
}
})
开发者ID:totem,项目名称:cluster-deployer,代码行数:28,代码来源:test_deployment.py
示例9: test_sync_units
def test_sync_units(m_filter_units, m_get_store):
# Given: Existing deployment
m_get_store.return_value.get_deployment.return_value = {
'deployment': {
'name': 'test',
'version': 'v1',
'mode': DEPLOYMENT_MODE_BLUEGREEN
}
}
# And: Discovered Nodes
m_filter_units.return_value = [{
'name': 'app-unit'
}]
# When: I synchronize units for existing deployment
ret_value = sync_units('mock')
# Then: Upstreams are synchronized as expected:
dict_compare(ret_value, {
'deployment_id': 'mock',
'state': 'success',
'units': m_filter_units.return_value
})
开发者ID:totem,项目名称:cluster-deployer,代码行数:25,代码来源:test_deployment.py
示例10: test_sync_upstreams_with_error_fetching_nodes
def test_sync_upstreams_with_error_fetching_nodes(
m_get_discovered_nodes, m_get_store):
# Given: Existing deployment
m_get_store.return_value.get_deployment.return_value = {
'deployment': {
'name': 'test',
'version': 'v1',
'mode': DEPLOYMENT_MODE_BLUEGREEN
},
'proxy': {
'hosts': {
'mock-host': {
'locations': {
'home': {
'port': 8090
}
}
}
}
}
}
# And: Discovered Nodes
m_get_discovered_nodes.side_effect = Exception('Mock')
# When: I synchronize upstreams for existing deployment
ret_value = sync_upstreams('mock')
# Then: Upstreams are synchronized as expected:
dict_compare(ret_value, {
'deployment_id': 'mock',
'state': 'failed',
'error': 'Mock'
})
开发者ID:totem,项目名称:cluster-deployer,代码行数:35,代码来源:test_deployment.py
示例11: test_update_runtime_units
def test_update_runtime_units(self):
# Given: Upstreams that needs to be updated
units = [
{
'name': 'unit1',
'machine': 'machine1',
'active': 'active',
'sub': 'dead'
}
]
# When: I promote state for existing deployment
self.store.update_runtime_units('test-deployment1-v1', units)
# Then: Deployment state is changed to promoted and set to never expire
deployment = self._get_raw_document_without_internal_id(
'test-deployment1-v1')
expected_deployment = dict_merge(deployment, {
'runtime': {
'units': units
},
'modified': NOW,
})
dict_compare(deployment, expected_deployment)
开发者ID:totem,项目名称:cluster-deployer,代码行数:25,代码来源:test_mongo.py
示例12: test_get_health_when_celery_is_disabled
def test_get_health_when_celery_is_disabled(get_store, client, ping):
"""
Should get the health status when elastic search is enabled
"""
# Given: Operational external services"
EtcdInfo = namedtuple('Info', ('machines',))
client.Client.return_value = EtcdInfo(['machine1'])
get_store.return_value.health.return_value = {'type': 'mock'}
# When: I get the health of external services
health_status = health.get_health(check_celery=False)
# Then: Expected health status is returned
dict_compare(health_status, {
'etcd': {
'status': HEALTH_OK,
'details': {
'machines': ['machine1']
}
},
'store': {
'status': HEALTH_OK,
'details': {
'type': 'mock'
}
}
})
开发者ID:totem,项目名称:cluster-orchestrator,代码行数:28,代码来源:test_health.py
示例13: test_filter_deployments_with_state
def test_filter_deployments_with_state(self):
# When: I filter deployments from the store with given state
deployments = self.store.filter_deployments(state=DEPLOYMENT_STATE_NEW)
# Then: Expected deployments are returned
eq_(len(deployments), 1)
dict_compare(deployments[0],
EXISTING_DEPLOYMENTS['test-deployment1-v2'])
开发者ID:totem,项目名称:cluster-deployer,代码行数:8,代码来源:test_mongo.py
示例14: test_filter_all_jobs
def test_filter_all_jobs(self):
# When: I filter jobs from the store
jobs = self.store.filter_jobs()
# Then: All jobs are returned
eq_(len(jobs), 2)
dict_compare(jobs[0], EXISTING_JOBS['job-1'])
dict_compare(jobs[1], EXISTING_JOBS['job-2'])
开发者ID:totem,项目名称:cluster-orchestrator,代码行数:8,代码来源:test_mongo.py
示例15: test_transform_string_values
def test_transform_string_values():
"""
Should transform string values inside config as expected.
:return:
"""
# Given: Config that needs to be transformed
config = {
'key1': 'value1',
'port': 1212,
'enabled': 'True',
'nested-port-key': {
'port': u'2321',
'nodes': u'12',
'min-nodes': '13',
'enabled': 'False',
'force-ssl': 'true'
},
'array-config': [
{
'port': '123',
'nodes': '13',
'min-nodes': '14',
'attempts': '10',
'enabled': False
},
'testval'
],
'null-key': None
}
# When: I transform string values in config
result = service.transform_string_values(config)
# Then: Transformed config is returned
dict_compare(result, {
'key1': 'value1',
'port': 1212,
'enabled': True,
'nested-port-key': {
'port': 2321,
'nodes': 12,
'min-nodes': 13,
'enabled': False,
'force-ssl': True
},
'array-config': [
{
'port': 123,
'nodes': 13,
'min-nodes': 14,
'attempts': 10,
'enabled': False
},
'testval'
],
'null-key': None
})
开发者ID:totem,项目名称:cluster-orchestrator,代码行数:58,代码来源:test_config.py
示例16: test_filter_deployments_with_excluded_names
def test_filter_deployments_with_excluded_names(self):
# When: I filter deployments from the store with given state
deployments = self.store.filter_deployments(
exclude_names=('test-deployment1',))
# Then: Expected deployments are returned
eq_(len(deployments), 1)
dict_compare(deployments[0],
EXISTING_DEPLOYMENTS['test-deployment2-v2'])
开发者ID:totem,项目名称:cluster-deployer,代码行数:9,代码来源:test_mongo.py
示例17: test_filter_deployments_with_version
def test_filter_deployments_with_version(self):
# When: I filter deployments from the store with given version
deployments = self.store.filter_deployments(
'test-deployment1', version='v1')
# Then: Expected deployments are returned
eq_(len(deployments), 1)
dict_compare(deployments[0],
EXISTING_DEPLOYMENTS['test-deployment1-v1'])
开发者ID:totem,项目名称:cluster-deployer,代码行数:9,代码来源:test_mongo.py
示例18: test_get_job
def test_get_job(self):
# When I get existing job
job = self.store.get_job('job-1')
# Expected Deployment is returned
expected_job = copy.deepcopy(EXISTING_JOBS['job-1'])
del(expected_job['_expiry'])
dict_compare(job, expected_job)
开发者ID:totem,项目名称:cluster-orchestrator,代码行数:9,代码来源:test_mongo.py
示例19: test_filter_deployment_ids
def test_filter_deployment_ids(self):
# When: I filter deployments from the store for ids only
deployments = self.store.filter_deployments('test-deployment1',
only_ids=True)
# Then: Expected deployment ids are returned
eq_(len(deployments), 2)
dict_compare(deployments[0], {'id': 'test-deployment1-v1'})
dict_compare(deployments[1], {'id': 'test-deployment1-v2'})
开发者ID:totem,项目名称:cluster-deployer,代码行数:9,代码来源:test_mongo.py
示例20: test_get_nodes__with_meta_for_non_existing_upstream
def test_get_nodes__with_meta_for_non_existing_upstream(self):
# Given: Existing nodes registered in etcd for given upstream
self.etcd_cl.read.side_effect = KeyError
# When: I get existing nodes
nodes = self.client.get_nodes_with_meta('test')
# Then: Empty nodes dictionary is returned
dict_compare(nodes, {})
开发者ID:totem,项目名称:yoda-py,代码行数:10,代码来源:test_client.py
注:本文中的tests.helper.dict_compare函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论