本文整理汇总了Python中urllib.parse.parse_qs函数的典型用法代码示例。如果您正苦于以下问题:Python parse_qs函数的具体用法?Python parse_qs怎么用?Python parse_qs使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了parse_qs函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: extract_youtube_id
def extract_youtube_id(url):
if not url.startswith("http://") and not url.startswith("https://"):
url = "http://" + url
# give credit where credit is due: http://stackoverflow.com/a/7936523
query = up.urlparse(url)
if query.hostname == 'youtu.be':
# http://youtu.be/YOUTUBE-ID&key_n=val_n
return query.path[1:]
if query.hostname == 'm.youtube.com':
if not query.query:
# http://m.youtube.com/#/watch?v=YOUTUBE-ID&key_n=val_n
p = up.parse_qs(query.fragment)
return p['/watch?v'][0]
else:
# http://m.youtube.com/watch?v=YOUTUBE-ID&key_n=val_n
p = up.parse_qs(query.query)
return p['v'][0]
if query.hostname in ('www.youtube.com', 'youtube.com'):
if query.path == '/watch':
# http://www.youtube.com/watch?v=YOUTUBE-ID&key_n=val_n
p = up.parse_qs(query.query)
return p['v'][0]
if query.path[:7] == '/embed/':
# http://www.youtube.com/embed/YOUTUBE-ID&key_n=val_n
return query.path.split('/')[2]
if query.path[:3] == '/v/':
# http://www.youtube.com/v/YOUTUBE-ID?key_n=val_n
return query.path.split('/')[2]
return CommentFetcher.INVALID_ID
开发者ID:ktht,项目名称:Pyt-comments,代码行数:31,代码来源:CommentFetcher.py
示例2: parse_request
def parse_request(self, req):
"""
Method to parse request and set to class request attributes
- self.path - url path "path/to/something"
- self.method - http method type
- self.headers - http headers
- self.body - request body data
- self.query_data - request query data
:param req: request string
"""
headers = {}
lines = req.splitlines()
in_body = False
body = ''
for line in lines[1:]:
if line.strip() == "":
in_body = True
if in_body:
body += line
else:
k, v = line.split(":", 1)
headers[k.strip()] = v.strip()
method, path, _ = lines[0].split()
self.path = path.lstrip("/")
self.method = method
self.headers = headers
self.body = parse_qs(body)
if '?' in path:
self.path, query_string = self.path.split("?")
self.query_data = parse_qs(query_string)
开发者ID:StorjOld,项目名称:metatool,代码行数:31,代码来源:testing_server.py
示例3: test_flow
def test_flow(self):
url = self.sp.make_auth_req()
status, headers, _ = self.getPage(url)
assert status == '303 See Other'
url = self.get_redirect_location(headers)
req = parse_qs(urlsplit(url).query)
assert 'SAMLRequest' in req
assert 'RelayState' in req
action, body = self.idp.handle_auth_req(req['SAMLRequest'][0],
req['RelayState'][0],
BINDING_HTTP_REDIRECT,
'test1')
status, headers, body = self.getPage(action, method='POST',
body=urlencode(body))
assert status == '302 Found'
url = self.get_redirect_location(headers)
req = parse_qs(urlsplit(url).query)
assert 'SAMLResponse' in req
assert 'RelayState' in req
resp = self.sp.parse_authn_request_response(req['SAMLResponse'][0],
BINDING_HTTP_REDIRECT)
identity = resp.ava
assert identity["displayName"][0] == "Test1"
assert identity["sn"][0] == "[email protected]"
assert identity['o'][0] == "Small university"
开发者ID:ibrsp,项目名称:s2sproxy,代码行数:28,代码来源:test_proxy_server.py
示例4: response_content
def response_content(self, url, request):
''' Fake HTTP responses for use with HTTMock in tests.
'''
scheme, host, path, _, query, _ = urlparse(url.geturl())
data_dirname = join(dirname(__file__), 'data')
local_path = None
if (host, path) == ('www.carsonproperty.info', '/ArcGIS/rest/services/basemap/MapServer/1/query'):
qs = parse_qs(query)
body_data = parse_qs(request.body) if request.body else {}
if qs.get('returnIdsOnly') == ['true']:
local_path = join(data_dirname, 'us-ca-carson-ids-only.json')
elif body_data.get('outSR') == ['4326']:
local_path = join(data_dirname, 'us-ca-carson-0.json')
if (host, path) == ('www.carsonproperty.info', '/ArcGIS/rest/services/basemap/MapServer/1'):
qs = parse_qs(query)
if qs.get('f') == ['json']:
local_path = join(data_dirname, 'us-ca-carson-metadata.json')
if local_path:
type, _ = guess_type(local_path)
with open(local_path, 'rb') as file:
return response(200, file.read(), headers={'Content-Type': type})
raise NotImplementedError(url.geturl())
开发者ID:kreed,项目名称:machine,代码行数:28,代码来源:util.py
示例5: setup_oauth
def setup_oauth():
"""Authorize your app via identifier."""
# Request token
oauth = OAuth1(CONSUMER_KEY, client_secret=CONSUMER_SECRET)
r = requests.post(url=REQUEST_TOKEN_URL, auth=oauth)
credentials = parse_qs(r.content)
print (credentials)
print
#gets the oauth token from the credentials and slice it.
oauthToken = (str(credentials[b'oauth_token']))
oauthToken = oauthToken[3:len(oauthToken)-2]
print (oauthToken)
#gets the oauth secret from the credentials and slice it.
oauthSecret = (str(credentials[b'oauth_token_secret']))
oauthSecret = oauthSecret[3:len(oauthSecret)-2]
print (oauthSecret)
resource_owner_key = oauthToken
"""credentials.get('oauth_token')"""
resource_owner_secret = oauthSecret
"""credentials.get('oauth_token_secret')"""
# Authorize
authorize_url = AUTHORIZE_URL + str(resource_owner_key)
print ('Please go here and authorize: ' + authorize_url)
verifier = input('Please input the verifier: ')
oauth = OAuth1(CONSUMER_KEY,
client_secret=CONSUMER_SECRET,
resource_owner_key=resource_owner_key,
resource_owner_secret=resource_owner_secret,
verifier=verifier)
print ("")
print ("")
print
# Finally, Obtain the Access Token
r = requests.post(url=ACCESS_TOKEN_URL, auth=oauth)
credentials = parse_qs(r.content)
print (credentials)
accessToken = (str(credentials[b'oauth_token']))
accessToken = accessToken[3:len(accessToken)-2]
print (accessToken)
#gets the oauth secret from the credentials and slice it.
accessSecret = (str(credentials[b'oauth_token_secret']))
accessSecret = accessSecret[3:len(accessSecret)-2]
token = accessToken
secret = accessSecret
return token, secret
开发者ID:jmdranola,项目名称:twitter-python,代码行数:60,代码来源:slackers_twitterbot.py
示例6: decrypt_request
def decrypt_request(self, request, action="GET"):
"""
Decrypts the request and extracts these information:
:return path: full path without host:port (first and last / are removed)
:return parts: list of query parts
:return in_data: json object of the associated request data
"""
o = urlparse.urlparse(request.get_path())
path = o.path
query = o.query
# prepare query path: remove first and last '/' if exists
if path[0] == '/':
path = path[1:]
if path[-1] == '/':
path = path[:-1]
parts = str(path).split('/')
in_data = None
if action == "GET":
in_data = urlparse.parse_qs(query, keep_blank_values=True)
else:
data = request.read_data()
if data != None:
in_data = json.loads(str(data))
else:
in_data = urlparse.parse_qs(query, keep_blank_values=True)
#print(json.dumps(in_data, sort_keys=False, indent=4, separators=(',', ': ')))
return (path, parts, in_data)
开发者ID:drem-darios,项目名称:cohorte-platforms,代码行数:30,代码来源:admin.py
示例7: testLoginIsPassive
def testLoginIsPassive(self):
"""
Tests the login method of the OneLogin_Saml2_Auth class
Case Logout with no parameters. A AuthN Request is built with IsPassive and redirect executed
"""
settings_info = self.loadSettingsJSON()
return_to = u'http://example.com/returnto'
settings_info['idp']['singleSignOnService']['url']
auth = OneLogin_Saml2_Auth(self.get_request(), old_settings=settings_info)
target_url = auth.login(return_to)
parsed_query = parse_qs(urlparse(target_url)[4])
sso_url = settings_info['idp']['singleSignOnService']['url']
self.assertIn(sso_url, target_url)
self.assertIn('SAMLRequest', parsed_query)
request = compat.to_string(OneLogin_Saml2_Utils.decode_base64_and_inflate(parsed_query['SAMLRequest'][0]))
self.assertNotIn('IsPassive="true"', request)
auth_2 = OneLogin_Saml2_Auth(self.get_request(), old_settings=settings_info)
target_url_2 = auth_2.login(return_to, False, False)
parsed_query_2 = parse_qs(urlparse(target_url_2)[4])
self.assertIn(sso_url, target_url_2)
self.assertIn('SAMLRequest', parsed_query_2)
request_2 = compat.to_string(OneLogin_Saml2_Utils.decode_base64_and_inflate(parsed_query_2['SAMLRequest'][0]))
self.assertNotIn('IsPassive="true"', request_2)
auth_3 = OneLogin_Saml2_Auth(self.get_request(), old_settings=settings_info)
target_url_3 = auth_3.login(return_to, False, True)
parsed_query_3 = parse_qs(urlparse(target_url_3)[4])
self.assertIn(sso_url, target_url_3)
self.assertIn('SAMLRequest', parsed_query_3)
request_3 = compat.to_string(OneLogin_Saml2_Utils.decode_base64_and_inflate(parsed_query_3['SAMLRequest'][0]))
self.assertIn('IsPassive="true"', request_3)
开发者ID:FancyFane,项目名称:python3-saml,代码行数:33,代码来源:auth_test.py
示例8: serialize_raw_response
def serialize_raw_response(self):
"""
"""
response = self.raw_reponse
headers = response.headers
if 'json' in headers['content-type']:
result = response.json()
elif 'image/' in headers['content-type']:
mimetype = headers['content-type']
result = {"data": getattr(response, 'content', {}),
"mime-type": mimetype,
"url": response.url}
elif "access_token" in parse_qs(response.text):
query_str = parse_qs(response.text)
if "access_token" in query_str:
result = {"access_token": query_str["access_token"][0]}
if "expires" in query_str:
result["expires"] = query_str["expires"][0]
else:
pass
raise GraphAPIError(response.json())
else:
raise GraphAPIError('Maintype was not text, image, or querystring')
self._response = result
开发者ID:yuri1992,项目名称:LikeStatsDjango,代码行数:26,代码来源:facebook_request.py
示例9: __parse_phone
def __parse_phone(self, path):
""" Parse /phone requests. """
if self.path == "/phone/reset":
self.server.window.reset_phone()
return self.REPLY_OK
if self.path.startswith("/phone/add"):
url = urlparse.urlparse(self.path)
params = urlparse.parse_qs(url.query)
if "number" not in params:
return "need-number"
if "name" not in params:
return "need-name"
self.server.db.addPhoneNumber(params["number"][0], params["name"][0])
self.server.conn.phone.reload_users()
return self.REPLY_OK
if self.path.startswith("/phone/del"):
url = urlparse.urlparse(self.path)
params = urlparse.parse_qs(url.query)
if "number" not in params:
return "need-number"
self.server.db.removePhoneNumber(params["number"][0])
self.server.conn.phone.reload_users()
return self.REPLY_OK
if self.path == "/phone/book":
return json.dumps(self.server.db.getPhoneBook(), indent=4)
return self.REPLY_ERR
开发者ID:petemadsen,项目名称:awShutters,代码行数:32,代码来源:my_http.py
示例10: test_correct_url_returned
def test_correct_url_returned():
url = 'www.mysite.com?utm_source=source&utm_medium=medium&utm_campaign=campaign&utm_content=content&utm_term=term'
composed_url = url_utm_ga('www.mysite.com', 'source', 'medium', 'campaign',
'content', 'term')
parsed = urlparse(url)
assert parse_qs(parsed.query) == parse_qs(urlparse(composed_url).query)
开发者ID:eliasdabbas,项目名称:advertools,代码行数:7,代码来源:test_url_builders.py
示例11: process_command
def process_command(self):
self.url = urlparse.urlparse(self.path)
self.reqpath = urlparse.unquote(self.url.path).decode('utf-8')
self.query = urlparse.parse_qs(self.url.query)
q = urlparse.parse_qs(self.url.query)
self.query = {}
for n in q:
self.query[n.decode('utf-8')] = [y.decode('utf-8') for y in q[n]]
self.process_ipv6_normalize()
self.process_xff()
self.dump_req()
self.sql_conn = sql.UnladenSqlConn(self.server.sql_engine)
for handler_name in self.server.config['httpd']['handlers']:
if handler_name in self.handler_modules:
handler_module = self.handler_modules['handler_name']
else:
handler_module = __import__('unladen.httpd.handlers.%s' % handler_name, fromlist=['unladen.httpd.handlers'])
self.handler_modules['handler_name'] = handler_module
handler_claimed = False
try:
handler_instance = handler_module.UnladenRequestHandler(self)
handler_claimed = handler_instance.process_request(self.reqpath)
except Exception as e:
self.logger.exception(e)
self.send_error(httplib.INTERNAL_SERVER_ERROR, str(e))
self.sql_conn.close()
return
if handler_claimed:
self.sql_conn.close()
return
self.sql_conn.close()
self.send_error(httplib.BAD_REQUEST)
开发者ID:rfinnie,项目名称:unladen,代码行数:35,代码来源:__init__.py
示例12: setup_oauth
def setup_oauth():
"""Authorize your app via identifier."""
# Request token
oauth = OAuth1(CONSUMER_KEY, client_secret=CONSUMER_SECRET)
r = requests.post(url=REQUEST_TOKEN_URL, auth=oauth)
credentials = parse_qs(r.content)
resource_owner_key_b = credentials.get(b'oauth_token')
resource_owner_key_a = str(resource_owner_key_b)
resource_owner_key = resource_owner_key_a[3:-2]
resource_owner_secret_b = credentials.get(b'oauth_token_secret')
resource_owner_secret=str(resource_owner_secret_b)[3:-2]
# Authorize
authorize_url = AUTHORIZE_URL + resource_owner_key
print ('Please go here and authorize: ' + authorize_url)
verifier = input('Please input the verifier: ')
oauth = OAuth1(CONSUMER_KEY,
client_secret=CONSUMER_SECRET,
resource_owner_key=resource_owner_key,
resource_owner_secret=resource_owner_secret,
verifier=verifier)
# Finally, Obtain the Access Token
r = requests.post(url=ACCESS_TOKEN_URL, auth=oauth)
credentials = parse_qs(r.content)
token_a = credentials.get(b'oauth_token')
token = str(token_a)[3:-2]
secret_b = credentials.get(b'oauth_token_secret')
secret = str(secret_b)[3:-2]
return token, secret
开发者ID:gangso18,项目名称:pythonBOT,代码行数:34,代码来源:python+bot.py
示例13: do_POST
def do_POST(self):
global PATH
context = {"Page":Page_class()}
mypath = self.path.split('?', 1)
if mypath[0] == "/":
mypath[0] = "/index.pyhp"
filename = PATH + mypath[0]
print(filename)
data = ""
args = {}
if 'Content-Length' in self.headers.keys():
length = int(self.headers['Content-Length'])
args = urlparse.parse_qs(self.rfile.read(length).decode('utf-8'))
elif len(mypath) > 1:
args = urlparse.parse_qs(mypath[1])
try:
with open(filename, "r") as fp:
data = fp.read()
except Exception:
return self.handle_error(404, "file %s not found" % filename)
self.send_response(200)
#self.send_header("Content-type", "text/html")
self.end_headers()
context['Page'].args = args
context['Page'].client = self.client_address
self.wfile.write(bytes(parse_file(data, context),"UTF-8"))
开发者ID:BrendanBenshoof,项目名称:pyHP,代码行数:26,代码来源:pyhp_server.py
示例14: authenticate
def authenticate(self, code=None, redirect_uri=None):
graph = ObservableGraphAPI()
args = {
'client_id': settings.FACEBOOK_APP_ID,
'client_secret': settings.FACEBOOK_APP_SECRET,
'redirect_uri': redirect_uri,
'code': code
}
try:
data = graph.get('/oauth/access_token', **args)
except exceptions.FacebookError as e:
message = "Facebook login failed %s" % e.message
code_used_message = 'This authorization code has been used.'
if e.code == 100 and e.message == code_used_message:
logger.info(message)
else:
logger.warning(message)
return None
except exceptions.FacepyError as e:
logger.warning("Facebook login connection error")
return None
try:
access_token = urlparse.parse_qs(data)['access_token'][-1]
expires = urlparse.parse_qs(data)['expires'][-1]
except KeyError as e:
args['client_secret'] = '*******%s' % args['client_secret'][-4:]
logger.error(e, extra={'facebook_response': data,
'sent_args': args})
return None
expires_at = self._timestamp_to_datetime(expires)
user = USER_FACTORY.get_user(access_token, expires_at)
return user
开发者ID:tomwys,项目名称:django-facebook-auth,代码行数:32,代码来源:backends.py
示例15: urlSimilarity
def urlSimilarity(url1, url2):
url_1 = parse.urlparse(url1)
url_2 = parse.urlparse(url2)
similarity = 0
if url_1.scheme == url_2.scheme:
similarity += 5
if url_1.netloc == url_2.netloc:
similarity += 10
path_component_1 = url_1.path.split('/')[1:]
path_component_2 = url_2.path.split('/')[1:]
for e1,e2 in zip(path_component_1, path_component_2):
if e1 == e2:
similarity += 1
else:
break
k1 = parse.parse_qs(url_1.query)
k2 = parse.parse_qs(url_2.query)
similarity += len(k1.keys() & k2.keys())
for keys in (k1.keys() & k2.keys()):
if k1[keys] == k2[keys]:
similarity += 1
return similarity
开发者ID:daudinhhoang,项目名称:codefights,代码行数:28,代码来源:urlSimilarity.py
示例16: do_POST
def do_POST(self):
"""
receives post, handles it
"""
print(_('receiving POST...'))
data_string = self.rfile.read(int(self.headers['Content-Length'])).decode('UTF-8')
self.send_response(200)
message = bytes('OK', 'UTF-8')
self.send_header("Content-type", "text")
self.send_header("Content-length", str(len(message)))
self.end_headers()
self.wfile.write(message)
print(_('connection closed'))
# parse requested path + query string
_parsed = urlparse(self.path)
path = _parsed.path
query_string = parse_qs(_parsed.query)
print(_("incoming path: {}").format(path))
# parse incoming data
payload = parse_qs(data_string)
print(_("payload {}").format(payload))
self._handle_incoming(path, query_string, payload)
开发者ID:PaulTimmins,项目名称:hangupsbot,代码行数:27,代码来源:slack.py
示例17: __init__
def __init__(self, video_url):
infoUrl = 'https://www.youtube.com/get_video_info?video_id='
try:
vidid = re.search(r'v=([a-zA-Z0-9-_]*)', video_url).group(1)
except:
raise RuntimeError("bad video url")
infoUrl += vidid + "&asv=3&el=detailpage&hl=en_US"
opener = build_opener()
ua = ("Mozilla/5.0 (compatible; MSIE 9.0; Windows NT 6.1; WOW64;"
"Trident/5.0)")
opener.addheaders = [('User-Agent', ua)]
self.keywords = ""
allinfo = parse_qs(decode_if_py3(opener.open(infoUrl).read()))
self._setmetadata(allinfo)
streamMap = allinfo['url_encoded_fmt_stream_map'][0].split(',')
smap = [parse_qs(sm) for sm in streamMap]
js = None
if not smap[0].get("sig", ""): # vevo!
watchurl = "https://www.youtube.com/watch?v=" + vidid
watchinfo = opener.open(watchurl).read().decode("UTF-8")
match = re.search(r';ytplayer.config = ({.*?});', watchinfo)
try:
myjson = json.loads(match.group(1))
except:
raise RuntimeError('Problem handling this video')
args = myjson['args']
streamMap = args['url_encoded_fmt_stream_map'].split(",")
html5player = myjson['assets']['js']
js = opener.open(html5player).read().decode("UTF-8")
smap = [parse_qs(sm) for sm in streamMap]
self.streams = [Stream(sm, opener, self.title, js) for sm in smap]
开发者ID:ElegantMonkey,项目名称:pafy,代码行数:31,代码来源:pafy.py
示例18: testProcessSLORequestInvalidValid
def testProcessSLORequestInvalidValid(self):
"""
Tests the process_slo method of the OneLogin_Saml2_Auth class
Case Invalid Logout Request
"""
settings_info = self.loadSettingsJSON()
request_data = self.get_request()
message = self.file_contents(join(self.data_path, 'logout_requests', 'logout_request_deflated.xml.base64'))
request_data['get_data']['SAMLRequest'] = message
auth = OneLogin_Saml2_Auth(request_data, old_settings=settings_info)
target_url = auth.process_slo(True)
parsed_query = parse_qs(urlparse(target_url)[4])
self.assertEqual(len(auth.get_errors()), 0)
slo_url = settings_info['idp']['singleLogoutService']['url']
self.assertIn(slo_url, target_url)
self.assertIn('SAMLResponse', parsed_query)
#self.assertNotIn('RelayState', parsed_query)
auth.set_strict(True)
auth.process_slo(True)
# Fail due destination missmatch
self.assertEqual(auth.get_errors(), ['invalid_logout_request'])
auth.set_strict(False)
target_url_2 = auth.process_slo(True)
parsed_query_2 = parse_qs(urlparse(target_url_2)[4])
self.assertEqual(len(auth.get_errors()), 0)
slo_url = settings_info['idp']['singleLogoutService']['url']
self.assertIn(slo_url, target_url_2)
self.assertIn('SAMLResponse', parsed_query_2)
开发者ID:FancyFane,项目名称:python3-saml,代码行数:30,代码来源:auth_test.py
示例19: match
def match(expected_str, actual_str):
expected = urlparse(expected_str)
actual = urlparse(actual_str)
assert expected.scheme == actual.scheme
assert expected.netloc == actual.netloc
assert expected.path == actual.path
self.assertEqual(parse_qs(expected.query), parse_qs(actual.query))
开发者ID:pubnub,项目名称:python,代码行数:7,代码来源:test_utils.py
示例20: return_informations
def return_informations(self, new_url):
"""
Return:
- Video title
- Video file name
- Video qualities
- Video type
- Direct URL
"""
try:
response = urlopen(new_url)
data = response.read()
info = parse_qs(str(data))
title = info['title'][0]
stream_map = info['url_encoded_fmt_stream_map'][0]
v_info = stream_map.split(",")
quality, v_type, direct_url = [], [], []
except Exception:
self.exit_app()
for video in v_info:
item = parse_qs(video)
quality.append(item['quality'][0])
v_type.append(item['type'][0])
direct_url.append(item['url'][0])
return title, quality, v_type, direct_url
开发者ID:Chiheb-Nexus,项目名称:PyVideo-dl,代码行数:27,代码来源:PyVideoDl.py
注:本文中的urllib.parse.parse_qs函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论