本文整理汇总了Python中util.UpnpPunch类的典型用法代码示例。如果您正苦于以下问题:Python UpnpPunch类的具体用法?Python UpnpPunch怎么用?Python UpnpPunch使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了UpnpPunch类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: test_send_requests_success
def test_send_requests_success(self):
with mock.patch(
'util.UpnpPunch._send_soap_request') as mock_send_request:
mock_send_request.return_value = mock.MagicMock(status=200)
upnp._send_requests(['msg'], None, None, None)
assert mock_send_request.called
开发者ID:0-vortex,项目名称:ZeroNet,代码行数:7,代码来源:TestUpnpPunch.py
示例2: openport
def openport(self, port=None, check=True):
if not port:
port = self.port
if self.port_opened:
return True # Port already opened
if check: # Check first if its already opened
time.sleep(1) # Wait for port open
if self.testOpenport(port, use_alternative=False)["result"] is True:
return True # Port already opened
if config.tor == "always": # Port opening won't work in Tor mode
return False
self.log.info("Trying to open port using UpnpPunch...")
try:
UpnpPunch.ask_to_open_port(self.port, 'ZeroNet', retries=3, protos=["TCP"])
except (UpnpPunch.UpnpError, UpnpPunch.IGDError, socket.error) as err:
self.log.error("UpnpPunch run error: %s" %
Debug.formatException(err))
return False
if self.testOpenport(port)["result"] is True:
self.upnp_port_opened = True
return True
self.log.info("Upnp mapping failed :( Please forward port %s on your router to your ipaddress" % port)
return False
开发者ID:Emeraude,项目名称:ZeroNet,代码行数:27,代码来源:FileServer.py
示例3: test_perform_m_search_socket_error
def test_perform_m_search_socket_error(self, mock_socket):
mock_socket.recv.side_effect = socket.error('Timeout error')
with mock.patch('util.UpnpPunch.socket.socket',
return_value=mock_socket):
with pytest.raises(upnp.UpnpError):
upnp.perform_m_search('127.0.0.1')
开发者ID:0-vortex,项目名称:ZeroNet,代码行数:7,代码来源:TestUpnpPunch.py
示例4: test_send_requests_failed
def test_send_requests_failed(self):
with mock.patch(
'util.UpnpPunch._send_soap_request') as mock_send_request:
mock_send_request.return_value = mock.MagicMock(status=500)
with pytest.raises(upnp.UpnpError):
upnp._send_requests(['msg'], None, None, None)
assert mock_send_request.called
开发者ID:0-vortex,项目名称:ZeroNet,代码行数:8,代码来源:TestUpnpPunch.py
示例5: stop
def stop(self):
if self.running and self.upnp_port_opened:
self.log.debug('Closing port %d' % self.port)
try:
UpnpPunch.ask_to_close_port(self.port, protos=["TCP"])
self.log.info('Closed port via upnp.')
except (UpnpPunch.UpnpError, UpnpPunch.IGDError), err:
self.log.info("Failed at attempt to use upnp to close port: %s" % err)
开发者ID:Emeraude,项目名称:ZeroNet,代码行数:8,代码来源:FileServer.py
示例6: test_perform_m_search
def test_perform_m_search(self, mock_socket):
local_ip = '127.0.0.1'
with mock.patch('util.UpnpPunch.socket.socket',
return_value=mock_socket):
result = upnp.perform_m_search(local_ip)
assert result == 'Hello'
assert local_ip == mock_socket.bind.call_args_list[0][0][0][0]
assert ('239.255.255.250',
1900) == mock_socket.sendto.call_args_list[0][0][1]
开发者ID:0-vortex,项目名称:ZeroNet,代码行数:10,代码来源:TestUpnpPunch.py
示例7: openport
def openport(self, port=None, check=True):
if not port: port = self.port
if self.port_opened: return True # Port already opened
if check: # Check first if its already opened
if self.testOpenport(port)["result"] == True:
return True # Port already opened
self.log.info("Trying to open port using UpnpPunch...")
try:
upnp_punch = UpnpPunch.open_port(self.port, 'ZeroNet')
upnp_punch = True
except Exception, err:
self.log.error("UpnpPunch run error: %s" % Debug.formatException(err))
upnp_punch = False
开发者ID:EdenSG,项目名称:ZeroNet,代码行数:14,代码来源:FileServer.py
示例8: openport
def openport(self, port=None, check=True):
if not port:
port = self.port
if self.port_opened:
return True # Port already opened
if check: # Check first if its already opened
time.sleep(1) # Wait for port open
if self.testOpenport(port, use_alternative=False)["result"] is True:
return True # Port already opened
if config.tor == "always": # Port opening won't work in Tor mode
return False
self.log.info("Trying to open port using UpnpPunch...")
try:
upnp_punch = UpnpPunch.open_port(self.port, 'Phantom')
upnp_punch = True
except Exception, err:
upnp_punch = False
开发者ID:shiftcurrency,项目名称:phantom,代码行数:19,代码来源:FileServer.py
示例9: getMyIps
def getMyIps(self):
return UpnpPunch._get_local_ips()
开发者ID:oleduc,项目名称:ZeroNet,代码行数:2,代码来源:BroadcastServer.py
示例10: test_create_close_message_parsable
def test_create_close_message_parsable(self):
from xml.parsers.expat import ExpatError
msg, _ = upnp._create_close_message('127.0.0.1', 8888)
try:
upnp.parseString(msg)
except ExpatError as e:
pytest.fail('Incorrect XML message: {}'.format(e))
开发者ID:0-vortex,项目名称:ZeroNet,代码行数:7,代码来源:TestUpnpPunch.py
示例11: test_communicate_with_igd_succeed_despite_single_failure
def test_communicate_with_igd_succeed_despite_single_failure(
self, mock_orchestrate, mock_get_local_ips):
mock_get_local_ips.return_value = ['192.168.0.12']
mock_orchestrate.side_effect = [upnp.UpnpError, None]
upnp._communicate_with_igd(retries=2)
assert mock_get_local_ips.called
assert mock_orchestrate.called
开发者ID:0-vortex,项目名称:ZeroNet,代码行数:7,代码来源:TestUpnpPunch.py
示例12: test_ask_to_open_port_failure
def test_ask_to_open_port_failure(self, mock_send_requests,
mock_collect_idg, mock_local_ips):
mock_local_ips.return_value = ['192.168.0.12']
mock_collect_idg.return_value = {'upnp_schema': 'schema-yo'}
mock_send_requests.side_effect = upnp.UpnpError()
with pytest.raises(upnp.UpnpError):
upnp.ask_to_open_port()
开发者ID:0-vortex,项目名称:ZeroNet,代码行数:8,代码来源:TestUpnpPunch.py
示例13: test_communicate_with_igd_total_failure
def test_communicate_with_igd_total_failure(self, mock_orchestrate,
mock_get_local_ips):
mock_get_local_ips.return_value = ['192.168.0.12']
mock_orchestrate.side_effect = [upnp.UpnpError, upnp.IGDError]
with pytest.raises(upnp.UpnpError):
upnp._communicate_with_igd(retries=2)
assert mock_get_local_ips.called
assert mock_orchestrate.called
开发者ID:0-vortex,项目名称:ZeroNet,代码行数:8,代码来源:TestUpnpPunch.py
示例14: test_parse_for_errors_error
def test_parse_for_errors_error(self, httplib_response):
soap_error = ('<document>'
'<errorCode>500</errorCode>'
'<errorDescription>Bad request</errorDescription>'
'</document>')
rsp = httplib_response(status=500, body=soap_error)
with pytest.raises(upnp.IGDError) as exc:
upnp._parse_for_errors(rsp)
assert 'SOAP request error' in exc.value.message
开发者ID:0-vortex,项目名称:ZeroNet,代码行数:9,代码来源:TestUpnpPunch.py
示例15: test_orchestrate_soap_request_without_desc
def test_orchestrate_soap_request_without_desc(self, mock_send_requests,
mock_collect_idg):
soap_mock = mock.MagicMock()
args = ['127.0.0.1', 31337, soap_mock, {'upnp_schema': 'schema-yo'}]
mock_collect_idg.return_value = args[-1]
upnp._orchestrate_soap_request(*args[:-1])
assert mock_collect_idg.called
soap_mock.assert_called_with(*args[:2] + [None, 'UDP', 'schema-yo'])
assert mock_send_requests.called
开发者ID:0-vortex,项目名称:ZeroNet,代码行数:11,代码来源:TestUpnpPunch.py
示例16: test_retrieve_location_from_ssdp
def test_retrieve_location_from_ssdp(self, url_obj):
ctrl_location = url_obj.geturl()
parsed_location = urlparse(ctrl_location)
rsp = ('auth: gibberish\r\nlocation: {0}\r\n'
'Content-Type: text/html\r\n\r\n').format(ctrl_location)
result = upnp._retrieve_location_from_ssdp(rsp)
assert result == parsed_location
开发者ID:0-vortex,项目名称:ZeroNet,代码行数:7,代码来源:TestUpnpPunch.py
示例17: test_create_close_message_contains_right_stuff
def test_create_close_message_contains_right_stuff(self):
settings = {'protocol': 'test proto',
'upnp_schema': 'test schema'}
msg, fn_name = upnp._create_close_message('127.0.0.1', 8888, **
settings)
assert fn_name == 'DeletePortMapping'
assert '8888' in msg
assert settings['protocol'] in msg
assert settings['upnp_schema'] in msg
开发者ID:0-vortex,项目名称:ZeroNet,代码行数:9,代码来源:TestUpnpPunch.py
示例18: test_create_open_message_contains_right_stuff
def test_create_open_message_contains_right_stuff(self):
settings = {'description': 'test desc',
'protocol': 'test proto',
'upnp_schema': 'test schema'}
msg, fn_name = upnp._create_open_message('127.0.0.1', 8888, **settings)
assert fn_name == 'AddPortMapping'
assert '127.0.0.1' in msg
assert '8888' in msg
assert settings['description'] in msg
assert settings['protocol'] in msg
assert settings['upnp_schema'] in msg
开发者ID:0-vortex,项目名称:ZeroNet,代码行数:11,代码来源:TestUpnpPunch.py
示例19: test_ask_to_open_port_success
def test_ask_to_open_port_success(self, mock_send_requests,
mock_collect_idg, mock_local_ips):
mock_collect_idg.return_value = {'upnp_schema': 'schema-yo'}
mock_local_ips.return_value = ['192.168.0.12']
result = upnp.ask_to_open_port(retries=5)
soap_msg = mock_send_requests.call_args[0][0][0][0]
assert result is None
assert mock_collect_idg.called
assert '192.168.0.12' in soap_msg
assert '15441' in soap_msg
assert 'schema-yo' in soap_msg
开发者ID:0-vortex,项目名称:ZeroNet,代码行数:15,代码来源:TestUpnpPunch.py
示例20: test_parse_for_errors_bad_rsp
def test_parse_for_errors_bad_rsp(self, httplib_response):
rsp = httplib_response(status=500)
with pytest.raises(upnp.IGDError) as exc:
upnp._parse_for_errors(rsp)
assert 'Unable to parse' in exc.value.message
开发者ID:0-vortex,项目名称:ZeroNet,代码行数:5,代码来源:TestUpnpPunch.py
注:本文中的util.UpnpPunch类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论