本文整理汇总了Python中twisted.trial.util.wait函数的典型用法代码示例。如果您正苦于以下问题:Python wait函数的具体用法?Python wait怎么用?Python wait使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了wait函数的16个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: test_upnp_mapper
def test_upnp_mapper(self):
from shtoom.upnp import UPnPMapper
ae = self.assertEquals
ar = self.assertRaises
checkUPnP()
mapper = UPnPMapper()
uprot = DatagramProtocol()
uport = reactor.listenUDP(random.randint(10000,20000), uprot)
class tfactory(Factory):
protocol = Protocol
tport = reactor.listenTCP(0, tfactory())
for port in uport, tport:
ar(ValueError, mapper.unmap, port)
ar(ValueError, mapper.info, port)
t = TestMapper(mapper, port)
d = t.go()
util.wait(d, timeout=16)
ae(len(t.map_res), 2)
ae(t.map_res, t.info_res)
ae(t.unmap_res, None)
# Can't unmap a port that's not mapped
ar(ValueError, mapper.unmap, port)
d = port.stopListening()
util.wait(d)
# Can't map a closed port
ar(ValueError, mapper.map, port)
# Can't get info on a closed port
ar(ValueError, mapper.info, port)
开发者ID:braams,项目名称:shtoom,代码行数:28,代码来源:test_upnp.py
示例2: eoc_create
def eoc_create(name, *owners):
"""Create list with given name"""
assert '@' in name
assert not name.startswith('.')
site = getSite()
d = site.create(name, owners)
wait(d)
开发者ID:tv42,项目名称:eocmanage,代码行数:7,代码来源:listutil.py
示例3: tearDown
def tearDown(self):
# Wait until all the protocols on the server-side of this test have
# been disconnected, to avoid leaving junk in the reactor.
for d in self.serverConns:
util.wait(d)
PortCleanerUpper.tearDown(self)
开发者ID:pwarren,项目名称:AGDeviceControl,代码行数:7,代码来源:test_tcp.py
示例4: test_nullmapper
def test_nullmapper(self):
from shtoom.nat import getNullMapper
mapper = getNullMapper()
ae = self.assertEquals
ar = self.assertRaises
uprot = DatagramProtocol()
uport = reactor.listenUDP(0, uprot)
class tfactory(Factory):
protocol = Protocol
tport = reactor.listenTCP(0, tfactory())
for port in uport, tport:
ar(ValueError, mapper.unmap, port)
ar(ValueError, mapper.info, port)
t = TestMapper(mapper, port)
d = t.go()
util.wait(d)
ae(len(t.map_res), 2)
ae(t.map_res, t.info_res)
ae(t.unmap_res, None)
# Can't unmap a port that's not mapped
ar(ValueError, mapper.unmap, port)
d = port.stopListening()
util.wait(d)
# Can't map a closed port
ar(ValueError, mapper.map, port)
# Can't get info on a closed port
ar(ValueError, mapper.info, port)
开发者ID:braams,项目名称:shtoom,代码行数:27,代码来源:test_natmappers.py
示例5: testStopTrying
def testStopTrying(self):
f = Factory()
f.protocol = In
f.connections = 0
f.allMessages = []
c = ReconnectingClientFactory()
c.initialDelay = c.delay = 0.2
c.protocol = Out
c.howManyTimes = 2
port = reactor.listenTCP(0, f)
PORT = port.getHost().port
reactor.connectTCP('127.0.0.1', PORT, c)
now = time.time()
while len(f.allMessages) != 2 and (time.time() < now + 10):
reactor.iterate(0.1)
util.wait(defer.maybeDeferred(port.stopListening))
self.assertEquals(len(f.allMessages), 2,
"not enough messages -- %s" % f.allMessages)
self.assertEquals(f.connections, 2,
"Number of successful connections incorrect %d" %
f.connections)
self.assertEquals(f.allMessages, [Out.msgs] * 2)
self.failIf(c.continueTrying, "stopTrying never called or ineffective")
开发者ID:pwarren,项目名称:AGDeviceControl,代码行数:27,代码来源:test_factories.py
示例6: testStateMachine
def testStateMachine(self):
d = defer.Deferred()
A = StateMachineOne(d)
reactor.callLater(0, A._start)
s = Saver()
d.addCallback(s.save)
util.wait(d)
self.assertEquals(s.res, [0,1,2,3])
开发者ID:braams,项目名称:shtoom,代码行数:8,代码来源:test_dougstatemachine.py
示例7: checkUPnP
def checkUPnP():
from shtoom.upnp import getUPnP
d = getUPnP()
s = Saver()
d.addCallback(s.save)
util.wait(d, timeout=8)
if s.val is None:
raise unittest.SkipTest('no UPnP available')
开发者ID:braams,项目名称:shtoom,代码行数:8,代码来源:test_upnp.py
示例8: setUp
def setUp(self):
# import here not to get twisted.internet.reactor imported too soon
from solipsis.services.profile.network.manager import NetworkManager
self.network = NetworkManager()
util.wait(self.network.start(), timeout=10)
self.network.on_new_peer(FakePeer("boby"))
self.network.on_service_data(
"boby", "HELLO 127.0.0.1:%s"% str(self.network.server.local_port))
开发者ID:BackupTheBerlios,项目名称:solipsis-svn,代码行数:8,代码来源:unittest_protocols.py
示例9: _reentrantWait
def _reentrantWait(self):
def threadedOperation(n):
time.sleep(n)
return n
d1 = threads.deferToThread(threadedOperation, 0.125)
d2 = threads.deferToThread(threadedOperation, 0.250)
d1.addCallback(lambda ignored: util.wait(d2))
util.wait(d1)
开发者ID:KatiaBorges,项目名称:exeLearning,代码行数:8,代码来源:test_util.py
示例10: test_stundiscovery
def test_stundiscovery(self):
from shtoom.stun import getSTUN, _NatType
ae = self.assertEquals
a_ = self.assert_
d = getSTUN()
s = Saver()
d.addCallback(s.save)
util.wait(d, timeout=16)
a_(isinstance(s.arg, _NatType), "%s, %s :: %s" % (s, s.arg, type(s.arg),))
开发者ID:braams,项目名称:shtoom,代码行数:9,代码来源:test_stun.py
示例11: tearDown
def tearDown(self):
# disconnect first one
util.wait(self.assertResponse("disconnect", "Disconnected"))
NetworkTest.tearDown(self)
# second one
util.wait(self.assertOtherResponse("disconnect", "Disconnected"))
self.other_factory.stopTrying()
self.other_factory.stopFactory()
self.other_connector.disconnect()
开发者ID:BackupTheBerlios,项目名称:solipsis-svn,代码行数:9,代码来源:unittest_network.py
示例12: testBrokenStateMachine
def testBrokenStateMachine(self):
d = defer.Deferred()
A = StateMachineTwo(d)
reactor.callLater(0, A._start)
s = Saver()
d.addCallback(s.save)
d.addErrback(s.error)
util.wait(d)
self.assertEquals(s.res, None)
self.assert_(isinstance(s.err.value, EventNotSpecifiedError))
开发者ID:braams,项目名称:shtoom,代码行数:10,代码来源:test_dougstatemachine.py
示例13: _bail
def _bail(self):
from twisted.internet import reactor
d = defer.Deferred()
reactor.addSystemEventTrigger('after', 'shutdown', lambda: d.callback(None))
reactor.fireSystemEvent('shutdown') # radix's suggestion
treactor = interfaces.IReactorThreads(reactor, None)
if treactor is not None:
treactor.suggestThreadPoolSize(0)
util.wait(d) # so that the shutdown event completes
开发者ID:pwarren,项目名称:AGDeviceControl,代码行数:11,代码来源:runner.py
示例14: tearDown
def tearDown(self):
# Clean up sockets
self.client.transport.loseConnection()
d = self.port.stopListening()
if d is not None:
wait(d)
del self.serverProtocol
# Clean up temporary directory
shutil.rmtree(self.directory)
开发者ID:pwarren,项目名称:AGDeviceControl,代码行数:11,代码来源:test_ftp.py
示例15: testStateMachineWithDeferreds
def testStateMachineWithDeferreds(self):
d = defer.Deferred()
A = StateMachineThree(d)
reactor.callLater(0, A._start)
class Saver:
res = None
def save(self, res):
self.res = res
s = Saver()
d.addCallback(s.save)
util.wait(d)
self.assertEquals(s.res, [0,1,2,3])
开发者ID:braams,项目名称:shtoom,代码行数:12,代码来源:test_dougstatemachine.py
示例16: test_interruptDoesntShutdown
def test_interruptDoesntShutdown(self):
reactor.addSystemEventTrigger('after', 'shutdown',
self._shutdownCalled)
d = defer.Deferred()
d.addCallback(self.raiseKeyInt)
reactor.callLater(0, d.callback, None)
try:
util.wait(d, useWaitError=False)
except KeyboardInterrupt:
self.failIf(self.shutdownCalled,
"System shutdown triggered")
else:
self.fail("KeyboardInterrupt wasn't raised")
开发者ID:KatiaBorges,项目名称:exeLearning,代码行数:13,代码来源:test_util.py
注:本文中的twisted.trial.util.wait函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论