本文整理汇总了Python中urllib3.util.parse_url函数的典型用法代码示例。如果您正苦于以下问题:Python parse_url函数的具体用法?Python parse_url怎么用?Python parse_url使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了parse_url函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: test_explicit
def test_explicit(self):
"""should support explicit """
result = uploader.explicit("cloudinary", type="twitter_name", eager=[TEST_TRANS_SCALE2_PNG], tags=[UNIQUE_TAG])
params = dict(TEST_TRANS_SCALE2_PNG, type="twitter_name", version=result["version"])
url = utils.cloudinary_url("cloudinary", **params)[0]
actual = result["eager"][0]["url"]
self.assertEqual(parse_url(actual).path, parse_url(url).path)
开发者ID:cloudinary,项目名称:pycloudinary,代码行数:7,代码来源:test_uploader.py
示例2: test_explicit
def test_explicit(self):
"""should support explicit """
result = uploader.explicit("cloudinary", type="twitter_name",
eager=[dict(crop="scale", width="2.0")], tags=[UNIQUE_TAG])
url = utils.cloudinary_url("cloudinary", type="twitter_name", crop="scale", width="2.0", format="png",
version=result["version"])[0]
actual = result["eager"][0]["url"]
self.assertEqual(parse_url(actual).path, parse_url(url).path)
开发者ID:UKTV,项目名称:pycloudinary,代码行数:8,代码来源:uploader_test.py
示例3: urlparsing
def urlparsing(value):
try:
loc = parse_url(value)
except LocationParseError as error:
return None, None, None
return is_secure(loc.scheme), loc.host, loc.port
开发者ID:Grindizer,项目名称:mist,代码行数:7,代码来源:utils.py
示例4: get_connection
def get_connection(self, url, proxies=None):
"""Returns a urllib3 connection for the given URL. This should not be
called from user code, and is only exposed for use when subclassing the
:class:`HTTPAdapter <requests.adapters.HTTPAdapter>`.
:param url: The URL to connect to.
:param proxies: (optional) A Requests-style dictionary of proxies used on this request.
:rtype: urllib3.ConnectionPool
"""
proxy = select_proxy(url, proxies)
if proxy:
proxy = prepend_scheme_if_needed(proxy, "http")
proxy_url = parse_url(proxy)
if not proxy_url.host:
raise InvalidProxyURL(
"Please check proxy URL. It is malformed"
" and could be missing the host."
)
proxy_manager = self.proxy_manager_for(proxy)
conn = proxy_manager.connection_from_url(url)
else:
# Only scheme should be lower case
parsed = urlparse(url)
url = parsed.geturl()
conn = self.poolmanager.connection_from_url(url)
return conn
开发者ID:d9pouces,项目名称:PolyArchiv,代码行数:28,代码来源:adapters.py
示例5: _parse_connection_properties
def _parse_connection_properties(self, host, port, username, password, use_ssl):
hosts_list = []
if isinstance(host, str):
# Force to a list, split on ',' if multiple
host = host.split(',')
for entity in host:
# Loop over the hosts and parse connection properties
host_properties = {}
parsed_uri = parse_url(entity)
host_properties['host'] = parsed_uri.host
if parsed_uri.port is not None:
host_properties['port'] = parsed_uri.port
else:
host_properties['port'] = port
if parsed_uri.scheme == 'https' or use_ssl is True:
host_properties['use_ssl'] = True
if parsed_uri.auth is not None:
host_properties['http_auth'] = parsed_uri.auth
elif username is not None:
if password is None or password == 'PROMPT':
password = getpass.getpass()
host_properties['http_auth'] = (username, password)
hosts_list.append(host_properties)
return hosts_list
开发者ID:evanfreed,项目名称:elasticstat,代码行数:30,代码来源:elasticstat.py
示例6: uris
def uris(rabbit_config):
amqp_uri = rabbit_config["AMQP_URI"]
scheme, auth, host, port, path, _, _ = parse_url(amqp_uri)
bad_port = Url(scheme, auth, host, port + 1, path).url
bad_user = Url(scheme, "invalid:invalid", host, port, path).url
bad_vhost = Url(scheme, auth, host, port, "/unknown").url
return {"good": amqp_uri, "bad_port": bad_port, "bad_user": bad_user, "bad_vhost": bad_vhost}
开发者ID:koenvo,项目名称:nameko,代码行数:7,代码来源:test_amqp.py
示例7: read_articles
def read_articles():
'''
read all articles as dataframe from mongodb collection 'articles'
- INPUT: None
- OUTPUT: df. columns: title, url, uri, body_text,
'''
my_mongo = MyMongo()
t0 = time.time()
cur_articles = my_mongo.get_article_body_text(testing=0)
articles_cleaned = {}
# print '%d unique articles ' % len(articles_cleaned)
clean_articles(cur_articles, articles_cleaned)
print '%d unique articles with body_text' % len(articles_cleaned)
t1 = time.time() # time it
print "finished in %4.4fmin for %s " % ((t1 - t0) / 60, 'read/clean articles')
df = pd.DataFrame([{'url': k, 'body_text': v[1]}
for k, v in articles_cleaned.items()])
article_dict, article_dt = MyMongo().get_article_attri()
#article_dict_all = dict(article_dict)
df['title'] = df['url'].map(lambda x: article_dict.get(x, 'Unknown'))
df['uri'] = df['url'].map(lambda x: parse_url(x).host)
df['dt'] = df['url'].map(lambda x: article_dt.get(x, ''))
my_mongo.close()
return df
开发者ID:joyce-duan,项目名称:All-Things-Data-Science,代码行数:30,代码来源:topic_modeling.py
示例8: test_parse_url
def test_parse_url(self):
url_host_map = {
'http://google.com/mail': Url('http', host='google.com', path='/mail'),
'http://google.com/mail/': Url('http', host='google.com', path='/mail/'),
'google.com/mail': Url(host='google.com', path='/mail'),
'http://google.com/': Url('http', host='google.com', path='/'),
'http://google.com': Url('http', host='google.com'),
'http://google.com?foo': Url('http', host='google.com', path='', query='foo'),
# Path/query/fragment
'': Url(),
'/': Url(path='/'),
'?': Url(path='', query=''),
'#': Url(path='', fragment=''),
'#?/!google.com/?foo#bar': Url(path='', fragment='?/!google.com/?foo#bar'),
'/foo': Url(path='/foo'),
'/foo?bar=baz': Url(path='/foo', query='bar=baz'),
'/foo?bar=baz#banana?apple/orange': Url(path='/foo', query='bar=baz', fragment='banana?apple/orange'),
# Port
'http://google.com/': Url('http', host='google.com', path='/'),
'http://google.com:80/': Url('http', host='google.com', port=80, path='/'),
'http://google.com:/': Url('http', host='google.com', path='/'),
'http://google.com:80': Url('http', host='google.com', port=80),
'http://google.com:': Url('http', host='google.com'),
# Auth
'http://foo:[email protected]/': Url('http', auth='foo:bar', host='localhost', path='/'),
'http://[email protected]/': Url('http', auth='foo', host='localhost', path='/'),
'http://foo:[email protected]@localhost/': Url('http', auth='foo:[email protected]', host='localhost', path='/'),
'http://@': Url('http', host=None, auth='')
}
for url, expected_url in url_host_map.items():
returned_url = parse_url(url)
self.assertEqual(returned_url, expected_url)
开发者ID:iyed,项目名称:urllib3,代码行数:35,代码来源:test_util.py
示例9: check_vul
def check_vul(url):
"""
Test if a GET to a URL is successful
:param url: The URL to test
:return: A dict with the exploit type as the keys, and the HTTP status code as the value
"""
if gl_args.mode == 'auto-scan' or gl_args.mode == 'file-scan':
timeout = Timeout(connect=1.0, read=3.0)
pool = PoolManager(timeout=timeout, retries=1, cert_reqs='CERT_NONE')
else:
timeout = Timeout(connect=3.0, read=6.0)
pool = PoolManager(timeout=timeout, cert_reqs='CERT_NONE')
url_check = parse_url(url)
if '443' in str(url_check.port) and url_check.scheme != 'https':
url = "https://"+str(url_check.host)+":"+str(url_check.port)
print(GREEN + "\n ** Checking Host: %s **\n" % url)
headers = {"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
"Connection": "keep-alive",
"User-Agent": user_agents[randint(0, len(user_agents) - 1)]}
paths = {"jmx-console": "/jmx-console/HtmlAdaptor?action=inspectMBean&name=jboss.system:type=ServerInfo",
"web-console" : "/web-console/ServerInfo.jsp",
"JMXInvokerServlet": "/invoker/JMXInvokerServlet",
"admin-console" : "/admin-console/"}
for i in paths.keys():
if gl_interrupted: break
try:
print(GREEN + " * Checking %s: \t" % i + ENDC),
r = pool.request('HEAD', url +str(paths[i]), redirect=False, headers=headers)
paths[i] = r.status
# check if it's false positive
if len(r.getheaders()) == 0:
print(RED + "[ ERROR ]\n * The server %s is not an HTTP server.\n" % url + ENDC)
paths = {"jmx-console": 505,
"web-console": 505,
"JMXInvokerServlet": 505,
"admin-console": 505}
break
if paths[i] in (301, 302, 303, 307, 308):
url_redirect = r.get_redirect_location()
print(GREEN + "[ REDIRECT ]\n * The server sent a redirect to: %s\n" % url_redirect)
elif paths[i] == 200 or paths[i] == 500:
if i == "admin-console":
print(RED + "[ EXPOSED ]" + ENDC)
else:
print(RED + "[ VULNERABLE ]" + ENDC)
else:
print(GREEN + "[ OK ]")
except:
print(RED + "\n * An error occurred while connecting to the host %s\n" % url + ENDC)
paths[i] = 505
return paths
开发者ID:lijiu1002,项目名称:jexboss,代码行数:60,代码来源:jexboss.py
示例10: test_parse_url
def test_parse_url(self):
url_host_map = {
"http://google.com/mail": Url("http", host="google.com", path="/mail"),
"http://google.com/mail/": Url("http", host="google.com", path="/mail/"),
"google.com/mail": Url(host="google.com", path="/mail"),
"http://google.com/": Url("http", host="google.com", path="/"),
"http://google.com": Url("http", host="google.com"),
"http://google.com?foo": Url("http", host="google.com", path="", query="foo"),
# Path/query/fragment
"": Url(),
"/": Url(path="/"),
"?": Url(path="", query=""),
"#": Url(path="", fragment=""),
"#?/!google.com/?foo#bar": Url(path="", fragment="?/!google.com/?foo#bar"),
"/foo": Url(path="/foo"),
"/foo?bar=baz": Url(path="/foo", query="bar=baz"),
"/foo?bar=baz#banana?apple/orange": Url(path="/foo", query="bar=baz", fragment="banana?apple/orange"),
# Port
"http://google.com/": Url("http", host="google.com", path="/"),
"http://google.com:80/": Url("http", host="google.com", port=80, path="/"),
"http://google.com:/": Url("http", host="google.com", path="/"),
"http://google.com:80": Url("http", host="google.com", port=80),
"http://google.com:": Url("http", host="google.com"),
# Auth
"http://foo:[email protected]/": Url("http", auth="foo:bar", host="localhost", path="/"),
"http://[email protected]/": Url("http", auth="foo", host="localhost", path="/"),
"http://foo:[email protected]@localhost/": Url("http", auth="foo:[email protected]", host="localhost", path="/"),
}
for url, expected_url in url_host_map.items():
returned_url = parse_url(url)
self.assertEquals(returned_url, expected_url)
开发者ID:ravello,项目名称:urllib3,代码行数:31,代码来源:test_util.py
示例11: prepare_url
def prepare_url(self, url, params):
"""Prepares the given HTTP URL."""
#: Accept objects that have string representations.
try:
url = unicode(url)
except NameError:
# We're on Python 3.
url = str(url)
except UnicodeDecodeError:
pass
# Support for unicode domain names and paths.
scheme, auth, host, port, path, query, fragment = parse_url(url)
if not scheme:
raise MissingSchema("Invalid URL %r: No schema supplied" % url)
if not host:
raise InvalidURL("Invalid URL %r: No host supplied" % url)
# Only want to apply IDNA to the hostname
try:
host = host.encode('idna').decode('utf-8')
except UnicodeError:
raise InvalidURL('URL has an invalid label.')
# Carefully reconstruct the network location
netloc = auth or ''
if netloc:
netloc += '@'
netloc += host
if port:
netloc += ':' + str(port)
# Bare domains aren't valid URLs.
if not path:
path = '/'
if is_py2:
if isinstance(scheme, str):
scheme = scheme.encode('utf-8')
if isinstance(netloc, str):
netloc = netloc.encode('utf-8')
if isinstance(path, str):
path = path.encode('utf-8')
if isinstance(query, str):
query = query.encode('utf-8')
if isinstance(fragment, str):
fragment = fragment.encode('utf-8')
enc_params = self._encode_params(params)
if enc_params:
if query:
query = '%s&%s' % (query, enc_params)
else:
query = enc_params
url = requote_uri(urlunparse([scheme, netloc, path, None, query, fragment]))
self.url = url
开发者ID:palladius,项目名称:facebook-sdk,代码行数:59,代码来源:models.py
示例12: getPPDURL
def getPPDURL(self, source_url):
"""
Downloads the source_url, stores it locally and returns the local URL
:param source_url: remote PPD URL
:return: local URL to the cached PPD
"""
source = parse_url(source_url)
host = source.host
if host is None or host == "localhost":
# no host: we assume that the PPD can be found on the current active master backend
with make_session() as session:
# get any other registered backend
master_backend = session.query(RegisteredBackend) \
.filter(RegisteredBackend.uuid != self.env.core_uuid,
RegisteredBackend.type == BackendTypes.active_master).first()
if master_backend is None:
self.log.error(C.make_error("NO_MASTER_BACKEND_FOUND"))
return source_url
# Try to log in with provided credentials
url = parse_url(master_backend.url)
host = url.host
# check if file exists locally
rel_path = source.path[1:] if source.path.startswith("/") else source.path
local_path = path.join(self.ppd_dir, host, rel_path)
if not path.exists(local_path):
# cache locally
try:
r = requests.get(source_url)
if r.ok:
local_dir = path.dirname(local_path)
if not path.exists(local_dir):
makedirs(local_dir)
with open(local_path, "w") as f:
f.write(r.text)
else:
self.log.error("requesting PPD from %s failed with status code: %s" % (source_url, r.status_code))
return source_url
except requests.exceptions.ConnectionError as e:
self.log.error("requesting PPD from %s failed with error: %s" % (source_url, str(e)))
return source_url
return "%s%s/%s" % (self.base_url, host, rel_path)
开发者ID:gonicus,项目名称:gosa,代码行数:45,代码来源:ppd_proxy.py
示例13: test_image_field
def test_image_field(self):
field = Poll.objects.get(question="with image")
self.assertIsNotNone(field)
self.assertEqual(field.image.public_id, API_TEST_ID)
self.assertEqual(
parse_url(field.image.url).path,
"/{cloud}/image/upload/v1234/{name}.jpg".format(cloud=cloudinary.config().cloud_name, name=API_TEST_ID)
)
self.assertTrue(False or field.image)
开发者ID:cloudinary,项目名称:pycloudinary,代码行数:9,代码来源:test_cloudinaryField.py
示例14: test_bad_vhost
def test_bad_vhost(rabbit_config):
scheme, auth, host, port, path, _, _ = parse_url(rabbit_config['AMQP_URI'])
amqp_uri = Url(scheme, auth, host, port, '/unknown').url
with pytest.raises(IOError) as exc_info:
verify_amqp_uri(amqp_uri)
message = str(exc_info.value)
assert 'Error connecting to broker' in message
assert 'invalid or unauthorized vhost' in message
开发者ID:davidszotten,项目名称:nameko,代码行数:9,代码来源:test_utils.py
示例15: test_bad_user
def test_bad_user(rabbit_config):
scheme, auth, host, port, path, _, _ = parse_url(rabbit_config['AMQP_URI'])
amqp_uri = Url(scheme, 'invalid:invalid', host, port, path).url
with pytest.raises(IOError) as exc_info:
verify_amqp_uri(amqp_uri)
message = str(exc_info.value)
assert 'Error connecting to broker' in message
assert 'invalid credentials' in message
开发者ID:davidszotten,项目名称:nameko,代码行数:9,代码来源:test_utils.py
示例16: test_netloc
def test_netloc(self):
url_netloc_map = {
'http://google.com/mail': 'google.com',
'http://google.com:80/mail': 'google.com:80',
'google.com/foobar': 'google.com',
'google.com:12345': 'google.com:12345',
}
for url, expected_netloc in url_netloc_map.items():
self.assertEqual(parse_url(url).netloc, expected_netloc)
开发者ID:iyed,项目名称:urllib3,代码行数:10,代码来源:test_util.py
示例17: test_netloc
def test_netloc(self):
url_netloc_map = {
"http://google.com/mail": "google.com",
"http://google.com:80/mail": "google.com:80",
"google.com/foobar": "google.com",
"google.com:12345": "google.com:12345",
}
for url, expected_netloc in url_netloc_map.items():
self.assertEquals(parse_url(url).netloc, expected_netloc)
开发者ID:ravello,项目名称:urllib3,代码行数:10,代码来源:test_util.py
示例18: pixel
def pixel(path_domain=""):
seen = {}
sites = request.cookies.site
sites = sites.replace('"', '')
for c in sites.split(' '):
if '.' in c:
seen[c] = True
ref_domain = parse_url(request.get_header('Referer')).host
req_domain = parse_url(request.url).host
if ref_domain and ref_domain != req_domain:
seen[ref_domain] = True
try:
del(seen['ad.aloodo.com'])
except KeyError:
pass
cdata = ' '.join(seen.keys())
if cdata:
response.set_header('Set-Cookie',
'site="%s"; Max-Age=31536000; Path=/' % cdata)
response.status=200
response.set_header('Tk', 'D')
accept = request.get_header('Accept')
if not "image" in accept and "text/html" in accept:
response.set_header('Content-Type', 'text/html')
return template('info',
req_headers=format_headers(request.headers),
res_headers=format_headers(response.headers),
req_url=request.url)
else:
response.set_header('Content-Type', 'image/png')
if len(seen) >= 3 or path_domain == ref_domain:
expdt = datetime.now() + timedelta(days=7)
exp = mktime(expdt.timetuple())
response.set_header('Expires', formatdate(
timeval=exp, localtime=False, usegmt=True))
return buf
开发者ID:Aloodo,项目名称:ad.aloodo.com,代码行数:42,代码来源:pixel.py
示例19: get_url_data
def get_url_data(serviceurl, params=None):
"""
:param serviceurl: url to retrieve data
:param params: http://docs.python-requests.org/en/master/user/quickstart/#passing-parameters-in-urls
:return: json url_data
"""
# Get data from the url
# Support https without verification of certificate
# req = requests.get(serviceurl, verify=False, params=params)
cnt = 0
max_retry = 3
purl = parse_url(serviceurl)
if purl.auth:
username = purl.auth.split(':')[0]
password = purl.auth.split(':')[1]
else:
username = None
password = None
# Add url like http://host
burl = '{}://{}'.format(purl.scheme, purl.host)
if purl.port:
# Add port like: http://host:8080
burl += ':{}'.format(purl.port)
if purl.request_uri:
# Add path and query like: http://host:8080/path/uri?query
burl += '{}'.format(purl.request_uri)
while cnt < max_retry:
try:
req = requests.get(burl, verify=False, params=params, timeout=timeout, auth=(username, password))
if req.json():
return req.json()
elif req.from_cache:
# Clear cache to retry again
requests_cache.clear()
req = requests.get(burl, verify=False, params=params, timeout=timeout, auth=(username, password))
if req.json():
return req.json()
else:
# Raise a custom exception
raise ValueError('No data from response')
except requests.exceptions.RequestException as e:
time.sleep(2 ** cnt)
cnt += 1
if cnt >= max_retry:
raise e
data = req.json()
return data
开发者ID:pablodav,项目名称:burp_server_reports,代码行数:54,代码来源:urlget.py
示例20: test_request_uri
def test_request_uri(self):
url_host_map = {
"http://google.com/mail": "/mail",
"http://google.com/mail/": "/mail/",
"http://google.com/": "/",
"http://google.com": "/",
"": "/",
"/": "/",
"?": "/?",
"#": "/",
"/foo?bar=baz": "/foo?bar=baz",
}
for url, expected_request_uri in url_host_map.items():
returned_url = parse_url(url)
self.assertEquals(returned_url.request_uri, expected_request_uri)
开发者ID:ravello,项目名称:urllib3,代码行数:15,代码来源:test_util.py
注:本文中的urllib3.util.parse_url函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论