本文整理汇总了Python中twisted.internet.base.ThreadedResolver类的典型用法代码示例。如果您正苦于以下问题:Python ThreadedResolver类的具体用法?Python ThreadedResolver怎么用?Python ThreadedResolver使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了ThreadedResolver类的9个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: test_failure
def test_failure(self):
"""
L{ThreadedResolver.getHostByName} returns a L{Deferred} which fires a
L{Failure} if the call to L{socket.gethostbyname} raises an exception.
"""
timeout = 30
reactor = FakeReactor()
self.addCleanup(reactor._stop)
def fakeGetHostByName(name):
raise IOError("ENOBUFS (this is a funny joke)")
self.patch(socket, 'gethostbyname', fakeGetHostByName)
failedWith = []
resolver = ThreadedResolver(reactor)
d = resolver.getHostByName("some.name", (timeout,))
self.assertFailure(d, DNSLookupError)
d.addCallback(failedWith.append)
reactor._runThreadCalls()
self.assertEqual(len(failedWith), 1)
# Make sure that any timeout-related stuff gets cleaned up.
reactor._clock.advance(timeout + 1)
self.assertEqual(reactor._clock.calls, [])
开发者ID:Almad,项目名称:twisted,代码行数:28,代码来源:test_base.py
示例2: test_timeout
def test_timeout(self):
"""
If L{socket.gethostbyname} does not complete before the specified
timeout elapsed, the L{Deferred} returned by
L{ThreadedResolver.getHostByBame} fails with L{DNSLookupError}.
"""
timeout = 10
reactor = FakeReactor()
self.addCleanup(reactor._stop)
result = Queue()
def fakeGetHostByName(name):
raise result.get()
self.patch(socket, 'gethostbyname', fakeGetHostByName)
failedWith = []
resolver = ThreadedResolver(reactor)
d = resolver.getHostByName("some.name", (timeout,))
self.assertFailure(d, DNSLookupError)
d.addCallback(failedWith.append)
reactor._clock.advance(timeout - 1)
self.assertEqual(failedWith, [])
reactor._clock.advance(1)
self.assertEqual(len(failedWith), 1)
# Eventually the socket.gethostbyname does finish - in this case, with
# an exception. Nobody cares, though.
result.put(IOError("The I/O was errorful"))
开发者ID:Almad,项目名称:twisted,代码行数:31,代码来源:test_base.py
示例3: test_success
def test_success(self):
"""
L{ThreadedResolver.getHostByName} returns a L{Deferred} which fires
with the value returned by the call to L{socket.gethostbyname} in the
threadpool of the reactor passed to L{ThreadedResolver.__init__}.
"""
ip = "10.0.0.17"
name = "foo.bar.example.com"
timeout = 30
reactor = FakeReactor()
self.addCleanup(reactor._stop)
lookedUp = []
resolvedTo = []
def fakeGetHostByName(name):
lookedUp.append(name)
return ip
self.patch(socket, 'gethostbyname', fakeGetHostByName)
resolver = ThreadedResolver(reactor)
d = resolver.getHostByName(name, (timeout,))
d.addCallback(resolvedTo.append)
reactor._runThreadCalls()
self.assertEqual(lookedUp, [name])
self.assertEqual(resolvedTo, [ip])
# Make sure that any timeout-related stuff gets cleaned up.
reactor._clock.advance(timeout + 1)
self.assertEqual(reactor._clock.calls, [])
开发者ID:Almad,项目名称:twisted,代码行数:33,代码来源:test_base.py
示例4: test_success
def test_success(self):
"""
L{ThreadedResolver.getHostByName} returns a L{Deferred} which fires
with the value returned by the call to L{socket.getaddrinfo} in the
threadpool of the reactor passed to L{ThreadedResolver.__init__}.
"""
ip = "213.180.204.3"
name = "ya.ru"
timeout = 30
reactor = FakeReactor()
self.addCleanup(reactor._stop)
lookedUp = []
resolvedTo = []
def fakeGetHostByName(name, service):
lookedUp.append(name)
return (2, 1, 6, '', ('213.180.204.3', 0)), (2, 2, 17, '', ('213.180.204.3', 0)), (2, 3, 0, '', ('213.180.204.3', 0))
self.patch(socket, 'getaddrinfo', fakeGetHostByName)
resolver = ThreadedResolver(reactor)
d = resolver.getHostByName(name, (timeout,))
d.addCallback(resolvedTo.append)
reactor._runThreadCalls()
self.assertEqual(lookedUp, [name])
self.assertEqual(resolvedTo, [ip])
# Make sure that any timeout-related stuff gets cleaned up.
reactor._clock.advance(timeout + 1)
self.assertEqual(reactor._clock.calls, [])
开发者ID:smira,项目名称:twisted,代码行数:33,代码来源:test_base.py
示例5: __init__
def __init__(self, reactor=None):
if reactor is None:
from twisted.internet import reactor
_ThreadedResolverImpl.__init__(self, reactor)
warnings.warn(
"twisted.names.client.ThreadedResolver is deprecated since "
"Twisted 9.0, use twisted.internet.base.ThreadedResolver "
"instead.",
category=DeprecationWarning, stacklevel=2)
开发者ID:JohnDoes95,项目名称:project_parser,代码行数:9,代码来源:client.py
示例6: test_real
def test_real(self):
name = "ya.ru"
timeout = 30
reactor = FakeReactor()
self.addCleanup(reactor._stop)
resolvedTo = []
resolver = ThreadedResolver(reactor)
d = resolver.getHostByName(name, (timeout,))
d.addCallback(resolvedTo.append)
reactor._runThreadCalls()
self.assertTrue(resolvedTo[0].startswith('77.') or
resolvedTo[0].startswith('87.') or
resolvedTo[0].startswith('93.') or
resolvedTo[0].startswith('213.'))
# Make sure that any timeout-related stuff gets cleaned up.
reactor._clock.advance(timeout + 1)
self.assertEqual(reactor._clock.calls, [])
开发者ID:smira,项目名称:twisted,代码行数:22,代码来源:test_base.py
示例7: getHostByName
def getHostByName(self, name, timeout=(1, 3, 11, 45)):
if name in dnscache:
return defer.succeed(dnscache[name])
d = ThreadedResolver.getHostByName(self, name, timeout)
d.addCallback(self._cache_result, name)
return d
开发者ID:dreamfrog,项目名称:jophiel,代码行数:6,代码来源:resolver.py
示例8: __init__
def __init__(self, *args, **kwargs):
ThreadedResolver.__init__(self, *args, **kwargs)
self._cache = {}
开发者ID:kenzouyeh,项目名称:scrapy,代码行数:3,代码来源:resolver.py
示例9: __init__
def __init__(self, reactor=None):
if reactor is None:
from twisted.internet import reactor
_ThreadedResolverImpl.__init__(self, reactor)
开发者ID:galaxysd,项目名称:BitTorrent,代码行数:4,代码来源:client.py
注:本文中的twisted.internet.base.ThreadedResolver类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论