本文整理汇总了Python中mongodb.MongoDBCollector类的典型用法代码示例。如果您正苦于以下问题:Python MongoDBCollector类的具体用法?Python MongoDBCollector怎么用?Python MongoDBCollector使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了MongoDBCollector类的9个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: TestMongoDBCollector
class TestMongoDBCollector(CollectorTestCase):
def setUp(self):
config = get_collector_config("MongoDBCollector", {"host": "localhost:27017"})
self.collector = MongoDBCollector(config, None)
self.connection = MagicMock()
@patch("pymongo.Connection")
@patch.object(Collector, "publish")
def test_should_publish_nested_keys_for_server_stats(self, publish_mock, connector_mock):
data = {"more_keys": {"nested_key": 1}, "key": 2, "string": "str"}
self._annotate_connection(connector_mock, data)
self.collector.collect()
self.connection.db.command.assert_called_once_with("serverStatus")
self.assertPublishedMany(publish_mock, {"more_keys.nested_key": 1, "key": 2})
@patch("pymongo.Connection")
@patch.object(Collector, "publish")
def test_should_publish_nested_keys_for_db_stats(self, publish_mock, connector_mock):
data = {"db_keys": {"db_nested_key": 1}, "dbkey": 2, "dbstring": "str"}
self._annotate_connection(connector_mock, data)
self.collector.collect()
self.connection["db1"].command.assert_called_once_with("dbStats")
metrics = {"db_keys.db_nested_key": 1, "dbkey": 2}
self.setDocExample(self.collector.__class__.__name__, metrics)
self.assertPublishedMany(publish_mock, metrics)
def _annotate_connection(self, connector_mock, data):
connector_mock.return_value = self.connection
self.connection.db.command.return_value = data
self.connection.database_names.return_value = ["db1"]
开发者ID:richardc,项目名称:Diamond,代码行数:35,代码来源:testmongodb.py
示例2: TestMongoDBCollectorWithReplica
class TestMongoDBCollectorWithReplica(CollectorTestCase):
def setUp(self):
config = get_collector_config('MongoDBCollector', {
'host': 'localhost:27017',
'databases': '^db',
'replica': True
})
self.collector = MongoDBCollector(config, None)
self.connection = MagicMock()
def test_import(self):
self.assertTrue(MongoDBCollector)
@run_only_if_pymongo_is_available
@patch('pymongo.MongoClient')
@patch.object(Collector, 'publish')
def test_should_publish_replset_status_if_enabled(self,
publish_mock,
connector_mock):
data = {'more_keys': long(1), 'key': 2, 'string': 'str'}
self._annotate_connection(connector_mock, data)
self.collector.collect()
self.connection.admin.command.assert_called_once_with(
'replSetGetStatus')
def _annotate_connection(self, connector_mock, data):
connector_mock.return_value = self.connection
self.connection.db.command.return_value = data
self.connection.database_names.return_value = ['db1', 'baddb']
开发者ID:KlavsKlavsen,项目名称:Diamond,代码行数:32,代码来源:testmongodb.py
示例3: TestMongoDBCollector
class TestMongoDBCollector(CollectorTestCase):
def setUp(self):
config = get_collector_config('MongoDBCollector', {
'host': 'localhost:27017',
})
self.collector = MongoDBCollector(config, None)
self.connection = MagicMock()
@patch('pymongo.Connection')
@patch.object(Collector, 'publish')
def test_should_publish_nested_keys_for_server_stats(self,
publish_mock,
connector_mock):
data = {'more_keys': {'nested_key': 1}, 'key': 2, 'string': 'str'}
self._annotate_connection(connector_mock, data)
self.collector.collect()
self.connection.db.command.assert_called_once_with('serverStatus')
self.assertPublishedMany(publish_mock, {
'more_keys.nested_key': 1,
'key': 2
})
@patch('pymongo.Connection')
@patch.object(Collector, 'publish')
def test_should_publish_nested_keys_for_db_stats(self,
publish_mock,
connector_mock):
data = {'db_keys': {'db_nested_key': 1}, 'dbkey': 2, 'dbstring': 'str'}
self._annotate_connection(connector_mock, data)
self.collector.collect()
self.connection['db1'].command.assert_called_once_with('dbStats')
metrics = {
'db_keys.db_nested_key': 1,
'dbkey': 2
}
self.setDocExample(collector=self.collector.__class__.__name__,
metrics=metrics,
defaultpath=self.collector.config['path'])
self.assertPublishedMany(publish_mock, metrics)
def _annotate_connection(self, connector_mock, data):
connector_mock.return_value = self.connection
self.connection.db.command.return_value = data
self.connection.database_names.return_value = ['db1']
开发者ID:patricksmith,项目名称:Diamond,代码行数:49,代码来源:testmongodb.py
示例4: setUp
def setUp(self):
config = get_collector_config('MongoDBCollector', {
'host': 'localhost:27017',
'databases': '^db'
})
self.collector = MongoDBCollector(config, None)
self.connection = MagicMock()
开发者ID:KlavsKlavsen,项目名称:Diamond,代码行数:7,代码来源:testmongodb.py
示例5: TestMongoDBCollector
class TestMongoDBCollector(CollectorTestCase):
def setUp(self):
config = get_collector_config('MongoDBCollector', {
'host' : 'localhost:27017',
})
self.collector = MongoDBCollector(config, None)
@patch('pymongo.Connection')
@patch.object(Collector, 'publish')
def test_should_publish_nested_keys(self, publish_mock, connector_mock):
connection = Mock()
data = {'more_keys': {'nested_key': 1}, 'key': 2, 'string': 'str'}
connector_mock.return_value = connection
connection.db.command.return_value = data
self.collector.collect()
connection.db.command.assert_called_once_with('serverStatus')
self.assertPublishedMany(publish_mock, {
'more_keys.nested_key': 1,
'key' : 2
})
开发者ID:MechanisM,项目名称:Diamond,代码行数:22,代码来源:testmongodb.py
示例6: TestMongoDBCollector
class TestMongoDBCollector(CollectorTestCase):
def setUp(self):
config = get_collector_config('MongoDBCollector', {
'host': 'localhost:27017',
'databases': '^db'
})
self.collector = MongoDBCollector(config, None)
self.connection = MagicMock()
def test_import(self):
self.assertTrue(MongoDBCollector)
@run_only_if_pymongo_is_available
@patch('pymongo.MongoClient')
@patch.object(Collector, 'publish')
def test_should_publish_nested_keys_for_server_stats(self,
publish_mock,
connector_mock):
data = {'more_keys': {'nested_key': 1}, 'key': 2, 'string': 'str'}
self._annotate_connection(connector_mock, data)
self.collector.collect()
self.connection.db.command.assert_called_once_with('serverStatus')
self.assertPublishedMany(publish_mock, {
'more_keys.nested_key': 1,
'key': 2
})
@run_only_if_pymongo_is_available
@patch('pymongo.MongoClient')
@patch.object(Collector, 'publish')
def test_should_publish_nested_keys_for_db_stats(self,
publish_mock,
connector_mock):
data = {'db_keys': {'db_nested_key': 1}, 'dbkey': 2, 'dbstring': 'str'}
self._annotate_connection(connector_mock, data)
self.collector.collect()
self.connection['db1'].command.assert_called_once_with('dbStats')
metrics = {
'db_keys.db_nested_key': 1,
'dbkey': 2
}
self.setDocExample(collector=self.collector.__class__.__name__,
metrics=metrics,
defaultpath=self.collector.config['path'])
self.assertPublishedMany(publish_mock, metrics)
@run_only_if_pymongo_is_available
@patch('pymongo.MongoClient')
@patch.object(Collector, 'publish')
def test_should_publish_stats_with_long_type(self,
publish_mock,
connector_mock):
data = {'more_keys': long(1), 'key': 2, 'string': 'str'}
self._annotate_connection(connector_mock, data)
self.collector.collect()
self.connection.db.command.assert_called_once_with('serverStatus')
self.assertPublishedMany(publish_mock, {
'more_keys': 1,
'key': 2
})
@run_only_if_pymongo_is_available
@patch('pymongo.MongoClient')
@patch.object(Collector, 'publish')
def test_should_ignore_unneeded_databases(self,
publish_mock,
connector_mock):
self._annotate_connection(connector_mock, {})
self.collector.collect()
assert call('baddb') not in self.connection.__getitem__.call_args_list
@run_only_if_pymongo_is_available
@patch('pymongo.MongoClient')
@patch.object(Collector, 'publish')
def test_should_ignore_unneeded_collections(self,
publish_mock,
connector_mock):
data = {'more_keys': long(1), 'key': 2, 'string': 'str'}
self._annotate_connection(connector_mock, data)
self.connection['db1'].collection_names.return_value = ['collection1',
'tmp.mr.tmp1']
self.connection['db1'].command.return_value = {'key': 2,
'string': 'str'}
self.collector.collect()
self.connection.db.command.assert_called_once_with('serverStatus')
self.connection['db1'].collection_names.assert_called_once_with()
self.connection['db1'].command.assert_any_call('dbStats')
#.........这里部分代码省略.........
开发者ID:KlavsKlavsen,项目名称:Diamond,代码行数:101,代码来源:testmongodb.py
示例7: TestMongoMultiHostDBCollector
class TestMongoMultiHostDBCollector(CollectorTestCase):
def setUp(self):
config = get_collector_config('MongoDBCollector', {
'hosts': ['localhost:27017', 'localhost:27057'],
'databases': '^db',
})
self.collector = MongoDBCollector(config, None)
self.connection = MagicMock()
def test_import(self):
self.assertTrue(MongoDBCollector)
@run_only_if_pymongo_is_available
@patch('pymongo.MongoClient')
@patch.object(Collector, 'publish')
def test_should_publish_nested_keys_for_server_stats(self,
publish_mock,
connector_mock):
data = {'more_keys': {'nested_key': 1}, 'key': 2, 'string': 'str'}
self._annotate_connection(connector_mock, data)
self.collector.collect()
self.connection.db.command.assert_called_with('serverStatus')
self.assertPublishedMany(publish_mock, {
'localhost_27017.more_keys.nested_key': 1,
'localhost_27057.more_keys.nested_key': 1,
'localhost_27017.key': 2,
'localhost_27057.key': 2
})
@run_only_if_pymongo_is_available
@patch('pymongo.MongoClient')
@patch.object(Collector, 'publish')
def test_should_publish_nested_keys_for_db_stats(self,
publish_mock,
connector_mock):
data = {'db_keys': {'db_nested_key': 1}, 'dbkey': 2, 'dbstring': 'str'}
self._annotate_connection(connector_mock, data)
self.collector.collect()
self.connection['db1'].command.assert_called_with('dbStats')
metrics = {
'localhost_27017.db_keys.db_nested_key': 1,
'localhost_27057.db_keys.db_nested_key': 1,
'localhost_27017.dbkey': 2,
'localhost_27057.dbkey': 2
}
self.setDocExample(collector=self.collector.__class__.__name__,
metrics=metrics,
defaultpath=self.collector.config['path'])
self.assertPublishedMany(publish_mock, metrics)
@run_only_if_pymongo_is_available
@patch('pymongo.MongoClient')
@patch.object(Collector, 'publish')
def test_should_publish_stats_with_long_type(self,
publish_mock,
connector_mock):
data = {'more_keys': long(1), 'key': 2, 'string': 'str'}
self._annotate_connection(connector_mock, data)
self.collector.collect()
self.connection.db.command.assert_called_with('serverStatus')
self.assertPublishedMany(publish_mock, {
'localhost_27017.more_keys': 1,
'localhost_27057.more_keys': 1,
'localhost_27017.key': 2,
'localhost_27057.key': 2
})
@run_only_if_pymongo_is_available
@patch('pymongo.MongoClient')
@patch.object(Collector, 'publish')
def test_should_ignore_unneeded_databases(self,
publish_mock,
connector_mock):
self._annotate_connection(connector_mock, {})
self.collector.collect()
assert call('baddb') not in self.connection.__getitem__.call_args_list
@run_only_if_pymongo_is_available
@patch('pymongo.MongoClient')
@patch.object(Collector, 'publish')
def test_should_ignore_unneeded_collections(self,
publish_mock,
connector_mock):
data = {'more_keys': long(1), 'key': 2, 'string': 'str'}
self._annotate_connection(connector_mock, data)
self.connection['db1'].collection_names.return_value = ['collection1',
'tmp.mr.tmp1']
self.connection['db1'].command.return_value = {'key': 2,
'string': 'str'}
#.........这里部分代码省略.........
开发者ID:KlavsKlavsen,项目名称:Diamond,代码行数:101,代码来源:testmongodb.py
示例8: setUp
def setUp(self):
config = get_collector_config("MongoDBCollector", {"host": "localhost:27017"})
self.collector = MongoDBCollector(config, None)
self.connection = MagicMock()
开发者ID:richardc,项目名称:Diamond,代码行数:4,代码来源:testmongodb.py
示例9: setUp
def setUp(self):
config = get_collector_config('MongoDBCollector', {
'host' : 'localhost:27017',
})
self.collector = MongoDBCollector(config, None)
开发者ID:MechanisM,项目名称:Diamond,代码行数:5,代码来源:testmongodb.py
注:本文中的mongodb.MongoDBCollector类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论