本文整理汇总了Python中transaction.TransactionManager类的典型用法代码示例。如果您正苦于以下问题:Python TransactionManager类的具体用法?Python TransactionManager怎么用?Python TransactionManager使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了TransactionManager类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: test_proxy
def test_proxy(self):
config = {
"endpoints": {"https://app.datadoghq.com": ["foo"]},
"proxy_settings": {
"host": "localhost",
"port": PROXY_PORT,
"user": None,
"password": None
}
}
app = Application()
app.skip_ssl_validation = True
app._agentConfig = config
trManager = TransactionManager(MAX_WAIT_FOR_REPLAY, MAX_QUEUE_SIZE, THROTTLING_DELAY)
trManager._flush_without_ioloop = True # Use blocking API to emulate tornado ioloop
CustomAgentTransaction.set_tr_manager(trManager)
app.use_simple_http_client = False # We need proxy capabilities
app.agent_dns_caching = False
# _test is the instance of this class. It is needed to call the method stop() and deal with the asynchronous
# calls as described here : http://www.tornadoweb.org/en/stable/testing.html
CustomAgentTransaction._test = self
CustomAgentTransaction.set_application(app)
CustomAgentTransaction.set_endpoints(config['endpoints'])
CustomAgentTransaction('body', {}, "") # Create and flush the transaction
self.wait()
del CustomAgentTransaction._test
access_log = self.docker_client.exec_start(
self.docker_client.exec_create(CONTAINER_NAME, 'cat /var/log/squid/access.log')['Id'])
self.assertTrue("CONNECT" in access_log) # There should be an entry in the proxy access log
self.assertEquals(len(trManager._endpoints_errors), 1) # There should be an error since we gave a bogus api_key
开发者ID:alq666,项目名称:dd-agent,代码行数:33,代码来源:test_proxy.py
示例2: testCustomEndpoint
def testCustomEndpoint(self):
MetricTransaction._endpoints = []
config = {
"sd_url": "https://foo.bar.com",
"agent_key": "foo",
"use_dd": True
}
app = Application()
app.skip_ssl_validation = False
app._agentConfig = config
app.use_simple_http_client = True
trManager = TransactionManager(timedelta(seconds=0), MAX_QUEUE_SIZE, THROTTLING_DELAY)
trManager._flush_without_ioloop = True # Use blocking API to emulate tornado ioloop
MetricTransaction._trManager = trManager
MetricTransaction.set_application(app)
MetricTransaction.set_endpoints()
transaction = MetricTransaction(None, {}, "msgtype")
endpoints = [transaction.get_url(e) for e in transaction._endpoints]
# Direct metric submission is not being enabled.
#expected = ['https://foo.bar.com/intake/msgtype?agent_key=foo']
expected = []
self.assertEqual(endpoints, expected, (endpoints, expected))
开发者ID:clementtrebuchet,项目名称:sd-agent,代码行数:26,代码来源:test_transaction.py
示例3: checkResolve
def checkResolve(self, resolvable=True):
db = DB(self._storage)
t1 = TransactionManager()
c1 = db.open(t1)
o1 = c1.root()['p'] = (PCounter if resolvable else PCounter2)()
o1.inc()
t1.commit()
t2 = TransactionManager()
c2 = db.open(t2)
o2 = c2.root()['p']
o2.inc(2)
t2.commit()
o1.inc(3)
try:
t1.commit()
except ConflictError as err:
self.assertIn(".PCounter2,", str(err))
self.assertEqual(o1._value, 3)
else:
self.assertTrue(resolvable, "Expected ConflictError")
self.assertEqual(o1._value, 6)
t2.begin()
self.assertEqual(o2._value, o1._value)
db.close()
开发者ID:navytux,项目名称:ZODB,代码行数:29,代码来源:ConflictResolution.py
示例4: test_multiple_endpoints
def test_multiple_endpoints(self):
raise SkipTest("This test doesn't apply to Server Density.")
config = {
"endpoints": {
"https://app.datadoghq.com": ['api_key'],
"https://app.example.com": ['api_key']
},
"dd_url": "https://app.datadoghq.com",
"api_key": 'api_key',
"use_sd": True
}
app = Application()
app.skip_ssl_validation = False
app._agentConfig = config
app.use_simple_http_client = True
trManager = TransactionManager(timedelta(seconds=0), MAX_QUEUE_SIZE,
THROTTLING_DELAY, max_endpoint_errors=100)
trManager._flush_without_ioloop = True # Use blocking API to emulate tornado ioloop
MetricTransaction._trManager = trManager
MetricTransaction.set_application(app)
MetricTransaction.set_endpoints(config['endpoints'])
MetricTransaction({}, {})
# 2 endpoints = 2 transactions
self.assertEqual(len(trManager._transactions), 2)
self.assertEqual(trManager._transactions[0]._endpoint, 'https://app.datadoghq.com')
self.assertEqual(trManager._transactions[1]._endpoint, 'https://app.example.com')
开发者ID:serverdensity,项目名称:sd-agent,代码行数:27,代码来源:test_transaction.py
示例5: testCustomEndpoint
def testCustomEndpoint(self):
MetricTransaction._endpoints = []
config = {
"endpoints": {"https://foo.bar.com": ["foo"]},
"dd_url": "https://foo.bar.com",
"api_key": "foo",
"use_dd": True
}
app = Application()
app.skip_ssl_validation = False
app._agentConfig = config
app.use_simple_http_client = True
trManager = TransactionManager(timedelta(seconds=0), MAX_QUEUE_SIZE,
THROTTLING_DELAY, max_endpoint_errors=100)
trManager._flush_without_ioloop = True # Use blocking API to emulate tornado ioloop
MetricTransaction._trManager = trManager
MetricTransaction.set_application(app)
MetricTransaction.set_endpoints(config['endpoints'])
transaction = MetricTransaction(None, {}, "msgtype")
endpoints = []
for endpoint in transaction._endpoints:
for api_key in transaction._endpoints[endpoint]:
endpoints.append(transaction.get_url(endpoint, api_key))
expected = ['https://foo.bar.com/intake/msgtype?api_key=foo']
self.assertEqual(endpoints, expected, (endpoints, expected))
开发者ID:SupersonicAds,项目名称:dd-agent,代码行数:29,代码来源:test_transaction.py
示例6: test_endpoint_error
def test_endpoint_error(self):
trManager = TransactionManager(timedelta(seconds=0), MAX_QUEUE_SIZE,
timedelta(seconds=0), max_endpoint_errors=2)
step = 10
oneTrSize = (MAX_QUEUE_SIZE / step) - 1
for i in xrange(step):
trManager.append(memTransaction(oneTrSize, trManager))
trManager.flush()
# There should be exactly step transaction in the list,
# and only 2 of them with a flush count of 1
self.assertEqual(len(trManager._transactions), step)
flush_count = 0
for tr in trManager._transactions:
flush_count += tr._flush_count
self.assertEqual(flush_count, 2)
# If we retry to flush, two OTHER transactions should be tried
trManager.flush()
self.assertEqual(len(trManager._transactions), step)
flush_count = 0
for tr in trManager._transactions:
flush_count += tr._flush_count
self.assertIn(tr._flush_count, [0, 1])
self.assertEqual(flush_count, 4)
# Finally when it's possible to flush, everything should go smoothly
for tr in trManager._transactions:
tr.is_flushable = True
trManager.flush()
self.assertEqual(len(trManager._transactions), 0)
开发者ID:SupersonicAds,项目名称:dd-agent,代码行数:35,代码来源:test_transaction.py
示例7: test_zbigarray_vs_cache_invalidation
def test_zbigarray_vs_cache_invalidation():
root = testdb.dbopen()
conn = root._p_jar
db = conn.db()
conn.close()
del root, conn
tm1 = TransactionManager()
tm2 = TransactionManager()
conn1 = db.open(transaction_manager=tm1)
root1 = conn1.root()
# setup zarray
root1['zarray3'] = a1 = ZBigArray((10,), uint8)
tm1.commit()
# set zarray initial data
a1[0:1] = [1] # XXX -> [0] = 1 after BigArray can
tm1.commit()
# read zarray in conn2
conn2 = db.open(transaction_manager=tm2)
root2 = conn2.root()
a2 = root2['zarray3']
assert a2[0:1] == [1] # read data in conn2 + make sure read correctly
# XXX -> [0] == 1 after BigArray can
# now zarray content is both in ZODB.Connection cache and in _ZBigFileH
# cache for each conn1 and conn2. Modify data in conn1 and make sure it
# fully propagate to conn2.
a1[0:1] = [2] # XXX -> [0] = 2 after BigArray can
tm1.commit()
# still should be read as old value in conn2
assert a2[0:1] == [1]
# and even after virtmem pages reclaim
# ( verifies that _p_invalidate() in ZBlk.loadblkdata() does not lead to
# reloading data as updated )
ram_reclaim_all()
assert a2[0:1] == [1]
tm2.commit() # transaction boundary for t2
# data from tm1 should propagate -> ZODB -> ram pages for _ZBigFileH in conn2
assert a2[0] == 2
conn2.close()
del conn2, root2
dbclose(root1)
开发者ID:Nexedi,项目名称:wendelin.core,代码行数:53,代码来源:test_arrayzodb.py
示例8: test_no_parallelism
def test_no_parallelism(self):
step = 2
trManager = TransactionManager(timedelta(seconds=0), MAX_QUEUE_SIZE,
timedelta(seconds=0), max_parallelism=1,
max_endpoint_errors=100)
for i in xrange(step):
trManager.append(SleepingTransaction(trManager, delay=1))
trManager.flush()
# Flushes should be sequential
for i in xrange(step):
self.assertEqual(trManager._running_flushes, 1)
self.assertEqual(trManager._finished_flushes, i)
self.assertEqual(len(trManager._trs_to_flush), step - (i + 1))
time.sleep(1)
开发者ID:SupersonicAds,项目名称:dd-agent,代码行数:14,代码来源:test_transaction.py
示例9: __init__
def __init__(self, port, agentConfig, watchdog=True,
skip_ssl_validation=False, use_simple_http_client=False):
self._port = int(port)
self._agentConfig = agentConfig
self._metrics = {}
AgentTransaction.set_application(self)
AgentTransaction.set_endpoints(agentConfig['endpoints'])
AgentTransaction.set_request_timeout(agentConfig['forwarder_timeout'])
max_parallelism = self.NO_PARALLELISM
# Multiple endpoints => enable parallelism
if len(agentConfig['endpoints']) > 1:
max_parallelism = self.DEFAULT_PARALLELISM
self._tr_manager = TransactionManager(MAX_WAIT_FOR_REPLAY,
MAX_QUEUE_SIZE, THROTTLING_DELAY,
max_parallelism=max_parallelism)
AgentTransaction.set_tr_manager(self._tr_manager)
self._watchdog = None
self.skip_ssl_validation = skip_ssl_validation or agentConfig.get('skip_ssl_validation', False)
self.use_simple_http_client = use_simple_http_client
if self.skip_ssl_validation:
log.info("Skipping SSL hostname validation, useful when using a transparent proxy")
# Monitor activity
if watchdog:
watchdog_timeout = TRANSACTION_FLUSH_INTERVAL * WATCHDOG_INTERVAL_MULTIPLIER / 1000
self._watchdog = Watchdog(
watchdog_timeout,
max_mem_mb=agentConfig.get('limit_memory_consumption', None),
max_resets=WATCHDOG_HIGH_ACTIVITY_THRESHOLD
)
开发者ID:DylanFrese,项目名称:dd-agent,代码行数:33,代码来源:ddagent.py
示例10: test_zbigarray_invalidate_shape
def test_zbigarray_invalidate_shape():
root = testdb.dbopen()
conn = root._p_jar
db = conn.db()
conn.close()
del root, conn
print
tm1 = TransactionManager()
tm2 = TransactionManager()
conn1 = db.open(transaction_manager=tm1)
root1 = conn1.root()
# setup zarray
root1['zarray4'] = a1 = ZBigArray((10,), uint8)
tm1.commit()
# set zarray initial data
a1[0:1] = [1] # XXX -> [0] = 1 after BigArray can
tm1.commit()
# read zarray in conn2
conn2 = db.open(transaction_manager=tm2)
root2 = conn2.root()
a2 = root2['zarray4']
assert a2[0:1] == [1] # read data in conn2 + make sure read correctly
# XXX -> [0] == 1 after BigArray can
# append to a1 which changes both RAM pages and a1.shape
assert a1.shape == (10,)
a1.append([123])
assert a1.shape == (11,)
assert a1[10:11] == [123] # XXX -> [10] = 123 after BigArray can
tm1.commit()
tm2.commit() # just transaction boundary for t2
# data from tm1 should propagate to tm
assert a2.shape == (11,)
assert a2[10:11] == [123] # XXX -> [10] = 123 after BigArray can
conn2.close()
del conn2, root2, a2
dbclose(root1)
开发者ID:Nexedi,项目名称:wendelin.core,代码行数:46,代码来源:test_arrayzodb.py
示例11: __init__
def __init__(self, port, agentConfig):
self._port = port
self._agentConfig = agentConfig
self._metrics = {}
self._watchdog = Watchdog(TRANSACTION_FLUSH_INTERVAL * WATCHDOG_INTERVAL_MULTIPLIER)
MetricTransaction.set_application(self)
self._tr_manager = TransactionManager(MAX_WAIT_FOR_REPLAY,
MAX_QUEUE_SIZE, THROTTLING_DELAY)
MetricTransaction.set_tr_manager(self._tr_manager)
开发者ID:SafPlusPlus,项目名称:dd-agent,代码行数:9,代码来源:ddagent.py
示例12: test_parallelism
def test_parallelism(self):
step = 4
trManager = TransactionManager(timedelta(seconds=0), MAX_QUEUE_SIZE,
timedelta(seconds=0), max_parallelism=step,
max_endpoint_errors=100)
for i in xrange(step):
trManager.append(SleepingTransaction(trManager))
trManager.flush()
self.assertEqual(trManager._running_flushes, step)
self.assertEqual(trManager._finished_flushes, 0)
# If _trs_to_flush != None, it means that it's still running as it should be
self.assertEqual(trManager._trs_to_flush, [])
time.sleep(1)
# It should be finished
self.assertEqual(trManager._running_flushes, 0)
self.assertEqual(trManager._finished_flushes, step)
self.assertIs(trManager._trs_to_flush, None)
开发者ID:SupersonicAds,项目名称:dd-agent,代码行数:19,代码来源:test_transaction.py
示例13: __init__
def __init__(self):
if not hasattr(self, '_conf'):
raise Exception('Unconfigured Context')
self.log = self._conf.log
self.log.debug('Initializing')
self._constructed_attrs = OrderedDict()
self._active = True
self.tx_manager = TransactionManager()
for callback in self._conf._create_callbacks:
callback(self)
开发者ID:score-framework,项目名称:py.ctx,代码行数:10,代码来源:_init.py
示例14: __init__
def __init__(self, port, agentConfig, watchdog=True):
self._port = int(port)
self._agentConfig = agentConfig
self._metrics = {}
MetricTransaction.set_application(self)
MetricTransaction.set_endpoints()
self._tr_manager = TransactionManager(MAX_WAIT_FOR_REPLAY, MAX_QUEUE_SIZE, THROTTLING_DELAY)
MetricTransaction.set_tr_manager(self._tr_manager)
self._watchdog = None
if watchdog:
watchdog_timeout = TRANSACTION_FLUSH_INTERVAL * WATCHDOG_INTERVAL_MULTIPLIER
self._watchdog = Watchdog(watchdog_timeout)
开发者ID:DevOpsDave,项目名称:dd-agent,代码行数:13,代码来源:ddagent.py
示例15: __init__
def __init__(self, cache_backend=None, keyhandler=None, keygen=None):
self.__dict__ = self.__shared_state
self.prefix = settings.MIDDLEWARE_KEY_PREFIX
if keyhandler: self.kh_class = keyhandler
if keygen: self.kg_class = keygen
if not cache_backend and not hasattr(self, 'cache_backend'):
cache_backend = settings._get_backend()
if not keygen and not hasattr(self, 'kg_class'):
self.kg_class = KeyGen
if keyhandler is None and not hasattr(self, 'kh_class'):
self.kh_class = KeyHandler
if cache_backend:
self.cache_backend = TransactionManager(cache_backend, self.kg_class)
self.keyhandler = self.kh_class(self.cache_backend, self.kg_class, self.prefix)
self._patched = getattr(self, '_patched', False)
开发者ID:GoodCloud,项目名称:johnny-cache,代码行数:17,代码来源:cache.py
示例16: __init__
def __init__(self, port, agentConfig, watchdog=True, skip_ssl_validation=False):
self._port = int(port)
self._agentConfig = agentConfig
self._metrics = {}
MetricTransaction.set_application(self)
MetricTransaction.set_endpoints()
self._tr_manager = TransactionManager(MAX_WAIT_FOR_REPLAY,
MAX_QUEUE_SIZE, THROTTLING_DELAY)
MetricTransaction.set_tr_manager(self._tr_manager)
self._watchdog = None
self.skip_ssl_validation = skip_ssl_validation or agentConfig.get('skip_ssl_validation', False)
if self.skip_ssl_validation:
log.info("Skipping SSL hostname validation, useful when using a transparent proxy")
if watchdog:
watchdog_timeout = TRANSACTION_FLUSH_INTERVAL * WATCHDOG_INTERVAL_MULTIPLIER
self._watchdog = Watchdog(watchdog_timeout,
max_mem_mb=agentConfig.get('limit_memory_consumption', None))
开发者ID:steinnes,项目名称:dd-agent,代码行数:19,代码来源:ddagent.py
示例17: testThrottling
def testThrottling(self):
"""Test throttling while flushing"""
# No throttling, no delay for replay
trManager = TransactionManager(timedelta(seconds = 0), MAX_QUEUE_SIZE, THROTTLING_DELAY)
trManager._flush_without_ioloop = True # Use blocking API to emulate tornado ioloop
# Add 3 transactions, make sure no memory limit is in the way
oneTrSize = MAX_QUEUE_SIZE / 10
for i in xrange(3):
tr = memTransaction(oneTrSize, trManager)
trManager.append(tr)
# Try to flush them, time it
before = datetime.now()
trManager.flush()
after = datetime.now()
self.assertTrue( (after-before) > 3 * THROTTLING_DELAY)
开发者ID:DevOpsDave,项目名称:dd-agent,代码行数:18,代码来源:test_transaction.py
示例18: __init__
def __init__(self, port, agentConfig, watchdog=True,
skip_ssl_validation=False, use_simple_http_client=False):
self._port = int(port)
self._agentConfig = agentConfig
self._metrics = {}
self._dns_cache = None
AgentTransaction.set_application(self)
AgentTransaction.set_endpoints(agentConfig['endpoints'])
if agentConfig['endpoints'] == {}:
log.warning(u"No valid endpoint found. Forwarder will drop all incoming payloads.")
AgentTransaction.set_request_timeout(agentConfig['forwarder_timeout'])
max_parallelism = self.NO_PARALLELISM
# Multiple endpoints => enable parallelism
if len(agentConfig['endpoints']) > 1:
max_parallelism = self.DEFAULT_PARALLELISM
self._tr_manager = TransactionManager(MAX_WAIT_FOR_REPLAY,
MAX_QUEUE_SIZE, THROTTLING_DELAY,
max_parallelism=max_parallelism)
AgentTransaction.set_tr_manager(self._tr_manager)
self._watchdog = None
self.skip_ssl_validation = skip_ssl_validation or _is_affirmative(agentConfig.get('skip_ssl_validation'))
self.agent_dns_caching = _is_affirmative(agentConfig.get('dns_caching'))
self.agent_dns_ttl = int(agentConfig.get('dns_ttl', DEFAULT_DNS_TTL))
if self.agent_dns_caching:
self._dns_cache = DNSCache(ttl=self.agent_dns_ttl)
self.use_simple_http_client = use_simple_http_client
if self.skip_ssl_validation:
log.info("Skipping SSL hostname validation, useful when using a transparent proxy")
# Monitor activity
if watchdog:
watchdog_timeout = TRANSACTION_FLUSH_INTERVAL * WATCHDOG_INTERVAL_MULTIPLIER / 1000
self._watchdog = Watchdog.create(watchdog_timeout,
max_resets=WATCHDOG_HIGH_ACTIVITY_THRESHOLD)
开发者ID:DataDog,项目名称:dd-agent,代码行数:37,代码来源:ddagent.py
示例19: test_notify_transaction_late_comers
def test_notify_transaction_late_comers(self):
# If a datamanager registers for synchonization after a
# transaction has started, we should call newTransaction so it
# can do necessry setup.
import mock
from .. import TransactionManager
manager = TransactionManager()
sync1 = mock.MagicMock()
manager.registerSynch(sync1)
sync1.newTransaction.assert_not_called()
t = manager.begin()
sync1.newTransaction.assert_called_with(t)
sync2 = mock.MagicMock()
manager.registerSynch(sync2)
sync2.newTransaction.assert_called_with(t)
# for, um, completeness
t.commit()
for s in sync1, sync2:
s.beforeCompletion.assert_called_with(t)
s.afterCompletion.assert_called_with(t)
开发者ID:pieterproigia,项目名称:transaction,代码行数:21,代码来源:test__manager.py
示例20: Forwarder
class Forwarder(tornado.web.Application):
def __init__(self, port, agent_config, watchdog=True, skip_ssl_validation=False,
use_simple_http_client=False):
self._port = int(port)
self._agentConfig = agent_config
self._metrics = {}
MetricTransaction.set_application(self)
MetricTransaction.set_endpoints(MonAPI(agent_config['Api']))
self._tr_manager = TransactionManager(MAX_WAIT_FOR_REPLAY, MAX_QUEUE_SIZE, THROTTLING_DELAY)
MetricTransaction.set_tr_manager(self._tr_manager)
self._watchdog = None
self.skip_ssl_validation = skip_ssl_validation or agent_config.get(
'skip_ssl_validation', False)
self.use_simple_http_client = use_simple_http_client
if self.skip_ssl_validation:
log.info("Skipping SSL hostname validation, useful when using a transparent proxy")
if watchdog:
watchdog_timeout = TRANSACTION_FLUSH_INTERVAL * WATCHDOG_INTERVAL_MULTIPLIER
self._watchdog = Watchdog(
watchdog_timeout, max_mem_mb=agent_config.get('limit_memory_consumption', None))
def _post_metrics(self):
if len(self._metrics) > 0:
MetricTransaction(self._metrics, headers={'Content-Type': 'application/json'})
self._metrics = {}
# todo why is the tornado logging method overridden? Perhaps ditch this.
def log_request(self, handler):
""" Override the tornado logging method.
If everything goes well, log level is DEBUG.
Otherwise it's WARNING or ERROR depending on the response code. """
if handler.get_status() < 400:
log_method = log.debug
elif handler.get_status() < 500:
log_method = log.warning
else:
log_method = log.error
request_time = 1000.0 * handler.request.request_time()
log_method("%d %s %.2fms", handler.get_status(),
handler._request_summary(), request_time)
def run(self):
handlers = [
(r"/intake/?", AgentInputHandler),
(r"/api/v1/series/?", AgentInputHandler),
(r"/status/?", StatusHandler),
]
settings = dict(
cookie_secret="12oETzKXQAGaYdkL5gEmGeJJFuYh7EQnp2XdTP1o/Vo=",
xsrf_cookies=False,
debug=False,
log_function=self.log_request
)
non_local_traffic = self._agentConfig.get("non_local_traffic", False)
tornado.web.Application.__init__(self, handlers, **settings)
http_server = tornado.httpserver.HTTPServer(self)
try:
# non_local_traffic must be == True to match, not just some non-false value
if non_local_traffic is True:
http_server.listen(self._port)
else:
# localhost in lieu of 127.0.0.1 to support IPv6
try:
http_server.listen(self._port, address="localhost")
except gaierror:
log.warning(
"localhost seems undefined in your host file, using 127.0.0.1 instead")
http_server.listen(self._port, address="127.0.0.1")
except socket_error as e:
if "Errno 99" in str(e):
log.warning("IPv6 doesn't seem to be fully supported. Falling back to IPv4")
http_server.listen(self._port, address="127.0.0.1")
else:
raise
except socket_error as e:
log.exception(
"Socket error %s. Is another application listening on the same port ? Exiting", e)
sys.exit(1)
except Exception:
log.exception("Uncaught exception. Forwarder is exiting.")
sys.exit(1)
log.info("Listening on port %d" % self._port)
# Register callbacks
self.mloop = get_tornado_ioloop()
logging.getLogger().setLevel(get_logging_config()['log_level'] or logging.INFO)
def flush_trs():
if self._watchdog:
self._watchdog.reset()
#.........这里部分代码省略.........
开发者ID:bluejayKR,项目名称:monasca-agent,代码行数:101,代码来源:daemon.py
注:本文中的transaction.TransactionManager类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论