本文整理汇总了Python中pyamf.get_decoder函数的典型用法代码示例。如果您正苦于以下问题:Python get_decoder函数的具体用法?Python get_decoder怎么用?Python get_decoder使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了get_decoder函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: test_decode
def test_decode(self):
pyamf.register_class(ListModel, "list-model")
decoder = pyamf.get_decoder(pyamf.AMF0)
decoder.stream.write(
"\x10\x00\nlist-model\x00\x07numbers\n\x00\x00"
"\x00\x05\[email protected]\x00\x00\x00\x00\x00\x00\x00\[email protected]\x10\x00\x00\x00"
"\x00\x00\x00\[email protected]\x18\x00\x00\x00\x00\x00\x00\[email protected] \x00\x00"
"\x00\x00\x00\x00\[email protected]$\x00\x00\x00\x00\x00\x00\x00\x00\t"
)
decoder.stream.seek(0)
x = decoder.readElement()
self.assertTrue(isinstance(x, ListModel))
self.assertTrue(hasattr(x, "numbers"))
self.assertEquals(x.numbers, [2, 4, 6, 8, 10])
decoder = pyamf.get_decoder(pyamf.AMF3)
decoder.stream.write("\n\x0b\x15list-model\x0fnumbers\t\x0b\x01\x04\x02\x04" "\x04\x04\x06\x04\x08\x04\n\x01")
decoder.stream.seek(0)
x = decoder.readElement()
self.assertTrue(isinstance(x, ListModel))
self.assertTrue(hasattr(x, "numbers"))
self.assertEquals(x.numbers, [2, 4, 6, 8, 10])
开发者ID:jrolfs,项目名称:google-calendar-amf,代码行数:27,代码来源:test_google.py
示例2: test_get_decoder
def test_get_decoder(self):
self.assertRaises(ValueError, pyamf.get_decoder, 'spam')
decoder = pyamf.get_decoder(pyamf.AMF0, stream='123', strict=True)
self.assertEqual(decoder.stream.getvalue(), '123')
self.assertTrue(decoder.strict)
decoder = pyamf.get_decoder(pyamf.AMF3, stream='456', strict=True)
self.assertEqual(decoder.stream.getvalue(), '456')
self.assertTrue(decoder.strict)
开发者ID:0xmilk,项目名称:appscale,代码行数:10,代码来源:test_basic.py
示例3: __init__
def __init__( self ):
# Prepare the encoder and decoder
self.encoder = pyamf.get_encoder( self.encoding )
self.ostream = self.encoder.stream
self.decoder = pyamf.get_decoder( self.encoding )
self.istream = self.decoder.stream
self.ipos = 0
开发者ID:doublecluepon,项目名称:SwfConduit,代码行数:7,代码来源:protocol.py
示例4: test_amf0
def test_amf0(self):
d = pyamf.get_decoder(pyamf.AMF0)
b = d.stream
b.write(
"\x10\x00\x03Pet\x00\x04_key\x02%s%s\x00\x04type\x02\x00\x03"
"cat\x00\x10weight_in_pounds\[email protected]\x14\x00\x00\x00\x00\x00\x00\x00"
"\x04name\x02\x00\x07Jessica\x00\tbirthdate\x0bB^\xc4\xae\xaa\x00"
"\x00\x00\x00\x00\x00\x12spayed_or_neutered\x01\x00\x00\x00\t"
% (struct.pack(">H", len(self.key)), self.key)
)
b.seek(0)
x = d.readElement()
self.assertTrue(isinstance(x, PetExpando))
self.assertEquals(x.__class__, PetExpando)
self.assertEquals(x.type, self.jessica.type)
self.assertEquals(x.weight_in_pounds, self.jessica.weight_in_pounds)
self.assertEquals(x.birthdate, datetime.date(1986, 10, 2))
self.assertEquals(x.spayed_or_neutered, self.jessica.spayed_or_neutered)
# now check db.Expando internals
self.assertEquals(x.key(), self.jessica.key())
self.assertEquals(x.kind(), self.jessica.kind())
self.assertEquals(x.parent(), self.jessica.parent())
self.assertEquals(x.parent_key(), self.jessica.parent_key())
self.assertTrue(x.is_saved())
开发者ID:jrolfs,项目名称:google-calendar-amf,代码行数:29,代码来源:test_google.py
示例5: test_amf3
def test_amf3(self):
d = pyamf.get_decoder(pyamf.AMF3)
b = d.stream
b.write(
"\n\x0b\x07Pet\tname\x06\x0fJessica\t_key\x06%s%s\x13birthdate"
"\x08\x01B^\xc4\xae\xaa\x00\x00\x00!weight_in_pounds\x04\x05\x07"
"foo\x06\x07bar\ttype\x06\x07cat%%spayed_or_neutered\x02\x01"
% (amf3._encode_int(len(self.key) << 1 | amf3.REFERENCE_BIT), self.key)
)
b.seek(0)
x = d.readElement()
self.assertTrue(isinstance(x, PetExpando))
self.assertEquals(x.__class__, PetExpando)
self.assertEquals(x.type, self.jessica.type)
self.assertEquals(x.weight_in_pounds, self.jessica.weight_in_pounds)
self.assertEquals(x.birthdate, datetime.date(1986, 10, 2))
self.assertEquals(x.spayed_or_neutered, self.jessica.spayed_or_neutered)
# now check db.Expando internals
self.assertEquals(x.key(), self.jessica.key())
self.assertEquals(x.kind(), self.jessica.kind())
self.assertEquals(x.parent(), self.jessica.parent())
self.assertEquals(x.parent_key(), self.jessica.parent_key())
self.assertTrue(x.is_saved())
开发者ID:jrolfs,项目名称:google-calendar-amf,代码行数:28,代码来源:test_google.py
示例6: send
def send(self, message):
self.sends.append(message)
# tell server we started listening
logging.debug('send: request: %s' % message.__repr__())
encoder = pyamf.get_encoder(object_encoding)
encoder.writeElement(message)
message = encoder.stream.getvalue()
encoder.stream.truncate()
logging.debug('send: encoded request: %s' % message.__repr__())
send_log = encoder.stream.tell().__repr__() \
+ encoder.stream.__repr__()
logging.debug('send; ' + send_log)
decoder = pyamf.get_decoder(object_encoding, message)
#if not isinstance(stream, util.BufferedByteStream):
# stream = util.BufferedByteStream(stream)
logging.debug('send: '
+ decoder.stream.tell().__repr__()
+ decoder.stream.__repr__())
data = decoder.readElement()
logging.debug('send: decoded %s' % data.__repr__())
try:
#total_sent = 0
#while total_sent < len(message):
sent = self.sock.send(message)
if sent == 0:
raise RuntimeError, \
"socket connection broken"
# total_sent += sent
except socket.error, e:
raise Exception("Can't connect: %s" % e[1])
开发者ID:ethankennerly,项目名称:hotel-vs-gozilla,代码行数:30,代码来源:go_board_client.py
示例7: listen
def listen(envoy):
response = ''
# http://bytes.com/topic/python/answers/22953-how-catch-socket-timeout
try:
chunk = envoy.recv(1024)
if '' == chunk:
error_message = 'RuntimeError socket connection broken'
logging.error('listen', error_message)
response += chunk
except socket.timeout:
logging.error('listen timeout')
logging.error('listen %s' % response.__repr__())
return 'timeout'
except socket.error:
import sys
error_number, error_string = sys.exc_info()[:2]
error_message = 'socket error %i: "%s"' \
% (error_number, error_string)
logging.error('listen', error_message)
return error_message
decoder = pyamf.get_decoder(object_encoding, response)
#if not isinstance(stream, util.BufferedByteStream):
# stream = util.BufferedByteStream(stream)
logging.debug('listen: response: ' + response.__repr__())
logging.debug('listen: stream: ' + decoder.stream.__repr__())
data = decoder.readElement()
logging.debug('listen decoded %s' % data.__repr__())
return data
开发者ID:ethankennerly,项目名称:hotel-vs-gozilla,代码行数:28,代码来源:go_board_client.py
示例8: decode
def decode(self, buf):
"""
Decode a notification message.
"""
decoder = pyamf.get_decoder(pyamf.AMF0, stream=buf)
self.name = decoder.next()
self.argv = [x for x in decoder]
开发者ID:Arlex,项目名称:rtmpy,代码行数:8,代码来源:message.py
示例9: _amf3_decoder
def _amf3_decoder(self):
decoder = getattr(self, "__amf3_decoder", None)
if not decoder:
decoder = pyamf.get_decoder(pyamf.AMF3, stream=self.stream, timezone_offset=self.timezone_offset)
self.__amf3_decoder = decoder
return decoder
开发者ID:Armedite,项目名称:xbmc-catchuptv-au,代码行数:8,代码来源:amf0.py
示例10: test_pure_decoder
def test_pure_decoder(self):
"""
With `use_ext=False` specified, the extension must NOT be returned.
"""
from pyamf import amf3
decoder = pyamf.get_decoder(pyamf.AMF3, use_ext=False)
self.assertIsInstance(decoder, amf3.Decoder)
开发者ID:nervatura,项目名称:nerva2py,代码行数:9,代码来源:test_basic.py
示例11: test_encode
def test_encode(self):
encoder = pyamf.get_encoder(pyamf.AMF0)
decoder = pyamf.get_decoder(pyamf.AMF0)
decoder.stream = encoder.stream
try:
raise TypeError, "unknown type"
except TypeError, e:
encoder.writeElement(amf0.build_fault(*sys.exc_info()))
开发者ID:wayne-abarquez,项目名称:vizzuality,代码行数:9,代码来源:test_gateway.py
示例12: test_encode_decode_transient
def test_encode_decode_transient(self):
user = self._build_obj()
encoder = pyamf.get_encoder(pyamf.AMF3)
encoder.writeElement(user)
encoded = encoder.stream.getvalue()
decoded = pyamf.get_decoder(pyamf.AMF3, encoded).readElement()
self._test_obj(user, decoded)
开发者ID:kruser,项目名称:zualoo,代码行数:9,代码来源:test_sqlalchemy.py
示例13: decode
def decode(stream, strict=True):
"""
Decodes a SOL stream. L{strict} mode ensures that the sol stream is as spec
compatible as possible.
@return: A C{tuple} containing the C{root_name} and a C{dict} of name,
value pairs.
"""
if not isinstance(stream, util.BufferedByteStream):
stream = util.BufferedByteStream(stream)
# read the version
version = stream.read(2)
if version != HEADER_VERSION:
raise pyamf.DecodeError('Unknown SOL version in header')
# read the length
length = stream.read_ulong()
if strict and stream.remaining() != length:
raise pyamf.DecodeError('Inconsistent stream header length')
# read the signature
signature = stream.read(10)
if signature != HEADER_SIGNATURE:
raise pyamf.DecodeError('Invalid signature')
length = stream.read_ushort()
root_name = stream.read_utf8_string(length)
# read padding
if stream.read(3) != PADDING_BYTE * 3:
raise pyamf.DecodeError('Invalid padding read')
decoder = pyamf.get_decoder(stream.read_uchar())
decoder.stream = stream
values = {}
while True:
if stream.at_eof():
break
name = decoder.readString()
value = decoder.readElement()
# read the padding
t = stream.read(1)
while t != PADDING_BYTE:
# raise pyamf.DecodeError('Missing padding byte')
sys.stderr.write('Bad padding byte: 0x02%x\n' % ord(t))
t = stream.read(1)
values[name] = value
return (root_name, values)
开发者ID:Averroes,项目名称:raft,代码行数:57,代码来源:sol.py
示例14: setUp
def setUp(self):
BaseTestCase.setUp(self)
self.alias = adapter.DataStoreClassAlias(models.PetModel, "foo.bar")
self.jessica = models.PetModel(name="Jessica", type="cat")
self.jessica_expando = models.PetExpando(name="Jessica", type="cat")
self.jessica_expando.foo = "bar"
self.decoder = pyamf.get_decoder(pyamf.AMF3)
开发者ID:Poorvak,项目名称:twitter_clone,代码行数:10,代码来源:test_xdb.py
示例15: test_none
def test_none(self):
pyamf.register_class(ListModel, "list-model")
decoder = pyamf.get_decoder(pyamf.AMF0)
decoder.stream.write("\x10\x00\nlist-model\x00\x07numbers\x05\x00\x00\t")
decoder.stream.seek(0)
x = decoder.readElement()
self.assertEquals(x.numbers, [])
开发者ID:jrolfs,项目名称:google-calendar-amf,代码行数:10,代码来源:test_google.py
示例16: setUp
def setUp(self):
BaseTestCase.setUp(self)
class FloatModel(db.Model):
f = db.FloatProperty()
self.klass = FloatModel
self.f = FloatModel()
self.alias = adapter.DataStoreClassAlias(self.klass, None)
self.decoder = pyamf.get_decoder(pyamf.AMF3)
开发者ID:nervatura,项目名称:nerva2py,代码行数:10,代码来源:test_xdb.py
示例17: test_get_decoder
def test_get_decoder(self):
from pyamf import amf0, amf3
self.assertEquals(pyamf._get_decoder_class(pyamf.AMF0), amf0.Decoder)
self.assertEquals(pyamf._get_decoder_class(pyamf.AMF3), amf3.Decoder)
self.assertRaises(ValueError, pyamf._get_decoder_class, 'spam')
self.assertTrue(isinstance(pyamf.get_decoder(pyamf.AMF0), amf0.Decoder))
self.assertTrue(isinstance(pyamf.get_decoder(pyamf.AMF3), amf3.Decoder))
self.assertRaises(ValueError, pyamf.get_decoder, 'spam')
context = amf0.Context()
decoder = pyamf.get_decoder(pyamf.AMF0, data='123', context=context)
self.assertEquals(decoder.stream.getvalue(), '123')
self.assertEquals(decoder.context, context)
context = amf3.Context()
decoder = pyamf.get_decoder(pyamf.AMF3, data='456', context=context)
self.assertEquals(decoder.stream.getvalue(), '456')
self.assertEquals(decoder.context, context)
开发者ID:fernandoacorreia,项目名称:flex-and-python-test,代码行数:20,代码来源:test_basic.py
示例18: getAMF3Decoder
def getAMF3Decoder(self, amf0_decoder):
decoder = self.extra.get('amf3_decoder', None)
if decoder:
return decoder
decoder = pyamf.get_decoder(pyamf.AMF3, stream=amf0_decoder.stream,
timezone_offset=amf0_decoder.timezone_offset)
self.extra['amf3_decoder'] = decoder
return decoder
开发者ID:Medisan,项目名称:TVWeb,代码行数:11,代码来源:amf0.py
示例19: test_encode_decode_persistent
def test_encode_decode_persistent(self):
user = self._build_obj()
self.session.save(user)
self.session.commit()
self.session.refresh(user)
encoder = pyamf.get_encoder(pyamf.AMF3)
encoder.writeElement(user)
encoded = encoder.stream.getvalue()
decoded = pyamf.get_decoder(pyamf.AMF3, encoded).readElement()
self._test_obj(user, decoded)
开发者ID:kruser,项目名称:zualoo,代码行数:12,代码来源:test_sqlalchemy.py
示例20: test_ext_decoder
def test_ext_decoder(self):
"""
With `use_ext=True` specified, the extension must be returned.
"""
try:
from cpyamf import amf3
except ImportError:
self.skipTest('amf3 extension not available')
decoder = pyamf.get_decoder(pyamf.AMF3, use_ext=True)
self.assertIsInstance(decoder, amf3.Decoder)
开发者ID:nervatura,项目名称:nerva2py,代码行数:12,代码来源:test_basic.py
注:本文中的pyamf.get_decoder函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论