本文整理汇总了Python中multidict.CIMultiDict类的典型用法代码示例。如果您正苦于以下问题:Python CIMultiDict类的具体用法?Python CIMultiDict怎么用?Python CIMultiDict使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了CIMultiDict类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: __init__
class Protocol:
def __init__(self, Parser, **kwargs):
self.url = None
self.headers = CIMultiDict()
self.body = b''
self.headers_complete = False
self.message_complete = False
self.parser = Parser(self, **kwargs)
self.feed_data = self.parser.feed_data
def on_url(self, url):
self.url = url
def on_header(self, name, value):
self.headers.add(name.decode(), value.decode())
def on_headers_complete(self):
self.headers_complete = True
def on_body(self, body):
self.body += body
def on_message_complete(self):
self.message_complete = True
开发者ID:quantmind,项目名称:pulsar,代码行数:25,代码来源:test_parser.py
示例2: HttpTunnel
class HttpTunnel(RequestBase):
first_line = None
data = None
decompress = False
method = 'CONNECT'
def __init__(self, client, req):
self.client = client
self.key = req
self.headers = CIMultiDict(client.DEFAULT_TUNNEL_HEADERS)
def __repr__(self):
return 'Tunnel %s' % self.url
__str__ = __repr__
def encode(self):
self.headers['host'] = self.key.netloc
self.first_line = 'CONNECT http://%s:%s HTTP/1.1' % self.key.address
buffer = [self.first_line.encode('ascii'), b'\r\n']
buffer.extend((('%s: %s\r\n' % (name, value)).encode(CHARSET)
for name, value in self.headers.items()))
buffer.append(b'\r\n')
return b''.join(buffer)
def has_header(self, header_name):
return header_name in self.headers
def get_header(self, header_name, default=None):
return self.headers.get(header_name, default)
def remove_header(self, header_name):
self.headers.pop(header_name, None)
开发者ID:quantmind,项目名称:pulsar,代码行数:32,代码来源:client.py
示例3: from_raw_headers
def from_raw_headers(cls, raw_headers):
headers = CIMultiDict()
decoded_headers = raw_headers.decode().split(utils.EOL)
for line in decoded_headers[1:]:
k, v = line.split(': ', 1)
if k in headers:
o = headers.setdefault(k, [])
if not isinstance(o, list):
o = [o]
o.append(v)
headers[k] = o
else:
headers[k] = v
m = FIRST_LINE_PATTERN['response']['regex'].match(decoded_headers[0])
if m:
d = m.groupdict()
return Response(status_code=int(d['status_code']),
status_message=d['status_message'],
headers=headers,
first_line=decoded_headers[0])
else:
m = FIRST_LINE_PATTERN['request']['regex'].match(decoded_headers[0])
if m:
d = m.groupdict()
cseq, _ = headers['CSeq'].split()
return Request(method=d['method'],
headers=headers,
cseq=int(cseq),
first_line=decoded_headers[0])
else:
LOG.debug(decoded_headers)
raise ValueError('Not a SIP message')
开发者ID:sangoma,项目名称:aiosip,代码行数:33,代码来源:message.py
示例4: __init__
def __init__(self, proxies=None, headers=None, verify=True,
cookies=None, store_cookies=True, cert=None,
max_redirects=10, decompress=True, version=None,
websocket_handler=None, parser=None, trust_env=True,
loop=None, client_version=None, timeout=None, stream=False,
pool_size=10, frame_parser=None, logger=None,
close_connections=False, keep_alive=None):
super().__init__(
partial(Connection, HttpResponse),
loop=loop,
keep_alive=keep_alive or cfg_value('http_keep_alive')
)
self.logger = logger or LOGGER
self.client_version = client_version or self.client_version
self.connection_pools = {}
self.pool_size = pool_size
self.trust_env = trust_env
self.timeout = timeout
self.store_cookies = store_cookies
self.max_redirects = max_redirects
self.cookies = cookiejar_from_dict(cookies)
self.decompress = decompress
self.version = version or self.version
# SSL Verification default
self.verify = verify
# SSL client certificate default, if String, path to ssl client
# cert file (.pem). If Tuple, ('cert', 'key') pair
self.cert = cert
self.stream = stream
self.close_connections = close_connections
dheaders = CIMultiDict(self.DEFAULT_HTTP_HEADERS)
dheaders['user-agent'] = self.client_version
# override headers
if headers:
for name, value in mapping_iterator(headers):
if value is None:
dheaders.pop(name, None)
else:
dheaders[name] = value
self.headers = dheaders
self.proxies = dict(proxies or ())
if not self.proxies and self.trust_env:
self.proxies = get_environ_proxies()
if 'no' not in self.proxies:
self.proxies['no'] = ','.join(self.no_proxy)
self.websocket_handler = websocket_handler
self.http_parser = parser or http.HttpResponseParser
self.frame_parser = frame_parser or websocket.frame_parser
# Add hooks
self.event('on_headers').bind(handle_cookies)
self.event('pre_request').bind(WebSocket())
self.event('post_request').bind(Expect())
self.event('post_request').bind(Redirect())
self._decompressors = dict(
gzip=GzipDecompress(),
deflate=DeflateDecompress()
)
开发者ID:quantmind,项目名称:pulsar,代码行数:57,代码来源:client.py
示例5: get_headers
def get_headers(self, request, headers):
# Returns a :class:`Header` obtained from combining
# :attr:`headers` with *headers*. Can handle websocket requests.
# TODO: this is a buf in CIMultiDict
# d = self.headers.copy()
d = CIMultiDict(self.headers.items())
if headers:
d.update(headers)
return d
开发者ID:quantmind,项目名称:pulsar,代码行数:9,代码来源:client.py
示例6: parse_headers
def parse_headers(header_data: bytes, value_encoding: str = 'ascii') -> CIMultiDict:
assert check_argument_types()
headers = CIMultiDict()
for line in header_data.rstrip().split(b'\r\n'):
key, value = line.split(b':', 1)
key = key.strip().decode('ascii')
value = value.strip().decode(value_encoding)
headers.add(key, value)
return headers
开发者ID:asphalt-framework,项目名称:asphalt-web,代码行数:10,代码来源:utils.py
示例7: start
async def start(self, connection, read_until_eof=False):
# vk.com return url like this: http://REDIRECT_URI#access_token=...
# but aiohttp by default removes all parameters after '#'
await super().start(connection, read_until_eof)
headers = CIMultiDict(self.headers)
location = headers.get(hdrs.LOCATION, None)
if location:
headers[hdrs.LOCATION] = location.replace('#', '?')
self.headers = CIMultiDictProxy(headers)
self.raw_headers = tuple(headers.items())
return self
开发者ID:Fahreeve,项目名称:aiovk,代码行数:11,代码来源:drivers.py
示例8: test_multiple_forwarded_headers
def test_multiple_forwarded_headers():
headers = CIMultiDict()
headers.add('Forwarded', 'By=identifier1;for=identifier2, BY=identifier3')
headers.add('Forwarded', 'By=identifier4;fOr=identifier5')
req = make_mocked_request('GET', '/', headers=headers)
assert len(req.forwarded) == 3
assert req.forwarded[0]['by'] == 'identifier1'
assert req.forwarded[0]['for'] == 'identifier2'
assert req.forwarded[1]['by'] == 'identifier3'
assert req.forwarded[2]['by'] == 'identifier4'
assert req.forwarded[2]['for'] == 'identifier5'
开发者ID:youpengly,项目名称:aiohttp,代码行数:11,代码来源:test_web_request.py
示例9: test_multiple_forwarded_headers_injection
def test_multiple_forwarded_headers_injection():
headers = CIMultiDict()
# This could be sent by an attacker, hoping to "shadow" the second header.
headers.add('Forwarded', 'for=_injected;by="')
# This is added by our trusted reverse proxy.
headers.add('Forwarded', 'for=_real;by=_actual_proxy')
req = make_mocked_request('GET', '/', headers=headers)
assert len(req.forwarded) == 2
assert 'by' not in req.forwarded[0]
assert req.forwarded[1]['for'] == '_real'
assert req.forwarded[1]['by'] == '_actual_proxy'
开发者ID:youpengly,项目名称:aiohttp,代码行数:11,代码来源:test_web_request.py
示例10: test_multiple_forwarded_headers_bad_syntax
def test_multiple_forwarded_headers_bad_syntax():
headers = CIMultiDict()
headers.add('Forwarded', 'for=_1;by=_2')
headers.add('Forwarded', 'invalid value')
headers.add('Forwarded', '')
headers.add('Forwarded', 'for=_3;by=_4')
req = make_mocked_request('GET', '/', headers=headers)
assert len(req.forwarded) == 4
assert req.forwarded[0]['for'] == '_1'
assert 'for' not in req.forwarded[1]
assert 'for' not in req.forwarded[2]
assert req.forwarded[3]['by'] == '_4'
开发者ID:youpengly,项目名称:aiohttp,代码行数:12,代码来源:test_web_request.py
示例11: __init__
def __init__(self,
value: Any,
headers: Optional[
Union[
_CIMultiDict,
Dict[str, str],
Iterable[Tuple[str, str]]
]
] = None,
content_type: Optional[str]=sentinel,
filename: Optional[str]=None,
encoding: Optional[str]=None,
**kwargs: Any) -> None:
self._encoding = encoding
self._filename = filename
self._headers = CIMultiDict() # type: _CIMultiDict
self._value = value
if content_type is not sentinel and content_type is not None:
self._headers[hdrs.CONTENT_TYPE] = content_type
elif self._filename is not None:
content_type = mimetypes.guess_type(self._filename)[0]
if content_type is None:
content_type = self._default_content_type
self._headers[hdrs.CONTENT_TYPE] = content_type
else:
self._headers[hdrs.CONTENT_TYPE] = self._default_content_type
self._headers.update(headers or {})
开发者ID:KeepSafe,项目名称:aiohttp,代码行数:27,代码来源:payload.py
示例12: __init__
def __init__(self,
headers=None,
payload=None,
from_details=None,
to_details=None,
contact_details=None,
):
if headers:
self.headers = headers
else:
self.headers = CIMultiDict()
if from_details:
self._from_details = from_details
elif 'From' not in self.headers:
raise ValueError('From header or from_details is required')
if to_details:
self._to_details = to_details
elif 'To' not in self.headers:
raise ValueError('To header or to_details is required')
if contact_details:
self._contact_details = contact_details
self._payload = payload
self._raw_payload = None
if 'Via' not in self.headers:
self.headers['Via'] = 'SIP/2.0/%(protocol)s ' + \
utils.format_host_and_port(self.contact_details['uri']['host'],
self.contact_details['uri']['port']) + \
';branch=%s' % utils.gen_branch(10)
开发者ID:sangoma,项目名称:aiosip,代码行数:34,代码来源:message.py
示例13: _prepare_headers
def _prepare_headers(self, headers):
""" Add default headers and transform it to CIMultiDict
"""
# Convert headers to MultiDict
result = CIMultiDict(self._default_headers)
if headers:
if not isinstance(headers, (MultiDictProxy, MultiDict)):
headers = CIMultiDict(headers)
added_names = set()
for key, value in headers.items():
if key in added_names:
result.add(key, value)
else:
result[key] = value
added_names.add(key)
return result
开发者ID:cynecx,项目名称:aiohttp,代码行数:16,代码来源:client.py
示例14: test_CacheControl
def test_CacheControl(self):
headers = CIMultiDict()
c = CacheControl()
self.assertFalse(c.private)
self.assertFalse(c.maxage)
c(headers)
self.assertEqual(', '.join(headers.getall('cache-control')),
'no-cache')
c = CacheControl(maxage=3600)
c(headers)
self.assertEqual(', '.join(headers.getall('cache-control')),
'max-age=3600, public')
c = CacheControl(maxage=3600, private=True)
c(headers)
self.assertEqual(', '.join(headers.getall('cache-control')),
'max-age=3600, private')
c = CacheControl(maxage=3600, must_revalidate=True)
c(headers)
self.assertEqual(', '.join(headers.getall('cache-control')),
'max-age=3600, public, must-revalidate')
c = CacheControl(maxage=3600, proxy_revalidate=True)
c(headers)
self.assertEqual(', '.join(headers.getall('cache-control')),
'max-age=3600, public, proxy-revalidate')
c = CacheControl(maxage=3600, proxy_revalidate=True,
nostore=True)
c(headers)
self.assertEqual(', '.join(headers.getall('cache-control')),
'no-store, no-cache, must-revalidate, max-age=0')
开发者ID:quantmind,项目名称:pulsar,代码行数:29,代码来源:test_tools.py
示例15: __init__
def __init__(self, transport, version, close, loop=None):
super().__init__(transport, loop)
self.version = version
self.closing = close
self.keepalive = None
self.length = None
self.headers = CIMultiDict()
self.headers_sent = False
开发者ID:singulared,项目名称:aiohttp,代码行数:9,代码来源:http_message.py
示例16: __init__
def __init__(self, *, status=200, reason=None, headers=None):
self._body = None
self._keep_alive = None
self._chunked = False
self._compression = False
self._compression_force = False
self._cookies = SimpleCookie()
self._req = None
self._payload_writer = None
self._eof_sent = False
if headers is not None:
self._headers = CIMultiDict(headers)
else:
self._headers = CIMultiDict()
self.set_status(status, reason)
开发者ID:singulared,项目名称:aiohttp,代码行数:18,代码来源:web_response.py
示例17: update_headers
def update_headers(self, headers):
"""Update request headers."""
self.headers = CIMultiDict()
if headers:
if isinstance(headers, (dict, MultiDictProxy, MultiDict)):
headers = headers.items()
for key, value in headers:
self.headers.add(key, value)
开发者ID:skytian,项目名称:apue_pro,代码行数:9,代码来源:client_reqrep.py
示例18: __init__
def __init__(self, status_code=200, content=None, response_headers=None,
content_type=None, encoding=None, can_store_cookies=True):
self.status_code = status_code
self.encoding = encoding
self.headers = CIMultiDict(response_headers or ())
self.content = content
self._cookies = None
self._can_store_cookies = can_store_cookies
if content_type is not None:
self.content_type = content_type
开发者ID:quantmind,项目名称:pulsar,代码行数:10,代码来源:wsgiresponse.py
示例19: __init__
def __init__(self, subtype='mixed', boundary=None):
boundary = boundary if boundary is not None else uuid.uuid4().hex
try:
boundary.encode('us-ascii')
except UnicodeEncodeError:
raise ValueError('boundary should contains ASCII only chars')
self.headers = CIMultiDict()
self.headers[CONTENT_TYPE] = 'multipart/{}; boundary="{}"'.format(
subtype, boundary
)
self.parts = []
开发者ID:mind1master,项目名称:aiohttp,代码行数:11,代码来源:multipart.py
示例20: from_raw_message
def from_raw_message(cls, raw_message):
lines = raw_message.split(utils.EOL)
first_line = lines.pop(0)
headers = CIMultiDict()
payload = ''
reading_headers = True
for line in lines:
if reading_headers:
if ': ' in line:
k, v = line.split(': ', 1)
if k in headers:
o = headers.setdefault(k, [])
if not isinstance(o, list):
o = [o]
o.append(v)
headers[k] = o
else:
headers[k] = v
else: # Finish to parse headers
reading_headers = False
else: # @todo: use content length to read payload
payload += line # reading payload
if payload == '':
payload = None
m = FIRST_LINE_PATTERN['response']['regex'].match(first_line)
if m:
d = m.groupdict()
return Response(status_code=int(d['status_code']),
status_message=d['status_message'],
headers=headers,
payload=payload)
else:
m = FIRST_LINE_PATTERN['request']['regex'].match(first_line)
if m:
d = m.groupdict()
return Request(method=d['method'],
headers=headers,
payload=payload)
else:
raise ValueError('Not a SIP message')
开发者ID:Eyepea,项目名称:aiosip,代码行数:41,代码来源:message.py
注:本文中的multidict.CIMultiDict类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论