本文整理汇总了Python中tests.common.get_check函数的典型用法代码示例。如果您正苦于以下问题:Python get_check函数的具体用法?Python get_check怎么用?Python get_check使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了get_check函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: test_check
def test_check(self):
"""Integration test for supervisord check. Using a mocked supervisord."""
for tc in self.TEST_CASES:
check, instances = get_check('supervisord', tc['yaml'])
self.assertTrue(check is not None, msg=check)
self.assertEquals(tc['expected_instances'], instances)
for instance in instances:
name = instance['name']
try:
# Run the check
check.check(instance)
except Exception, e:
if 'error_message' in tc: # excepted error
self.assertEquals(str(e), tc['error_message'])
else:
self.assertTrue(False, msg=str(e))
else:
# Assert that the check collected the right metrics
expected_metrics = tc['expected_metrics'][name]
self.assert_metrics(expected_metrics, check.get_metrics())
# Assert that the check generated the right service checks
expected_service_checks = tc['expected_service_checks'][name]
self.assert_service_checks(expected_service_checks,
check.get_service_checks())
开发者ID:Joeasaurus,项目名称:dd-agent,代码行数:26,代码来源:test_supervisord.py
示例2: test_zk_stat_parsing_lt_v344
def test_zk_stat_parsing_lt_v344(self):
zk, instances = get_check('zk', CONFIG)
expected = [
('zookeeper.latency.min', -10),
('zookeeper.latency.avg', 0),
('zookeeper.latency.max', 20007),
('zookeeper.bytes_received', 101032173L),
('zookeeper.bytes_sent', 0L),
('zookeeper.connections', 6),
('zookeeper.bytes_outstanding', 0L),
('zookeeper.outstanding_requests', 0L),
('zookeeper.zxid.epoch', 1),
('zookeeper.zxid.count', 55024071),
('zookeeper.nodes', 487L),
]
with patch.object(zk, '_send_command', send_command_lt_v344):
zk.check(instances[0])
service_checks = zk.get_service_checks()
self.assertEquals(service_checks[0]['check'], 'zookeeper.ruok')
self.assertEquals(service_checks[1]['check'], 'zookeeper.mode')
self.assertEquals(service_checks[1]['status'], AgentCheck.CRITICAL)
metrics = zk.get_metrics()
self.assertEquals(sorted([(name, val) for name, _, val, _ in metrics]), sorted(expected))
self.assertEquals(len(service_checks), 2)
self.assertEquals(metrics[0][3]['tags'], ['mode:leader'])
开发者ID:AquaBindi,项目名称:dd-agent,代码行数:29,代码来源:test_zookeeper.py
示例3: test_travis_supervisord
def test_travis_supervisord(self):
"""Integration test for supervisord check. Using a supervisord on Travis."""
# Load yaml config
config_str = open(os.environ['VOLATILE_DIR'] + '/supervisor/supervisord.yaml', 'r').read()
self.assertTrue(config_str is not None and len(config_str) > 0, msg=config_str)
# init the check and get the instances
check, instances = get_check('supervisord', config_str)
self.assertTrue(check is not None, msg=check)
self.assertEquals(len(instances), 1)
# Supervisord should run 3 programs for 30, 60 and 90 seconds
# respectively. The tests below will ensure that the process count
# metric is reported correctly after (roughly) 10, 40, 70 and 100 seconds
for i in range(4):
try:
# Run the check
check.check(instances[0])
except Exception, e:
# Make sure that it ran successfully
self.assertTrue(False, msg=str(e))
else:
up, down = 0, 0
for name, timestamp, value, meta in check.get_metrics():
if name == 'supervisord.process.count':
if 'status:up' in meta['tags']:
up = value
elif 'status:down' in meta['tags']:
down = value
self.assertEquals(up, 3 - i)
self.assertEquals(down, i)
sleep(10)
开发者ID:Joeasaurus,项目名称:dd-agent,代码行数:33,代码来源:test_supervisord.py
示例4: testIIS
def testIIS(self):
check, instances = get_check('iis', CONFIG)
check.check(instances[0])
metrics = check.get_metrics()
service_checks = check.get_service_checks()
time.sleep(1)
# Second run to get the rates
check.check(instances[0])
metrics = check.get_metrics()
service_checks = check.get_service_checks()
base_metrics = [m[0] for m in check.METRICS]
ret_metrics = [m[0] for m in metrics]
ret_tags = [m[3]['tags'] for m in metrics]
# Make sure each metric was captured
for metric in base_metrics:
self.assertTrue(metric in ret_metrics, "not reporting %s" % metric)
# Make sure everything is tagged correctly
for tags in ret_tags:
self.assertEquals(['mytag1', 'mytag2', 'site:Default Web Site'], tags)
# Make sure that we get a service check
self.assertEquals(len(service_checks),1)
self.assertEquals(check.SERVICE_CHECK, service_checks[0]['check'])
self.assertEquals(['site:Default Web Site'], service_checks[0]['tags'])
开发者ID:AirbornePorcine,项目名称:dd-agent,代码行数:28,代码来源:test_iis.py
示例5: testNginxPlus
def testNginxPlus(self):
test_data = read_data_from_file('nginx_plus_in.json')
expected = eval(read_data_from_file('nginx_plus_out.python'))
nginx, instances = get_check('nginx', self.nginx_config)
parsed = nginx.parse_json(test_data)
parsed.sort()
self.assertEquals(parsed, expected)
开发者ID:AirbornePorcine,项目名称:dd-agent,代码行数:7,代码来源:test_web.py
示例6: testSqlServer
def testSqlServer(self):
check, instances = get_check('sqlserver', CONFIG)
check.check(instances[0])
metrics = check.get_metrics()
# Make sure the base metrics loaded
base_metrics = [m[0] for m in check.METRICS]
ret_metrics = [m[0] for m in metrics]
for metric in base_metrics:
assert metric in ret_metrics
# Check our custom metrics
assert 'sqlserver.clr.execution' in ret_metrics
assert 'sqlserver.exec.in_progress' in ret_metrics
assert 'sqlserver.db.commit_table_entries' in ret_metrics
# Make sure the ALL custom metric is tagged
tagged_metrics = [m for m in metrics
if m[0] == 'sqlserver.db.commit_table_entries']
for metric in tagged_metrics:
for tag in metric[3]['tags']:
assert tag.startswith('db')
# Service checks
service_checks = check.get_service_checks()
service_checks_count = len(service_checks)
self.assertTrue(type(service_checks) == type([]))
self.assertTrue(service_checks_count > 0)
self.assertEquals(len([sc for sc in service_checks if sc['check'] == check.SERVICE_CHECK_NAME]), 1, service_checks)
# Assert that all service checks have the proper tags: host and port
self.assertEquals(len([sc for sc in service_checks if "host:127.0.0.1,1433" in sc['tags']]), service_checks_count, service_checks)
self.assertEquals(len([sc for sc in service_checks if "db:master" in sc['tags']]), service_checks_count, service_checks)
开发者ID:AirbornePorcine,项目名称:dd-agent,代码行数:32,代码来源:test_sqlserver.py
示例7: test_process_zpool
def test_process_zpool(self):
zpool_metrics = {
'capacity': '64',
'size': '15942918602752',
'dedupratio': '1.00',
'free': '5585519069102',
'allocated': '10357399533649'
}
zpool_checks = {
'health': 'ONLINE'
}
check, instances = get_check('zfs', self.config)
zpool = 'tank'
check._process_zpool(zpool=zpool, zpool_metrics=zpool_metrics, zpool_checks=zpool_checks)
metrics = check.get_metrics()
for metric in metrics:
if metric[0] == 'zpool.capacity':
assert metric[2] == '64'
elif metric[0] == 'zpool.size':
assert metric[2] == '15942918602752'
elif metric[0] == 'zpool.dedupratio':
assert metric[2] == '1.00'
elif metric[0] == 'zpool.free':
assert metric[2] == '5585519069102'
elif metric[0] == 'zpool.allocated':
assert metric[2] == '10357399533649'
else:
assert False, "Unexpcted metric " + metric[0]
开发者ID:jonathonwiebe,项目名称:dd-agent,代码行数:28,代码来源:test_zfs.py
示例8: test_get_zpool_stats
def test_get_zpool_stats(self):
zpool_get_data = """NAME PROPERTY VALUE SOURCE
tank capacity 64% -
tank size 14.5T -
tank dedupratio 1.00x -
tank free 5.08T -
tank allocated 9.42T -"""
expected = {
'capacity': '64',
'size': '15942918602752',
'dedupratio': '1.00',
'free': '5585519069102',
'allocated': '10357399533649'
}
check, instances = get_check('zfs', self.config)
check.subprocess.Popen = mock.Mock()
check.subprocess.Popen.return_value = mock.Mock()
check.subprocess.Popen.return_value.communicate.return_value = (zpool_get_data, None)
zpool_name = 'tank'
actual = check._get_zpool_stats(zpool_name)
assert check.subprocess.Popen.call_count == 1
assert check.subprocess.Popen.call_args == mock.call(
'sudo zpool get capacity,size,dedupratio,free,allocated tank'.split(),
stdout=check.subprocess.PIPE
)
for result in actual.keys():
assert actual[result] == expected[result]
开发者ID:jonathonwiebe,项目名称:dd-agent,代码行数:28,代码来源:test_zfs.py
示例9: test_convert_human_to_bytes
def test_convert_human_to_bytes(self):
check, instances = get_check('zfs', self.config)
# Test bytes
result = check._convert_human_to_bytes('300')
assert result == 300
# Test kilobytes
result = check._convert_human_to_bytes('300K')
assert result == 307200
# Test megabytes
result = check._convert_human_to_bytes('300M')
assert result == 314572800
# Test gigabytes
result = check._convert_human_to_bytes('300G')
assert result == 322122547200
# Test terabytes
result = check._convert_human_to_bytes('300T')
assert result == 329853488332800
# Test invalid input
with self.assertRaises(ValueError):
check._convert_human_to_bytes('Pfffffft')
# Test non-implemented units
with self.assertRaises(NotImplementedError):
check._convert_human_to_bytes('300J')
开发者ID:jonathonwiebe,项目名称:dd-agent,代码行数:30,代码来源:test_zfs.py
示例10: test_check
def test_check(self):
check, instances = get_check('activemq_xml', self.config)
check.requests = mock.Mock()
def response_side_effect(*args, **kwargs):
text = ''
if '/admin/xml/topics.jsp' in args[0]:
text = '<topics></topics>'
elif '/admin/xml/queues.jsp' in args[0]:
text = '<queues></queues>'
elif '/admin/xml/subscribers.jsp' in args[0]:
text = '<subscribers></subscribers>'
# if text='' then we will get an xml parsing error
# (which is what we want if we called with a url we dont know)
return mock.Mock(text=text)
check.requests.get.side_effect = response_side_effect
check.check(instances[0])
expected = {
'url:http://localhost:8161': {
'activemq.queue.count': (0, 'gauge'),
'activemq.topic.count': (0, 'gauge'),
'activemq.subscriber.count': (0, 'gauge'),
}
}
self._assert_expected_metrics(expected, check.get_metrics())
开发者ID:AquaBindi,项目名称:dd-agent,代码行数:26,代码来源:test_activemq_xml.py
示例11: run_check
def run_check(name, path=None):
from tests.common import get_check
# Read the config file
confd_path = path or os.path.join(get_confd_path(get_os()), '%s.yaml' % name)
try:
f = open(confd_path)
except IOError:
raise Exception('Unable to open configuration at %s' % confd_path)
config_str = f.read()
f.close()
# Run the check
check, instances = get_check(name, config_str)
if not instances:
raise Exception('YAML configuration returned no instances.')
for instance in instances:
check.check(instance)
if check.has_events():
print "Events:\n"
pprint(check.get_events(), indent=4)
print "Metrics:\n"
pprint(check.get_metrics(), indent=4)
开发者ID:alekstorm,项目名称:dd-agent,代码行数:25,代码来源:__init__.py
示例12: testApacheOldConfig
def testApacheOldConfig(self):
a, _ = get_check('apache', self.apache_config)
config = {
'apache_status_url': 'http://example.com/server-status?auto'
}
instances = a.parse_agent_config(config)['instances']
assert instances[0]['apache_status_url'] == config['apache_status_url']
开发者ID:openstack,项目名称:monasca-agent,代码行数:7,代码来源:test_web.py
示例13: test_build_message
def test_build_message(self):
"""Unit test supervisord build service check message."""
process = {
'now': 1414815513,
'group': 'mysql',
'description': 'pid 787, uptime 0:02:05',
'pid': 787,
'stderr_logfile': '/var/log/supervisor/mysql-stderr---supervisor-3ATI82.log',
'stop': 0,
'statename': 'RUNNING',
'start': 1414815388,
'state': 20,
'stdout_logfile': '/var/log/mysql/mysql.log',
'logfile': '/var/log/mysql/mysql.log',
'exitstatus': 0,
'spawnerr': '',
'name': 'mysql'
}
expected_message = """Current time: 2014-11-01 04:18:33
Process name: mysql
Process group: mysql
Description: pid 787, uptime 0:02:05
Error log file: /var/log/supervisor/mysql-stderr---supervisor-3ATI82.log
Stdout log file: /var/log/mysql/mysql.log
Log file: /var/log/mysql/mysql.log
State: RUNNING
Start time: 2014-11-01 04:16:28
Stop time: \nExit Status: 0"""
check, _ = get_check('supervisord', self.TEST_CASES[0]['yaml'])
self.assertEquals(expected_message, check._build_message(process))
开发者ID:Joeasaurus,项目名称:dd-agent,代码行数:32,代码来源:test_supervisord.py
示例14: testNginx
def testNginx(self):
nginx, instances = get_check('nginx', self.nginx_config)
nginx.check(instances[0])
r = nginx.get_metrics()
self.assertEquals(len([t for t in r if t[0] == "nginx.net.connections"]), 1, r)
nginx.check(instances[1])
r = nginx.get_metrics()
self.assertEquals(r[0][3].get('tags'), ['first_one'])
开发者ID:Tokutek,项目名称:dd-agent,代码行数:9,代码来源:test_web.py
示例15: testLighttpd
def testLighttpd(self):
l, instances = get_check('lighttpd', self.lighttpd_config)
l.check(instances[0])
metrics = l.get_metrics()
self.assertEquals(metrics[0][3].get('tags'), ['instance:first'])
l.check(instances[1])
metrics = l.get_metrics()
self.assertEquals(metrics[0][3].get('tags'), ['instance:second'])
开发者ID:Tokutek,项目名称:dd-agent,代码行数:10,代码来源:test_web.py
示例16: testApache
def testApache(self):
a, instances = get_check('apache', self.apache_config)
a.check(instances[0])
metrics = a.get_metrics()
self.assertEquals(metrics[0][3].get('tags'), ['instance:first'])
a.check(instances[1])
metrics = a.get_metrics()
self.assertEquals(metrics[0][3].get('tags'), ['instance:second'])
开发者ID:Tokutek,项目名称:dd-agent,代码行数:10,代码来源:test_web.py
示例17: test_check
def test_check(self):
v, instances = get_check('varnish', self.config)
import pprint
try:
for i in range(3):
v.check({"varnishstat": os.popen("which varnishstat").read()[:-1]})
pprint.pprint(v.get_metrics())
time.sleep(1)
except Exception:
pass
开发者ID:AquaBindi,项目名称:dd-agent,代码行数:10,代码来源:test_varnish.py
示例18: testNginx
def testNginx(self):
raise SkipTest("Requires running Lighthttpd")
nginx, instances = get_check('nginx', self.nginx_config)
nginx.check(instances[0])
r = nginx.get_metrics()
self.assertEqual(len([t for t in r if t[0] == "nginx.net.connections"]), 1, r)
nginx.check(instances[1])
r = nginx.get_metrics()
self.assertEqual(r[0][3].get('dimensions'), {'test': 'first_one'})
开发者ID:openstack,项目名称:monasca-agent,代码行数:10,代码来源:test_web.py
示例19: testNginxOldConfig
def testNginxOldConfig(self):
nginx, _ = get_check('nginx', self.nginx_config)
config = {
'nginx_status_url_1': 'http://www.example.com/nginx_status:first_tag',
'nginx_status_url_2': 'http://www.example2.com/nginx_status:8080:second_tag',
'nginx_status_url_3': 'http://www.example3.com/nginx_status:third_tag'
}
instances = nginx.parse_agent_config(config)['instances']
self.assertEqual(len(instances), 3)
for i, instance in enumerate(instances):
assert ':'.join(config.values()[i].split(':')[:-1]) == instance['nginx_status_url']
开发者ID:openstack,项目名称:monasca-agent,代码行数:11,代码来源:test_web.py
示例20: testApache
def testApache(self):
raise SkipTest("Requires running apache")
a, instances = get_check('apache', self.apache_config)
a.check(instances[0])
metrics = a.get_metrics()
self.assertEqual(metrics[0][3].get('dimensions'), {'instance': 'first'})
a.check(instances[1])
metrics = a.get_metrics()
self.assertEqual(metrics[0][3].get('dimensions'), {'instance': 'second'})
开发者ID:openstack,项目名称:monasca-agent,代码行数:11,代码来源:test_web.py
注:本文中的tests.common.get_check函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论