本文整理汇总了Python中pyhdb.compat.iter_range函数的典型用法代码示例。如果您正苦于以下问题:Python iter_range函数的具体用法?Python iter_range怎么用?Python iter_range使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了iter_range函数的14个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: test_multiple_insert_four_large_lobs_via_writelob_requests
def test_multiple_insert_four_large_lobs_via_writelob_requests(connection, test_table):
"""
Inserting BLOBs larger that MAX_SEGMENT_SIZE will split the upload to the DB into the normal INSERT
statement plus one or more WRITE_LOB requests.
Check that such a large BLOBs are written correctly.
"""
bigblob1 = os.urandom(2 * constants.MAX_SEGMENT_SIZE + 1000)
bigblob2 = os.urandom(2 * constants.MAX_SEGMENT_SIZE + 1000)
bigclob1 = "".join(random.choice(string.ascii_letters) for x in iter_range(2 * constants.MAX_SEGMENT_SIZE))
bigclob2 = "".join(random.choice(string.ascii_letters) for x in iter_range(2 * constants.MAX_SEGMENT_SIZE))
cursor = connection.cursor()
cursor.executemany(
"insert into %s (fblob, name, fclob) values (:1, :2, :3)" % TABLE,
[(bigblob1, "blob1", bigclob1), (bigblob2, "blob2", bigclob2)],
)
connection.commit()
cursor = connection.cursor()
rows = cursor.execute("select fblob, fclob from %s order by name" % TABLE).fetchall()
fblob1, fclob1 = rows[0]
assert fblob1.read() == bigblob1
assert fclob1.read() == bigclob1
fblob2, fclob2 = rows[1]
assert fblob2.read() == bigblob2
assert fclob2.read() == bigclob2
开发者ID:martin-zheng,项目名称:PyHDB,代码行数:26,代码来源:test_lob.py
示例2: _xor
def _xor(a, b):
a = bytearray(a)
b = bytearray(b)
result = bytearray(len(a))
for i in iter_range(len(a)):
result[i] += a[i] ^ b[i]
return bytes(result)
开发者ID:hpi-epic,项目名称:PyHDB,代码行数:7,代码来源:auth.py
示例3: unpack_data
def unpack_data(cls, argument_count, payload):
errors = []
for _ in iter_range(argument_count):
code, position, textlength, level, sqlstate = cls.part_struct.unpack(payload.read(cls.part_struct.size))
errortext = payload.read(textlength).decode('utf-8')
errors.append(DatabaseError(errortext, code))
return tuple(errors),
开发者ID:euekim,项目名称:PyHDB,代码行数:7,代码来源:parts.py
示例4: unpack_data
def unpack_data(cls, argument_count, payload):
options = {}
for _ in iter_range(argument_count):
key, typ = struct.unpack('bb', payload.read(2))
if key not in cls.option_identifier:
key = 'Unknown_%d' % key
else:
key = cls.option_identifier[key]
if typ == 1:
value = struct.unpack('B', payload.read(1))[0]
elif typ == 2:
value = struct.unpack('h', payload.read(2))[0]
elif typ == 3:
value = struct.unpack('i', payload.read(4))[0]
elif typ == 4:
value = struct.unpack('q', payload.read(8))[0]
elif typ == 28:
value = struct.unpack('?', payload.read(1))[0]
elif typ == 29 or typ == 30:
length = struct.unpack('h', payload.read(2))[0]
value = payload.read(length).decode('utf-8')
elif typ == 24:
# TODO: Handle type 24
continue
else:
raise Exception("Unknown option type %s" % typ)
options[key] = value
return options,
开发者ID:weese,项目名称:PyHDB,代码行数:32,代码来源:parts.py
示例5: unpack_data
def unpack_data(cls, argument_count, payload):
ParamMetadata = namedtuple('ParameterMetadataTuple', 'options datatype mode id length fraction')
values = []
for i in iter_range(argument_count):
param = struct.unpack("bbbbIhhI", payload.read(16))
if param[4] == 0xffffffff:
# no parameter name given
param_id = i
else:
# offset of the parameter name set
payload.seek(param[4], 0)
length, = struct.unpack('B', payload.read(1))
param_id = payload.read(length).decode('utf-8')
# replace name offset with param name, if parameter names supplied,
# or parameter position (integer), if names not supplied)
param_metadata = list(param)
param_metadata[4] = param_id
# remove unused fields
del param_metadata[3]
del param_metadata[6]
options, datatype, mode, name, length, fraction = param_metadata
param_metadata = ParamMetadata(options, datatype, mode, name, length, fraction)
values.append(param_metadata)
return tuple(values),
开发者ID:dpdornseifer,项目名称:PyHDB,代码行数:27,代码来源:parts.py
示例6: unpack_from
def unpack_from(cls, payload, expected_segments):
for num_segment in iter_range(expected_segments):
try:
segment_header = ReplySegmentHeader(*cls.header_struct.unpack(payload.read(cls.header_size)))
except struct.error:
raise Exception("No valid segment header")
debug('%s (%d/%d): %s', cls.__name__, num_segment + 1, expected_segments, str(segment_header))
if expected_segments == 1:
# If we just expects one segment than we can take the full payload.
# This also a workaround of an internal bug (Which bug?)
segment_payload_size = -1
else:
segment_payload_size = segment_header.segment_length - cls.header_size
# Determinate segment payload
pl = payload.read(segment_payload_size)
segment_payload = BytesIO(pl)
debug('Read %d bytes payload segment %d', len(pl), num_segment + 1)
parts = tuple(Part.unpack_from(segment_payload, expected_parts=segment_header.num_parts))
segment = cls(segment_header.function_code, parts, header=segment_header)
if segment_header.segment_kind == segment_kinds.REPLY:
yield segment
elif segment_header.segment_kind == segment_kinds.ERROR:
error = segment
if error.parts[0].kind == part_kinds.ROWSAFFECTED:
raise Exception("Rows affected %s" % (error.parts[0].values,))
elif error.parts[0].kind == part_kinds.ERROR:
raise error.parts[0].errors[0]
else:
raise Exception("Invalid reply segment")
开发者ID:Parnswir,项目名称:PyHDB,代码行数:34,代码来源:segments.py
示例7: unpack_from
def unpack_from(cls, payload, expected_parts):
"""Unpack parts from payload"""
for num_part in iter_range(expected_parts):
hdr = payload.read(cls.header_size)
try:
part_header = PartHeader(*cls.header_struct.unpack(hdr))
except struct.error:
raise InterfaceError("No valid part header")
if part_header.payload_size % 8 != 0:
part_payload_size = part_header.payload_size + 8 - (part_header.payload_size % 8)
else:
part_payload_size = part_header.payload_size
pl = payload.read(part_payload_size)
part_payload = io.BytesIO(pl)
try:
_PartClass = PART_MAPPING[part_header.part_kind]
except KeyError:
raise InterfaceError("Unknown part kind %s" % part_header.part_kind)
debug('%s (%d/%d): %s', _PartClass.__name__, num_part+1, expected_parts, str(part_header))
debug('Read %d bytes payload for part %d', part_payload_size, num_part + 1)
init_arguments = _PartClass.unpack_data(part_header.argument_count, part_payload)
debug('Part data: %s', init_arguments)
part = _PartClass(*init_arguments)
part.header = part_header
part.attribute = part_header.part_attributes
part.source = 'server'
if pyhdb.tracing:
part.trace_header = humanhexlify(hdr[:part_header.payload_size])
part.trace_payload = humanhexlify(pl, 30)
yield part
开发者ID:weese,项目名称:PyHDB,代码行数:34,代码来源:parts.py
示例8: unpack_rows
def unpack_rows(self, column_types, connection):
"""Unpack rows for data (from a select statement) from payload and yield a single row at a time.
:param column_types: a tuple of column descriptors
e.g. (<class 'pyhdb.protocol.types.String'>, <class 'pyhdb.protocol.types.ClobType'>)
:param connection: a db connection object
:returns: a generator object
"""
for _ in iter_range(self.num_rows):
yield tuple(typ.from_resultset(self.payload, connection) for typ in column_types)
开发者ID:weese,项目名称:PyHDB,代码行数:9,代码来源:parts.py
示例9: test_insert_string
def test_insert_string(connection, test_table):
"""Insert string into table"""
cursor = connection.cursor()
large_string = ''.join(random.choice(string.ascii_letters) for _ in iter_range(5000))
cursor.execute("insert into %s (name) values (:1)" % TABLE, [large_string])
connection.commit()
cursor = connection.cursor()
row = cursor.execute('select name from %s' % TABLE).fetchone()
assert row[0] == large_string
开发者ID:Parnswir,项目名称:PyHDB,代码行数:9,代码来源:test_string.py
示例10: unpack_data
def unpack_data(cls, argument_count, payload):
errors = []
for _ in iter_range(argument_count):
code, position, textlength, level, sqlstate = cls.part_struct.unpack(payload.read(cls.part_struct.size))
errortext = payload.read(textlength).decode("utf-8")
if code == 301:
# Unique constraint violated
errors.append(IntegrityError(errortext, code))
else:
errors.append(DatabaseError(errortext, code))
return (tuple(errors),)
开发者ID:martin-zheng,项目名称:PyHDB,代码行数:11,代码来源:parts.py
示例11: test_insert_two_large_lobs_via_writelob_requests
def test_insert_two_large_lobs_via_writelob_requests(connection, test_table):
"""
Inserting BLOBs larger that MAX_SEGMENT_SIZE will split the upload to the DB into the normal INSERT
statement plus one or more WRITE_LOB requests.
Check that such a large BLOBs are written correctly.
"""
bigblob = os.urandom(2 * constants.MAX_SEGMENT_SIZE + 1000)
bigclob = "".join(random.choice(string.ascii_letters) for x in iter_range(2 * constants.MAX_SEGMENT_SIZE))
cursor = connection.cursor()
cursor.execute("insert into %s (fblob, name, fclob) values (:1, :2, :3)" % TABLE, [bigblob, "blob1", bigclob])
connection.commit()
cursor = connection.cursor()
row = cursor.execute("select fblob, fclob from %s where name=:1" % TABLE, ["blob1"]).fetchone()
assert row[0].read() == bigblob
assert row[1].read() == bigclob
开发者ID:martin-zheng,项目名称:PyHDB,代码行数:15,代码来源:test_lob.py
示例12: from_resultset
def from_resultset(cls, payload, connection=None):
payload = bytearray(payload.read(16))
payload.reverse()
if payload[0] == 0x70:
return None
sign = payload[0] >> 7
exponent = ((payload[0] & 0x7F) << 7) | ((payload[1] & 0xFE) >> 1)
exponent = exponent - 6176
mantissa = (payload[1] & 0x01) << 112
x = 104
for i in iter_range(2, 16):
mantissa = mantissa | ((payload[i]) << x)
x -= 8
number = pow(-1, sign) * decimal.Decimal(10) ** exponent * mantissa
return number
开发者ID:hpi-epic,项目名称:PyHDB,代码行数:19,代码来源:types.py
示例13: prepare
def prepare(cls, value):
if value is None:
return struct.pack('b', 0)
if isinstance(value, float):
value = decimal.Decimal(value)
sign, digits, exponent = value.as_tuple()
if len(digits) > 34:
exponent += len(digits) - 34
mantissa = int(''.join(map(str, digits[:34])))
exponent += 6176
packed = bytearray(16)
packed[0] = (sign << 7) | (exponent >> 7)
packed[1] = ((exponent & 0x7F) << 1) | (mantissa >> 112)
shift = 104
for i in iter_range(2, 16):
packed[i] = (mantissa >> shift) & 0xFF
shift -= 8
packed.reverse()
return struct.pack('b', cls.type_code) + packed
开发者ID:hpi-epic,项目名称:PyHDB,代码行数:24,代码来源:types.py
示例14: _unpack_rows
def _unpack_rows(self, payload, rows):
for _ in iter_range(rows):
yield tuple(typ.from_resultset(payload, self.connection) for typ in self._column_types)
开发者ID:dpdornseifer,项目名称:PyHDB,代码行数:3,代码来源:cursor.py
注:本文中的pyhdb.compat.iter_range函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论