• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Python transport.TransmitIntent类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中thespian.system.transport.TransmitIntent的典型用法代码示例。如果您正苦于以下问题:Python TransmitIntent类的具体用法?Python TransmitIntent怎么用?Python TransmitIntent使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



在下文中一共展示了TransmitIntent类的19个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: testTransmitIntentSetResult

 def testTransmitIntentSetResult(self):
     ti = TransmitIntent('addr', 'msg')
     assert None == ti.result
     ti.result = SendStatus.Sent
     assert ti.result == SendStatus.Sent
     ti.result = SendStatus.Failed
     assert ti.result == SendStatus.Failed
开发者ID:godaddy,项目名称:Thespian,代码行数:7,代码来源:test_transmitintent.py


示例2: testTransmitIntentRetryTimingExceedsLimit

    def testTransmitIntentRetryTimingExceedsLimit(self):
        maxPeriod = timedelta(seconds=90)
        period = timedelta(microseconds=1)
        now = 1.23
        timepad = timedelta(microseconds=10) # avoid float imprecision
        with update_elapsed_time(now, timedelta(0)):
            ti = TransmitIntent('addr', 'msg',
                                maxPeriod=maxPeriod,
                                retryPeriod=period)
            assert not ti.timeToRetry()

        timeoffset = timedelta(0)
        for N in range(MAX_TRANSMIT_RETRIES+1):
            # Indicate "failure" and the need to retry
            with update_elapsed_time(now, timeoffset + timepad):
                assert ti.retry()
            # Wait for the indication that it is time to retry
            time_to_retry = False
            for x in range(90):
                with update_elapsed_time(now, timeoffset + timepad):
                    # Only call timeToRetry once, because it auto-resets
                    time_to_retry = ti.timeToRetry()
                    if time_to_retry: break
                timeoffset += (period + (period / 2))
                # = period * 1.5, but python2 cannot multiply
                # timedelta by fractions.
            assert time_to_retry

        with update_elapsed_time(now, timeoffset + timepad):
            assert not ti.retry()
开发者ID:godaddy,项目名称:Thespian,代码行数:30,代码来源:test_transmitintent.py


示例3: testNormalTransmitResetMessage

 def testNormalTransmitResetMessage(self):
     ti = TransmitIntent('addr', 'msg')
     assert ti.targetAddr == 'addr'
     assert ti.message == 'msg'
     ti.changeMessage('message2')
     assert ti.targetAddr == 'addr'
     assert ti.message == 'message2'
开发者ID:godaddy,项目名称:Thespian,代码行数:7,代码来源:test_transmitintent.py


示例4: testTransmitIntentDelay

 def testTransmitIntentDelay(self):
     maxPeriod = timedelta(milliseconds=90)
     period = timedelta(milliseconds=30)
     ti = TransmitIntent('addr', 'msg', maxPeriod=maxPeriod, retryPeriod=period)
     delay = ti.delay()
     self.assertGreater(delay, timedelta(milliseconds=88))
     self.assertLess(delay, timedelta(milliseconds=91))
开发者ID:liuzhijun,项目名称:Thespian,代码行数:7,代码来源:test_transmitintent.py


示例5: testNormalTransmitResetMessage

 def testNormalTransmitResetMessage(self):
     ti = TransmitIntent('addr', 'msg')
     self.assertEqual(ti.targetAddr, 'addr')
     self.assertEqual(ti.message, 'msg')
     ti.changeMessage('message2')
     self.assertEqual(ti.targetAddr, 'addr')
     self.assertEqual(ti.message, 'message2')
开发者ID:liuzhijun,项目名称:Thespian,代码行数:7,代码来源:test_transmitintent.py


示例6: testTransmitIntentSetResult

 def testTransmitIntentSetResult(self):
     ti = TransmitIntent('addr', 'msg')
     self.assertEqual(None, ti.result)
     ti.result = SendStatus.Sent
     self.assertEqual(ti.result, SendStatus.Sent)
     ti.result = SendStatus.Failed
     self.assertEqual(ti.result, SendStatus.Failed)
开发者ID:liuzhijun,项目名称:Thespian,代码行数:7,代码来源:test_transmitintent.py


示例7: testTransmitIntentDelay

 def testTransmitIntentDelay(self):
     maxPeriod = timedelta(milliseconds=90)
     period = timedelta(milliseconds=30)
     ti = TransmitIntent('addr', 'msg',
                         maxPeriod=maxPeriod,
                         retryPeriod=period)
     delay = ti.delay()
     assert delay > timedelta(milliseconds=88)
     assert delay < timedelta(milliseconds=91)
