本文整理汇总了Python中urllib3.disable_warnings函数的典型用法代码示例。如果您正苦于以下问题:Python disable_warnings函数的具体用法?Python disable_warnings怎么用?Python disable_warnings使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了disable_warnings函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: publish_plot
def publish_plot(instrument, run_number, files, config=None):
# read the configuration if one isn't provided
if config is None:
config = read_configuration()
# verify that it has an attribute that matters
try:
config.publish_url
except AttributeError: # assume that it is a filename
config = read_configuration(config)
run_number = str(run_number)
url = _getURL(config.publish_url, instrument, run_number)
print('posting to \'%s\'' % url)
# these next 2 lines are explicity bad - and doesn't seem
# to do ANYTHING
# https://urllib3.readthedocs.org/en/latest/security.html
import urllib3
urllib3.disable_warnings()
import requests
request = requests.post(url, data={'username': config.publisher_username,
'password': config.publisher_password},
files=files, verify=False)
return request
开发者ID:peterfpeterson,项目名称:finddata,代码行数:25,代码来源:publish_plot.py
示例2: load_drivers
def load_drivers(self):
subprocess.check_call("sudo mount -o remount,rw /", shell=True)
subprocess.check_call("sudo mount -o remount,rw /root_bypass_ramdisks", shell=True)
mac = subprocess.check_output("/sbin/ifconfig eth0 |grep -Eo ..\(\:..\){5}", shell=True).decode('utf-8').split('\n')[0]
#response = requests.get(url, auth=(username, db_uuid.split('\n')[0]), stream=True)
server = get_odoo_server_url()
if server:
urllib3.disable_warnings()
pm = urllib3.PoolManager(cert_reqs='CERT_NONE')
resp = False
server = server + '/iot/get_drivers'
try:
resp = pm.request('POST',
server,
fields={'mac': mac})
except Exception as e:
_logger.error('Could not reach configured server')
_logger.error('A error encountered : %s ' % e)
if resp and resp.data:
zip_file = zipfile.ZipFile(io.BytesIO(resp.data))
zip_file.extractall(get_resource_path('hw_drivers', 'drivers'))
subprocess.check_call("sudo service odoo restart", shell=True)
subprocess.check_call("sudo mount -o remount,ro /", shell=True)
subprocess.check_call("sudo mount -o remount,ro /root_bypass_ramdisks", shell=True)
return "<meta http-equiv='refresh' content='20; url=http://" + get_ip() + ":8069/list_drivers'>"
开发者ID:Vauxoo,项目名称:odoo,代码行数:28,代码来源:main.py
示例3: _CheckPythonVersionAndDisableWarnings
def _CheckPythonVersionAndDisableWarnings(self):
"""Checks python version, and disables SSL warnings.
urllib3 will warn on each HTTPS request made by older versions of Python.
Rather than spamming the user, we print one warning message, then disable
warnings in urllib3.
"""
if self._checked_for_old_python_version:
return
if sys.version_info[0:3] < (2, 7, 9):
logging.warn(
u'You are running a version of Python prior to 2.7.9. Your version '
u'of Python has multiple weaknesses in its SSL implementation that '
u'can allow an attacker to read or modify SSL encrypted data. '
u'Please update. Further SSL warnings will be suppressed. See '
u'https://www.python.org/dev/peps/pep-0466/ for more information.')
# Some distributions de-vendor urllib3 from requests, so we have to
# check if this has occurred and disable warnings in the correct
# package.
if (hasattr(requests, u'packages') and
hasattr(requests.packages, u'urllib3') and
hasattr(requests.packages.urllib3, u'disable_warnings')):
requests.packages.urllib3.disable_warnings()
else:
if urllib3 and hasattr(urllib3, u'disable_warnings'):
urllib3.disable_warnings()
self._checked_for_old_python_version = True
开发者ID:joachimmetz,项目名称:plaso,代码行数:27,代码来源:interface.py
示例4: __init__
def __init__(self, connect_str=None, config=None):
connect_args = {}
if(config.protocol):
connect_args['protocol'] = config.protocol
# if(config.requests_kwargs):
# connect_args["requests_kwargs"] = config.requests_kwargs
if(config.auth_filename):
urllib3.disable_warnings(urllib3.exceptions.SecurityWarning)
f = open(config.auth_filename)
creds = f.read().splitlines()
if(connect_args.get("requests_kwargs")==None):
connect_args["requests_kwargs"] = {}
connect_args["requests_kwargs"]["auth"] = requests.auth.HTTPBasicAuth(creds[0], creds[1])
f.close()
if(config.verify_filename):
urllib3.disable_warnings(urllib3.exceptions.SecurityWarning)
if(connect_args.get("requests_kwargs")==None):
connect_args["requests_kwargs"] = {}
connect_args["requests_kwargs"]["verify"] = config.verify_filename
# 'protocol': 'https',
# 'requests_kwargs' : requests_kwargs
# }
try:
self.engine = sqlalchemy.create_engine(connect_str, connect_args = connect_args)
except: # TODO: bare except; but what's an ArgumentError?
print(self.tell_format())
raise
self.dialect = self.engine.url.get_dialect()
self.metadata = sqlalchemy.MetaData(bind=self.engine)
self.name = self.assign_name(self.engine)
self.session = self.engine.connect()
self.connections[self.name] = self
self.connections[str(self.metadata.bind.url)] = self
Connection.current = self
开发者ID:gett-labs,项目名称:ipython-sql,代码行数:34,代码来源:connection.py
示例5: download_if_needed
def download_if_needed(url, filename):
"""
Download from URL to filename unless filename already exists
"""
if os.path.exists(filename):
filenameExists=(filename, 'already exists')
return filenameExists
elif filename:
import requests
urllib3.disable_warnings()
try:
connection_pool = urllib3.PoolManager(retries=urllib3.Retry(5))
resp = connection_pool.request('GET',url,timeout=urllib3.Timeout(total=5.0))
f = open(filename, 'wb')
f.write(resp.data)
f.close()
resp.release_conn()
downloadSuccessful=('Downloading', filename)
return(downloadSuccessful)
except MaxRetryError as e:
exception='Could not connect to server. Please check to make sure the URL is valid and try again.'
return exception
except LocationValueError as e:
exception=str(e)
return exception
else:
warningMessage='Please input a correct filename.'
return warningMessage
开发者ID:UWSEDS,项目名称:stellar-homework,代码行数:28,代码来源:pronto_utils.py
示例6: __init__
def __init__(self, _url=None, token=None, _cert=None, _verify=True,
_timeout=30, _proxies=None, _allow_redirects=True,
_session=None):
self.version = None
self.vault_addr = os.environ.get('VAULT_ADDR')
if not self.vault_addr:
raise aomi.exceptions.AomiError('VAULT_ADDR is undefined or empty')
if not self.vault_addr.startswith("http"):
raise aomi.exceptions.AomiError('VAULT_ADDR must be a URL')
ssl_verify = True
if 'VAULT_SKIP_VERIFY' in os.environ:
if os.environ['VAULT_SKIP_VERIFY'] == '1':
import urllib3
urllib3.disable_warnings()
ssl_verify = False
self.initial_token = None
self.operational_token = None
session = requests.Session()
retries = Retry(total=5,
backoff_factor=0.5)
adapter = HTTPAdapter(max_retries=retries)
session.mount('https://', adapter)
session.mount('http://', adapter)
super(Client, self).__init__(url=self.vault_addr,
verify=ssl_verify,
session=session)
开发者ID:Autodesk,项目名称:aomi,代码行数:29,代码来源:vault.py
示例7: download_zip
def download_zip(url: str) -> BytesIO:
"""Download data from url."""
logger.warning('start chromium download.\n'
'Download may take a few minutes.')
# disable warnings so that we don't need a cert.
# see https://urllib3.readthedocs.io/en/latest/advanced-usage.html for more
urllib3.disable_warnings()
with urllib3.PoolManager() as http:
# Get data from url.
# set preload_content=False means using stream later.
data = http.request('GET', url, preload_content=False)
try:
total_length = int(data.headers['content-length'])
except (KeyError, ValueError, AttributeError):
total_length = 0
process_bar = tqdm(total=total_length)
# 10 * 1024
_data = BytesIO()
for chunk in data.stream(10240):
_data.write(chunk)
process_bar.update(len(chunk))
process_bar.close()
logger.warning('\nchromium download done.')
return _data
开发者ID:fvinas,项目名称:pyppeteer,代码行数:30,代码来源:chromium_downloader.py
示例8: configure
def configure():
if "cygwin" in system().lower():
raise exception.CygwinEnvDetected()
# https://urllib3.readthedocs.org
# /en/latest/security.html#insecureplatformwarning
try:
import urllib3
urllib3.disable_warnings()
except (AttributeError, ImportError):
pass
# handle PLATFORMIO_FORCE_COLOR
if str(os.getenv("PLATFORMIO_FORCE_COLOR", "")).lower() == "true":
try:
# pylint: disable=protected-access
click._compat.isatty = lambda stream: True
except: # pylint: disable=bare-except
pass
# Handle IOError issue with VSCode's Terminal (Windows)
click_echo_origin = [click.echo, click.secho]
def _safe_echo(origin, *args, **kwargs):
try:
click_echo_origin[origin](*args, **kwargs)
except IOError:
(sys.stderr.write if kwargs.get("err") else
sys.stdout.write)("%s\n" % (args[0] if args else ""))
click.echo = lambda *args, **kwargs: _safe_echo(0, *args, **kwargs)
click.secho = lambda *args, **kwargs: _safe_echo(1, *args, **kwargs)
开发者ID:efreeway,项目名称:platformio-core,代码行数:32,代码来源:__main__.py
示例9: _setup_session
def _setup_session(self, profile):
"""
Set up a python-requests session for our usage.
"""
session = FuturesSession(max_workers=self.MAX_WORKERS)
session.proxies = self.PROXIES
session.verify = not self.logger.settings['ssl_insecure'] and self.VERIFY
if not session.verify:
try:
urllib3.disable_warnings()
except AttributeError:
self.logger.warning('Urllib3 is too old, warnings won\'t be disable')
# defines a max_retries. It's mandatory in case a server is not
# handling keep alive correctly, like the proxy burp
a = requests.adapters.HTTPAdapter(max_retries=self.MAX_RETRIES)
session.mount('http://', a)
session.mount('https://', a)
if self.TIMEOUT:
session.timeout = self.TIMEOUT
## weboob only can provide proxy and HTTP auth options
session.trust_env = False
profile.setup_session(session)
if self.logger.settings['save_responses']:
session.hooks['response'].append(self._save)
self.session = session
session.cookies = WeboobCookieJar()
开发者ID:wazari972,项目名称:weboob,代码行数:34,代码来源:browsers.py
示例10: http_session
def http_session(self, retries=None):
"""Returns a :class:`requests.Session` object. A new session is
created if it doesn't already exist."""
http_session = getattr(self, '_http_session', None)
# try to get the source definition if available
source_definition = getattr(self, 'source_definition', {})
if not retries:
retries = source_definition.get('http_retries', 0)
if not http_session:
urllib3.disable_warnings()
session = requests.Session()
session.headers['User-Agent'] = USER_AGENT
http_retry = CustomRetry(total=retries, status_forcelist=[500, 503],
backoff_factor=.4)
http_adapter = HTTPAdapter(max_retries=http_retry)
session.mount('http://', http_adapter)
http_retry = CustomRetry(total=retries, status_forcelist=[500, 503],
backoff_factor=.4)
http_adapter = HTTPAdapter(max_retries=http_retry)
session.mount('https://', http_adapter)
self._http_session = session
return self._http_session
开发者ID:openstate,项目名称:open-raadsinformatie,代码行数:28,代码来源:http.py
示例11: _build_cluster_configs
def _build_cluster_configs(cluster_list):
cluster_configs = []
for cluster in cluster_list:
username, password, verify_ssl = _get_cluster_auth_data(cluster)
if cluster in g_cluster_configs:
cluster_name, isi_sdk, api_client, version = \
g_cluster_configs[cluster]
else:
if verify_ssl is False:
urllib3.disable_warnings()
try:
isi_sdk, api_client, version = \
isi_sdk_utils.configure(
cluster, username, password, verify_ssl)
except RuntimeError as exc:
print >> sys.stderr, "Failed to configure SDK for " \
"cluster %s. Exception raised: %s" \
% (cluster, str(exc))
sys.exit(1)
print "Configured %s as version %d cluster, using SDK %s." \
% (cluster, int(version), isi_sdk.__name__)
cluster_name = \
_query_cluster_name(cluster, isi_sdk, api_client)
g_cluster_configs[cluster] = \
cluster_name, isi_sdk, api_client, version
cluster_config = \
ClusterConfig(
cluster, cluster_name, version, isi_sdk, api_client)
cluster_configs.append(cluster_config)
return cluster_configs
开发者ID:Isilon,项目名称:isilon_data_insights_connector,代码行数:33,代码来源:isi_data_insights_config.py
示例12: request
def request(self, method, url, headers, post_data=None, **kwargs):
import urllib3
from urllib3.util.retry import Retry
urllib3.disable_warnings()
retries = Retry(total=False, connect=None, read=None, redirect=None)
pool_kwargs = {}
if self._verify_ssl_certs:
pool_kwargs['cert_reqs'] = 'CERT_REQUIRED'
pool_kwargs['ca_certs'] = os.path.join(
os.path.dirname(os.path.dirname(os.path.abspath(__file__))), 'data/ca-certificates.crt')
if self._allow_redirect:
kwargs['redirect'] = True
else:
kwargs['redirect'] = False
# 如果是长连接模式
if self._conn_pool:
global conn_pool
if not conn_pool:
conn_pool = urllib3.PoolManager(num_pools=max(100, self._conn_pool), maxsize=max(100, self._conn_pool), **pool_kwargs)
conn = conn_pool
else:
conn = urllib3.PoolManager(**pool_kwargs)
result = conn.request(method=method, url=url, body=post_data, headers=headers, timeout=self._timeout, retries=retries, **kwargs)
self.content, self.code, self.headers = result.data, result.status, result.headers
return self.content, self.code, self.headers
开发者ID:zhaoweikid,项目名称:zbase,代码行数:32,代码来源:http_client.py
示例13: request
def request(self, method, url, verify=False, random_ua=False, allow_post_redirects=False, *args, **kwargs):
self.headers.update({'Accept-Encoding': 'gzip, deflate',
'User-Agent': (sickrage.app.user_agent, UserAgent().random)[random_ua]})
if not verify:
disable_warnings()
if allow_post_redirects and method == 'POST':
sickrage.app.log.debug('Retrieving redirect URL for {url}'.format(**{'url': url}))
response = super(WebSession, self).request(method, url, allow_redirects=False)
url = self.get_redirect_target(response) or url
response = super(WebSession, self).request(method, url, verify=self._get_ssl_cert(verify), *args, **kwargs)
if self.cloudflare:
response = WebHelpers.cloudflare(self, response, **kwargs)
try:
# check web response for errors
response.raise_for_status()
except requests.exceptions.SSLError as e:
if ssl.OPENSSL_VERSION_INFO < (1, 0, 1, 5):
sickrage.app.log.info(
"SSL Error requesting url: '{}' You have {}, try upgrading OpenSSL to 1.0.1e+".format(
e.request.url, ssl.OPENSSL_VERSION))
if sickrage.app.config.ssl_verify:
sickrage.app.log.info(
"SSL Error requesting url: '{}', try disabling cert verification in advanced settings".format(
e.request.url))
except Exception:
pass
return response
开发者ID:SiCKRAGETV,项目名称:SiCKRAGE,代码行数:33,代码来源:__init__.py
示例14: profile_device
def profile_device(dhcp_fingerprint, macaddr, vendor_class_id):
data = {}
data['dhcp_fingerprint'] = ','.join(map(str, dhcp_fingerprint))
data['debug'] = 'on'
data['mac'] = macaddr
data['vendor_class_id'] = vendor_class_id
print(f"Will attempt to profile using {dhcp_fingerprint}, {macaddr}, and {vendor_class_id}")
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
try:
response = requests.post(fingerbank_url,
headers=headers,
params=params,
data=json.dumps(data))
except requests.exceptions.HTTPError as e:
log_fingerbank_error(e, response)
return -1
log_fingerbank_response(response.json())
# If score is less than 30, there is very little confidence on the returned profile. Ignore it.
if (response.json()['score'] < 30):
return -1
return response.json()['device']['name']
开发者ID:drath,项目名称:drath.github.io,代码行数:26,代码来源:discovery.py
示例15: cli
def cli(debug, config_dir, output, query):
"""CloudMunch CLI.\n
This is a CLI tool for executing commands against CloudMunch API.
"""
log = logging.getLogger('cloudmunch')
stdoutHanlder = logging.StreamHandler()
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
stdoutHanlder.setFormatter(formatter)
log.addHandler(stdoutHanlder)
if debug:
log.setLevel(logging.DEBUG)
import httplib
httplib.HTTPConnection.debuglevel = 1
requests_log = logging.getLogger("requests.packages.urllib3")
requests_log.setLevel(logging.DEBUG)
requests_log.propagate = True
if sys.version_info >= (2,7):
logging.captureWarnings(True)
else:
log.setLevel(logging.NOTSET)
urllib3.disable_warnings()
if sys.version_info >= (2,7):
logging.captureWarnings(False)
log.info('Setting config dir to %s' % config_dir)
log.info('Python version is %s - %s' % (sys.version, sys.hexversion))
cloudmunch.config.config_dir = config_dir
cloudmunch.config.credentials = cloudmunch.config.Credentials(cloudmunch.config.config_dir)
cloudmunch.config.config = cloudmunch.config.Config(cloudmunch.config.config_dir)
开发者ID:cloudmunch,项目名称:cloudmunch-cli,代码行数:33,代码来源:cli.py
示例16: reader
def reader(username, password, url, cmd, timeout = 10):
""" Create a connection and return response if valid """
headers = {'content-type': 'application/json-rpc'}
payload = [{"jsonrpc": "2.0",
"method": "cli",
"params": {"cmd": cmd,
"version": 1},
"id": 1}
]
try:
urllib3.disable_warnings()
requests.packages.urllib3.disable_warnings()
response = requests.post(url, data=json.dumps(payload), headers=headers, auth=(username, password), verify=False, timeout=timeout).json()
if _isjson(response):
if _hasValidParams(response):
return response
else:
nagios_msg(3, "cisco command: " + response['error']['message'] )
else:
nagios_msg(3, "invalid or empty JSON object")
except requests.exceptions.RequestException as e:
nagios_msg(3, "Connection error: " + str(e))
开发者ID:jpajicek,项目名称:nagios,代码行数:26,代码来源:utils.py
示例17: __init__
def __init__(
self, client_service, keyfile, keytab, server, realm,
ldap_uri=None, auth_type=None):
self.client_service = client_service
self.keytab = keytab
# Init creds immediately to make sure they are valid. Creds
# can also be re-inited by _auth_header to avoid expiry.
#
self.creds = self.init_creds()
self.service_name = gssapi.Name('[email protected]%s' % (server,),
gssapi.NameType.hostbased_service)
self.server = server
self.ikk = IPAKEMKeys({'server_keys': keyfile, 'ldap_uri': ldap_uri})
self.kemcli = KEMClient(self._server_keys(server, realm),
self._client_keys())
self.keystore = self._keystore(realm, ldap_uri, auth_type)
# FIXME: Remove warnings about missing subjAltName for the
# requests module
urllib3.disable_warnings()
开发者ID:encukou,项目名称:freeipa,代码行数:25,代码来源:client.py
示例18: cached_download
def cached_download(latitude, longitude, radius):
cache_file_name = 'raw_data/mcdonalds_%s_%s_%d.json' % (latitude, longitude, radius)
print('cache_file_name = ' + cache_file_name)
url = "https://www.mcdonalds.com/googleapps/GoogleRestaurantLocAction.do?method=searchLocation&latitude=%s&longitude=%s&radius=%d&maxResults=100&country=ca&language=en-ca"% (latitude, longitude, radius)
print('downloading from ' + url)
if os.path.isfile(cache_file_name):
with open(cache_file_name, 'rb') as cache_file:
print('retrieved from cache')
return json.load(cache_file)
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
headers = {
'accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
'accept-Encoding': 'gzip, deflate, br',
'accept-Language': 'en-US,en;q=0.5',
'connection': 'keep-alive',
'DNT': '1',
'Host': 'www.mcdonalds.com',
'upgrade-insecure-requests': '1',
'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:61.0) Gecko/20100101 Firefox/61.0',
}
response = requests.get(url, headers=headers, verify=False)
response = response.json()
with open(cache_file_name, 'w') as cache_file:
json.dump(response, cache_file)
return response
开发者ID:hhaccessibility,项目名称:hhaccessibility.github.io,代码行数:27,代码来源:downloader.py
示例19: get_external_ip_address
def get_external_ip_address():
"""
Get the IP address of this machine as seen from the outside world. THis
function is primarily used during various internal testing of the Yombo
Gateway. This information is reported to the Yombo Service, however, the
Yombo Service already knows you're IP address during the process of
downloading configuration files.
Yombo servers will only use this information if server "getpeer.ip()" function
results in a private IP address. See: http://en.wikipedia.org/wiki/Private_network
This assists in Yombo performing various tests internally, but still providing
an ability to do further tests.
Gives Yombo servers a hint of your external ip address from your view. This
should be the same as what Yombo servers see when you connect.
This is called only once during the startup phase. Calling this function too
often can result in the gateway being blocked by whatismyip.org
.. warning::
This is 100% blocking code. A replacement will be coming soon.
:return: An ip address
:rtype: string
"""
import urllib3
urllib3.disable_warnings() # just getting client IP address from outside view. Not a big issue here.
http = urllib3.PoolManager()
r = http.request("GET", "https://yombo.net/tools/clientip.php")
return r.data.strip()
开发者ID:yombo,项目名称:yombo-gateway,代码行数:31,代码来源:__init__.py
示例20: _doPushToHCP
def _doPushToHCP(self,hcpDeviceId,messageTypeIdToDevice,payload,dummyMode=False):
try:
urllib3.disable_warnings()
except:
print("urllib3.disable_warnings() failed - get a recent enough urllib3 version to avoid potential InsecureRequestWarning warnings! Will continue though.")
# use with or without proxy
if (config.proxy_url == ''):
http = urllib3.PoolManager()
else:
http = urllib3.proxy_from_url(config.proxy_url)
push_url='https://iotmms' + config.hcp_account_id + config.hcp_landscape_host + '/com.sap.iotservices.mms/v1/api/http/push/' + str(hcpDeviceId)
self.info('push url:\n ' + push_url)
# use with authentication
headers = urllib3.util.make_headers(user_agent=None)
headers['Authorization'] = config.hcp_authorization_header
headers['Content-Type'] = 'application/json;charset=utf-8'
bodyObj = {
'method' : self.method,
'sender' : self.name,
'messageType' : messageTypeIdToDevice,
'messages' : [payload]
}
body = json.dumps(bodyObj)
self.info('message body:\n ' + body)
if dummyMode:
#self.info("Dummy mode is active, not pushing anything to HCP.")
return
try:
r = http.urlopen('POST', push_url, body=body, headers=headers)
self.info('push_to_hcp(): ' + str(r.status) + ' ' + r.data)
except Exception as e:
self.info(str(e))
开发者ID:asumansuenbuel,项目名称:hcp_iot_simulator,代码行数:35,代码来源:push_service.py
注:本文中的urllib3.disable_warnings函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论