本文整理汇总了Python中twitter.common.testing.clock.ThreadedClock类的典型用法代码示例。如果您正苦于以下问题:Python ThreadedClock类的具体用法?Python ThreadedClock怎么用?Python ThreadedClock使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了ThreadedClock类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: test_announcer_under_abnormal_circumstances
def test_announcer_under_abnormal_circumstances():
mock_serverset = create_autospec(spec=ServerSet, instance=True)
mock_serverset.join = MagicMock()
mock_serverset.join.side_effect = [
KazooException('Whoops the ensemble is down!'),
'member0001',
]
mock_serverset.cancel = MagicMock()
endpoint = Endpoint('localhost', 12345)
clock = ThreadedClock(31337.0)
announcer = Announcer(
mock_serverset, endpoint, clock=clock, exception_wait=Amount(2, Time.SECONDS))
announcer.start()
try:
clock.tick(1.0)
assert announcer.disconnected_time() == 1.0
clock.tick(2.0)
assert announcer.disconnected_time() == 0.0, (
'Announcer should recover after an exception thrown internally.')
assert announcer._membership == 'member0001'
finally:
announcer.stop()
开发者ID:AltanAlpay,项目名称:aurora,代码行数:25,代码来源:test_announcer.py
示例2: test_caching
def test_caching(getmtime_mock):
URL = 'http://www.google.com'
DATA = b'This is google.com!'
clock = ThreadedClock()
getmtime_mock.return_value = 0
opener = MockOpener(DATA)
web = CachedWeb(clock=clock, opener=opener)
assert not os.path.exists(web.translate_url(URL))
with contextlib.closing(web.open(URL)) as fp:
assert fp.read() == DATA
assert os.path.exists(web.translate_url(URL))
assert opener.opened.is_set()
opener.clear()
assert web.expired(URL, ttl=0.5) is False
clock.tick(1)
assert web.expired(URL, ttl=0.5)
with contextlib.closing(web.open(URL)) as fp:
assert fp.read() == DATA
assert not opener.opened.is_set()
with contextlib.closing(web.open(URL, ttl=0.5)) as fp:
assert fp.read() == DATA
assert opener.opened.is_set(), 'expect expired url to cause http get'
开发者ID:alfss,项目名称:commons,代码行数:26,代码来源:test_http.py
示例3: test_ttl_decrement_works
def test_ttl_decrement_works():
clock = ThreadedClock()
pps = TestPingPongServer('foo', 31337, clock=clock)
pps.ping('hello world', ttl=1)
clock.tick(TestPingPongServer.PING_DELAY.as_(Time.SECONDS))
pps.expect_calls()
开发者ID:jayeye,项目名称:commons,代码行数:7,代码来源:test_pingpong_server.py
示例4: test_caching
def test_caching():
URL = 'http://www.google.com'
DATA = b'This is google.com!'
clock = ThreadedClock()
m = mox.Mox()
m.StubOutWithMock(os.path, 'getmtime')
os.path.getmtime(mox.IgnoreArg()).MultipleTimes().AndReturn(0)
m.ReplayAll()
opener = MockOpener(DATA)
web = CachedWeb(clock=clock, opener=opener)
assert not os.path.exists(web.translate_url(URL))
with contextlib.closing(web.open(URL)) as fp:
assert fp.read() == DATA
assert os.path.exists(web.translate_url(URL))
assert opener.opened.is_set()
opener.clear()
assert web.expired(URL, ttl=0.5) is False
clock.tick(1)
assert web.expired(URL, ttl=0.5)
with contextlib.closing(web.open(URL)) as fp:
assert fp.read() == DATA
assert not opener.opened.is_set()
with contextlib.closing(web.open(URL, ttl=0.5)) as fp:
assert fp.read() == DATA
assert opener.opened.is_set(), 'expect expired url to cause http get'
m.UnsetStubs()
m.VerifyAll()
开发者ID:BabyDuncan,项目名称:commons,代码行数:32,代码来源:test_http.py
示例5: test_ping
def test_ping():
clock = ThreadedClock()
pps = TestPingPongServer('foo', 31337, clock=clock)
pps.ping('hello world', ttl=2)
clock.tick(TestPingPongServer.PING_DELAY.as_(Time.SECONDS))
pps.expect_calls(('ping', 'hello world', 1))
pps.ping('hello world', ttl=3)
clock.tick(TestPingPongServer.PING_DELAY.as_(Time.SECONDS))
pps.expect_calls(('ping', 'hello world', 2))
开发者ID:jayeye,项目名称:commons,代码行数:11,代码来源:test_pingpong_server.py
示例6: test_defer
def test_defer():
clock = ThreadedClock()
DELAY = 3
results = Queue(maxsize=1)
def func():
results.put_nowait('success')
defer(func, delay=DELAY, clock=clock)
with Timer(clock=clock) as timer:
clock.tick(4)
assert results.get() == 'success'
assert timer.elapsed >= DELAY
开发者ID:BabyDuncan,项目名称:commons,代码行数:11,代码来源:test_deferred.py
示例7: test_sleep_0
def test_sleep_0():
clock = ThreadedClock(0)
event = threading.Event()
def run():
clock.sleep(0)
event.set()
th = threading.Thread(target=run)
th.daemon = True
th.start()
assert clock.converge(threads=[th])
assert event.is_set()
开发者ID:EricCen,项目名称:commons,代码行数:14,代码来源:test_clock.py
示例8: setUp
def setUp(self):
self._clock = ThreadedClock(0)
self._checker = mock.Mock(spec=HttpSignaler)
self.fake_health_checks = []
def mock_health_check():
return self.fake_health_checks.pop(0)
self._checker.health = mock.Mock(spec=self._checker.__call__)
self._checker.health.side_effect = mock_health_check
开发者ID:KancerEzeroglu,项目名称:aurora,代码行数:9,代码来源:test_health_checker.py
示例9: test_defer
def test_defer():
DELAY = 3
clock = ThreadedClock()
results = Queue(maxsize=1)
def func():
results.put_nowait('success')
defer(func, delay=DELAY, clock=clock)
with Timer(clock=clock) as timer:
with pytest.raises(Empty):
results.get_nowait()
clock.tick(DELAY + 1)
assert results.get() == 'success'
assert timer.elapsed == DELAY + 1
开发者ID:EricCen,项目名称:commons,代码行数:18,代码来源:test_deferred.py
示例10: test_announcer_under_abnormal_circumstances
def test_announcer_under_abnormal_circumstances():
mock_serverset = mock.MagicMock(spec=ServerSet)
mock_serverset.join = mock.MagicMock()
mock_serverset.join.side_effect = [KazooException("Whoops the ensemble is down!"), "member0001"]
mock_serverset.cancel = mock.MagicMock()
endpoint = Endpoint("localhost", 12345)
clock = ThreadedClock(31337.0)
announcer = Announcer(mock_serverset, endpoint, clock=clock, exception_wait=Amount(2, Time.SECONDS))
announcer.start()
try:
clock.tick(1.0)
assert announcer.disconnected_time() == 1.0
clock.tick(2.0)
assert announcer.disconnected_time() == 0.0, "Announcer should recover after an exception thrown internally."
assert announcer._membership == "member0001"
finally:
announcer.stop()
开发者ID:kevinburg,项目名称:incubator-aurora,代码行数:20,代码来源:test_announcer.py
示例11: test_sampler_base
def test_sampler_base():
class TestSampler(SamplerBase):
def __init__(self, period, clock):
self.count = 0
SamplerBase.__init__(self, period, clock)
def iterate(self):
self.count += 1
test_clock = ThreadedClock()
sampler = TestSampler(Amount(1, Time.SECONDS), clock=test_clock)
sampler.start()
test_clock.tick(0.5)
assert sampler.count == 0
test_clock.tick(0.5)
assert sampler.count == 1
test_clock.tick(5)
assert sampler.count == 6
assert not sampler.is_stopped()
sampler.stop()
# make sure that stopping the sampler short circuits any sampling
test_clock.tick(5)
assert sampler.count == 6
开发者ID:BabyDuncan,项目名称:commons,代码行数:24,代码来源:test_sampling.py
示例12: test_announcer_on_expiration
def test_announcer_on_expiration():
joined = threading.Event()
operations = []
def joined_side_effect(*args, **kw):
# 'global' does not work within python nested functions, so we cannot use a
# counter here, so instead we do append/len (see PEP-3104)
operations.append(1)
if len(operations) == 1 or len(operations) == 3:
joined.set()
return 'membership %d' % len(operations)
else:
raise KazooException('Failed to reconnect')
mock_serverset = create_autospec(spec=ServerSet, instance=True)
mock_serverset.join = MagicMock()
mock_serverset.join.side_effect = joined_side_effect
mock_serverset.cancel = MagicMock()
endpoint = Endpoint('localhost', 12345)
clock = ThreadedClock(31337.0)
announcer = Announcer(
mock_serverset, endpoint, clock=clock, exception_wait=Amount(2, Time.SECONDS))
announcer.start()
try:
joined.wait(timeout=1.0)
assert joined.is_set()
assert announcer._membership == 'membership 1'
assert announcer.disconnected_time() == 0.0
clock.tick(1.0)
assert announcer.disconnected_time() == 0.0
announcer.on_expiration() # expect exception
clock.tick(1.0)
assert announcer.disconnected_time() == 1.0, (
'Announcer should be disconnected on expiration.')
clock.tick(10.0)
assert announcer.disconnected_time() == 0.0, (
'Announcer should not advance disconnection time when connected.')
assert announcer._membership == 'membership 3'
finally:
announcer.stop()
开发者ID:AltanAlpay,项目名称:aurora,代码行数:44,代码来源:test_announcer.py
示例13: test_announcer_under_normal_circumstances
def test_announcer_under_normal_circumstances():
joined = threading.Event()
def joined_side_effect(*args, **kw):
joined.set()
return 'membership foo'
mock_serverset = create_autospec(spec=ServerSet, instance=True)
mock_serverset.join = MagicMock()
mock_serverset.join.side_effect = joined_side_effect
mock_serverset.cancel = MagicMock()
endpoint = Endpoint('localhost', 12345)
clock = ThreadedClock(31337.0)
announcer = Announcer(mock_serverset, endpoint, clock=clock)
assert announcer.disconnected_time() == 0.0
clock.tick(1.0)
assert announcer.disconnected_time() == 1.0, (
'Announcer should advance disconnection time when not yet initially connected.')
announcer.start()
try:
joined.wait(timeout=1.0)
assert joined.is_set()
assert announcer.disconnected_time() == 0.0
clock.tick(1.0)
assert announcer.disconnected_time() == 0.0, (
'Announcer should not advance disconnection time when connected.')
assert announcer._membership == 'membership foo'
finally:
announcer.stop()
mock_serverset.cancel.assert_called_with('membership foo')
assert announcer.disconnected_time() == 0.0
clock.tick(1.0)
assert announcer.disconnected_time() == 0.0, (
'Announcer should not advance disconnection time when stopped.')
开发者ID:AltanAlpay,项目名称:aurora,代码行数:42,代码来源:test_announcer.py
示例14: test_tracing_timed
def test_tracing_timed():
sio = Compatibility.StringIO()
clock = ThreadedClock()
final_trace = []
class PrintTraceInterceptor(Tracer):
def print_trace(self, *args, **kw):
final_trace.append(self._local.parent)
tracer = PrintTraceInterceptor(output=sio, clock=clock, predicate=lambda v: False)
assert not hasattr(tracer._local, 'parent')
with tracer.timed('hello'):
clock.tick(1.0)
with tracer.timed('world 1'):
clock.tick(1.0)
with tracer.timed('world 2'):
clock.tick(1.0)
assert len(final_trace) == 1
final_trace = final_trace[0]
assert final_trace._start == 0
assert final_trace._stop == 3
assert final_trace.duration() == 3
assert final_trace.msg == 'hello'
assert len(final_trace.children) == 2
child = final_trace.children[0]
assert child._start == 1
assert child._stop == 2
assert child.parent is final_trace
assert child.msg == 'world 1'
child = final_trace.children[1]
assert child._start == 2
assert child._stop == 3
assert child.parent is final_trace
assert child.msg == 'world 2'
# should not log if verbosity low
assert sio.getvalue() == ''
开发者ID:BabyDuncan,项目名称:commons,代码行数:39,代码来源:test_tracer.py
示例15: TestHealthChecker
class TestHealthChecker(unittest.TestCase):
def setUp(self):
self._clock = ThreadedClock(0)
self._checker = mock.Mock(spec=HttpSignaler)
self.initial_interval_secs = 15
self.interval_secs = 10
self.fake_health_checks = []
def mock_health_check():
return self.fake_health_checks.pop(0)
self._checker.health = mock.Mock(spec=self._checker.__call__)
self._checker.health.side_effect = mock_health_check
def append_health_checks(self, status, num_calls=1):
for i in range(num_calls):
self.fake_health_checks.append((status, 'reason'))
def test_grace_period_2x_success(self):
'''Grace period is 2 x interval and health checks succeed.'''
self.append_health_checks(True, num_calls=2)
hct = HealthChecker(
self._checker.health,
interval_secs=self.interval_secs,
clock=self._clock)
hct.start()
assert self._clock.converge(threads=[hct.threaded_health_checker])
self._clock.assert_waiting(hct.threaded_health_checker, self.interval_secs)
assert hct.status == StatusResult('Task is healthy.', TaskState.Value('TASK_RUNNING'))
assert hct.threaded_health_checker.running is True
hct.stop()
assert self._checker.health.call_count == 1
def test_grace_period_2x_failure_then_success(self):
'''Grace period is 2 x interval and health checks fail then succeed.'''
self.append_health_checks(False)
self.append_health_checks(True)
hct = HealthChecker(
self._checker.health,
interval_secs=self.interval_secs,
clock=self._clock)
hct.start()
assert self._clock.converge(threads=[hct.threaded_health_checker])
self._clock.assert_waiting(hct.threaded_health_checker, self.interval_secs)
assert hct.status == StatusResult(None, TaskState.Value('TASK_STARTING'))
assert hct.threaded_health_checker.running is False
self._clock.tick(self.interval_secs)
assert self._clock.converge(threads=[hct.threaded_health_checker])
self._clock.assert_waiting(hct.threaded_health_checker, self.interval_secs)
assert hct.status == StatusResult('Task is healthy.', TaskState.Value('TASK_RUNNING'))
assert hct.threaded_health_checker.running is True
hct.stop()
assert self._checker.health.call_count == 2
def test_grace_period_2x_failure(self):
'''
Grace period is 2 x interval and all health checks fail.
Failures are ignored when in grace period.
'''
self.append_health_checks(False, num_calls=3)
hct = HealthChecker(
self._checker.health,
interval_secs=self.interval_secs,
clock=self._clock)
hct.start()
assert self._clock.converge(threads=[hct.threaded_health_checker])
self._clock.assert_waiting(hct.threaded_health_checker, self.interval_secs)
assert hct.status == StatusResult(None, TaskState.Value('TASK_STARTING'))
assert hct.threaded_health_checker.running is False
self._clock.tick(self.interval_secs)
assert self._clock.converge(threads=[hct.threaded_health_checker])
self._clock.assert_waiting(hct.threaded_health_checker, self.interval_secs)
assert hct.status == StatusResult(None, TaskState.Value('TASK_STARTING'))
assert hct.threaded_health_checker.running is False
self._clock.tick(self.interval_secs)
assert self._clock.converge(threads=[hct.threaded_health_checker])
self._clock.assert_waiting(hct.threaded_health_checker, self.interval_secs)
assert hct.status == StatusResult('Failed health check! reason', TaskState.Value('TASK_FAILED'))
assert hct.threaded_health_checker.running is False
hct.stop()
assert self._checker.health.call_count == 3
def test_success_outside_grace_period(self):
'''
Health checks fail inside grace period, but pass outside and leads to success
'''
self.append_health_checks(False, num_calls=2)
self.append_health_checks(True)
hct = HealthChecker(
self._checker.health,
interval_secs=self.interval_secs,
clock=self._clock)
hct.start()
assert self._clock.converge(threads=[hct.threaded_health_checker])
self._clock.assert_waiting(hct.threaded_health_checker, self.interval_secs)
assert hct.status == StatusResult(None, TaskState.Value('TASK_STARTING'))
assert hct.threaded_health_checker.running is False
self._clock.tick(self.interval_secs)
#.........这里部分代码省略.........
开发者ID:kasisnu,项目名称:aurora,代码行数:101,代码来源:test_health_checker.py
示例16: TestHealthChecker
class TestHealthChecker(unittest.TestCase):
def setUp(self):
self._clock = ThreadedClock()
self._mox = mox.Mox()
self._checker = self._mox.CreateMockAnything()
def expect_health_check(self, status, num_calls=1):
for x in range(int(num_calls)):
self._checker().AndReturn((status, 'reason'))
def replay(self):
self._mox.ReplayAll()
def verify(self):
self._mox.VerifyAll()
def test_initial_interval_2x(self):
self.expect_health_check(False)
self.replay()
hct = HealthCheckerThread(self._checker, interval_secs=5, clock=self._clock)
hct.start()
thread_yield()
assert hct.status is None
self._clock.tick(6)
assert hct.status is None
self._clock.tick(3)
assert hct.status is None
self._clock.tick(5)
thread_yield()
assert hct.status is not None
hct.stop()
self.verify()
def test_initial_interval_whatev(self):
self.expect_health_check(False)
self.replay()
hct = HealthCheckerThread(
self._checker,
interval_secs=5,
initial_interval_secs=0,
clock=self._clock)
hct.start()
assert hct.status is not None
hct.stop()
self.verify()
def test_consecutive_failures(self):
'''Verify that a task is unhealthy only after max_consecutive_failures is exceeded'''
initial_interval_secs = 2
interval_secs = 1
self.expect_health_check(False, num_calls=2)
self.expect_health_check(True)
self.expect_health_check(False, num_calls=3)
self.replay()
hct = HealthCheckerThread(
self._checker,
interval_secs=interval_secs,
initial_interval_secs=initial_interval_secs,
max_consecutive_failures=2,
clock=self._clock)
hct.start()
# 2 consecutive health check failures followed by a successful health check.
self._clock.tick(initial_interval_secs)
assert hct.status is None
self._clock.tick(interval_secs)
assert hct.status is None
self._clock.tick(interval_secs)
assert hct.status is None
# 3 consecutive health check failures.
self._clock.tick(interval_secs)
assert hct.status is None
self._clock.tick(interval_secs)
assert hct.status is None
self._clock.tick(interval_secs)
thread_yield()
assert hct.status is not None
hct.stop()
self.verify()
开发者ID:MustafaOrkunAcar,项目名称:incubator-aurora,代码行数:80,代码来源:test_health_checker.py
示例17: setUp
def setUp(self):
self._clock = ThreadedClock()
self._mox = mox.Mox()
self._checker = self._mox.CreateMockAnything()
开发者ID:MustafaOrkunAcar,项目名称:incubator-aurora,代码行数:4,代码来源:test_health_checker.py
示例18: TestHealthChecker
class TestHealthChecker(unittest.TestCase):
def setUp(self):
self._clock = ThreadedClock()
self._checker = mock.Mock(spec=HttpSignaler)
self.fake_health_checks = []
def mock_health_check():
return self.fake_health_checks.pop(0)
self._checker.health = mock.Mock(spec=self._checker.health)
self._checker.health.side_effect = mock_health_check
def append_health_checks(self, status, num_calls=1):
for i in range(num_calls):
self.fake_health_checks.append((status, 'reason'))
def test_initial_interval_2x(self):
self.append_health_checks(False)
hct = HealthChecker(self._checker.health, interval_secs=5, clock=self._clock)
hct.start()
thread_yield()
assert hct.status is None
self._clock.tick(6)
assert hct.status is None
self._clock.tick(3)
assert hct.status is None
self._clock.tick(5)
thread_yield()
assert hct.status.status == TaskState.Value('TASK_FAILED')
hct.stop()
assert self._checker.health.call_count == 1
def test_initial_interval_whatev(self):
self.append_health_checks(False)
hct = HealthChecker(
self._checker.health,
interval_secs=5,
initial_interval_secs=0,
clock=self._clock)
hct.start()
assert hct.status.status == TaskState.Value('TASK_FAILED')
hct.stop()
assert self._checker.health.call_count == 1
def test_consecutive_failures(self):
'''Verify that a task is unhealthy only after max_consecutive_failures is exceeded'''
initial_interval_secs = 2
interval_secs = 1
self.append_health_checks(False, num_calls=2)
self.append_health_checks(True)
self.append_health_checks(False, num_calls=3)
hct = HealthChecker(
self._checker.health,
interval_secs=interval_secs,
initial_interval_secs=initial_interval_secs,
max_consecutive_failures=2,
clock=self._clock)
hct.start()
# 2 consecutive health check failures followed by a successful health check.
self._clock.tick(initial_interval_secs)
assert hct.status is None
self._clock.tick(interval_secs)
assert hct.status is None
self._clock.tick(interval_secs)
assert hct.status is None
# 3 consecutive health check failures.
self._clock.tick(interval_secs)
assert hct.status is None
self._clock.tick(interval_secs)
assert hct.status is None
self._clock.tick(interval_secs)
thread_yield()
assert hct.status.status == TaskState.Value('TASK_FAILED')
hct.stop()
assert self._checker.health.call_count == 6
开发者ID:kevints,项目名称:aurora,代码行数:76,代码来源:test_health_checker.py
示例19: TestHealthChecker
class TestHealthChecker(unittest.TestCase):
def setUp(self):
self._clock = ThreadedClock(0)
self._checker = mock.Mock(spec=HttpSignaler)
self.fake_health_checks = []
def mock_health_check():
return self.fake_health_checks.pop(0)
self._checker.health = mock.Mock(spec=self._checker.__call__)
self._checker.health.side_effect = mock_health_check
def append_health_checks(self, status, num_calls=1):
for i in range(num_calls):
self.fake_health_checks.append((status, 'reason'))
def test_initial_interval_2x(self):
self.append_health_checks(False)
hct = HealthChecker(self._checker.health, interval_secs=5, clock=self._clock)
hct.start()
assert self._clock.converge(threads=[hct.threaded_health_checker])
self._clock.assert_waiting(hct.threaded_health_checker, 10)
assert hct.status is None
self._clock.tick(6)
assert self._clock.converge(threads=[hct.threaded_health_checker])
assert hct.status is None
self._clock.tick(3)
assert self._clock.converge(threads=[hct.threaded_health_checker])
assert hct.status is None
self._clock.tick(5)
assert self._clock.converge(threads=[hct.threaded_health_checker])
assert hct.status.status == TaskState.Value('TASK_FAILED')
hct.stop()
assert self._checker.health.call_count == 1
def test_initial_interval_whatev(self):
self.append_health_checks(False, 2)
hct = HealthChecker(
self._checker.health,
interval_secs=5,
initial_interval_secs=0,
clock=self._clock)
hct.start()
self._clock.converge(threads=[hct.threaded_health_checker])
self._clock.assert_waiting(hct.threaded_health_checker, amount=5)
assert hct.status.status == TaskState.Value('TASK_FAILED')
hct.stop()
# this is an implementation detail -- we healthcheck in the initializer and
# healthcheck in the run loop. if we ever change the implementation, expect
# this to break.
assert self._checker.health.call_count == 2
def test_consecutive_failures(self):
'''Verify that a task is unhealthy only after max_consecutive_failures is exceeded'''
initial_interval_secs = 2
interval_secs = 1
self.append_health_checks(False, num_calls=2)
self.append_health_checks(True)
self.append_health_checks(False, num_calls=3)
hct = HealthChecker(
self._checker.health,
interval_secs=interval_secs,
initial_interval_secs=initial_interval_secs,
max_consecutive_failures=2,
clock=self._clock)
hct.start()
self._clock.converge(threads=[hct.threaded_health_checker])
# 2 consecutive health check failures followed by a successful health check.
epsilon = 0.001
self._clock.tick(initial_interval_secs + epsilon)
self._clock.converge(threads=[hct.threaded_health_checker])
self._clock.assert_waiting(hct.threaded_health_checker, amount=1)
assert hct.status is None
assert hct.metrics.sample()['consecutive_failures'] == 1
self._clock.tick(interval_secs + epsilon)
self._clock.converge(threads=[hct.threaded_health_checker])
self._clock.assert_waiting(hct.threaded_health_checker, amount=1)
assert hct.status is None
assert hct.metrics.sample()['consecutive_failures'] == 2
self._clock.tick(interval_secs + epsilon)
self._clock.converge(threads=[hct.threaded_health_checker])
self._clock.assert_waiting(hct.threaded_health_checker, amount=1)
assert hct.status is None
assert hct.metrics.sample()['consecutive_failures'] == 0
# 3 consecutive health check failures.
self._clock.tick(interval_secs + epsilon)
self._clock.converge(threads=[hct.threaded_health_checker])
self._clock.assert_waiting(hct.threaded_health_checker, amount=1)
assert hct.status is None
assert hct.metrics.sample()['consecutive_failures'] == 1
self._clock.tick(interval_secs + epsilon)
self._clock.converge(threads=[hct.threaded_health_checker])
self._clock.assert_waiting(hct.threaded_health_checker, amount=1)
assert hct.status is None
assert hct.metrics.sample()['consecutive_failures'] == 2
self._clock.tick(interval_secs + epsilon)
self._clock.converge(threads=[hct.threaded_health_checker])
self._clock.assert_waiting(hct.threaded_health_checker, amount=1)
assert hct.status.status == TaskState.Value('TASK_FAILED')
#.........这里部分代码省略.........
开发者ID:KancerEzeroglu,项目名称:aurora,代码行数:101,代码来源:test_health_checker.py
示例20: test_with_events
def test_with_events(num_threads):
event = threading.Event()
hits = []
hits_before, hits_after = 0, 0
clock = ThreadedClock(0)
def hit_me():
clock.sleep(0.1)
hits.append(True)
threads = []
for _ in range(num_threads):
th = threading.Thread(target=hit_me)
th.daemon = True
th.start()
threads.append(th)
clock.converge(threads=threads)
for th in threads:
clock.assert_waiting(th, 0.1)
clock.tick(0.05)
clock.converge(threads=threads)
hits_before += len(hits)
with pytest.raises(AssertionError):
clock.assert_waiting(threads[0], 234)
clock.tick(0.05)
clock.converge(threads=threads)
hits_after += len(hits)
for th in threads:
clock.assert_not_waiting(th)
with pytest.raises(AssertionError):
clock.assert_waiting(th, 0.1)
assert hits_before == 0
assert hits_after == num_threads
开发者ID:EricCen,项目名称:commons,代码行数:41,代码来源:test_clock.py
注:本文中的twitter.common.testing.clock.ThreadedClock类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论