开发者ID:godaddy,项目名称:Thespian,代码行数:9,代码来源:test_transmitintent.py


示例8: testTransmitIntentRetryTimingExceedsLimit

    def testTransmitIntentRetryTimingExceedsLimit(self):
        maxPeriod = timedelta(seconds=90)
        period = timedelta(microseconds=1)
        ti = TransmitIntent('addr', 'msg', maxPeriod=maxPeriod, retryPeriod=period)
        self.assertFalse(ti.timeToRetry())

        for N in range(MAX_TRANSMIT_RETRIES+1):
            # Indicate "failure" and the need to retry
            self.assertTrue(ti.retry())
            # Wait for the indication that it is time to retry
            time_to_retry = False
            for x in range(90):
                # Only call timeToRetry once, because it auto-resets
                time_to_retry = ti.timeToRetry()
                if time_to_retry: break
                sleep(timePeriodSeconds(period) * 1.5)
            self.assertTrue(time_to_retry)

        self.assertFalse(ti.retry())
开发者ID:liuzhijun,项目名称:Thespian,代码行数:19,代码来源:test_transmitintent.py


示例9: testTransmitIntentCallbackFailureFailed

 def testTransmitIntentCallbackFailureFailed(self):
     ti = TransmitIntent('addr', 'msg')
     ti.result = SendStatus.Failed
     # Ensure no exception thrown
     ti.completionCallback()
     # And again
     ti.completionCallback()
开发者ID:liuzhijun,项目名称:Thespian,代码行数:7,代码来源:test_transmitintent.py


示例10: testTransmitIntentCallbackFailureFailedWithChangedTargetsAdded

 def testTransmitIntentCallbackFailureFailedWithChangedTargetsAdded(self):
     self.successes = []
     self.failures = []
     ti = TransmitIntent('addr', 'msg', onSuccess = self._success, onError = self._failed)
     ti.result = SendStatus.Failed
     # Ensure no exception thrown
     ti.completionCallback()
     self.assertEqual(self.successes, [])
     self.assertEqual(self.failures, [(SendStatus.Failed, ti)])
     # And again
     ti.addCallback(self._success, self._failed)
     ti.completionCallback()
     self.assertEqual(self.successes, [])
     self.assertEqual(self.failures, [(SendStatus.Failed, ti), (SendStatus.Failed, ti)])
开发者ID:liuzhijun,项目名称:Thespian,代码行数:14,代码来源:test_transmitintent.py


示例11: testTransmitIntentCallbackSuccessWithChangedTargetsAdded

 def testTransmitIntentCallbackSuccessWithChangedTargetsAdded(self):
     self.successes = []
     self.failures = []
     ti = TransmitIntent('addr', 'msg',
                         onSuccess = self._success,
                         onError = self._failed)
     ti.result = SendStatus.Sent
     # Ensure no exception thrown
     ti.completionCallback()
     assert self.successes == [(SendStatus.Sent, ti)]
     assert self.failures == []
     # And again
     ti.addCallback(self._success, self._failed)
     ti.completionCallback()
     assert self.successes == [(SendStatus.Sent, ti), (SendStatus.Sent, ti)]
     assert self.failures == []
开发者ID:godaddy,项目名称:Thespian,代码行数:16,代码来源:test_transmitintent.py


示例12: testTransmitIntentCallbackFailureNotSentWithTarget

 def testTransmitIntentCallbackFailureNotSentWithTarget(self):
     self.successes = []
     self.failures = []
     ti = TransmitIntent('addr', 'msg', onSuccess = self._success, onError = self._failed)
     ti.result = SendStatus.NotSent
     # Ensure no exception thrown
     ti.completionCallback()
     self.assertEqual(self.successes, [])
     self.assertEqual(self.failures, [(SendStatus.NotSent, ti)])
     # And again
     ti.completionCallback()
     self.assertEqual(self.successes, [])
     self.assertEqual(self.failures, [(SendStatus.NotSent, ti)])
开发者ID:liuzhijun,项目名称:Thespian,代码行数:13,代码来源:test_transmitintent.py


示例13: testTransmitIntentRetryTimingExceedsLimit

    def testTransmitIntentRetryTimingExceedsLimit(self):
        maxPeriod = timedelta(seconds=90)
        period = timedelta(microseconds=1)
        ti = TransmitIntent('addr', 'msg', maxPeriod=maxPeriod, retryPeriod=period)
        self.assertFalse(ti.timeToRetry())

        for N in range(MAX_TRANSMIT_RETRIES+1):
            self.assertTrue(ti.retry())
            for x in range(90):
                if ti.timeToRetry(): break
                sleep(timePeriodSeconds(period))
            self.assertTrue(ti.timeToRetry())

        self.assertFalse(ti.retry())
