本文整理汇总了Python中txtorcon.TCPHiddenServiceEndpoint类的典型用法代码示例。如果您正苦于以下问题:Python TCPHiddenServiceEndpoint类的具体用法?Python TCPHiddenServiceEndpoint怎么用?Python TCPHiddenServiceEndpoint使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了TCPHiddenServiceEndpoint类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: setupHSEndpoint
def setupHSEndpoint(self, tor_process_protocol, torconfig, endpoint):
endpointName = endpoint.settings['name']
def setup_complete(port):
if LooseVersion(txtorcon_version) >= LooseVersion('0.10.0'):
onion_uri = port.address.onion_uri
else:
onion_uri = port.onion_uri
print("Exposed %s Tor hidden service "
"on httpo://%s" % (endpointName, onion_uri))
public_port = 80
data_dir = os.path.join(torconfig.DataDirectory, endpointName)
if LooseVersion(txtorcon_version) >= LooseVersion('0.10.0'):
hs_endpoint = TCPHiddenServiceEndpoint(reactor,
torconfig,
public_port,
hidden_service_dir=data_dir)
else:
hs_endpoint = TCPHiddenServiceEndpoint(reactor,
torconfig,
public_port,
data_dir=data_dir)
d = hs_endpoint.listen(endpoint)
d.addCallback(setup_complete)
d.addErrback(self.txSetupFailed)
return d
开发者ID:isislovecruft,项目名称:ooni-backend,代码行数:27,代码来源:runner.py
示例2: test_basic
def test_basic(self):
ep = TCPHiddenServiceEndpoint(self.reactor, self.config, 123)
d = ep.listen(FakeProtocolFactory())
self.protocol.answers.append('config/names=\nHiddenServiceOptions Virtual\nOK')
self.protocol.answers.append('HiddenServiceOptions')
self.config.bootstrap()
return d
开发者ID:glamrock,项目名称:txtorcon,代码行数:7,代码来源:test_torconfig.py
示例3: test_multiple_listen
def test_multiple_listen(self):
ep = TCPHiddenServiceEndpoint(self.reactor, self.config, 123)
d0 = ep.listen(NoOpProtocolFactory())
@defer.inlineCallbacks
def more_listen(arg):
yield arg.stopListening()
d1 = ep.listen(NoOpProtocolFactory())
def foo(arg):
return arg
d1.addBoth(foo)
defer.returnValue(arg)
return
d0.addBoth(more_listen)
self.config.bootstrap()
def check(arg):
self.assertEqual("127.0.0.1", ep.tcp_endpoint._interface)
self.assertEqual(len(self.config.HiddenServices), 1)
d0.addCallback(check).addErrback(self.fail)
return d0
开发者ID:isislovecruft,项目名称:txtorcon,代码行数:25,代码来源:test_endpoints.py
示例4: test_bad_listener
def test_bad_listener(self):
def test_gen(*args, **kw):
kw['interface'] = '0.0.0.0'
return TCP4ServerEndpoint(*args, **kw)
ep = TCPHiddenServiceEndpoint(self.reactor, self.config, 123,
endpoint_generator=test_gen)
d = ep.listen(FakeProtocolFactory())
class ErrorCallback(object):
got_error = None
def __call__(self, err, *args, **kw):
self.got_error = err.value
error_cb = ErrorCallback()
d.addErrback(error_cb)
## enough answers so the config bootstraps properly
self.protocol.answers.append('config/names=\nHiddenServiceOptions Virtual')
self.protocol.answers.append('HiddenServiceOptions')
self.config.bootstrap()
## now we should have attempted to listen on the endpoint our
## test_gen() is generating - which should be the "wrong"
## answer of anything (0.0.0.0)
self.assertEqual('0.0.0.0', ep.tcp_endpoint._interface)
## ...and the point of this test; ensure we got an error
## trying to listen on not-127.*
self.assertTrue(error_cb.got_error is not None)
self.assertTrue(isinstance(error_cb.got_error, RuntimeError))
return d
开发者ID:biddyweb,项目名称:txtorcon,代码行数:32,代码来源:test_torconfig.py
示例5: test_failure
def test_failure(self):
self.reactor.failures = 1
ep = TCPHiddenServiceEndpoint(self.reactor, self.config, 123)
d = ep.listen(NoOpProtocolFactory())
self.config.bootstrap()
d.addErrback(self.check_error)
return d
开发者ID:coffeemakr,项目名称:txtorcon,代码行数:7,代码来源:test_endpoints.py
示例6: setupCollector
def setupCollector(tor_process_protocol):
def setup_complete(port):
print("Exposed collector Tor hidden service on httpo://%s"
% port.onion_uri)
tempfile.tempdir = os.path.join(_repo_dir, 'tmp')
if not os.path.isdir(tempfile.gettempdir()):
os.makedirs(tempfile.gettempdir())
_temp_dir = tempfile.mkdtemp()
if config.main.tor_datadir is None:
log.warn("Option 'tor_datadir' in oonib.conf is unspecified!")
log.msg("Creating tmp directory in current directory for datadir.")
log.debug("Using %s" % _temp_dir)
datadir = _temp_dir
else:
datadir = config.main.tor_datadir
torconfig = TorConfig(tor_process_protocol.tor_protocol)
public_port = 80
# XXX there is currently a bug in txtorcon that prevents data_dir from
# being passed properly. Details on the bug can be found here:
# https://github.com/meejah/txtorcon/pull/22
hs_endpoint = TCPHiddenServiceEndpoint(reactor, torconfig, public_port,
data_dir=datadir)
hidden_service = hs_endpoint.listen(reportingBackend)
hidden_service.addCallback(setup_complete)
hidden_service.addErrback(txSetupFailed)
开发者ID:stephen-soltesz,项目名称:ooni-backend,代码行数:28,代码来源:runner.py
示例7: test_stealth_auth
def test_stealth_auth(self, ftb):
'''
make sure we produce a HiddenService instance with stealth-auth
lines if we had authentication specified in the first place.
'''
config = TorConfig(self.protocol)
ep = TCPHiddenServiceEndpoint(self.reactor, config, 123, '/dev/null',
stealth_auth=['alice', 'bob'])
# make sure listen() correctly configures our hidden-serivce
# with the explicit directory we passed in above
d = ep.listen(NoOpProtocolFactory())
def foo(fail):
print("ERROR", fail)
d.addErrback(foo)
yield d # returns 'port'
self.assertEqual(1, len(config.HiddenServices))
self.assertEqual(config.HiddenServices[0].dir, '/dev/null')
self.assertEqual(
config.HiddenServices[0].authorize_client[0],
'stealth alice,bob'
)
self.assertEqual(None, ep.onion_uri)
# XXX cheating; private API
config.HiddenServices[0].hostname = 'oh my'
self.assertEqual('oh my', ep.onion_uri)
开发者ID:felipedau,项目名称:txtorcon,代码行数:28,代码来源:test_endpoints.py
示例8: test_explicit_data_dir
def test_explicit_data_dir(self):
ep = TCPHiddenServiceEndpoint(self.reactor, self.config, 123, '/mumble/mumble')
d = ep.listen(FakeProtocolFactory())
self.protocol.answers.append('config/names=\nHiddenServiceOptions Virtual')
self.protocol.answers.append('HiddenServiceOptions')
self.config.bootstrap()
return d
开发者ID:biddyweb,项目名称:txtorcon,代码行数:10,代码来源:test_torconfig.py
示例9: test_already_bootstrapped
def test_already_bootstrapped(self):
self.protocol.answers.append('''config/names=
HiddenServiceOptions Virtual''')
self.protocol.answers.append('HiddenServiceOptions')
self.config.bootstrap()
ep = TCPHiddenServiceEndpoint(self.reactor, self.config, 123)
d = ep.listen(FakeProtocolFactory())
return d
开发者ID:biddyweb,项目名称:txtorcon,代码行数:10,代码来源:test_torconfig.py
示例10: test_basic
def test_basic(self):
ep = TCPHiddenServiceEndpoint(self.reactor, self.config, 123)
d = ep.listen(FakeProtocolFactory())
self.protocol.answers.append('config/names=\nHiddenServiceOptions Virtual')
self.protocol.answers.append('HiddenServiceOptions')
self.config.bootstrap()
self.assertEqual('127.0.0.1', ep.tcp_endpoint._interface)
## make sure _ListWrapper's __repr__ doesn't explode
repr(self.config.HiddenServices)
return d
开发者ID:biddyweb,项目名称:txtorcon,代码行数:10,代码来源:test_torconfig.py
示例11: test_too_many_failures
def test_too_many_failures(self):
self.reactor.failures = 12
ep = TCPHiddenServiceEndpoint(self.reactor, self.config, 123)
d = ep.listen(FakeProtocolFactory())
self.protocol.answers.append('config/names=\nHiddenServiceOptions Virtual')
self.protocol.answers.append('HiddenServiceOptions')
self.config.bootstrap()
d.addErrback(self.check_error)
return d
开发者ID:biddyweb,项目名称:txtorcon,代码行数:10,代码来源:test_torconfig.py
示例12: test_progress_updates
def test_progress_updates(self):
config = TorConfig()
ep = TCPHiddenServiceEndpoint(self.reactor, config, 123)
self.assertTrue(IProgressProvider.providedBy(ep))
prog = IProgressProvider(ep)
ding = Mock()
prog.add_progress_listener(ding)
args = (50, "blarg", "Doing that thing we talked about.")
# kind-of cheating, test-wise?
ep._tor_progress_update(*args)
self.assertTrue(ding.called_with(*args))
开发者ID:coffeemakr,项目名称:txtorcon,代码行数:12,代码来源:test_endpoints.py
示例13: setupHSEndpoint
def setupHSEndpoint(self, tor_process_protocol, torconfig, endpoint):
endpointName = endpoint.settings['name']
def setup_complete(port):
print("Exposed %s Tor hidden service on httpo://%s" % (endpointName,
port.onion_uri))
public_port = 80
hs_endpoint = TCPHiddenServiceEndpoint(reactor, torconfig, public_port,
data_dir=os.path.join(torconfig.DataDirectory, endpointName))
d = hs_endpoint.listen(endpoint)
d.addCallback(setup_complete)
d.addErrback(self.txSetupFailed)
return d
开发者ID:ThisIsNotOfficialCodeItsJustForks,项目名称:ooni-backend,代码行数:13,代码来源:runner.py
示例14: test_explicit_data_dir
def test_explicit_data_dir(self):
config = TorConfig(self.protocol)
ep = TCPHiddenServiceEndpoint(self.reactor, config, 123, "/dev/null")
# make sure listen() correctly configures our hidden-serivce
# with the explicit directory we passed in above
d = ep.listen(NoOpProtocolFactory())
def foo(fail):
print "ERROR", fail
d.addErrback(foo)
port = yield d
self.assertEqual(1, len(config.HiddenServices))
self.assertEqual(config.HiddenServices[0].dir, "/dev/null")
开发者ID:isislovecruft,项目名称:txtorcon,代码行数:15,代码来源:test_endpoints.py
示例15: setupBouncer
def setupBouncer(tor_process_protocol, datadir):
def setup_complete(port):
#XXX: drop some other noise about what API are available on this machine
print("Exposed bouncer Tor hidden service on httpo://%s"
% port.onion_uri)
torconfig = TorConfig(tor_process_protocol.tor_protocol)
public_port = 80
hs_endpoint = TCPHiddenServiceEndpoint(reactor, torconfig, public_port,
data_dir=os.path.join(datadir, 'bouncer'))
d = hs_endpoint.listen(ooniBouncer)
d.addCallback(setup_complete)
d.addErrback(txSetupFailed)
开发者ID:waaaaargh,项目名称:ooni-backend,代码行数:15,代码来源:runner.py
示例16: test_progress_updates_system_tor
def test_progress_updates_system_tor(self, ftb):
control_ep = Mock()
control_ep.connect = Mock(return_value=defer.succeed(None))
directlyProvides(control_ep, IStreamClientEndpoint)
ep = TCPHiddenServiceEndpoint.system_tor(self.reactor, control_ep, 1234)
ep._tor_progress_update(40, "FOO", "foo to bar")
return ep
开发者ID:felipedau,项目名称:txtorcon,代码行数:7,代码来源:test_endpoints.py
示例17: test_private_tor_no_control_port
def test_private_tor_no_control_port(self):
m = Mock()
from txtorcon import endpoints
endpoints.launch_tor = m
ep = yield TCPHiddenServiceEndpoint.private_tor(Mock(), 80)
m.assert_called()
开发者ID:isislovecruft,项目名称:txtorcon,代码行数:7,代码来源:test_endpoints.py
示例18: test_endpoint_properties
def test_endpoint_properties(self):
ep = yield TCPHiddenServiceEndpoint.private_tor(Mock(), 80)
self.assertEqual(None, ep.onion_private_key)
self.assertEqual(None, ep.onion_uri)
ep.hiddenservice = Mock()
ep.hiddenservice.private_key = 'mumble'
self.assertEqual('mumble', ep.onion_private_key)
开发者ID:coffeemakr,项目名称:txtorcon,代码行数:7,代码来源:test_endpoints.py
示例19: test_explicit_data_dir
def test_explicit_data_dir(self, ftb):
with util.TempDir() as tmp:
d = str(tmp)
with open(os.path.join(d, 'hostname'), 'w') as f:
f.write('public')
config = TorConfig(self.protocol)
ep = TCPHiddenServiceEndpoint(self.reactor, config, 123, d)
# make sure listen() correctly configures our hidden-serivce
# with the explicit directory we passed in above
yield ep.listen(NoOpProtocolFactory())
self.assertEqual(1, len(config.HiddenServices))
self.assertEqual(config.HiddenServices[0].dir, d)
self.assertEqual(config.HiddenServices[0].hostname, 'public')
开发者ID:felipedau,项目名称:txtorcon,代码行数:16,代码来源:test_endpoints.py
示例20: test_system_tor
def test_system_tor(self):
from test_torconfig import FakeControlProtocol
def boom(*args):
# why does the new_callable thing need a callable that
# returns a callable? Feels like I must be doing something
# wrong somewhere...
def bam(*args, **kw):
return self.protocol
return bam
with patch('txtorcon.endpoints.launch_tor') as launch_mock:
with patch('txtorcon.endpoints.build_tor_connection', new_callable=boom) as btc:
client = clientFromString(
self.reactor,
"tcp:host=localhost:port=9050"
)
ep = yield TCPHiddenServiceEndpoint.system_tor(self.reactor,
client, 80)
port = yield ep.listen(NoOpProtocolFactory())
toa = port.getHost()
self.assertTrue(hasattr(toa, 'onion_uri'))
self.assertTrue(hasattr(toa, 'onion_port'))
port.startListening()
str(port)
port.tor_config
# system_tor should be connecting to a running one,
# *not* launching a new one.
self.assertFalse(launch_mock.called)
开发者ID:coffeemakr,项目名称:txtorcon,代码行数:28,代码来源:test_endpoints.py
注:本文中的txtorcon.TCPHiddenServiceEndpoint类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论