本文整理汇总了Python中urllib.parse.parse_qsl函数的典型用法代码示例。如果您正苦于以下问题:Python parse_qsl函数的具体用法?Python parse_qsl怎么用?Python parse_qsl使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了parse_qsl函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: add_preserved_filters
def add_preserved_filters(context, url, popup=False):
opts = context.get('opts')
preserved_filters = context.get('preserved_filters')
parsed_url = list(urlparse(url))
parsed_qs = dict(parse_qsl(parsed_url[4]))
merged_qs = dict()
if opts and preserved_filters:
preserved_filters = dict(parse_qsl(preserved_filters))
try:
match = resolve(url)
except Resolver404:
pass
else:
current_url = '%s:%s' % (match.namespace, match.url_name)
changelist_url = 'admin:%s_%s_changelist' % (opts.app_label, opts.model_name)
if changelist_url == current_url and '_changelist_filters' in preserved_filters:
preserved_filters = dict(parse_qsl(preserved_filters['_changelist_filters']))
merged_qs.update(preserved_filters)
if popup:
from django.contrib.admin.options import IS_POPUP_VAR
merged_qs[IS_POPUP_VAR] = 1
merged_qs.update(parsed_qs)
parsed_url[4] = urlencode(merged_qs)
return urlunparse(parsed_url)
开发者ID:1ngr1d,项目名称:django,代码行数:31,代码来源:admin_urls.py
示例2: twitter
def twitter():
request_token_url = 'https://api.twitter.com/oauth/request_token'
access_token_url = 'https://api.twitter.com/oauth/access_token'
authenticate_url = 'https://api.twitter.com/oauth/authenticate'
if request.args.get('oauth_token') and request.args.get('oauth_verifier'):
auth = OAuth1(app.config['TWITTER_CONSUMER_KEY'],
client_secret=app.config['TWITTER_CONSUMER_SECRET'],
resource_owner_key=request.args.get('oauth_token'),
verifier=request.args.get('oauth_verifier'))
r = requests.post(access_token_url, auth=auth)
profile = dict(parse_qsl(r.text))
user = User.query.filter_by(twitter=profile['user_id']).first()
if user:
token = create_jwt_token(user)
return jsonify(token=token)
u = User(twitter=profile['user_id'],
first_name=profile['screen_name'])
db.session.add(u)
db.session.commit()
token = create_jwt_token(u)
return jsonify(token=token)
else:
oauth = OAuth1(app.config['TWITTER_CONSUMER_KEY'],
client_secret=app.config['TWITTER_CONSUMER_SECRET'],
callback_uri=app.config['TWITTER_CALLBACK_URL'])
r = requests.post(request_token_url, auth=oauth)
oauth_token = dict(parse_qsl(r.text))
qs = urlencode(dict(oauth_token=oauth_token['oauth_token']))
return redirect(authenticate_url + '?' + qs)
开发者ID:Amit-Kolambikar,项目名称:satellizer,代码行数:31,代码来源:app.py
示例3: _build_url
def _build_url(self):
"""Build resource url
Parsing ``self._url``, add ``self._params`` to query string if need
:return self._url: resource url
"""
scheme, netloc, path, params, query, fragment = urlparse(self._url)
# IDN domains support
netloc = to_unicode(netloc)
# removed idna encode as it was causing python3 urlunparse to error
# print(repr(netloc), repr(netloc.encode('idna')))
if not netloc:
raise ValueError("Invalid url")
elif not scheme:
scheme = "http"
tmp = []
if self._params is not None:
for param, value in self._params:
if isinstance(value, tuple):
for i in value:
tmp.append((param, i))
elif isinstance(value, str):
tmp.append((param, value))
if tmp:
tmp = parse_qsl(query, keep_blank_values=True) + tmp
else:
try:
tmp = parse_qsl(query, keep_blank_values=True, strict_parsing=True)
except ValueError:
tmp = query
if isinstance(tmp, str):
encode = quote_plus
noencode = lambda result: result
else:
encode = urlencode
noencode = urlnoencode
if self._encode_query:
query = encode(tmp)
else:
query = noencode(tmp)
del tmp
# print(repr([scheme, netloc, path, query, fragment]))
url_unparse_list = [
scheme.encode('utf8'),
netloc.encode('idna'),
path.encode('utf8'),
params.encode('utf8'),
query.encode('utf8'),
fragment.encode('utf8')]
self._url = urlunparse(url_unparse_list)
return self._url
开发者ID:budlight,项目名称:human_curl,代码行数:60,代码来源:core.py
示例4: append_qs
def append_qs(url, query_string):
"""Append query_string values to an existing URL and return it as a string.
query_string can be:
* an encoded string: 'test3=val1&test3=val2'
* a dict of strings: {'test3': 'val'}
* a dict of lists of strings: {'test3': ['val1', 'val2']}
* a list of tuples: [('test3', 'val1'), ('test3', 'val2')]
"""
parsed_url = urlparse.urlsplit(url)
parsed_qs = urlparse.parse_qsl(parsed_url.query, True)
if isstr(query_string):
parsed_qs += urlparse.parse_qsl(query_string)
elif isdict(query_string):
for item in query_string.items():
if islist(item[1]):
for val in item[1]:
parsed_qs.append((item[0], val))
else:
parsed_qs.append(item)
elif islist(query_string):
parsed_qs += query_string
else:
raise TypeError('Unexpected query_string type')
return urlparse.urlunsplit((
parsed_url.scheme,
parsed_url.netloc,
parsed_url.path,
urlencode_unicode(parsed_qs),
parsed_url.fragment,
))
开发者ID:JoProvost,项目名称:python-ubersmith,代码行数:34,代码来源:utils.py
示例5: build_url
def build_url(base, query_params={}, fragment={}):
"""Construct a URL based off of base containing all parameters in
the query portion of base plus any additional parameters.
Taken from https://github.com/NateFerrero/oauth2lib/blob/master/oauth2lib/utils.py and extended to allow
paramenters as fragment
:param base: Base URL
:type base: str
::param query_params: Additional query parameters to include.
:type query_params: dict
::param fragment: Additional parameters to include in the fragment section of the url
:type fragment: dict
:rtype: str
"""
url = urlparse.urlparse(base)
query_params.update(urlparse.parse_qsl(url.query, True))
query_params = {k: v for k, v in query_params.iteritems() if v is not None}
fragment.update(urlparse.parse_qsl(url.fragment, True))
fragment = {k: v for k, v in fragment.iteritems() if v is not None}
return urlparse.urlunparse((url.scheme,
url.netloc,
url.path,
url.params,
urllib.urlencode(query_params),
urllib.urlencode(fragment)))
开发者ID:shell-labs,项目名称:flask-rest-boilerplate,代码行数:26,代码来源:util.py
示例6: add_preserved_filters
def add_preserved_filters(context, url, popup=False, to_field=None):
opts = context.get('opts')
preserved_filters = context.get('preserved_filters')
parsed_url = list(urlparse(url))
parsed_qs = dict(parse_qsl(parsed_url[4]))
merged_qs = {}
if opts and preserved_filters:
preserved_filters = dict(parse_qsl(preserved_filters))
match_url = '/%s' % url.partition(get_script_prefix())[2]
try:
match = resolve(match_url)
except Resolver404:
pass
else:
current_url = '%s:%s' % (match.app_name, match.url_name)
changelist_url = 'admin:%s_%s_changelist' % (opts.app_label, opts.model_name)
if changelist_url == current_url and '_changelist_filters' in preserved_filters:
preserved_filters = dict(parse_qsl(preserved_filters['_changelist_filters']))
merged_qs.update(preserved_filters)
if popup:
from django.contrib.admin.options import IS_POPUP_VAR
merged_qs[IS_POPUP_VAR] = 1
if to_field:
from django.contrib.admin.options import TO_FIELD_VAR
merged_qs[TO_FIELD_VAR] = to_field
merged_qs.update(parsed_qs)
parsed_url[4] = urlencode(merged_qs)
return urlunparse(parsed_url)
开发者ID:100448facens,项目名称:django-example-jn,代码行数:35,代码来源:admin_urls.py
示例7: respond_all
def respond_all(environ, start_response):
status = '200 OK' # HTTP Status
headers = [('Content-type', 'text/plain; charset=utf-8')] # HTTP Headers
start_response(status, headers)
my_dict = {'method_name': environ['REQUEST_METHOD'], 'path_info': environ['PATH_INFO']}
if len(environ['CONTENT_LENGTH'])>0:
my_dict['content_length'] = environ['CONTENT_LENGTH']
request_body = environ['wsgi.input'].read(int(environ['CONTENT_LENGTH'])).decode('utf-8')
request_body = request_body.strip('[] ')
body_dict = parse.parse_qsl(request_body)
my_dict.update(body_dict)
else:
pass
if len(environ['QUERY_STRING'])>0:
qs = environ['QUERY_STRING']
query_dict = parse.parse_qsl(qs)
my_dict.update(query_dict)
else:
pass
json_response = json.dumps(my_dict, sort_keys=True, indent=4, separators=(',',': '))
return [json_response.encode('utf-8')]
开发者ID:jay-batavia,项目名称:1.CP341-Web_Services,代码行数:26,代码来源:http_parser.py
示例8: cleanup
def cleanup(url):
url = _follow(url)
# remove trackers params
try:
urlp = urlparse(url)
# cleanup query param
query = parse_qsl(urlp.query)
# only if query is non empty and we manage to parse fragment as
# key/value
if urlp.query and query:
for annoying in ANNOYING_PARAMS:
query = [(x, y) for x, y in query if not x.startswith(annoying)]
urlp = urlp._replace(
query=urlencode(query),
)
# cleanup fragment param
fragment = parse_qsl(urlp.fragment)
# only if fragments is non empty and we manage to parse fragment as
# key/value
if urlp.fragment and fragment:
for annoying in ANNOYING_PARAMS:
fragment = [(x, y) for x, y in fragment if not x.startswith(annoying)]
urlp = urlp._replace(
fragment=urlencode(fragment),
)
url = urlp.geturl()
except Exception:
app.logger.exception("Problem cleaning url %s", url)
app.logger.info("Final url %s", url)
return url
开发者ID:jcsaaddupuy,项目名称:dereferer,代码行数:32,代码来源:app.py
示例9: twitter
def twitter():
request_token_url = 'https://api.twitter.com/oauth/request_token'
access_token_url = 'https://api.twitter.com/oauth/access_token'
if request.json.get('oauth_token') and request.json.get('oauth_verifier'):
auth = OAuth1(app.config['OAUTH2_CLIENT_ID'],
client_secret=app.config['OAUTH2_CLIENT_SECRET'],
resource_owner_key=request.json.get('oauth_token'),
verifier=request.json.get('oauth_verifier'))
r = requests.post(access_token_url, auth=auth)
profile = dict(parse_qsl(r.text))
login = profile['screen_name']
if app.config['AUTH_REQUIRED'] and not db.is_user_valid(login=login):
return jsonify(status="error", message="User %s is not authorized" % login), 403
token = create_token(profile['user_id'], '@'+login, login, provider='twitter')
return jsonify(token=token)
else:
oauth = OAuth1(app.config['OAUTH2_CLIENT_ID'],
client_secret=app.config['OAUTH2_CLIENT_SECRET'],
callback_uri=app.config.get('TWITTER_CALLBACK_URL', request.headers.get('Referer', ''))
)
r = requests.post(request_token_url, auth=oauth)
oauth_token = dict(parse_qsl(r.text))
return jsonify(oauth_token)
开发者ID:iselu,项目名称:alerta,代码行数:26,代码来源:auth.py
示例10: get_redirect_to
def get_redirect_to(self):
assert self.is_redirect()
if hasattr(self.request, 'response_mode'):
response_mode = self.request.response_mode
else:
response_mode = self.request.get('response_mode')
if response_mode:
is_fragment = response_mode == 'fragment'
else:
response_types = set(self.request.response_type.split())
is_fragment = 'token' in response_types
parsed = urlparse(self.redirect_uri)
if is_fragment:
query = parsed.query
fragment = self.to_query_string()
else:
query = parse_qsl(parsed.query)
query.extend(parse_qsl(self.to_query_string()))
query = urlencode(query)
fragment = parsed.fragment
return urlunparse(parsed[:4] + (query, fragment))
开发者ID:GehirnInc,项目名称:py3oauth2,代码行数:25,代码来源:message.py
示例11: url_concat
def url_concat(url, args):
"""Concatenate url and arguments regardless of whether
url has existing query parameters.
``args`` may be either a dictionary or a list of key-value pairs
(the latter allows for multiple values with the same key.
>>> url_concat("http://example.com/foo", dict(c="d"))
'http://example.com/foo?c=d'
>>> url_concat("http://example.com/foo?a=b", dict(c="d"))
'http://example.com/foo?a=b&c=d'
>>> url_concat("http://example.com/foo?a=b", [("c", "d"), ("c", "d2")])
'http://example.com/foo?a=b&c=d&c=d2'
"""
parsed_url = urlparse(url)
if isinstance(args, dict):
parsed_query = parse_qsl(parsed_url.query, keep_blank_values=True)
parsed_query.extend(args.items())
elif isinstance(args, list) or isinstance(args, tuple):
parsed_query = parse_qsl(parsed_url.query, keep_blank_values=True)
parsed_query.extend(args)
else:
err = "'args' parameter should be dict, list or tuple. Not {0}".format(
type(args))
raise TypeError(err)
final_query = urlencode(parsed_query)
url = urlunparse((
parsed_url[0],
parsed_url[1],
parsed_url[2],
parsed_url[3],
final_query,
parsed_url[5]))
return url
开发者ID:mr-ping,项目名称:tornado,代码行数:34,代码来源:httputil.py
示例12: legacy_arguments
def legacy_arguments(self):
arguments = LegacyMultiDict()
if hasattr(self, 'uri'):
arguments.update(parse_qsl(self.parsed_uri.query))
body = getattr(self, 'body', None)
if body and getattr(self, 'content_type', None) == _FORM_CTYPE:
arguments.update(parse_qsl(self.body.decode('ascii')))
return arguments
开发者ID:dragon788,项目名称:zorro,代码行数:8,代码来源:web.py
示例13: form_arguments
def form_arguments(self):
arguments = {}
if hasattr(self, 'uri'):
arguments.update(parse_qsl(self.parsed_uri.query))
body = getattr(self, 'body', None)
if body and getattr(self, 'content_type', None) == FORM_CTYPE:
arguments.update(parse_qsl(self.body.decode('ascii')))
return arguments
开发者ID:dragon788,项目名称:zorro,代码行数:8,代码来源:zerogw.py
示例14: assertURLEqual
def assertURLEqual(self, first, second, msg=None):
"""Check that two arguments are equivalent URLs. Ignores the order of
query arguments.
"""
first_parsed = urlparse(first)
second_parsed = urlparse(second)
self.assertEqual(first_parsed[:3], second_parsed[:3], msg)
first_qsl = sorted(parse_qsl(first_parsed.query))
second_qsl = sorted(parse_qsl(second_parsed.query))
self.assertEqual(first_qsl, second_qsl, msg)
开发者ID:raphacosta27,项目名称:Insper,代码行数:11,代码来源:untitled5.py
示例15: _encode_url
def _encode_url(self, data):
query = self.query
if data:
data = native_str(data)
if isinstance(data, str):
data = parse_qsl(data)
else:
data = mapping_iterator(data)
query = parse_qsl(query)
query.extend(data)
query = urlencode(query)
self.query = query
开发者ID:successtest9,项目名称:pulsar,代码行数:12,代码来源:__init__.py
示例16: test_full_flow
def test_full_flow(self, satosa_config_dict, oidc_frontend_config, saml_backend_config, idp_conf):
user_id = "testuser1"
# proxy config
satosa_config_dict["FRONTEND_MODULES"] = [oidc_frontend_config]
satosa_config_dict["BACKEND_MODULES"] = [saml_backend_config]
satosa_config_dict["INTERNAL_ATTRIBUTES"]["attributes"] = {attr_name: {"openid": [attr_name],
"saml": [attr_name]}
for attr_name in USERS[user_id]}
_, backend_metadata = create_entity_descriptors(SATOSAConfig(satosa_config_dict))
# application
test_client = Client(make_app(SATOSAConfig(satosa_config_dict)), BaseResponse)
# get frontend OP config info
provider_config = json.loads(test_client.get("/.well-known/openid-configuration").data.decode("utf-8"))
# create auth req
claims_request = ClaimsRequest(id_token=Claims(**{k: None for k in USERS[user_id]}))
req_args = {"scope": "openid", "response_type": "id_token", "client_id": CLIENT_ID,
"redirect_uri": REDIRECT_URI, "nonce": "nonce",
"claims": claims_request.to_json()}
auth_req = urlparse(provider_config["authorization_endpoint"]).path + "?" + urlencode(req_args)
# make auth req to proxy
proxied_auth_req = test_client.get(auth_req)
assert proxied_auth_req.status == "303 See Other"
# config test IdP
backend_metadata_str = str(backend_metadata[saml_backend_config["name"]][0])
idp_conf["metadata"]["inline"].append(backend_metadata_str)
fakeidp = FakeIdP(USERS, config=IdPConfig().load(idp_conf, metadata_construction=False))
# create auth resp
req_params = dict(parse_qsl(urlparse(proxied_auth_req.data.decode("utf-8")).query))
url, authn_resp = fakeidp.handle_auth_req(
req_params["SAMLRequest"],
req_params["RelayState"],
BINDING_HTTP_REDIRECT,
user_id,
response_binding=BINDING_HTTP_REDIRECT)
# make auth resp to proxy
authn_resp_req = urlparse(url).path + "?" + urlencode(authn_resp)
authn_resp = test_client.get("/" + authn_resp_req)
assert authn_resp.status == "303 See Other"
# verify auth resp from proxy
resp_dict = dict(parse_qsl(urlparse(authn_resp.data.decode("utf-8")).fragment))
signing_key = RSAKey(key=rsa_load(oidc_frontend_config["config"]["signing_key_path"]),
use="sig", alg="RS256")
id_token_claims = JWS().verify_compact(resp_dict["id_token"], keys=[signing_key])
assert all((k, v[0]) in id_token_claims.items() for k, v in USERS[user_id].items())
开发者ID:its-dirg,项目名称:SATOSA,代码行数:53,代码来源:test_oidc-saml.py
示例17: on_get
def on_get(self, req, res):
"""Create Twitter JWT token
"""
request_token_url = 'https://api.twitter.com/oauth/request_token'
access_token_url = 'https://api.twitter.com/oauth/access_token'
authenticate_url = 'https://api.twitter.com/oauth/authenticate'
if req.get_param('oauth_token') and req.get_param('oauth_verifier'):
auth = OAuth1(settings.TWITTER_KEY,
client_secret=settings.TWITTER_SECRET,
resource_owner_key=req.get_param('oauth_token'),
verifier=req.get_param('oauth_verifier'))
logger.debug("Twitter OAuth: Got auth session.")
r = requests.post(access_token_url, auth=auth)
profile = dict(parse_qsl(r.text))
logger.debug("Twitter OAuth: User profile retrieved")
try:
user = User.select().where(User.twitter == profile['user_id'] |
User.username == profile['screen_name']).get()
except:
user = User.create(twitter=profile['user_id'],
username=profile['screen_name'])
user.save()
token = utils.create_jwt_token(user)
res.body = json.dumps({"token": token})
res.status = falcon.HTTP_200
else:
oauth = OAuth1(settings.TWITTER_KEY,
client_secret=settings.TWITTER_SECRET,
callback_uri=settings.TWITTER_CALLBACK_URI)
logger.debug("Twitter OAuth: Got auth session.")
r = requests.post(request_token_url, auth=oauth)
oauth_token = dict(parse_qsl(r.text))
logger.debug("Twitter OAuth: User profile retrieved")
qs = urlencode(dict(oauth_token=oauth_token['oauth_token']))
# Falcon doesn't support redirects, so we have to fake it
# this implementation has been taken from werkzeug
final_url = authenticate_url + '?' + qs
res.body = (
'<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 3.2 Final//EN">\n'
'<title>Redirecting...</title>\n'
'<h1>Redirecting...</h1>\n'
'<p>You should be redirected automatically to target URL: '
'<a href="{0}">{0}</a>. If not click the link.'.format(final_url)
)
res.location = final_url
res.status = falcon.HTTP_301
开发者ID:sikrvault,项目名称:sikr,代码行数:51,代码来源:twitter.py
示例18: on_user_web_access
def on_user_web_access(self, user_id, get_array, post_array):
TWITTER_KEY = "1"
db = a.get_db()
if 'connect' in get_array and get_array['connect'] == 'twitter':
consumer = oauth.Consumer(consumer_key, consumer_secret)
client = oauth.Client(consumer)
resp, content = client.request(request_token_url, "GET")
if resp['status'] != '200':
raise Exception("Invalid response %s." % resp['status'])
request_token = dict(parse.parse_qsl(content.decode()))
db.sql("INSERT INTO request_tokens(user_id, token, token_secret) VALUES(%s, '"+request_token['oauth_token']+"', '"+request_token['oauth_token_secret']+"')", (str(user_id),))
db.commit()
a.p('<p>Connexion a Twitter requise.</p>')
a.p('<a href="'+authorize_url+'?oauth_token='+(request_token['oauth_token'])+'">Continuer sur Twitter</a>')
elif 'twitter' in get_array and get_array["twitter"] == TWITTER_KEY:
consumer = oauth.Consumer(consumer_key, consumer_secret)
client = oauth.Client(consumer)
oauth_token = get_array['oauth_token']
oauth_verifier = get_array['oauth_verifier']
request_token = db.sql('SELECT * FROM request_tokens WHERE user_id=%s', (str(user_id),))[0]
token = oauth.Token(request_token[1],
request_token[2])
token.set_verifier(oauth_verifier)
client = oauth.Client(consumer, token)
resp, content = client.request(access_token_url, "POST")
if resp['status'] != '200':
raise Exception("Invalid response %s." % resp['status'])
access_token = dict(parse.parse_qsl(content.decode()))
db.sql("INSERT INTO usr(usr_id, usr_token, usr_token_secret) VALUES(%s, '"+access_token['oauth_token']+"', '"+access_token['oauth_token_secret']+"')", (str(user_id),))
db.sql('DELETE FROM request_tokens WHERE user_id = %s', (str(user_id),))
db.commit()
a.p('<p>Twitter pairing successful</p>')
else:
a.p('<a href="'+a.get_url()+'&connect=twitter">Connect with Twitter</a>')
开发者ID:Axce,项目名称:bobcat,代码行数:50,代码来源:main.py
示例19: run_test
def run_test(self, satosa_config_dict, sp_conf, oidc_backend_config, frontend_config):
subject_id = "testuser1"
# proxy config
satosa_config_dict["FRONTEND_MODULES"] = [frontend_config]
satosa_config_dict["BACKEND_MODULES"] = [oidc_backend_config]
satosa_config_dict["INTERNAL_ATTRIBUTES"]["attributes"] = {attr_name: {"openid": [attr_name],
"saml": [attr_name]}
for attr_name in USERS[subject_id]}
frontend_metadata, backend_metadata = create_entity_descriptors(SATOSAConfig(satosa_config_dict))
# application
test_client = Client(make_app(SATOSAConfig(satosa_config_dict)), BaseResponse)
# config test SP
frontend_metadata_str = str(frontend_metadata[frontend_config["name"]][0])
sp_conf["metadata"]["inline"].append(frontend_metadata_str)
fakesp = FakeSP(SPConfig().load(sp_conf, metadata_construction=False))
# create auth req
destination, req_args = fakesp.make_auth_req(frontend_metadata[frontend_config["name"]][0].entity_id)
auth_req = urlparse(destination).path + "?" + urlencode(req_args)
# make auth req to proxy
proxied_auth_req = test_client.get(auth_req)
assert proxied_auth_req.status == "302 Found"
parsed_auth_req = dict(parse_qsl(urlparse(proxied_auth_req.data.decode("utf-8")).query))
# create auth resp
id_token_claims = {k: v[0] for k, v in USERS[subject_id].items()}
id_token_claims["sub"] = subject_id
id_token_claims["iat"] = time.time()
id_token_claims["exp"] = time.time() + 3600
id_token_claims["iss"] = "https://op.example.com"
id_token_claims["aud"] = oidc_backend_config["config"]["client"]["client_metadata"]["client_id"]
id_token_claims["nonce"] = parsed_auth_req["nonce"]
id_token = IdToken(**id_token_claims).to_jwt()
authn_resp = {"state": parsed_auth_req["state"], "id_token": id_token}
# make auth resp to proxy
redirect_uri_path = urlparse(
oidc_backend_config["config"]["client"]["client_metadata"]["redirect_uris"][0]).path
authn_resp_req = redirect_uri_path + "?" + urlencode(authn_resp)
authn_resp = test_client.get(authn_resp_req)
assert authn_resp.status == "303 See Other"
# verify auth resp from proxy
resp_dict = dict(parse_qsl(urlparse(authn_resp.data.decode("utf-8")).query))
auth_resp = fakesp.parse_authn_request_response(resp_dict["SAMLResponse"], BINDING_HTTP_REDIRECT)
assert auth_resp.ava == USERS[subject_id]
开发者ID:SUNET,项目名称:SATOSA,代码行数:49,代码来源:test_saml-oidc.py
示例20: run_test
def run_test(self, satosa_config_dict, sp_conf, idp_conf, saml_backend_config, frontend_config):
user_id = "testuser1"
# proxy config
satosa_config_dict["FRONTEND_MODULES"] = [frontend_config]
satosa_config_dict["BACKEND_MODULES"] = [saml_backend_config]
satosa_config_dict["INTERNAL_ATTRIBUTES"]["attributes"] = {attr_name: {"saml": [attr_name]} for attr_name in
USERS[user_id]}
frontend_metadata, backend_metadata = create_entity_descriptors(SATOSAConfig(satosa_config_dict))
# application
test_client = Client(make_app(SATOSAConfig(satosa_config_dict)), BaseResponse)
# config test SP
frontend_metadata_str = str(frontend_metadata[frontend_config["name"]][0])
sp_conf["metadata"]["inline"].append(frontend_metadata_str)
fakesp = FakeSP(SPConfig().load(sp_conf, metadata_construction=False))
# create auth req
destination, req_args = fakesp.make_auth_req(frontend_metadata[frontend_config["name"]][0].entity_id)
auth_req = urlparse(destination).path + "?" + urlencode(req_args)
# make auth req to proxy
proxied_auth_req = test_client.get(auth_req)
assert proxied_auth_req.status == "303 See Other"
# config test IdP
backend_metadata_str = str(backend_metadata[saml_backend_config["name"]][0])
idp_conf["metadata"]["inline"].append(backend_metadata_str)
fakeidp = FakeIdP(USERS, config=IdPConfig().load(idp_conf, metadata_construction=False))
# create auth resp
req_params = dict(parse_qsl(urlparse(proxied_auth_req.data.decode("utf-8")).query))
url, authn_resp = fakeidp.handle_auth_req(
req_params["SAMLRequest"],
req_params["RelayState"],
BINDING_HTTP_REDIRECT,
user_id,
response_binding=BINDING_HTTP_REDIRECT)
# make auth resp to proxy
authn_resp_req = urlparse(url).path + "?" + urlencode(authn_resp)
authn_resp = test_client.get("/" + authn_resp_req)
assert authn_resp.status == "303 See Other"
# verify auth resp from proxy
resp_dict = dict(parse_qsl(urlparse(authn_resp.data.decode("utf-8")).query))
auth_resp = fakesp.parse_authn_request_response(resp_dict["SAMLResponse"], BINDING_HTTP_REDIRECT)
assert auth_resp.ava == USERS[user_id]
开发者ID:its-dirg,项目名称:SATOSA,代码行数:48,代码来源:test_saml-saml.py
注:本文中的urllib.parse.parse_qsl函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论