本文整理汇总了Python中thespian.system.transport.TransmitIntent类的典型用法代码示例。如果您正苦于以下问题:Python TransmitIntent类的具体用法?Python TransmitIntent怎么用?Python TransmitIntent使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了TransmitIntent类的19个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: testTransmitIntentSetResult
def testTransmitIntentSetResult(self):
ti = TransmitIntent('addr', 'msg')
assert None == ti.result
ti.result = SendStatus.Sent
assert ti.result == SendStatus.Sent
ti.result = SendStatus.Failed
assert ti.result == SendStatus.Failed
开发者ID:godaddy,项目名称:Thespian,代码行数:7,代码来源:test_transmitintent.py
示例2: testTransmitIntentRetryTimingExceedsLimit
def testTransmitIntentRetryTimingExceedsLimit(self):
maxPeriod = timedelta(seconds=90)
period = timedelta(microseconds=1)
now = 1.23
timepad = timedelta(microseconds=10) # avoid float imprecision
with update_elapsed_time(now, timedelta(0)):
ti = TransmitIntent('addr', 'msg',
maxPeriod=maxPeriod,
retryPeriod=period)
assert not ti.timeToRetry()
timeoffset = timedelta(0)
for N in range(MAX_TRANSMIT_RETRIES+1):
# Indicate "failure" and the need to retry
with update_elapsed_time(now, timeoffset + timepad):
assert ti.retry()
# Wait for the indication that it is time to retry
time_to_retry = False
for x in range(90):
with update_elapsed_time(now, timeoffset + timepad):
# Only call timeToRetry once, because it auto-resets
time_to_retry = ti.timeToRetry()
if time_to_retry: break
timeoffset += (period + (period / 2))
# = period * 1.5, but python2 cannot multiply
# timedelta by fractions.
assert time_to_retry
with update_elapsed_time(now, timeoffset + timepad):
assert not ti.retry()
开发者ID:godaddy,项目名称:Thespian,代码行数:30,代码来源:test_transmitintent.py
示例3: testNormalTransmitResetMessage
def testNormalTransmitResetMessage(self):
ti = TransmitIntent('addr', 'msg')
assert ti.targetAddr == 'addr'
assert ti.message == 'msg'
ti.changeMessage('message2')
assert ti.targetAddr == 'addr'
assert ti.message == 'message2'
开发者ID:godaddy,项目名称:Thespian,代码行数:7,代码来源:test_transmitintent.py
示例4: testTransmitIntentDelay
def testTransmitIntentDelay(self):
maxPeriod = timedelta(milliseconds=90)
period = timedelta(milliseconds=30)
ti = TransmitIntent('addr', 'msg', maxPeriod=maxPeriod, retryPeriod=period)
delay = ti.delay()
self.assertGreater(delay, timedelta(milliseconds=88))
self.assertLess(delay, timedelta(milliseconds=91))
开发者ID:liuzhijun,项目名称:Thespian,代码行数:7,代码来源:test_transmitintent.py
示例5: testNormalTransmitResetMessage
def testNormalTransmitResetMessage(self):
ti = TransmitIntent('addr', 'msg')
self.assertEqual(ti.targetAddr, 'addr')
self.assertEqual(ti.message, 'msg')
ti.changeMessage('message2')
self.assertEqual(ti.targetAddr, 'addr')
self.assertEqual(ti.message, 'message2')
开发者ID:liuzhijun,项目名称:Thespian,代码行数:7,代码来源:test_transmitintent.py
示例6: testTransmitIntentSetResult
def testTransmitIntentSetResult(self):
ti = TransmitIntent('addr', 'msg')
self.assertEqual(None, ti.result)
ti.result = SendStatus.Sent
self.assertEqual(ti.result, SendStatus.Sent)
ti.result = SendStatus.Failed
self.assertEqual(ti.result, SendStatus.Failed)
开发者ID:liuzhijun,项目名称:Thespian,代码行数:7,代码来源:test_transmitintent.py
示例7: testTransmitIntentDelay
def testTransmitIntentDelay(self):
maxPeriod = timedelta(milliseconds=90)
period = timedelta(milliseconds=30)
ti = TransmitIntent('addr', 'msg',
maxPeriod=maxPeriod,
retryPeriod=period)
delay = ti.delay()
assert delay > timedelta(milliseconds=88)
assert delay < timedelta(milliseconds=91)
开发者ID:godaddy,项目名称:Thespian,代码行数:9,代码来源:test_transmitintent.py
示例8: testTransmitIntentRetryTimingExceedsLimit
def testTransmitIntentRetryTimingExceedsLimit(self):
maxPeriod = timedelta(seconds=90)
period = timedelta(microseconds=1)
ti = TransmitIntent('addr', 'msg', maxPeriod=maxPeriod, retryPeriod=period)
self.assertFalse(ti.timeToRetry())
for N in range(MAX_TRANSMIT_RETRIES+1):
# Indicate "failure" and the need to retry
self.assertTrue(ti.retry())
# Wait for the indication that it is time to retry
time_to_retry = False
for x in range(90):
# Only call timeToRetry once, because it auto-resets
time_to_retry = ti.timeToRetry()
if time_to_retry: break
sleep(timePeriodSeconds(period) * 1.5)
self.assertTrue(time_to_retry)
self.assertFalse(ti.retry())
开发者ID:liuzhijun,项目名称:Thespian,代码行数:19,代码来源:test_transmitintent.py
示例9: testTransmitIntentCallbackFailureFailed
def testTransmitIntentCallbackFailureFailed(self):
ti = TransmitIntent('addr', 'msg')
ti.result = SendStatus.Failed
# Ensure no exception thrown
ti.completionCallback()
# And again
ti.completionCallback()
开发者ID:liuzhijun,项目名称:Thespian,代码行数:7,代码来源:test_transmitintent.py
示例10: testTransmitIntentCallbackFailureFailedWithChangedTargetsAdded
def testTransmitIntentCallbackFailureFailedWithChangedTargetsAdded(self):
self.successes = []
self.failures = []
ti = TransmitIntent('addr', 'msg', onSuccess = self._success, onError = self._failed)
ti.result = SendStatus.Failed
# Ensure no exception thrown
ti.completionCallback()
self.assertEqual(self.successes, [])
self.assertEqual(self.failures, [(SendStatus.Failed, ti)])
# And again
ti.addCallback(self._success, self._failed)
ti.completionCallback()
self.assertEqual(self.successes, [])
self.assertEqual(self.failures, [(SendStatus.Failed, ti), (SendStatus.Failed, ti)])
开发者ID:liuzhijun,项目名称:Thespian,代码行数:14,代码来源:test_transmitintent.py
示例11: testTransmitIntentCallbackSuccessWithChangedTargetsAdded
def testTransmitIntentCallbackSuccessWithChangedTargetsAdded(self):
self.successes = []
self.failures = []
ti = TransmitIntent('addr', 'msg',
onSuccess = self._success,
onError = self._failed)
ti.result = SendStatus.Sent
# Ensure no exception thrown
ti.completionCallback()
assert self.successes == [(SendStatus.Sent, ti)]
assert self.failures == []
# And again
ti.addCallback(self._success, self._failed)
ti.completionCallback()
assert self.successes == [(SendStatus.Sent, ti), (SendStatus.Sent, ti)]
assert self.failures == []
开发者ID:godaddy,项目名称:Thespian,代码行数:16,代码来源:test_transmitintent.py
示例12: testTransmitIntentCallbackFailureNotSentWithTarget
def testTransmitIntentCallbackFailureNotSentWithTarget(self):
self.successes = []
self.failures = []
ti = TransmitIntent('addr', 'msg', onSuccess = self._success, onError = self._failed)
ti.result = SendStatus.NotSent
# Ensure no exception thrown
ti.completionCallback()
self.assertEqual(self.successes, [])
self.assertEqual(self.failures, [(SendStatus.NotSent, ti)])
# And again
ti.completionCallback()
self.assertEqual(self.successes, [])
self.assertEqual(self.failures, [(SendStatus.NotSent, ti)])
开发者ID:liuzhijun,项目名称:Thespian,代码行数:13,代码来源:test_transmitintent.py
示例13: testTransmitIntentRetryTimingExceedsLimit
def testTransmitIntentRetryTimingExceedsLimit(self):
maxPeriod = timedelta(seconds=90)
period = timedelta(microseconds=1)
ti = TransmitIntent('addr', 'msg', maxPeriod=maxPeriod, retryPeriod=period)
self.assertFalse(ti.timeToRetry())
for N in range(MAX_TRANSMIT_RETRIES+1):
self.assertTrue(ti.retry())
for x in range(90):
if ti.timeToRetry(): break
sleep(timePeriodSeconds(period))
self.assertTrue(ti.timeToRetry())
self.assertFalse(ti.retry())
开发者ID:jfasenfest,项目名称:Thespian,代码行数:14,代码来源:test_transmitintent.py
示例14: testTransmitIntentCallbackFailureFailedWithTarget
def testTransmitIntentCallbackFailureFailedWithTarget(self):
self.successes = []
self.failures = []
ti = TransmitIntent('addr', 'msg',
onSuccess = self._success,
onError = self._failed)
ti.result = SendStatus.Failed
# Ensure no exception thrown
ti.completionCallback()
assert self.successes == []
assert self.failures == [(SendStatus.Failed, ti)]
# And again
ti.completionCallback()
assert self.successes == []
assert self.failures == [(SendStatus.Failed, ti)]
开发者ID:godaddy,项目名称:Thespian,代码行数:15,代码来源:test_transmitintent.py
示例15: testTransmitIntentRetry
def testTransmitIntentRetry(self):
ti = TransmitIntent('addr', 'msg')
for x in range(MAX_TRANSMIT_RETRIES+1):
assert ti.retry()
assert not ti.retry()
开发者ID:godaddy,项目名称:Thespian,代码行数:5,代码来源:test_transmitintent.py
示例16: testNormalTransmitIdentification
def testNormalTransmitIdentification(self):
ti = TransmitIntent('addr', 'msg')
# Just ensure no exceptions are thrown
self.assertTrue(ti.identify())
开发者ID:liuzhijun,项目名称:Thespian,代码行数:4,代码来源:test_transmitintent.py
示例17: testTransmitIntentRetryTiming
def testTransmitIntentRetryTiming(self):
maxPeriod = timedelta(milliseconds=90)
period = timedelta(milliseconds=30)
now = 0.01
timepad = timedelta(microseconds=10) # avoid float imprecision
with update_elapsed_time(now, timedelta(0)):
ti = TransmitIntent('addr', 'msg',
maxPeriod=maxPeriod,
retryPeriod=period)
assert not ti.timeToRetry()
with update_elapsed_time(now, period + timepad):
assert not ti.timeToRetry()
assert ti.retry()
assert not ti.timeToRetry()
with update_elapsed_time(now, period + period + timepad):
assert ti.timeToRetry()
assert ti.retry()
assert not ti.timeToRetry()
with update_elapsed_time(now, period * 3 + timepad):
assert not ti.timeToRetry() # Each retry increases
with update_elapsed_time(now, period * 4 + timepad):
assert ti.timeToRetry()
assert not ti.retry() # Exceeds maximum time
开发者ID:godaddy,项目名称:Thespian,代码行数:28,代码来源:test_transmitintent.py
示例18: testTransmitIntentRetryTiming
def testTransmitIntentRetryTiming(self):
maxPeriod = timedelta(milliseconds=90)
period = timedelta(milliseconds=30)
ti = TransmitIntent('addr', 'msg', maxPeriod=maxPeriod, retryPeriod=period)
self.assertFalse(ti.timeToRetry())
sleep(timePeriodSeconds(period))
self.assertFalse(ti.timeToRetry())
self.assertTrue(ti.retry())
self.assertFalse(ti.timeToRetry())
sleep(timePeriodSeconds(period))
self.assertTrue(ti.timeToRetry())
self.assertTrue(ti.retry())
self.assertFalse(ti.timeToRetry())
sleep(timePeriodSeconds(period))
self.assertFalse(ti.timeToRetry()) # Each retry increases
sleep(timePeriodSeconds(period))
self.assertTrue(ti.timeToRetry())
self.assertFalse(ti.retry()) # Exceeds maximum time
开发者ID:liuzhijun,项目名称:Thespian,代码行数:21,代码来源:test_transmitintent.py
示例19: testTransmitIntentRetry
def testTransmitIntentRetry(self):
ti = TransmitIntent('addr', 'msg')
for x in range(MAX_TRANSMIT_RETRIES+1):
self.assertTrue(ti.retry())
self.assertFalse(ti.retry())
开发者ID:liuzhijun,项目名称:Thespian,代码行数:5,代码来源:test_transmitintent.py
注:本文中的thespian.system.transport.TransmitIntent类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论