本文整理汇总了Python中urllib.parse.urlencode函数的典型用法代码示例。如果您正苦于以下问题:Python urlencode函数的具体用法?Python urlencode怎么用?Python urlencode使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了urlencode函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: MsgAddCard
def MsgAddCard(self, chat_id):
self.p_sendMessage['text'] = "Enter card number [xxxxxxxxxxx]:"
self.p_sendMessage['chat_id'] = str(chat_id)
udata = urlencode(self.p_sendMessage).encode()
req = Request(self.sendMessage, udata)
resp = urlopen(req)
bNum = False
while not bNum:
udata = urlencode(self.p_getUpdates).encode()
req = Request(self.getUpdates, udata)
resp = urlopen(req).read().decode()
data = json.loads(resp).get('result')[0]
if data.get('update_id') > self.update_id:
self.update_id = data.get('update_id')
bNum = True
if self.CheckCardNum(data.get('message').get('text')):
if chat_id in self.cards:
self.cards[chat_id].append(data.get('message').get('text'))
else:
self.cards[chat_id] = []
self.cards[chat_id].append(data.get('message').get('text'))
self.p_sendMessage['text'] = "Card " + data.get('message').get('text') + " successfully added!"
self.p_sendMessage['chat_id'] = str(chat_id)
udata = urlencode(self.p_sendMessage).encode()
req = Request(self.sendMessage, udata)
resp = urlopen(req)
else:
self.p_sendMessage['text'] = "This doesn't look like card number!"
self.p_sendMessage['chat_id'] = str(chat_id)
udata = urlencode(self.p_sendMessage).encode()
req = Request(self.sendMessage, udata)
resp = urlopen(req)
开发者ID:trashgremlin,项目名称:StrelkaCardBot,代码行数:32,代码来源:StrelkaCardBot.py
示例2: __build_params
def __build_params(self):
# See https://developers.facebook.com/docs/reference/api/batch/
# for documentation on how the batch api is supposed to work.
batch = []
all_files = []
for request in self.__api_calls:
payload = {'method': request.method}
if not request.ignore_result:
payload['omit_response_on_success'] = False
if request.name:
payload['name'] = request.name
if request.method in ['GET', 'DELETE']:
payload['relative_url'] = request.path+'?'+urlencode(request.params)
elif request.method == 'POST':
payload['relative_url'] = request.path
files = []
params = {}
for key, value in request.params.iteritems():
if isinstance(value, FileType):
all_files.append(value)
files.append('file%s' % (len(all_files) - 1))
else:
params[key] = value
payload['body'] = urlencode(params)
payload['attached_files'] = ','.join(files)
batch.append(payload)
params = {'batch':json.dumps(batch)}
for i, f in enumerate(all_files):
params['file%s' % i] = f
return params
开发者ID:thusfresh,项目名称:fbconsole,代码行数:33,代码来源:fbconsole.py
示例3: request
def request(self, uri, *,
method='GET',
query=None,
headers={},
body=None):
conn = self.connection()
assert method.isidentifier(), method
assert uri.startswith('/'), uri
if query:
if '?' in uri:
uri += '&' + urlencode(query)
else:
uri += '?' + urlencode(query)
headers = headers.copy()
statusline = '{} {} HTTP/1.1'.format(method.upper(), uri)
lines = [statusline]
if isinstance(body, dict):
body = urlencode(body)
if isinstance(body, str):
body = body.encode('utf-8') # there are no other encodings, right?
if body is not None:
clen = len(body)
else:
clen = 0
body = b''
headers['Content-Length'] = clen
for k, v in headers.items():
lines.append('{}: {}'.format(k, str(v)))
lines.append('')
lines.append('')
buf = '\r\n'.join(lines).encode('ascii')
return self.response_class(*conn.request(buf + body).get())
开发者ID:dragon788,项目名称:zorro,代码行数:32,代码来源:http.py
示例4: twitter_request
def twitter_request(self, path, callback=None, access_token=None,
post_args=None, **args):
"""Fetches the given API path, e.g., ``statuses/user_timeline/btaylor``
The path should not include the format or API version number.
(we automatically use JSON format and API version 1).
If the request is a POST, ``post_args`` should be provided. Query
string arguments should be given as keyword arguments.
All the Twitter methods are documented at http://dev.twitter.com/
Many methods require an OAuth access token which you can
obtain through `~OAuthMixin.authorize_redirect` and
`~OAuthMixin.get_authenticated_user`. The user returned through that
process includes an 'access_token' attribute that can be used
to make authenticated requests via this method. Example
usage::
class MainHandler(tornado.web.RequestHandler,
tornado.auth.TwitterMixin):
@tornado.web.authenticated
@tornado.web.asynchronous
@tornado.gen.coroutine
def get(self):
new_entry = yield self.twitter_request(
"/statuses/update",
post_args={"status": "Testing Tornado Web Server"},
access_token=self.current_user["access_token"])
if not new_entry:
# Call failed; perhaps missing permission?
yield self.authorize_redirect()
return
self.finish("Posted a message!")
"""
if path.startswith('http:') or path.startswith('https:'):
# Raw urls are useful for e.g. search which doesn't follow the
# usual pattern: http://search.twitter.com/search.json
url = path
else:
url = self._TWITTER_BASE_URL + path + ".json"
# Add the OAuth resource request signature if we have credentials
if access_token:
all_args = {}
all_args.update(args)
all_args.update(post_args or {})
method = "POST" if post_args is not None else "GET"
oauth = self._oauth_request_parameters(
url, access_token, all_args, method=method)
args.update(oauth)
if args:
url += "?" + urllib_parse.urlencode(args)
http = self.get_auth_http_client()
http_callback = self.async_callback(self._on_twitter_request, callback)
if post_args is not None:
http.fetch(url, method="POST", body=urllib_parse.urlencode(post_args),
callback=http_callback)
else:
http.fetch(url, callback=http_callback)
开发者ID:auscompgeek,项目名称:perfectgift,代码行数:60,代码来源:auth.py
示例5: __getChannelId
def __getChannelId(self):
"""
Obtain channel id for channel name, if present in ``self.search_params``.
"""
if not self.search_params.get("channelId"):
return
api_fixed_url = "https://www.googleapis.com/youtube/v3/channels?part=id&maxResults=1&fields=items%2Fid&"
url = api_fixed_url + urlencode({"key": self.api_key, "forUsername": self.search_params["channelId"]})
get = requests.get(url).json()
try:
self.search_params["channelId"] = get['items'][0]['id']
return # got it
except IndexError:
pass # try searching now...
api_fixed_url = "https://www.googleapis.com/youtube/v3/search?part=snippet&type=channel&fields=items%2Fid&"
url = api_fixed_url + urlencode({"key": self.api_key, "q": self.search_params['channelId']})
get = requests.get(url).json()
try:
self.search_params["channelId"] = get['items'][0]['id']['channelId']
except IndexError:
del self.search_params["channelId"] # channel not found
开发者ID:Acidburn0zzz,项目名称:ytfs,代码行数:29,代码来源:actions.py
示例6: test_query_dont_unqoute_twice
def test_query_dont_unqoute_twice():
sample_url = "http://base.place?" + urlencode({"a": "/////"})
query = urlencode({"url": sample_url})
full_url = "http://test_url.aha?" + query
url = URL(full_url)
assert url.query["url"] == sample_url
开发者ID:asvetlov,项目名称:yarl,代码行数:7,代码来源:test_url_query.py
示例7: test_call_app
def test_call_app(self):
url = reverse('two_factor:twilio_call_app', args=['123456'])
response = self.client.get(url)
self.assertEqual(response.content,
b'<?xml version="1.0" encoding="UTF-8" ?>'
b'<Response>'
b' <Gather timeout="15" numDigits="1" finishOnKey="">'
b' <Say language="en">Hi, this is testserver calling. '
b'Press any key to continue.</Say>'
b' </Gather>'
b' <Say language="en">You didn\'t press any keys. Good bye.</Say>'
b'</Response>')
url = reverse('two_factor:twilio_call_app', args=['123456'])
response = self.client.post(url)
self.assertEqual(response.content,
b'<?xml version="1.0" encoding="UTF-8" ?>'
b'<Response>'
b' <Say language="en">Your token is 1. 2. 3. 4. 5. 6. '
b'Repeat: 1. 2. 3. 4. 5. 6. Good bye.</Say>'
b'</Response>')
# there is a en-gb voice
response = self.client.get('%s?%s' % (url, urlencode({'locale': 'en-gb'})))
self.assertContains(response, '<Say language="en-gb">')
# there is no nl voice
response = self.client.get('%s?%s' % (url, urlencode({'locale': 'nl-nl'})))
self.assertContains(response, '<Say language="en">')
开发者ID:myfreecomm,项目名称:django-two-factor-auth,代码行数:29,代码来源:tests.py
示例8: get_user_work_page
def get_user_work_page(self):
self.userId = input('Enter User id: ')
self.userDirName = join(self.imgStoreDirName, self.pixiv, self.userId)
if self.userId.isdigit():
if not exists(self.userDirName):
makedirs(self.userDirName)
else:
print('Wrong id.')
raise SystemExit(1)
get_value = {'id': self.userId,
'type': 'all',
'p': 1
}
get_data = urlencode(get_value)
work_url = self.memIllUrl + get_data
print("Load the page 1...")
work_page = self.url_open(work_url)
pattern = compile('class="count-badge">(.*?)</span>')
count_num = search(pattern, work_page)
count_num = int(count_num.group(1)[:-1])
total_page = ceil(count_num / 20)
yield work_page
for i in range(2, total_page + 1):
get_value['p'] = i
get_data = urlencode(get_value)
work_url = self.memIllUrl + get_data
print("Load the page %d..." % i)
work_page = self.url_open(work_url)
yield work_page
开发者ID:akrisrn,项目名称:pixdog,代码行数:29,代码来源:pixiv_module.py
示例9: request_page
def request_page(self, p, action="GET", **kvargs):
conn = http.client.HTTPConnection(self.serv+".pokemon-gl.com")
url = "/api/?"
headers = {"Cookie": "PMDSUSSID={}; locale=en".format(self.PMDSUSSID)}
get = [("p", p)]
#if member_id != None:
# get.append(("member_savedata_id", member_id))
if self.token != None:
get.append(("token", self.token))
get += kvargs.items()
if debug: print(get)
if action == "GET":
url += urlencode(get)
data = None
elif action == "POST":
headers["Content-type"]='application/x-www-form-urlencoded'
data = urlencode(get)
conn.request(action, url, data, headers)
r = conn.getresponse()
data = r.read().decode("utf-8")
tree = json.loads(data)
if 'error' in tree:
error = tree['error']
raise RuntimeError(error['code'], error['mess'], error['details'])
if debug: print(tree)
return tree
开发者ID:Sanqui,项目名称:dwbot,代码行数:26,代码来源:dwlib.py
示例10: _photo_item
def _photo_item(users, user, api_item):
"""Parses a photo item."""
if api_item["type"] == "photo":
title = "новые фотографии"
photos = api_item["photos"]
get_photo_url = lambda photo: _vk_url("feed?" + urlencode({
"section": "photos",
"z": "photo{owner_id}_{photo_id}/feed1_{source_id}_{timestamp}".format(
owner_id=photo["owner_id"], photo_id=photo["pid"],
source_id=api_item["source_id"], timestamp=api_item["date"])}))
elif api_item["type"] == "photo_tag":
title = "новые отметки на фотографиях"
photos = api_item["photo_tags"]
get_photo_url = lambda photo: _vk_url("feed?" + urlencode({
"z": "photo{owner_id}_{photo_id}/feed3_{source_id}_{timestamp}".format(
owner_id=photo["owner_id"], photo_id=photo["pid"],
source_id=api_item["source_id"], timestamp=api_item["date"])}))
else:
raise Error("Logical error.")
item = {
"title": user["name"] + ": " + title,
"text": "",
}
for photo in photos[1:]:
url = get_photo_url(photo)
item.setdefault("url", url)
item["text"] += _block(_link(url, _image(photo["src_big"])))
if photos[0] > len(photos) - 1:
item["text"] += _block("[показаны не все фотографии]")
return item
开发者ID:Fizorg,项目名称:social-rss,代码行数:35,代码来源:vk.py
示例11: method
def method(self, url, method="GET", parameters=None, timeout=None):
log.info('Making {0} request to {1} with parameters {2}'.format(method, url, parameters))
method_url = urljoin(self.url, url)
if method == "GET":
if not parameters:
parameters = dict()
parameters['format'] = self.format
parameters['auth_token'] = self.token
query_string = urlencode(parameters)
request_data = None
else:
query_parameters = dict()
query_parameters['auth_token'] = self.token
query_string = urlencode(query_parameters)
if parameters:
request_data = urlencode(parameters).encode('utf-8')
else:
request_data = None
method_url = method_url + '?' + query_string
log.debug('Method URL: {0}'.format(method_url))
req = self.RequestWithMethod(method_url, http_method=method, data=request_data)
response = self.opener.open(req, None, timeout)
# TODO Check response status (failures, throttling)
return json.loads(response.read().decode('utf-8'))
开发者ID:robjohncox,项目名称:python-simple-hipchat,代码行数:33,代码来源:__init__.py
示例12: auth_user
def auth_user(email, password, client_id, scope, opener):
request_params = {
'redirect_uri': 'https://oauth.vk.com/blank.html',
'response_type': 'token',
'client_id': client_id,
'display': 'mobile',
'scope': ','.join(scope),
'v': API_VERSION
}
base_auth_url = 'https://oauth.vk.com/authorize'
params = list(request_params.items())
params = urlencode(params).encode('utf-8')
response = opener.open(base_auth_url, params)
doc = response.read().decode(encoding='utf-8', errors='replace')
parser = FormParser()
parser.feed(doc)
parser.close()
if (not parser.form_parsed or
parser.url is None or
"pass" not in parser.params or
"email" not in parser.params):
raise RuntimeError("Something wrong")
parser.params["email"] = email
parser.params["pass"] = password
if parser.method == "POST":
params = urlencode(parser.params).encode('utf-8')
response = opener.open(parser.url, params)
else:
raise NotImplementedError("Method '%s'" % parser.method)
doc = response.read().decode(encoding='utf-8', errors='replace')
return doc, response.geturl()
开发者ID:cl0ne,项目名称:vk_api_auth,代码行数:31,代码来源:vk_auth.py
示例13: total_file_size
def total_file_size(slack_token, verbose=False):
"""
Finds the total size of all files on the slack server
:param slack_token:
:param verbose:
:return:
"""
params = {
'token': slack_token,
'count': 500,
}
response = reader(urlopen('https://slack.com/api/files.list?' + urlencode(params)))
size = 0
file_ids = [f['id'] for f in load(response)['files']]
for file_id in file_ids:
params = {
'token': token,
'file': file_id
}
response = reader(urlopen('https://slack.com/api/files.info?' + urlencode(params)))
size += load(response)['file']['size']
mb = size / 1048576
if verbose:
print('{0:.2f} MB total'.format(mb))
mb = size / 1048576
return '{0:.2f} MB'.format(mb)
开发者ID:darkrilin,项目名称:slackbot-gamemakerdevs,代码行数:29,代码来源:purge_files.py
示例14: delete_files
def delete_files(file_ids, slack_token, verbose=False):
"""
Deletes all files with IDs matching the given list
:param file_ids:
:param slack_token:
:param verbose:
"""
size = 0
count = 0
num_files = len(file_ids)
for file_id in file_ids:
count += 1
params = {
'token': slack_token,
'file': file_id
}
response = reader(urlopen('https://slack.com/api/files.info?' + urlencode(params)))
size += load(response)['file']['size']
response = reader(urlopen('https://slack.com/api/files.delete?' + urlencode(params)))
ok = load(response)['ok']
mb = size / 1048576
if verbose:
print("{0} of {1} - {2} {3} ... {4:.2f} MB saved".format(count, num_files, file_id, ok, mb))
开发者ID:darkrilin,项目名称:slackbot-gamemakerdevs,代码行数:27,代码来源:purge_files.py
示例15: request
def request(
cls,
uri,
params={},
client=None,
wrapper=FreesoundObject,
method='GET',
data=False
):
p = params if params else {}
url = '%s?%s' % (uri, urlencode(p)) if params else uri
d = urlencode(data) if data else None
headers = {'Authorization': client.header}
req = Request(url, d, headers)
try:
f = urlopen(req)
except HTTPError as e:
resp = e.read()
if e.code >= 200 and e.code < 300:
return resp
else:
raise FreesoundException(e.code, json.loads(resp))
resp = f.read()
f.close()
result = None
try:
result = json.loads(resp)
except:
raise FreesoundException(0, "Couldn't parse response")
if wrapper:
return wrapper(result, client)
return result
开发者ID:bdejong,项目名称:freesound-python,代码行数:32,代码来源:freesound.py
示例16: test_login_failed_empty
def test_login_failed_empty(self):
post_data = {'action': 'Login',
'email': '',
'password': 'bar'}
body = urlencode(post_data)
self.http_client.fetch(self.get_url('/login/email'),
self.stop,
method='POST',
body=body,
follow_redirects=False)
response = self.wait()
self.assertEqual(json_decode(response.body)['status'], 'failed')
self.assertEqual(json_decode(response.body)['error'], 'Email and password are mandatory')
post_data = {'action': 'Login',
'email': 'foo',
'password': ''}
body = urlencode(post_data)
self.http_client.fetch(self.get_url('/login/email'),
self.stop,
method='POST',
body=body,
follow_redirects=False)
response = self.wait()
self.assertEqual(json_decode(response.body)['status'], 'failed')
self.assertEqual(json_decode(response.body)['error'], 'Email and password are mandatory')
开发者ID:fgaudin,项目名称:libre,代码行数:26,代码来源:auth.py
示例17: api
def api(self, command, args={}):
"""
Main Api Function
- encodes and sends <command> with optional [args] to Poloniex api
- raises 'ValueError' if an api key or secret is missing (and the command is 'private'), or if the <command> is not valid
- returns decoded json api message
"""
if self._coaching: self.apiCoach.wait() # check in with the coach
args['command'] = command # pass the command
global PUBLIC_COMMANDS;global PRIVATE_COMMANDS
if command in PRIVATE_COMMANDS: # private?
try:
if len(self.APIKey) < 2 or len(self.Secret) < 2:
raise ValueError("An APIKey and Secret is needed for private api commands!")
args['nonce'] = self.nonce
post_data = urlencode(args)
sign = hmac.new(self.Secret, post_data.encode('utf-8'), hashlib.sha512).hexdigest()
headers = {'Sign': sign, 'Key': self.APIKey}
ret = requests.post('https://poloniex.com/tradingApi', data=args, headers=headers, timeout=self.timeout)
return json.loads(ret.text)
except Exception as e:raise e
finally:self.nonce+=1 # increment nonce
elif command in PUBLIC_COMMANDS: # public?
try:
ret = requests.post('https://poloniex.com/public?' + urlencode(args), timeout=self.timeout)
return json.loads(ret.text)
except Exception as e:raise e
else:raise ValueError("Invalid Command!")
开发者ID:superposition,项目名称:python-poloniex,代码行数:31,代码来源:__init__.py
示例18: WriteData
def WriteData(self, data):
""" Write observation data to server
:param data: dictionary with observation data
:returns: server answer or None (if error)
"""
par = {}
if data is None or self.DropData(data):
logging.warning(" empty or inappropiate data not written")
return
# add datetime and/or id
data = self.ExtendData(data)
for key, val in data.items():
if self.filt is None or key in self.filt:
par[key] = self.StrVal(val)
if self.mode == 'GET':
#res = urlopen(self.url + '?' + urlencode(par)).read()
print(self.url + '?' + urlencode(par))
try:
res = urlopen(self.url + '?' + urlencode(par))
print(res)
res = res.read()
print(res)
except:
res = None
else:
try:
d = urlencode(par)
req = Request(self.url, d)
print(req)
res = urlopen(req).read()
print(res)
except:
res = None
return res
开发者ID:zsiki,项目名称:ulyxes,代码行数:35,代码来源:httpwriter.py
示例19: asyncServe
def asyncServe(privmsg, query, yearhint):
if yearhint:
params = urlencode({'title': query,
'sort': 'num_votes,desc',
'release_date': "%04i-06-01,%04i-06-01"%(yearhint-1, yearhint+1)})
else:
params = urlencode({'title': query,
'sort': 'num_votes,desc'})
url = "http://akas.imdb.com/search/title?" + params
debug("int_imdb", "url", url)
html = urllib.request.urlopen(url).read(50000).decode("iso-8859-1")
html = html.partition('<table class="results">')[2]
if not html:
debug("int_imdb", "result-table not found.")
return
ttid = re.search(
r'<a href="/title/(tt[^/]+)/">', html).group(1)
title = htmlDecodeEntites(re.search(
r'<a href="/title/tt[^/]+/">([^<]+)</a>', html).group(1))
director = htmlDecodeEntites(re.search(
r'Dir: <a[^>]+>([^<]+)</a>', html).group(1))
#outline = re.search(
# r'<span class="outline">([^>]+)</span>', html)
#if outline: outline = htmlDecodeEntites(outline.group(1))
year = re.search(
r'<span class="year_type">\((\d{4})', html).group(1)
rating = re.search(
r'itemprop="ratingValue">([0-9.]+|-)</span>', html).group(1)
privmsg.reply_imore("%s (%s) %s [%s] http://www.imdb.com/title/%s/"%(
title, year, director, rating, ttid))
开发者ID:pyropeter,项目名称:PyroBot-1G,代码行数:33,代码来源:int_imdb.py
示例20: cleanup
def cleanup(url):
url = _follow(url)
# remove trackers params
try:
urlp = urlparse(url)
# cleanup query param
query = parse_qsl(urlp.query)
# only if query is non empty and we manage to parse fragment as
# key/value
if urlp.query and query:
for annoying in ANNOYING_PARAMS:
query = [(x, y) for x, y in query if not x.startswith(annoying)]
urlp = urlp._replace(
query=urlencode(query),
)
# cleanup fragment param
fragment = parse_qsl(urlp.fragment)
# only if fragments is non empty and we manage to parse fragment as
# key/value
if urlp.fragment and fragment:
for annoying in ANNOYING_PARAMS:
fragment = [(x, y) for x, y in fragment if not x.startswith(annoying)]
urlp = urlp._replace(
fragment=urlencode(fragment),
)
url = urlp.geturl()
except Exception:
app.logger.exception("Problem cleaning url %s", url)
app.logger.info("Final url %s", url)
return url
开发者ID:jcsaaddupuy,项目名称:dereferer,代码行数:32,代码来源:app.py
注:本文中的urllib.parse.urlencode函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论