开发者ID:jfasenfest,项目名称:Thespian,代码行数:14,代码来源:test_transmitintent.py


示例14: testTransmitIntentCallbackFailureFailedWithTarget

 def testTransmitIntentCallbackFailureFailedWithTarget(self):
     self.successes = []
     self.failures = []
     ti = TransmitIntent('addr', 'msg',
                         onSuccess = self._success,
                         onError = self._failed)
     ti.result = SendStatus.Failed
     # Ensure no exception thrown
     ti.completionCallback()
     assert self.successes == []
     assert self.failures == [(SendStatus.Failed, ti)]
     # And again
     ti.completionCallback()
     assert self.successes == []
     assert self.failures == [(SendStatus.Failed, ti)]
开发者ID:godaddy,项目名称:Thespian,代码行数:15,代码来源:test_transmitintent.py


示例15: testTransmitIntentRetry

 def testTransmitIntentRetry(self):
     ti = TransmitIntent('addr', 'msg')
     for x in range(MAX_TRANSMIT_RETRIES+1):
         assert ti.retry()
     assert not ti.retry()
开发者ID:godaddy,项目名称:Thespian,代码行数:5,代码来源:test_transmitintent.py


示例16: testNormalTransmitIdentification

 def testNormalTransmitIdentification(self):
     ti = TransmitIntent('addr', 'msg')
     # Just ensure no exceptions are thrown
     self.assertTrue(ti.identify())
开发者ID:liuzhijun,项目名称:Thespian,代码行数:4,代码来源:test_transmitintent.py


示例17: testTransmitIntentRetryTiming

    def testTransmitIntentRetryTiming(self):
        maxPeriod = timedelta(milliseconds=90)
        period = timedelta(milliseconds=30)
        now = 0.01
        timepad = timedelta(microseconds=10) # avoid float imprecision
        with update_elapsed_time(now, timedelta(0)):
            ti = TransmitIntent('addr', 'msg',
                                maxPeriod=maxPeriod,
                                retryPeriod=period)
            assert not ti.timeToRetry()

        with update_elapsed_time(now, period + timepad):
            assert not ti.timeToRetry()

            assert ti.retry()
            assert not ti.timeToRetry()

        with update_elapsed_time(now, period + period + timepad):
            assert ti.timeToRetry()
            assert ti.retry()
            assert not ti.timeToRetry()

        with update_elapsed_time(now, period * 3 + timepad):
            assert not ti.timeToRetry()  # Each retry increases

        with update_elapsed_time(now, period * 4 + timepad):
            assert ti.timeToRetry()
            assert not ti.retry()  # Exceeds maximum time
开发者ID:godaddy,项目名称:Thespian,代码行数:28,代码来源:test_transmitintent.py


示例18: testTransmitIntentRetryTiming

    def testTransmitIntentRetryTiming(self):
        maxPeriod = timedelta(milliseconds=90)
        period = timedelta(milliseconds=30)
        ti = TransmitIntent('addr', 'msg', maxPeriod=maxPeriod, retryPeriod=period)
        self.assertFalse(ti.timeToRetry())
        sleep(timePeriodSeconds(period))
        self.assertFalse(ti.timeToRetry())

        self.assertTrue(ti.retry())
        self.assertFalse(ti.timeToRetry())
        sleep(timePeriodSeconds(period))
        self.assertTrue(ti.timeToRetry())

        self.assertTrue(ti.retry())
        self.assertFalse(ti.timeToRetry())
        sleep(timePeriodSeconds(period))
        self.assertFalse(ti.timeToRetry())  # Each retry increases
        sleep(timePeriodSeconds(period))
        self.assertTrue(ti.timeToRetry())

        self.assertFalse(ti.retry())  # Exceeds maximum time
开发者ID:liuzhijun,项目名称:Thespian,代码行数:21,代码来源:test_transmitintent.py


示例19: testTransmitIntentRetry

 def testTransmitIntentRetry(self):
     ti = TransmitIntent('addr', 'msg')
     for x in range(MAX_TRANSMIT_RETRIES+1):
         self.assertTrue(ti.retry())
     self.assertFalse(ti.retry())
开发者ID:liuzhijun,项目名称:Thespian,代码行数:5,代码来源:test_transmitintent.py



注:本文中的thespian.system.transport.TransmitIntent类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python utilis.thesplog函数代码示例发布时间:2022-05-27
下一篇:
Python identifiers.checkCAS函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap