本文整理汇总了Python中pyamf.remoting.encode函数的典型用法代码示例。如果您正苦于以下问题:Python encode函数的具体用法?Python encode怎么用?Python encode使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了encode函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: test_basic
def test_basic(self):
"""
"""
msg = remoting.Envelope(pyamf.AMF0, pyamf.ClientTypes.Flash6)
self.assertEquals(remoting.encode(msg).getvalue(), "\x00" * 6)
msg = remoting.Envelope(pyamf.AMF3, pyamf.ClientTypes.FlashCom)
self.assertEquals(remoting.encode(msg).getvalue(), "\x03\x01" + "\x00" * 4)
开发者ID:kruser,项目名称:zualoo,代码行数:8,代码来源:test_remoting.py
示例2: test_basic
def test_basic(self):
"""
"""
msg = remoting.Envelope(pyamf.AMF0)
self.assertEquals(remoting.encode(msg).getvalue(), '\x00' * 6)
msg = remoting.Envelope(pyamf.AMF3)
self.assertEquals(remoting.encode(msg).getvalue(),
'\x00\x03' + '\x00' * 4)
开发者ID:cardmagic,项目名称:PyAMF,代码行数:9,代码来源:test_remoting.py
示例3: test_amf_parse
def test_amf_parse(self):
mq = MethodQueryCanonicalizer('POST', 'application/x-amf', 0, BytesIO())
req = Request(target='t', body="")
ev_1 = Envelope(AMF3)
ev_1['/0'] = req
req = Request(target='t', body="alt_content")
ev_2 = Envelope(AMF3)
ev_2['/0'] = req
assert mq.amf_parse(encode(ev_1).getvalue(), None) != \
mq.amf_parse(encode(ev_2).getvalue(), None)
开发者ID:ikreymer,项目名称:pywb,代码行数:13,代码来源:test_inputreq.py
示例4: write_envelope
def write_envelope(version, filename):
envelope = Envelope(amfVersion=version)
message = Response(records)
envelope.__setitem__('message', message);
stream = remoting.encode(envelope)
file = open(filename, 'w+')
file.write(stream.getvalue())
开发者ID:olivar16,项目名称:WinEng_DB,代码行数:7,代码来源:amf-grid.py
示例5: get_kanal5
def get_kanal5(video_player):
player_id = 811317479001
publisher_id = 22710239001
const = '9f79dd85c3703b8674de883265d8c9e606360c2e'
env = remoting.Envelope(amfVersion=3)
env.bodies.append(
(
"/1",
remoting.Request(
target="com.brightcove.player.runtime.PlayerMediaFacade.findMediaById",
body=[const, player_id, video_player, publisher_id],
envelope=env
)
)
)
env = str(remoting.encode(env).read())
conn = httplib.HTTPConnection("c.brightcove.com")
conn.request("POST", "/services/messagebroker/amf?playerKey=AQ~~,AAAABUmivxk~,SnCsFJuhbr0vfwrPJJSL03znlhz-e9bk", env, {'content-type': 'application/x-amf'})
response = conn.getresponse().read()
rtmp = ''
for rendition in remoting.decode(response).bodies[0][1].body['renditions']:
rtmp += '"%sx%s:%s";' % (rendition['frameWidth'], rendition['frameHeight'], rendition['defaultURL'])
return rtmp
开发者ID:cederlys,项目名称:pirateplay,代码行数:25,代码来源:kanal5.py
示例6: test_response
def test_response(self):
"""
Test encoding of request body.
"""
msg = remoting.Envelope(pyamf.AMF0)
msg['/1'] = remoting.Response(body=[1, 2, 3])
self.assertEqual(len(msg), 1)
x = msg['/1']
self.assertTrue(isinstance(x, remoting.Response))
self.assertEqual(x.envelope, msg)
self.assertEqual(x.body, [1, 2, 3])
self.assertEqual(x.status, 0)
self.assertEqual(x.headers, msg.headers)
self.assertEqual(
remoting.encode(msg).getvalue(),
'\x00\x00\x00\x00'
'\x00\x01\x00\x0b/1/onResult\x00\x04null\x00\x00\x00\x00\n\x00\x00'
'\x00\x03\x00?\xf0\x00\x00\x00\x00\x00\x00\[email protected]\x00\x00\x00\x00'
'\x00\x00\x00\[email protected]\x08\x00\x00\x00\x00\x00\x00'
)
开发者ID:nervatura,项目名称:nerva2py,代码行数:25,代码来源:test_remoting.py
示例7: __GetBrightCoveData
def __GetBrightCoveData(self):
""" Retrieves the Url's from a brightcove stream
Arguments:
playerKey : string - Key identifying the current request
contentId : int - ID of the content to retrieve
url : string - Url of the page that calls the video SWF
seed : string - Constant which depends on the website
Keyword Arguments:
experienceId : id - <unknown parameter>
Returns a dictionary with the data
"""
# Seed = 61773bc7479ab4e69a5214f17fd4afd21fe1987a
envelope = self.__BuildBrightCoveAmfRequest(self.playerKey, self.contentId, self.url, self.experienceId, self.seed, self.contentRefId)
if self.proxy:
connection = httplib.HTTPConnection(self.proxy.Proxy, self.proxy.Port)
else:
connection = httplib.HTTPConnection("c.brightcove.com")
print envelope
jj = remoting.encode(envelope).read()
connection.request("POST", "http://c.brightcove.com/services/messagebroker/amf?playerKey=" + str(self.playerKey), str(remoting.encode(envelope).read()), {'content-type': 'application/x-amf'})
response = connection.getresponse().read()
#print response
response = remoting.decode(response).bodies[0][1].body
print response
if self.logger:
self.logger.Trace(response)
self.full_response = response
return response['programmedContent']['videoPlayer']['mediaDTO']
开发者ID:boggob,项目名称:bogs-xbmc-addons,代码行数:35,代码来源:brightcovehelper.py
示例8: test_process_request
def test_process_request(self):
def echo(data):
return data
self.gw.addService(echo)
env = remoting.Envelope(pyamf.AMF3)
request = remoting.Request('echo', body=['hello'])
env['/1'] = request
d = client.getPage("http://127.0.0.1:%d/" % (self.port,),
method="POST", postdata=remoting.encode(env).getvalue())
def cb(result):
response = remoting.decode(result)
self.assertEquals(response.amfVersion, pyamf.AMF3)
self.assertTrue('/1' in response)
body_response = response['/1']
self.assertEquals(body_response.status, remoting.STATUS_OK)
self.assertEquals(body_response.body, 'hello')
return d.addCallback(cb)
开发者ID:cardmagic,项目名称:PyAMF,代码行数:25,代码来源:test_twisted.py
示例9: get_clip_info
def get_clip_info(const, playerID, videoPlayer, publisherID):
conn = httplib.HTTPConnection("c.brightcove.com")
envelope = build_amf_request(const, playerID, videoPlayer, publisherID)
conn.request("POST", "/services/messagebroker/amf?playerKey=" + playerKey, str(remoting.encode(envelope).read()), {'content-type': 'application/x-amf'})
response = conn.getresponse().read()
response = remoting.decode(response).bodies[0][1].body
return response
开发者ID:mikpin,项目名称:plugin.video.dmax-ita,代码行数:7,代码来源:default.py
示例10: test_deferred_service
def test_deferred_service(self):
def echo(data):
x = defer.Deferred()
reactor.callLater(0, x.callback, data)
return x
self.gw.addService(echo)
env = remoting.Envelope(pyamf.AMF0, pyamf.ClientTypes.Flash9)
request = remoting.Request('echo', body=['hello'])
env['/1'] = request
d = client.getPage("http://127.0.0.1:%d/" % (self.port,),
method="POST", postdata=remoting.encode(env).getvalue())
def cb(result):
response = remoting.decode(result)
self.assertEquals(response.amfVersion, pyamf.AMF0)
self.assertEquals(response.clientType, pyamf.ClientTypes.Flash9)
self.assertTrue('/1' in response)
body_response = response['/1']
self.assertEquals(body_response.status, remoting.STATUS_OK)
self.assertEquals(body_response.body, 'hello')
return d.addCallback(cb)
开发者ID:weimingtom,项目名称:o-healer-projects,代码行数:29,代码来源:test_twisted.py
示例11: test_timezone
def test_timezone(self):
import datetime
self.executed = False
td = datetime.timedelta(hours=-5)
now = datetime.datetime.utcnow()
def echo(d):
self.assertEquals(d, now + td)
self.executed = True
return d
self.gw.addService(echo)
self.gw.timezone_offset = -18000
msg = remoting.Envelope(amfVersion=pyamf.AMF0)
msg['/1'] = remoting.Request(target='echo', body=[now])
stream = remoting.encode(msg)
self.environ['wsgi.input'] = stream
self.gw.post()
envelope = remoting.decode(self.response.out.getvalue())
message = envelope['/1']
self.assertEquals(message.body, now)
self.assertTrue(self.executed)
开发者ID:cardmagic,项目名称:PyAMF,代码行数:29,代码来源:test_google.py
示例12: get_clip_info
def get_clip_info(clip_name):
conn = httplib.HTTPConnection("www.tele5.de")
envelope = build_amf_request(clip_name)
conn.request("POST", "/gateway/gateway.php", str(remoting.encode(envelope).read()), {'content-type': 'application/x-amf'})
response = conn.getresponse().read()
response = remoting.decode(response).bodies[0][1].body
return response
开发者ID:mmllnr,项目名称:plugin.video.tele5,代码行数:7,代码来源:default.py
示例13: test_process_request
def test_process_request(self):
def echo(data):
return data
self.gw.addService(echo)
env = remoting.Envelope(pyamf.AMF0, pyamf.ClientTypes.Flash9)
request = remoting.Request("echo", body=["hello"])
env["/1"] = request
d = client.getPage(
"http://127.0.0.1:%d/" % (self.port,), method="POST", postdata=remoting.encode(env).getvalue()
)
def cb(result):
response = remoting.decode(result)
self.assertEquals(response.amfVersion, pyamf.AMF0)
self.assertEquals(response.clientType, pyamf.ClientTypes.Flash9)
self.assertTrue("/1" in response)
body_response = response["/1"]
self.assertEquals(body_response.status, remoting.STATUS_OK)
self.assertEquals(body_response.body, "hello")
return d.addCallback(cb)
开发者ID:wogooo,项目名称:TowerSaint,代码行数:27,代码来源:test_twisted.py
示例14: test_timezone
def test_timezone(self):
import datetime
self.executed = False
td = datetime.timedelta(hours=-5)
now = datetime.datetime.utcnow()
def echo(d):
self.assertEquals(d, now + td)
self.executed = True
return d
self.gw.addService(echo)
self.gw.timezone_offset = -18000
msg = remoting.Envelope(amfVersion=pyamf.AMF0, clientType=0)
msg["/1"] = remoting.Request(target="echo", body=[now])
stream = remoting.encode(msg)
d = client.getPage("http://127.0.0.1:%d/" % (self.port,), method="POST", postdata=stream.getvalue())
def cb(response):
envelope = remoting.decode("".join(response))
message = envelope["/1"]
self.assertEquals(message.status, remoting.STATUS_OK)
self.assertEquals(message.body, now)
return d.addCallback(cb)
开发者ID:wogooo,项目名称:TowerSaint,代码行数:32,代码来源:test_twisted.py
示例15: test_expose_request
def test_expose_request(self):
self.gw.expose_request = True
self.executed = False
env = remoting.Envelope(pyamf.AMF0, pyamf.ClientTypes.Flash9)
request = remoting.Request("echo", body=["hello"])
env["/1"] = request
def echo(http_request, data):
self.assertTrue(isinstance(http_request, http.Request))
self.assertTrue(hasattr(http_request, "amf_request"))
amf_request = http_request.amf_request
self.assertEquals(request.target, "echo")
self.assertEquals(request.body, ["hello"])
self.executed = True
return data
self.gw.addService(echo)
d = client.getPage(
"http://127.0.0.1:%d/" % (self.port,), method="POST", postdata=remoting.encode(env).getvalue()
)
return d.addCallback(lambda x: self.assertTrue(self.executed))
开发者ID:wogooo,项目名称:TowerSaint,代码行数:27,代码来源:test_twisted.py
示例16: execute
def execute(self):
if self.logger:
self.logger.debug('Sending POST request to %s', self.url.geturl())
self.logger.debug('User-Agent: %s', self.user_agent)
for key, value in self.http_headers.iteritems():
self.logger.debug('%s: %s', key, value)
# Make sure these requests won't get added to another batch
requests = self.requests
self.requests = []
port = self.url.port or 80
body = remoting.encode(self._createAMFRequest(requests),
strict=self.strict).getvalue()
factory = HTTPClientFactory(self.url.geturl(), 'POST', body,
self.http_headers, self.user_agent)
factory.deferred.addCallbacks(self._handleHTTPResponse,
self._handleHTTPError,
[factory],
errbackArgs=[factory])
factory.deferred.addCallback(remoting.decode, strict=self.strict)
factory.deferred.addCallbacks(self._handleAMFResponse,
self._handleAMFError,
[requests],
errbackArgs=[requests])
reactor.connectTCP(self.url.hostname, port, factory)
开发者ID:hydralabs,项目名称:plasma,代码行数:27,代码来源:client.py
示例17: __GetBrightCoveData
def __GetBrightCoveData(self):
""" Retrieves the Url's from a brightcove stream
Arguments:
playerKey : string - Key identifying the current request
contentId : int - ID of the content to retrieve
url : string - Url of the page that calls the video SWF
seed : string - Constant which depends on the website
Keyword Arguments:
experienceId : id - <unknown parameter>
Returns a dictionary with the data
"""
# Seed = 61773bc7479ab4e69a5214f17fd4afd21fe1987a
envelope = self.__BuildBrightCoveAmfRequest(
self.playerKey, self.contentId, self.url, self.experienceId, self.seed
)
connection = httplib.HTTPConnection("c.brightcove.com")
connection.request(
"POST",
"/services/messagebroker/amf?playerKey=" + self.playerKey,
str(remoting.encode(envelope).read()),
{"content-type": "application/x-amf"},
)
response = connection.getresponse().read()
response = remoting.decode(response).bodies[0][1].body
# self.logger.debug(response)
return response["programmedContent"]["videoPlayer"]["mediaDTO"]
开发者ID:bethanie,项目名称:bartsidee-boxee,代码行数:33,代码来源:veronica.py
示例18: get_episode_info
def get_episode_info(video_player_key, video_content_id, video_url, video_player_id):
envelope = build_amf_request(video_player_key, video_content_id, video_url, video_player_id)
connection_url = "http://c.brightcove.com/services/messagebroker/amf?playerKey=" + video_player_key
values = bytes(remoting.encode(envelope).read())
header = {'Content-Type' : 'application/x-amf'}
response = remoting.decode(_connection.getAMF(connection_url, values, header)).bodies[0][1].body
return response
开发者ID:slices81,项目名称:xbmcplus-xbmc-plugins-beta,代码行数:7,代码来源:amc.py
示例19: test_encoding_error
def test_encoding_error(self):
encode = _twisted.remoting.encode
def force_error(amf_request, context=None):
raise pyamf.EncodeError
def echo(request, data):
return data
self.gw.addService(echo)
env = remoting.Envelope(pyamf.AMF0, pyamf.ClientTypes.Flash9)
request = remoting.Request('echo', body=['hello'])
env['/1'] = request
d = client.getPage("http://127.0.0.1:%d/" % (self.port,),
method="POST", postdata=remoting.encode(env).getvalue())
_twisted.remoting.encode = force_error
def switch(x):
_twisted.remoting.encode = encode
d = self.assertFailure(d, error.Error)
def check(exc):
self.assertEquals(int(exc.args[0]), http.INTERNAL_SERVER_ERROR)
self.assertTrue(exc.args[1].startswith('500 Internal Server Error'))
d.addCallback(check)
return d.addBoth(switch)
开发者ID:weimingtom,项目名称:o-healer-projects,代码行数:31,代码来源:test_twisted.py
示例20: test_expose_request
def test_expose_request(self):
self.gw.expose_request = True
self.executed = False
env = remoting.Envelope(pyamf.AMF3)
request = remoting.Request('echo', body=['hello'])
env['/1'] = request
request = remoting.encode(env)
env = {
'REQUEST_METHOD': 'POST',
'CONTENT_LENGTH': str(len(request)),
'wsgi.input': request
}
def echo(http_request, data):
self.assertTrue('pyamf.request' in http_request)
request = http_request['pyamf.request']
self.assertTrue(isinstance(request, remoting.Request))
self.assertEquals(request.target, 'echo')
self.assertEquals(request.body, ['hello'])
self.executed = True
return data
self.gw.addService(echo)
response = self.gw(env, lambda *args: None)
self.assertTrue(self.executed)
开发者ID:cardmagic,项目名称:PyAMF,代码行数:33,代码来源:test_wsgi.py
注:本文中的pyamf.remoting.encode函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论