本文整理汇总了Python中twisted.names.client.Resolver类的典型用法代码示例。如果您正苦于以下问题:Python Resolver类的具体用法?Python Resolver怎么用?Python Resolver使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了Resolver类的15个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: __init__
def __init__(self, servers, reactor):
self.serverHosts = {}
nonHostnameServers = []
for host, ip, port in servers:
self.serverHosts[ip, port] = host
nonHostnameServers.append((ip, port))
NonrecursiveResolver.__init__(self, servers=nonHostnameServers, reactor=reactor)
开发者ID:habnabit,项目名称:spiral,代码行数:7,代码来源:dnscurve.py
示例2: test_no_answer
def test_no_answer(self):
"""
If a request returns a L{dns.NS} response, but we can't connect to the
given server, the request fails with the error returned at connection.
"""
def query(self, *args):
# Pop from the message list, so that it blows up if more queries
# are run than expected.
return succeed(messages.pop(0))
def queryProtocol(self, *args, **kwargs):
return defer.fail(socket.gaierror("Couldn't connect"))
resolver = Resolver(servers=[('0.0.0.0', 0)])
resolver._query = query
messages = []
# Let's patch dns.DNSDatagramProtocol.query, as there is no easy way to
# customize it.
self.patch(dns.DNSDatagramProtocol, "query", queryProtocol)
records = [
dns.RRHeader(name='fooba.com', type=dns.NS, cls=dns.IN, ttl=700,
auth=False,
payload=dns.Record_NS(name='ns.twistedmatrix.com',
ttl=700))]
m = dns.Message(id=999, answer=1, opCode=0, recDes=0, recAv=1, auth=1,
rCode=0, trunc=0, maxSize=0)
m.answers = records
messages.append(m)
return self.assertFailure(
resolver.getHostByName("fooby.com"), socket.gaierror)
开发者ID:ali-hallaji,项目名称:twisted,代码行数:32,代码来源:test_names.py
示例3: performALookup
def performALookup(self, hostname, dns_server):
"""
Performs an A lookup and returns an array containg all the dotted quad
IP addresses in the response.
:hostname: is the hostname to perform the A lookup on
:dns_server: is the dns_server that should be used for the lookup as a
tuple of ip port (ex. ("127.0.0.1", 53))
"""
query = [dns.Query(hostname, dns.A, dns.IN)]
def gotResponse(message):
addrs = []
answers = []
for answer in message.answers:
if answer.type is 1:
addr = answer.payload.dottedQuad()
addrs.append(addr)
answers.append(representAnswer(answer))
DNSTest.addToReport(self, query, resolver=dns_server, query_type='A',
answers=answers, addrs=addrs)
return addrs
def gotError(failure):
failure.trap(gaierror, TimeoutError)
DNSTest.addToReport(self, query, resolver=dns_server, query_type='A',
failure=failure)
return failure
resolver = Resolver(servers=[dns_server])
d = resolver.queryUDP(query, timeout=self.queryTimeout)
d.addCallback(gotResponse)
d.addErrback(gotError)
return d
开发者ID:aagbsn,项目名称:ooni-probe,代码行数:35,代码来源:dnst.py
示例4: test_zoneTransferConnectionFails
def test_zoneTransferConnectionFails(self):
"""
A failed AXFR TCP connection errbacks the L{Deferred} returned
from L{Resolver.lookupZone}.
"""
resolver = Resolver(servers=[("nameserver.invalid", 53)])
return self.assertFailure(resolver.lookupZone("impossible.invalid"),
error.DNSLookupError)
开发者ID:JohnDoes95,项目名称:project_parser,代码行数:8,代码来源:test_names.py
示例5: __init__
def __init__(self, flag=False, servers=[('8.8.8.8', 53)], timeout=(2, 3, 5, 10)):
Resolver.__init__(self, servers=servers, timeout=timeout)
self.flag = flag
self.servers = servers
# load TLD nameservers
if self.flag:
from pickle import load
self.tld_servers = load(open('/usr/local/share/tld_nameservers.pkl'))
开发者ID:0x90CODE,项目名称:hydra_resolver,代码行数:9,代码来源:hydra_resolver.py
示例6: test_a_lookup_ooni_query
def test_a_lookup_ooni_query(self):
def done_query(message, *arg):
answer = message.answers[0]
self.assertEqual(answer.type, 1)
dns_query = [dns.Query('ooni.nu', type=dns.A)]
resolver = Resolver(servers=[('8.8.8.8', 53)])
d = resolver.queryUDP(dns_query)
d.addCallback(done_query)
return d
开发者ID:0xPoly,项目名称:ooni-probe,代码行数:10,代码来源:disable_test_dns.py
示例7: dnsLookup
def dnsLookup(self, hostname, dns_type, dns_server = None):
"""
Performs a DNS lookup and returns the response.
:hostname: is the hostname to perform the DNS lookup on
:dns_type: type of lookup 'NS'/'A'/'SOA'
:dns_server: is the dns_server that should be used for the lookup as a
tuple of ip port (ex. ("127.0.0.1", 53))
"""
types={'NS':dns.NS,'A':dns.A,'SOA':dns.SOA,'PTR':dns.PTR}
dnsType=types[dns_type]
query = [dns.Query(hostname, dnsType, dns.IN)]
def gotResponse(message):
log.debug(dns_type+" Lookup successful")
log.debug(str(message))
addrs = []
answers = []
if dns_server:
msg = message.answers
else:
msg = message[0]
for answer in msg:
if answer.type is dnsType:
if dnsType is dns.SOA:
addr = (answer.name.name,answer.payload.serial)
elif dnsType in [dns.NS,dns.PTR]:
addr = answer.payload.name.name
elif dnsType is dns.A:
addr = answer.payload.dottedQuad()
else:
addr = None
addrs.append(addr)
answers.append(representAnswer(answer))
DNSTest.addToReport(self, query, resolver=dns_server, query_type=dns_type,
answers=answers, addrs=addrs)
return addrs
def gotError(failure):
failure.trap(gaierror, TimeoutError)
DNSTest.addToReport(self, query, resolver=dns_server, query_type=dns_type,
failure=failure)
return failure
if dns_server:
resolver = Resolver(servers=[dns_server])
d = resolver.queryUDP(query, timeout=self.queryTimeout)
else:
lookupFunction={'NS':client.lookupNameservers, 'SOA':client.lookupAuthority, 'A':client.lookupAddress, 'PTR':client.lookupPointer}
d = lookupFunction[dns_type](hostname)
d.addCallback(gotResponse)
d.addErrback(gotError)
return d
开发者ID:GarysRefererence2014,项目名称:ooni-probe,代码行数:54,代码来源:dnst.py
示例8: __init__
def __init__(self, flag=False, servers=[('8.8.8.8', 53)], timeout=(2, 3, 5, 10)):
Resolver.__init__(self, servers=servers, timeout=timeout)
self.flag = flag
self.servers = servers
# load TLD nameservers
if self.flag:
if pkg_resources.resource_exists('hydra_resolver', 'data/tld_nameservers.pkl'):
data = pkg_resources.resource_stream('hydra_resolver', 'data/tld_nameservers.pkl')
self.tld_servers = load(data)
else:
raise EnvironmentError('Unable to load tld_nameservers.pkl')
开发者ID:nms84,项目名称:hydra_resolver,代码行数:12,代码来源:hydra_resolver.py
示例9: test_lookup
def test_lookup(self):
"""
We perform an A lookup on the DNS test servers for the domains to be
tested and an A lookup on the known good DNS server.
We then compare the results from test_resolvers and that from
control_resolver and see if the match up.
If they match up then no censorship is happening (tampering: false).
If they do not we do a reverse lookup (PTR) on the test_resolvers and
the control resolver for every IP address we got back and check to see
if anyone of them matches the control ones.
If they do then we take not of the fact that censorship is probably not
happening (tampering: reverse-match).
If they do not match then censorship is probably going on (tampering:
true).
"""
log.msg("Doing the test lookups on %s" % self.input)
list_of_ds = []
hostname = self.input
dns_query = [dns.Query(hostname, dns.IN, dns.A)]
dns_server = [(self.localOptions['backend'],
self.localOptions['backendport'])]
resolver = Resolver(servers=dns_server)
control_d = resolver.queryUDP(dns_query, timeout=self.lookupTimeout)
control_d.addCallback(self.process_a_answers, 'control')
control_d.addErrback(self.a_lookup_error, 'control')
for test_resolver in self.test_resolvers:
log.msg("Going for %s" % test_resolver)
dns_server = [(test_resolver, 53)]
resolver = Resolver(servers=dns_server)
d = resolver.queryUDP(dns_query, timeout=self.lookupTimeout)
d.addCallback(self.process_a_answers, test_resolver)
d.addErrback(self.a_lookup_error, test_resolver)
# This is required to cancel the delayed calls of the
# twisted.names.client resolver
list_of_ds.append(d)
list_of_ds.append(control_d)
dl = defer.DeferredList(list_of_ds)
dl.addCallback(self.do_reverse_lookups)
dl.addBoth(self.compare_results)
return dl
开发者ID:jonmtoz,项目名称:ooni-probe,代码行数:51,代码来源:dnstamper.py
示例10: performALookup
def performALookup(self, hostname, dns_server):
"""
Performs an A lookup and returns an array containg all the dotted quad
IP addresses in the response.
:hostname: is the hostname to perform the A lookup on
:dns_server: is the dns_server that should be used for the lookup as a
tuple of ip port (ex. ("127.0.0.1", 53))
"""
query = [dns.Query(hostname, dns.IN, dns.A)]
def gotResponse(message):
addrs = []
answers = []
for answer in message.answers:
if answer.type is 1:
addr = answer.payload.dottedQuad()
addrs.append(addr)
# We store the resource record and the answer payload in a
# tuple
r = (repr(answer), repr(answer.payload))
answers.append(r)
result = {}
result['resolver'] = dns_server
result['query_type'] = 'A'
result['query'] = repr(query)
result['answers'] = answers
result['addrs'] = addrs
self.report['queries'].append(result)
return addrs
def gotError(failure):
log.exception(failure)
result = {}
result['resolver'] = dns_server
result['query_type'] = 'A'
result['query'] = repr(query)
result['error'] = str(failure)
return None
resolver = Resolver(servers=[dns_server])
d = resolver.queryUDP(query, timeout=self.queryTimeout)
d.addCallback(gotResponse)
d.addErrback(gotError)
return d
开发者ID:Shadiesna,项目名称:ooni-probe,代码行数:45,代码来源:dnst.py
示例11: performPTRLookup
def performPTRLookup(self, address, dns_server):
"""
Does a reverse DNS lookup on the input ip address
:address: the IP Address as a dotted quad to do a reverse lookup on.
:dns_server: is the dns_server that should be used for the lookup as a
tuple of ip port (ex. ("127.0.0.1", 53))
"""
ptr = '.'.join(address.split('.')[::-1]) + '.in-addr.arpa'
query = [dns.Query(ptr, dns.IN, dns.PTR)]
def gotResponse(message):
answers = []
name = None
for answer in message.answers:
if answer.type is 12:
name = answer.payload.name
result = {}
result['resolver'] = dns_server
result['query_type'] = 'PTR'
result['query'] = repr(query)
result['answers'] = answers
result['name'] = name
self.report['queries'].append(result)
return name
def gotError(failure):
log.exception(failure)
result = {}
result['resolver'] = dns_server
result['query_type'] = 'PTR'
result['query'] = repr(query)
result['error'] = str(failure)
return None
resolver = Resolver(servers=[dns_server])
d = resolver.queryUDP(query, timeout=self.queryTimeout)
d.addCallback(gotResponse)
d.addErrback(gotError)
return d
开发者ID:Shadiesna,项目名称:ooni-probe,代码行数:41,代码来源:dnst.py
示例12: performPTRLookup
def performPTRLookup(self, address, dns_server):
"""
Does a reverse DNS lookup on the input ip address
:address: the IP Address as a dotted quad to do a reverse lookup on.
:dns_server: is the dns_server that should be used for the lookup as a
tuple of ip port (ex. ("127.0.0.1", 53))
"""
ptr = '.'.join(address.split('.')[::-1]) + '.in-addr.arpa'
query = [dns.Query(ptr, dns.PTR, dns.IN)]
def gotResponse(message):
log.debug("Lookup successful")
log.debug(message)
answers = []
name = ''
for answer in message.answers:
if answer.type is 12:
name = str(answer.payload.name)
answers.append(representAnswer(answer))
DNSTest.addToReport(self, query, resolver=dns_server,
query_type = 'PTR', answers=answers, name=name)
return name
def gotError(failure):
log.err("Failed to perform lookup")
log.exception(failure)
failure.trap(gaierror, TimeoutError)
DNSTest.addToReport(self, query, resolver=dns_server,
query_type = 'PTR', failure=failure)
return None
resolver = Resolver(servers=[dns_server])
d = resolver.queryUDP(query, timeout=self.queryTimeout)
d.addCallback(gotResponse)
d.addErrback(gotError)
return d
开发者ID:aagbsn,项目名称:ooni-probe,代码行数:38,代码来源:dnst.py
示例13: setUp
def setUp(self):
# Create a resolver pointed at an invalid server - we won't be hitting
# the network in any of these tests.
self.resolver = Resolver(servers=[('0.0.0.0', 0)])
开发者ID:Almad,项目名称:twisted,代码行数:4,代码来源:test_names.py
示例14: FilterAnswersTests
class FilterAnswersTests(unittest.TestCase):
"""
Test L{twisted.names.client.Resolver.filterAnswers}'s handling of various
error conditions it might encounter.
"""
def setUp(self):
# Create a resolver pointed at an invalid server - we won't be hitting
# the network in any of these tests.
self.resolver = Resolver(servers=[('0.0.0.0', 0)])
def test_truncatedMessage(self):
"""
Test that a truncated message results in an equivalent request made via
TCP.
"""
m = Message(trunc=True)
m.addQuery('example.com')
def queryTCP(queries):
self.assertEqual(queries, m.queries)
response = Message()
response.answers = ['answer']
response.authority = ['authority']
response.additional = ['additional']
return succeed(response)
self.resolver.queryTCP = queryTCP
d = self.resolver.filterAnswers(m)
d.addCallback(
self.assertEqual, (['answer'], ['authority'], ['additional']))
return d
def _rcodeTest(self, rcode, exc):
m = Message(rCode=rcode)
err = self.resolver.filterAnswers(m)
err.trap(exc)
def test_formatError(self):
"""
Test that a message with a result code of C{EFORMAT} results in a
failure wrapped around L{DNSFormatError}.
"""
return self._rcodeTest(EFORMAT, DNSFormatError)
def test_serverError(self):
"""
Like L{test_formatError} but for C{ESERVER}/L{DNSServerError}.
"""
return self._rcodeTest(ESERVER, DNSServerError)
def test_nameError(self):
"""
Like L{test_formatError} but for C{ENAME}/L{DNSNameError}.
"""
return self._rcodeTest(ENAME, DNSNameError)
def test_notImplementedError(self):
"""
Like L{test_formatError} but for C{ENOTIMP}/L{DNSNotImplementedError}.
"""
return self._rcodeTest(ENOTIMP, DNSNotImplementedError)
def test_refusedError(self):
"""
Like L{test_formatError} but for C{EREFUSED}/L{DNSQueryRefusedError}.
"""
return self._rcodeTest(EREFUSED, DNSQueryRefusedError)
def test_refusedErrorUnknown(self):
"""
Like L{test_formatError} but for an unrecognized error code and
L{DNSUnknownError}.
"""
return self._rcodeTest(EREFUSED + 1, DNSUnknownError)
开发者ID:Almad,项目名称:twisted,代码行数:81,代码来源:test_names.py
示例15: __init__
def __init__(self, *args, **kwargs):
Resolver.__init__(self, *args, **kwargs)
self.clear_cache()
开发者ID:seraphlnWu,项目名称:creditor,代码行数:3,代码来源:http.py
注:本文中的twisted.names.client.Resolver类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论