本文整理汇总了Python中tests.integration.get_server_versions函数的典型用法代码示例。如果您正苦于以下问题:Python get_server_versions函数的具体用法?Python get_server_versions怎么用?Python get_server_versions使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了get_server_versions函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: test_collection_indexes
def test_collection_indexes(self):
if get_server_versions()[0] < (2, 1, 0):
raise unittest.SkipTest("Secondary index on collections were introduced in Cassandra 2.1")
self.session.execute("CREATE TABLE %s.%s (a int PRIMARY KEY, b map<text, text>)"
% (self.ksname, self.cfname))
self.session.execute("CREATE INDEX index1 ON %s.%s (keys(b))"
% (self.ksname, self.cfname))
tablemeta = self.get_table_metadata()
self.assertIn('(keys(b))', tablemeta.export_as_string())
self.session.execute("DROP INDEX %s.index1" % (self.ksname,))
self.session.execute("CREATE INDEX index2 ON %s.%s (b)"
% (self.ksname, self.cfname))
tablemeta = self.get_table_metadata()
self.assertIn(' (b)', tablemeta.export_as_string())
# test full indexes on frozen collections, if available
if get_server_versions()[0] >= (2, 1, 3):
self.session.execute("DROP TABLE %s.%s" % (self.ksname, self.cfname))
self.session.execute("CREATE TABLE %s.%s (a int PRIMARY KEY, b frozen<map<text, text>>)"
% (self.ksname, self.cfname))
self.session.execute("CREATE INDEX index3 ON %s.%s (full(b))"
% (self.ksname, self.cfname))
tablemeta = self.get_table_metadata()
self.assertIn('(full(b))', tablemeta.export_as_string())
开发者ID:animeshinvinci,项目名称:python-driver,代码行数:29,代码来源:test_metadata.py
示例2: test_refresh_schema_type
def test_refresh_schema_type(self):
if get_server_versions()[0] < (2, 1, 0):
raise unittest.SkipTest('UDTs were introduced in Cassandra 2.1')
if PROTOCOL_VERSION < 3:
raise unittest.SkipTest('UDTs are not specified in change events for protocol v2')
# We may want to refresh types on keyspace change events in that case(?)
cluster = Cluster(protocol_version=PROTOCOL_VERSION)
session = cluster.connect()
keyspace_name = 'test1rf'
type_name = self._testMethodName
session.execute('CREATE TYPE IF NOT EXISTS %s.%s (one int, two text)' % (keyspace_name, type_name))
original_meta = cluster.metadata.keyspaces
original_test1rf_meta = original_meta[keyspace_name]
original_type_meta = original_test1rf_meta.user_types[type_name]
# only refresh one type
cluster.refresh_user_type_metadata('test1rf', type_name)
current_meta = cluster.metadata.keyspaces
current_test1rf_meta = current_meta[keyspace_name]
current_type_meta = current_test1rf_meta.user_types[type_name]
self.assertIs(original_meta, current_meta)
self.assertIs(original_test1rf_meta, current_test1rf_meta)
self.assertIsNot(original_type_meta, current_type_meta)
self.assertEqual(original_type_meta.as_cql_query(), current_type_meta.as_cql_query())
session.shutdown()
开发者ID:angelomendonca,项目名称:python-driver,代码行数:29,代码来源:test_cluster.py
示例3: setUp
def setUp(self):
self._cass_version, self._cql_version = get_server_versions()
self.cluster = Cluster(protocol_version=PROTOCOL_VERSION)
self.session = self.cluster.connect()
self.session.execute("CREATE KEYSPACE typetests WITH replication = { 'class' : 'SimpleStrategy', 'replication_factor': '1'}")
self.cluster.shutdown()
开发者ID:StuartAxelOwen,项目名称:python-driver,代码行数:7,代码来源:test_types.py
示例4: setup_class
def setup_class(cls):
cls._cass_version, cls._cql_version = get_server_versions()
cls._col_types = ['text',
'ascii',
'bigint',
'boolean',
'decimal',
'double',
'float',
'inet',
'int',
'list<text>',
'set<int>',
'map<text,int>',
'timestamp',
'uuid',
'timeuuid',
'varchar',
'varint']
if cls._cass_version >= (2, 1, 4):
cls._col_types.extend(('date', 'time'))
cls._session = Cluster(protocol_version=PROTOCOL_VERSION).connect()
cls._session.execute("CREATE KEYSPACE typetests WITH replication = { 'class' : 'SimpleStrategy', 'replication_factor': '1'}")
cls._session.set_keyspace("typetests")
开发者ID:anthony-cervantes,项目名称:python-driver,代码行数:27,代码来源:test_types.py
示例5: setup_class
def setup_class(cls):
cls.cluster = Cluster(protocol_version=PROTOCOL_VERSION)
cls.session = cls.cluster.connect()
cls._cass_version, cls._cql_version = get_server_versions()
ddl = '''
CREATE TABLE test1rf.table_num_col ( key blob PRIMARY KEY, "626972746864617465" blob )
WITH COMPACT STORAGE'''
cls.session.execute(ddl)
开发者ID:Adirio,项目名称:python-driver,代码行数:8,代码来源:test_row_factories.py
示例6: update_datatypes
def update_datatypes():
_cass_version, _cql_version = get_server_versions()
if _cass_version >= (2, 1, 0):
COLLECTION_TYPES.append('tuple')
if _cass_version >= (2, 1, 5):
PRIMITIVE_DATATYPES.append('date')
PRIMITIVE_DATATYPES.append('time')
开发者ID:animeshinvinci,项目名称:python-driver,代码行数:9,代码来源:datatype_utils.py
示例7: setUp
def setUp(self):
self._cass_version, self._cql_version = get_server_versions()
if self._cass_version < (2, 1, 0):
raise unittest.SkipTest("User Defined Types were introduced in Cassandra 2.1")
self.cluster = Cluster(protocol_version=PROTOCOL_VERSION)
self.session = self.cluster.connect()
self.session.execute("CREATE KEYSPACE udttests WITH replication = { 'class' : 'SimpleStrategy', 'replication_factor': '1'}")
self.cluster.shutdown()
开发者ID:StuartAxelOwen,项目名称:python-driver,代码行数:10,代码来源:test_udts.py
示例8: update_datatypes
def update_datatypes():
_cass_version, _cql_version = get_server_versions()
if _cass_version >= (2, 1, 0):
COLLECTION_TYPES.add('tuple')
if _cass_version >= (2, 2, 0):
PRIMITIVE_DATATYPES.update(['date', 'time', 'smallint', 'tinyint'])
global SAMPLE_DATA
SAMPLE_DATA = get_sample_data()
开发者ID:dkua,项目名称:python-driver,代码行数:11,代码来源:datatype_utils.py
示例9: test_collection_indexes
def test_collection_indexes(self):
self.session.execute("CREATE TABLE %s.%s (a int PRIMARY KEY, b map<text, text>)" % (self.ksname, self.cfname))
self.session.execute("CREATE INDEX index1 ON %s.%s (keys(b))" % (self.ksname, self.cfname))
tablemeta = self.get_table_metadata()
self.assertIn("(keys(b))", tablemeta.export_as_string())
self.session.execute("DROP INDEX %s.index1" % (self.ksname,))
self.session.execute("CREATE INDEX index2 ON %s.%s (b)" % (self.ksname, self.cfname))
tablemeta = self.get_table_metadata()
self.assertIn(" (b)", tablemeta.export_as_string())
# test full indexes on frozen collections, if available
if get_server_versions()[0] >= (2, 1, 3):
self.session.execute("DROP TABLE %s.%s" % (self.ksname, self.cfname))
self.session.execute(
"CREATE TABLE %s.%s (a int PRIMARY KEY, b frozen<map<text, text>>)" % (self.ksname, self.cfname)
)
self.session.execute("CREATE INDEX index3 ON %s.%s (full(b))" % (self.ksname, self.cfname))
tablemeta = self.get_table_metadata()
self.assertIn("(full(b))", tablemeta.export_as_string())
开发者ID:sakura-sky,项目名称:python-driver,代码行数:23,代码来源:test_metadata.py
示例10: setUpClass
def setUpClass(cls):
cls.cass_version = get_server_versions()
开发者ID:stonefly,项目名称:python-driver,代码行数:2,代码来源:test_prepared_statements.py
示例11: setup_module
def setup_module():
use_singledc()
global CASS_SERVER_VERSION
CASS_SERVER_VERSION = get_server_versions()[0]
开发者ID:Adirio,项目名称:python-driver,代码行数:4,代码来源:test_query.py
示例12: test_export_keyspace_schema_udts
def test_export_keyspace_schema_udts(self):
"""
Test udt exports
"""
if get_server_versions()[0] < (2, 1, 0):
raise unittest.SkipTest('UDTs were introduced in Cassandra 2.1')
cluster = Cluster(protocol_version=PROTOCOL_VERSION)
session = cluster.connect()
session.execute("""
CREATE KEYSPACE export_udts
WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'}
AND durable_writes = true;
""")
session.execute("""
CREATE TYPE export_udts.street (
street_number int,
street_name text)
""")
session.execute("""
CREATE TYPE export_udts.zip (
zipcode int,
zip_plus_4 int)
""")
session.execute("""
CREATE TYPE export_udts.address (
street_address street,
zip_code zip)
""")
session.execute("""
CREATE TABLE export_udts.users (
user text PRIMARY KEY,
addresses map<text, address>)
""")
expected_string = """CREATE KEYSPACE export_udts WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'} AND durable_writes = true;
CREATE TYPE export_udts.street (
street_number int,
street_name text
);
CREATE TYPE export_udts.zip (
zipcode int,
zip_plus_4 int
);
CREATE TYPE export_udts.address (
street_address street,
zip_code zip
);
CREATE TABLE export_udts.users (
user text PRIMARY KEY,
addresses map<text, address>
) WITH bloom_filter_fp_chance = 0.01
AND caching = '{"keys":"ALL", "rows_per_partition":"NONE"}'
AND comment = ''
AND compaction = {'min_threshold': '4', 'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy', 'max_threshold': '32'}
AND compression = {'sstable_compression': 'org.apache.cassandra.io.compress.LZ4Compressor'}
AND dclocal_read_repair_chance = 0.1
AND default_time_to_live = 0
AND gc_grace_seconds = 864000
AND max_index_interval = 2048
AND memtable_flush_period_in_ms = 0
AND min_index_interval = 128
AND read_repair_chance = 0.0
AND speculative_retry = '99.0PERCENTILE';"""
self.assertEqual(cluster.metadata.keyspaces['export_udts'].export_as_string(), expected_string)
table_meta = cluster.metadata.keyspaces['export_udts'].tables['users']
expected_string = """CREATE TABLE export_udts.users (
user text PRIMARY KEY,
addresses map<text, address>
) WITH bloom_filter_fp_chance = 0.01
AND caching = '{"keys":"ALL", "rows_per_partition":"NONE"}'
AND comment = ''
AND compaction = {'min_threshold': '4', 'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy', 'max_threshold': '32'}
AND compression = {'sstable_compression': 'org.apache.cassandra.io.compress.LZ4Compressor'}
AND dclocal_read_repair_chance = 0.1
AND default_time_to_live = 0
AND gc_grace_seconds = 864000
AND max_index_interval = 2048
AND memtable_flush_period_in_ms = 0
AND min_index_interval = 128
AND read_repair_chance = 0.0
AND speculative_retry = '99.0PERCENTILE';"""
self.assertEqual(table_meta.export_as_string(), expected_string)
开发者ID:EnigmaCurry,项目名称:python-driver,代码行数:93,代码来源:test_metadata.py
示例13: setUp
def setUp(self):
self._cass_version, self._cql_version = get_server_versions()
开发者ID:EnigmaCurry,项目名称:python-driver,代码行数:2,代码来源:test_types.py
示例14: setUpClass
def setUpClass(cls):
cls._cass_version, cls._cql_version = get_server_versions()
cls.cluster = Cluster(protocol_version=PROTOCOL_VERSION)
cls.session = cls.cluster.connect()
cls.session.execute("CREATE KEYSPACE typetests WITH replication = { 'class' : 'SimpleStrategy', 'replication_factor': '1'}")
cls.session.set_keyspace("typetests")
开发者ID:alfasin,项目名称:python-driver,代码行数:6,代码来源:test_types.py
示例15: setUp
def setUp(self):
self.default_keyspace = models.DEFAULT_KEYSPACE
cass_version = get_server_versions()[0]
if cass_version < (3, 0):
raise unittest.SkipTest("Materialized views require Cassandra 3.0+")
super(TestNamedWithMV, self).setUp()
开发者ID:jfelectron,项目名称:python-driver,代码行数:6,代码来源:test_named.py
示例16: test_legacy_tables
def test_legacy_tables(self):
if get_server_versions()[0] < (2, 1, 0):
raise unittest.SkipTest('Test schema output assumes 2.1.0+ options')
if sys.version_info[0:2] != (2, 7):
raise unittest.SkipTest('This test compares static strings generated from dict items, which may change orders. Test with 2.7.')
cli_script = """CREATE KEYSPACE legacy
WITH placement_strategy = 'SimpleStrategy'
AND strategy_options = {replication_factor:1};
USE legacy;
CREATE COLUMN FAMILY simple_no_col
WITH comparator = UTF8Type
AND key_validation_class = UUIDType
AND default_validation_class = UTF8Type;
CREATE COLUMN FAMILY simple_with_col
WITH comparator = UTF8Type
and key_validation_class = UUIDType
and default_validation_class = UTF8Type
AND column_metadata = [
{column_name: col_with_meta, validation_class: UTF8Type}
];
CREATE COLUMN FAMILY composite_partition_no_col
WITH comparator = UTF8Type
AND key_validation_class = 'CompositeType(UUIDType,UTF8Type)'
AND default_validation_class = UTF8Type;
CREATE COLUMN FAMILY composite_partition_with_col
WITH comparator = UTF8Type
AND key_validation_class = 'CompositeType(UUIDType,UTF8Type)'
AND default_validation_class = UTF8Type
AND column_metadata = [
{column_name: col_with_meta, validation_class: UTF8Type}
];
CREATE COLUMN FAMILY nested_composite_key
WITH comparator = UTF8Type
and key_validation_class = 'CompositeType(CompositeType(UUIDType,UTF8Type), LongType)'
and default_validation_class = UTF8Type
AND column_metadata = [
{column_name: full_name, validation_class: UTF8Type}
];
create column family composite_comp_no_col
with column_type = 'Standard'
and comparator = 'DynamicCompositeType(t=>org.apache.cassandra.db.marshal.TimeUUIDType,s=>org.apache.cassandra.db.marshal.UTF8Type,b=>org.apache.cassandra.db.marshal.BytesType)'
and default_validation_class = 'BytesType'
and key_validation_class = 'BytesType'
and read_repair_chance = 0.0
and dclocal_read_repair_chance = 0.1
and gc_grace = 864000
and min_compaction_threshold = 4
and max_compaction_threshold = 32
and compaction_strategy = 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy'
and caching = 'KEYS_ONLY'
and cells_per_row_to_cache = '0'
and default_time_to_live = 0
and speculative_retry = 'NONE'
and comment = 'Stores file meta data';
create column family composite_comp_with_col
with column_type = 'Standard'
and comparator = 'DynamicCompositeType(t=>org.apache.cassandra.db.marshal.TimeUUIDType,s=>org.apache.cassandra.db.marshal.UTF8Type,b=>org.apache.cassandra.db.marshal.BytesType)'
and default_validation_class = 'BytesType'
and key_validation_class = 'BytesType'
and read_repair_chance = 0.0
and dclocal_read_repair_chance = 0.1
and gc_grace = 864000
and min_compaction_threshold = 4
and max_compaction_threshold = 32
and compaction_strategy = 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy'
and caching = 'KEYS_ONLY'
and cells_per_row_to_cache = '0'
and default_time_to_live = 0
and speculative_retry = 'NONE'
and comment = 'Stores file meta data'
and column_metadata = [
{column_name : '[email protected]',
validation_class : BytesType,
index_name : 'idx_one',
index_type : 0},
{column_name : '[email protected]',
validation_class : BytesType,
index_name : 'idx_two',
index_type : 0}]
and compression_options = {'sstable_compression' : 'org.apache.cassandra.io.compress.LZ4Compressor'};"""
# note: the inner key type for legacy.nested_composite_key
# (org.apache.cassandra.db.marshal.CompositeType(org.apache.cassandra.db.marshal.UUIDType, org.apache.cassandra.db.marshal.UTF8Type))
# is a bit strange, but it replays in CQL with desired results
expected_string = """CREATE KEYSPACE legacy WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'} AND durable_writes = true;
/*
Warning: Table legacy.composite_comp_with_col omitted because it has constructs not compatible with CQL (was created via legacy API).
#.........这里部分代码省略.........
开发者ID:animeshinvinci,项目名称:python-driver,代码行数:101,代码来源:test_metadata.py
示例17: test_export_keyspace_schema_udts
def test_export_keyspace_schema_udts(self):
"""
Test udt exports
"""
if get_server_versions()[0] < (2, 1, 0):
raise unittest.SkipTest('UDTs were introduced in Cassandra 2.1')
if PROTOCOL_VERSION < 3:
raise unittest.SkipTest(
"Protocol 3.0+ is required for UDT change events, currently testing against %r"
% (PROTOCOL_VERSION,))
if sys.version_info[0:2] != (2, 7):
raise unittest.SkipTest('This test compares static strings generated from dict items, which may change orders. Test with 2.7.')
cluster = Cluster(protocol_version=PROTOCOL_VERSION)
session = cluster.connect()
session.execute("""
CREATE KEYSPACE export_udts
WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'}
AND durable_writes = true;
""")
session.execute("""
CREATE TYPE export_udts.street (
street_number int,
street_name text)
""")
session.execute("""
CREATE TYPE export_udts.zip (
zipcode int,
zip_plus_4 int)
""")
session.execute("""
CREATE TYPE export_udts.address (
street_address frozen<street>,
zip_code frozen<zip>)
""")
session.execute("""
CREATE TABLE export_udts.users (
user text PRIMARY KEY,
addresses map<text, frozen<address>>)
""")
expected_string = """CREATE KEYSPACE export_udts WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'} AND durable_writes = true;
CREATE TYPE export_udts.street (
street_number int,
street_name text
);
CREATE TYPE export_udts.zip (
zipcode int,
zip_plus_4 int
);
CREATE TYPE export_udts.address (
street_address frozen<street>,
zip_code frozen<zip>
);
CREATE TABLE export_udts.users (
user text PRIMARY KEY,
addresses map<text, frozen<address>>
) WITH bloom_filter_fp_chance = 0.01
AND caching = '{"keys":"ALL", "rows_per_partition":"NONE"}'
AND comment = ''
AND compaction = {'min_threshold': '4', 'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy', 'max_threshold': '32'}
AND compression = {'sstable_compression': 'org.apache.cassandra.io.compress.LZ4Compressor'}
AND dclocal_read_repair_chance = 0.1
AND default_time_to_live = 0
AND gc_grace_seconds = 864000
AND max_index_interval = 2048
AND memtable_flush_period_in_ms = 0
AND min_index_interval = 128
AND read_repair_chance = 0.0
AND speculative_retry = '99.0PERCENTILE';"""
self.assert_equal_diff(cluster.metadata.keyspaces['export_udts'].export_as_string(), expected_string)
table_meta = cluster.metadata.keyspaces['export_udts'].tables['users']
expected_string = """CREATE TABLE export_udts.users (
user text PRIMARY KEY,
addresses map<text, frozen<address>>
) WITH bloom_filter_fp_chance = 0.01
AND caching = '{"keys":"ALL", "rows_per_partition":"NONE"}'
AND comment = ''
AND compaction = {'min_threshold': '4', 'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy', 'max_threshold': '32'}
AND compression = {'sstable_compression': 'org.apache.cassandra.io.compress.LZ4Compressor'}
AND dclocal_read_repair_chance = 0.1
AND default_time_to_live = 0
AND gc_grace_seconds = 864000
AND max_index_interval = 2048
AND memtable_flush_period_in_ms = 0
AND min_index_interval = 128
AND read_repair_chance = 0.0
AND speculative_retry = '99.0PERCENTILE';"""
#.........这里部分代码省略.........
开发者ID:animeshinvinci,项目名称:python-driver,代码行数:101,代码来源:test_metadata.py
示例18: setUp
def setUp(self):
cass_version = get_server_versions()[0]
if cass_version < (3, 0):
raise unittest.SkipTest("Materialized views require Cassandra 3.0+")
super(TestNamedWithMV, self).setUp()
开发者ID:janin,项目名称:python-driver,代码行数:5,代码来源:test_named.py
示例19: setUp
def setUp(self):
if PROTOCOL_VERSION < 3:
raise unittest.SkipTest("v3 protocol is required for UDT tests")
self._cass_version, self._cql_version = get_server_versions()
开发者ID:anthony-cervantes,项目名称:python-driver,代码行数:5,代码来源:test_udts.py
示例20: is_prepend_reversed
def is_prepend_reversed():
# do we have https://issues.apache.org/jira/browse/CASSANDRA-8733 ?
ver, _ = get_server_versions()
return not (ver >= (2, 0, 13) or ver >= (2, 1, 3))
开发者ID:heqing90,项目名称:python-driver,代码行数:4,代码来源:__init__.py
注:本文中的tests.integration.get_server_versions函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论