• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Python integration.get_server_versions函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中tests.integration.get_server_versions函数的典型用法代码示例。如果您正苦于以下问题:Python get_server_versions函数的具体用法?Python get_server_versions怎么用?Python get_server_versions使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了get_server_versions函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: test_collection_indexes

    def test_collection_indexes(self):
        if get_server_versions()[0] < (2, 1, 0):
            raise unittest.SkipTest("Secondary index on collections were introduced in Cassandra 2.1")

        self.session.execute("CREATE TABLE %s.%s (a int PRIMARY KEY, b map<text, text>)"
                             % (self.ksname, self.cfname))
        self.session.execute("CREATE INDEX index1 ON %s.%s (keys(b))"
                             % (self.ksname, self.cfname))

        tablemeta = self.get_table_metadata()
        self.assertIn('(keys(b))', tablemeta.export_as_string())

        self.session.execute("DROP INDEX %s.index1" % (self.ksname,))
        self.session.execute("CREATE INDEX index2 ON %s.%s (b)"
                             % (self.ksname, self.cfname))

        tablemeta = self.get_table_metadata()
        self.assertIn(' (b)', tablemeta.export_as_string())

        # test full indexes on frozen collections, if available
        if get_server_versions()[0] >= (2, 1, 3):
            self.session.execute("DROP TABLE %s.%s" % (self.ksname, self.cfname))
            self.session.execute("CREATE TABLE %s.%s (a int PRIMARY KEY, b frozen<map<text, text>>)"
                                 % (self.ksname, self.cfname))
            self.session.execute("CREATE INDEX index3 ON %s.%s (full(b))"
                                 % (self.ksname, self.cfname))

            tablemeta = self.get_table_metadata()
            self.assertIn('(full(b))', tablemeta.export_as_string())
开发者ID:animeshinvinci,项目名称:python-driver,代码行数:29,代码来源:test_metadata.py


示例2: test_refresh_schema_type

    def test_refresh_schema_type(self):
        if get_server_versions()[0] < (2, 1, 0):
            raise unittest.SkipTest('UDTs were introduced in Cassandra 2.1')

        if PROTOCOL_VERSION < 3:
            raise unittest.SkipTest('UDTs are not specified in change events for protocol v2')
            # We may want to refresh types on keyspace change events in that case(?)

        cluster = Cluster(protocol_version=PROTOCOL_VERSION)
        session = cluster.connect()

        keyspace_name = 'test1rf'
        type_name = self._testMethodName

        session.execute('CREATE TYPE IF NOT EXISTS %s.%s (one int, two text)' % (keyspace_name, type_name))
        original_meta = cluster.metadata.keyspaces
        original_test1rf_meta = original_meta[keyspace_name]
        original_type_meta = original_test1rf_meta.user_types[type_name]

        # only refresh one type
        cluster.refresh_user_type_metadata('test1rf', type_name)
        current_meta = cluster.metadata.keyspaces
        current_test1rf_meta = current_meta[keyspace_name]
        current_type_meta = current_test1rf_meta.user_types[type_name]
        self.assertIs(original_meta, current_meta)
        self.assertIs(original_test1rf_meta, current_test1rf_meta)
        self.assertIsNot(original_type_meta, current_type_meta)
        self.assertEqual(original_type_meta.as_cql_query(), current_type_meta.as_cql_query())
        session.shutdown()
开发者ID:angelomendonca,项目名称:python-driver,代码行数:29,代码来源:test_cluster.py


示例3: setUp

    def setUp(self):
        self._cass_version, self._cql_version = get_server_versions()

        self.cluster = Cluster(protocol_version=PROTOCOL_VERSION)
        self.session = self.cluster.connect()
        self.session.execute("CREATE KEYSPACE typetests WITH replication = { 'class' : 'SimpleStrategy', 'replication_factor': '1'}")
        self.cluster.shutdown()
开发者ID:StuartAxelOwen,项目名称:python-driver,代码行数:7,代码来源:test_types.py


示例4: setup_class

    def setup_class(cls):
        cls._cass_version, cls._cql_version = get_server_versions()

        cls._col_types = ['text',
                          'ascii',
                          'bigint',
                          'boolean',
                          'decimal',
                          'double',
                          'float',
                          'inet',
                          'int',
                          'list<text>',
                          'set<int>',
                          'map<text,int>',
                          'timestamp',
                          'uuid',
                          'timeuuid',
                          'varchar',
                          'varint']

        if cls._cass_version >= (2, 1, 4):
            cls._col_types.extend(('date', 'time'))

        cls._session = Cluster(protocol_version=PROTOCOL_VERSION).connect()
        cls._session.execute("CREATE KEYSPACE typetests WITH replication = { 'class' : 'SimpleStrategy', 'replication_factor': '1'}")
        cls._session.set_keyspace("typetests")
开发者ID:anthony-cervantes,项目名称:python-driver,代码行数:27,代码来源:test_types.py


示例5: setup_class

 def setup_class(cls):
     cls.cluster = Cluster(protocol_version=PROTOCOL_VERSION)
     cls.session = cls.cluster.connect()
     cls._cass_version, cls._cql_version = get_server_versions()
     ddl = '''
         CREATE TABLE test1rf.table_num_col ( key blob PRIMARY KEY, "626972746864617465" blob )
         WITH COMPACT STORAGE'''
     cls.session.execute(ddl)
开发者ID:Adirio,项目名称:python-driver,代码行数:8,代码来源:test_row_factories.py


示例6: update_datatypes

def update_datatypes():
    _cass_version, _cql_version = get_server_versions()

    if _cass_version >= (2, 1, 0):
        COLLECTION_TYPES.append('tuple')

    if _cass_version >= (2, 1, 5):
        PRIMITIVE_DATATYPES.append('date')
        PRIMITIVE_DATATYPES.append('time')
开发者ID:animeshinvinci,项目名称:python-driver,代码行数:9,代码来源:datatype_utils.py


示例7: setUp

    def setUp(self):
        self._cass_version, self._cql_version = get_server_versions()

        if self._cass_version < (2, 1, 0):
            raise unittest.SkipTest("User Defined Types were introduced in Cassandra 2.1")

        self.cluster = Cluster(protocol_version=PROTOCOL_VERSION)
        self.session = self.cluster.connect()
        self.session.execute("CREATE KEYSPACE udttests WITH replication = { 'class' : 'SimpleStrategy', 'replication_factor': '1'}")
        self.cluster.shutdown()
开发者ID:StuartAxelOwen,项目名称:python-driver,代码行数:10,代码来源:test_udts.py


示例8: update_datatypes

def update_datatypes():
    _cass_version, _cql_version = get_server_versions()

    if _cass_version >= (2, 1, 0):
        COLLECTION_TYPES.add('tuple')

    if _cass_version >= (2, 2, 0):
        PRIMITIVE_DATATYPES.update(['date', 'time', 'smallint', 'tinyint'])

    global SAMPLE_DATA
    SAMPLE_DATA = get_sample_data()
开发者ID:dkua,项目名称:python-driver,代码行数:11,代码来源:datatype_utils.py


示例9: test_collection_indexes

    def test_collection_indexes(self):
        self.session.execute("CREATE TABLE %s.%s (a int PRIMARY KEY, b map<text, text>)" % (self.ksname, self.cfname))
        self.session.execute("CREATE INDEX index1 ON %s.%s (keys(b))" % (self.ksname, self.cfname))

        tablemeta = self.get_table_metadata()
        self.assertIn("(keys(b))", tablemeta.export_as_string())

        self.session.execute("DROP INDEX %s.index1" % (self.ksname,))
        self.session.execute("CREATE INDEX index2 ON %s.%s (b)" % (self.ksname, self.cfname))

        tablemeta = self.get_table_metadata()
        self.assertIn(" (b)", tablemeta.export_as_string())

        # test full indexes on frozen collections, if available
        if get_server_versions()[0] >= (2, 1, 3):
            self.session.execute("DROP TABLE %s.%s" % (self.ksname, self.cfname))
            self.session.execute(
                "CREATE TABLE %s.%s (a int PRIMARY KEY, b frozen<map<text, text>>)" % (self.ksname, self.cfname)
            )
            self.session.execute("CREATE INDEX index3 ON %s.%s (full(b))" % (self.ksname, self.cfname))

            tablemeta = self.get_table_metadata()
            self.assertIn("(full(b))", tablemeta.export_as_string())
开发者ID:sakura-sky,项目名称:python-driver,代码行数:23,代码来源:test_metadata.py


示例10: setUpClass

 def setUpClass(cls):
     cls.cass_version = get_server_versions()
开发者ID:stonefly,项目名称:python-driver,代码行数:2,代码来源:test_prepared_statements.py


示例11: setup_module

def setup_module():
    use_singledc()
    global CASS_SERVER_VERSION
    CASS_SERVER_VERSION = get_server_versions()[0]
开发者ID:Adirio,项目名称:python-driver,代码行数:4,代码来源:test_query.py


示例12: test_export_keyspace_schema_udts

    def test_export_keyspace_schema_udts(self):
        """
        Test udt exports
        """

        if get_server_versions()[0] < (2, 1, 0):
            raise unittest.SkipTest('UDTs were introduced in Cassandra 2.1')

        cluster = Cluster(protocol_version=PROTOCOL_VERSION)
        session = cluster.connect()

        session.execute("""
            CREATE KEYSPACE export_udts
            WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'}
            AND durable_writes = true;
        """)
        session.execute("""
            CREATE TYPE export_udts.street (
                street_number int,
                street_name text)
        """)
        session.execute("""
            CREATE TYPE export_udts.zip (
                zipcode int,
                zip_plus_4 int)
        """)
        session.execute("""
            CREATE TYPE export_udts.address (
                street_address street,
                zip_code zip)
        """)
        session.execute("""
            CREATE TABLE export_udts.users (
            user text PRIMARY KEY,
            addresses map<text, address>)
        """)

        expected_string = """CREATE KEYSPACE export_udts WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'}  AND durable_writes = true;

CREATE TYPE export_udts.street (
    street_number int,
    street_name text
);

CREATE TYPE export_udts.zip (
    zipcode int,
    zip_plus_4 int
);

CREATE TYPE export_udts.address (
    street_address street,
    zip_code zip
);

CREATE TABLE export_udts.users (
    user text PRIMARY KEY,
    addresses map<text, address>
) WITH bloom_filter_fp_chance = 0.01
    AND caching = '{"keys":"ALL", "rows_per_partition":"NONE"}'
    AND comment = ''
    AND compaction = {'min_threshold': '4', 'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy', 'max_threshold': '32'}
    AND compression = {'sstable_compression': 'org.apache.cassandra.io.compress.LZ4Compressor'}
    AND dclocal_read_repair_chance = 0.1
    AND default_time_to_live = 0
    AND gc_grace_seconds = 864000
    AND max_index_interval = 2048
    AND memtable_flush_period_in_ms = 0
    AND min_index_interval = 128
    AND read_repair_chance = 0.0
    AND speculative_retry = '99.0PERCENTILE';"""

        self.assertEqual(cluster.metadata.keyspaces['export_udts'].export_as_string(), expected_string)

        table_meta = cluster.metadata.keyspaces['export_udts'].tables['users']

        expected_string = """CREATE TABLE export_udts.users (
    user text PRIMARY KEY,
    addresses map<text, address>
) WITH bloom_filter_fp_chance = 0.01
    AND caching = '{"keys":"ALL", "rows_per_partition":"NONE"}'
    AND comment = ''
    AND compaction = {'min_threshold': '4', 'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy', 'max_threshold': '32'}
    AND compression = {'sstable_compression': 'org.apache.cassandra.io.compress.LZ4Compressor'}
    AND dclocal_read_repair_chance = 0.1
    AND default_time_to_live = 0
    AND gc_grace_seconds = 864000
    AND max_index_interval = 2048
    AND memtable_flush_period_in_ms = 0
    AND min_index_interval = 128
    AND read_repair_chance = 0.0
    AND speculative_retry = '99.0PERCENTILE';"""

        self.assertEqual(table_meta.export_as_string(), expected_string)
开发者ID:EnigmaCurry,项目名称:python-driver,代码行数:93,代码来源:test_metadata.py


示例13: setUp

 def setUp(self):
     self._cass_version, self._cql_version = get_server_versions()
开发者ID:EnigmaCurry,项目名称:python-driver,代码行数:2,代码来源:test_types.py


示例14: setUpClass

 def setUpClass(cls):
     cls._cass_version, cls._cql_version = get_server_versions()
     cls.cluster = Cluster(protocol_version=PROTOCOL_VERSION)
     cls.session = cls.cluster.connect()
     cls.session.execute("CREATE KEYSPACE typetests WITH replication = { 'class' : 'SimpleStrategy', 'replication_factor': '1'}")
     cls.session.set_keyspace("typetests")
开发者ID:alfasin,项目名称:python-driver,代码行数:6,代码来源:test_types.py


示例15: setUp

 def setUp(self):
     self.default_keyspace = models.DEFAULT_KEYSPACE
     cass_version = get_server_versions()[0]
     if cass_version < (3, 0):
         raise unittest.SkipTest("Materialized views require Cassandra 3.0+")
     super(TestNamedWithMV, self).setUp()
开发者ID:jfelectron,项目名称:python-driver,代码行数:6,代码来源:test_named.py


示例16: test_legacy_tables

    def test_legacy_tables(self):

        if get_server_versions()[0] < (2, 1, 0):
            raise unittest.SkipTest('Test schema output assumes 2.1.0+ options')

        if sys.version_info[0:2] != (2, 7):
            raise unittest.SkipTest('This test compares static strings generated from dict items, which may change orders. Test with 2.7.')

        cli_script = """CREATE KEYSPACE legacy
WITH placement_strategy = 'SimpleStrategy'
AND strategy_options = {replication_factor:1};

USE legacy;

CREATE COLUMN FAMILY simple_no_col
 WITH comparator = UTF8Type
 AND key_validation_class = UUIDType
 AND default_validation_class = UTF8Type;

CREATE COLUMN FAMILY simple_with_col
 WITH comparator = UTF8Type
 and key_validation_class = UUIDType
 and default_validation_class = UTF8Type
 AND column_metadata = [
 {column_name: col_with_meta, validation_class: UTF8Type}
 ];

CREATE COLUMN FAMILY composite_partition_no_col
 WITH comparator = UTF8Type
 AND key_validation_class = 'CompositeType(UUIDType,UTF8Type)'
 AND default_validation_class = UTF8Type;

CREATE COLUMN FAMILY composite_partition_with_col
 WITH comparator = UTF8Type
 AND key_validation_class = 'CompositeType(UUIDType,UTF8Type)'
 AND default_validation_class = UTF8Type
 AND column_metadata = [
 {column_name: col_with_meta, validation_class: UTF8Type}
 ];

CREATE COLUMN FAMILY nested_composite_key
 WITH comparator = UTF8Type
 and key_validation_class = 'CompositeType(CompositeType(UUIDType,UTF8Type), LongType)'
 and default_validation_class = UTF8Type
 AND column_metadata = [
 {column_name: full_name, validation_class: UTF8Type}
 ];

create column family composite_comp_no_col
  with column_type = 'Standard'
  and comparator = 'DynamicCompositeType(t=>org.apache.cassandra.db.marshal.TimeUUIDType,s=>org.apache.cassandra.db.marshal.UTF8Type,b=>org.apache.cassandra.db.marshal.BytesType)'
  and default_validation_class = 'BytesType'
  and key_validation_class = 'BytesType'
  and read_repair_chance = 0.0
  and dclocal_read_repair_chance = 0.1
  and gc_grace = 864000
  and min_compaction_threshold = 4
  and max_compaction_threshold = 32
  and compaction_strategy = 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy'
  and caching = 'KEYS_ONLY'
  and cells_per_row_to_cache = '0'
  and default_time_to_live = 0
  and speculative_retry = 'NONE'
  and comment = 'Stores file meta data';

create column family composite_comp_with_col
  with column_type = 'Standard'
  and comparator = 'DynamicCompositeType(t=>org.apache.cassandra.db.marshal.TimeUUIDType,s=>org.apache.cassandra.db.marshal.UTF8Type,b=>org.apache.cassandra.db.marshal.BytesType)'
  and default_validation_class = 'BytesType'
  and key_validation_class = 'BytesType'
  and read_repair_chance = 0.0
  and dclocal_read_repair_chance = 0.1
  and gc_grace = 864000
  and min_compaction_threshold = 4
  and max_compaction_threshold = 32
  and compaction_strategy = 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy'
  and caching = 'KEYS_ONLY'
  and cells_per_row_to_cache = '0'
  and default_time_to_live = 0
  and speculative_retry = 'NONE'
  and comment = 'Stores file meta data'
  and column_metadata = [
    {column_name : '[email protected]',
    validation_class : BytesType,
    index_name : 'idx_one',
    index_type : 0},
    {column_name : '[email protected]',
    validation_class : BytesType,
    index_name : 'idx_two',
    index_type : 0}]
  and compression_options = {'sstable_compression' : 'org.apache.cassandra.io.compress.LZ4Compressor'};"""

        # note: the inner key type for legacy.nested_composite_key
        # (org.apache.cassandra.db.marshal.CompositeType(org.apache.cassandra.db.marshal.UUIDType, org.apache.cassandra.db.marshal.UTF8Type))
        # is a bit strange, but it replays in CQL with desired results
        expected_string = """CREATE KEYSPACE legacy WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'}  AND durable_writes = true;

/*
Warning: Table legacy.composite_comp_with_col omitted because it has constructs not compatible with CQL (was created via legacy API).

#.........这里部分代码省略.........
开发者ID:animeshinvinci,项目名称:python-driver,代码行数:101,代码来源:test_metadata.py


示例17: test_export_keyspace_schema_udts

    def test_export_keyspace_schema_udts(self):
        """
        Test udt exports
        """

        if get_server_versions()[0] < (2, 1, 0):
            raise unittest.SkipTest('UDTs were introduced in Cassandra 2.1')

        if PROTOCOL_VERSION < 3:
            raise unittest.SkipTest(
                "Protocol 3.0+ is required for UDT change events, currently testing against %r"
                % (PROTOCOL_VERSION,))

        if sys.version_info[0:2] != (2, 7):
            raise unittest.SkipTest('This test compares static strings generated from dict items, which may change orders. Test with 2.7.')

        cluster = Cluster(protocol_version=PROTOCOL_VERSION)
        session = cluster.connect()

        session.execute("""
            CREATE KEYSPACE export_udts
            WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'}
            AND durable_writes = true;
        """)
        session.execute("""
            CREATE TYPE export_udts.street (
                street_number int,
                street_name text)
        """)
        session.execute("""
            CREATE TYPE export_udts.zip (
                zipcode int,
                zip_plus_4 int)
        """)
        session.execute("""
            CREATE TYPE export_udts.address (
                street_address frozen<street>,
                zip_code frozen<zip>)
        """)
        session.execute("""
            CREATE TABLE export_udts.users (
            user text PRIMARY KEY,
            addresses map<text, frozen<address>>)
        """)

        expected_string = """CREATE KEYSPACE export_udts WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'}  AND durable_writes = true;

CREATE TYPE export_udts.street (
    street_number int,
    street_name text
);

CREATE TYPE export_udts.zip (
    zipcode int,
    zip_plus_4 int
);

CREATE TYPE export_udts.address (
    street_address frozen<street>,
    zip_code frozen<zip>
);

CREATE TABLE export_udts.users (
    user text PRIMARY KEY,
    addresses map<text, frozen<address>>
) WITH bloom_filter_fp_chance = 0.01
    AND caching = '{"keys":"ALL", "rows_per_partition":"NONE"}'
    AND comment = ''
    AND compaction = {'min_threshold': '4', 'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy', 'max_threshold': '32'}
    AND compression = {'sstable_compression': 'org.apache.cassandra.io.compress.LZ4Compressor'}
    AND dclocal_read_repair_chance = 0.1
    AND default_time_to_live = 0
    AND gc_grace_seconds = 864000
    AND max_index_interval = 2048
    AND memtable_flush_period_in_ms = 0
    AND min_index_interval = 128
    AND read_repair_chance = 0.0
    AND speculative_retry = '99.0PERCENTILE';"""

        self.assert_equal_diff(cluster.metadata.keyspaces['export_udts'].export_as_string(), expected_string)

        table_meta = cluster.metadata.keyspaces['export_udts'].tables['users']

        expected_string = """CREATE TABLE export_udts.users (
    user text PRIMARY KEY,
    addresses map<text, frozen<address>>
) WITH bloom_filter_fp_chance = 0.01
    AND caching = '{"keys":"ALL", "rows_per_partition":"NONE"}'
    AND comment = ''
    AND compaction = {'min_threshold': '4', 'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy', 'max_threshold': '32'}
    AND compression = {'sstable_compression': 'org.apache.cassandra.io.compress.LZ4Compressor'}
    AND dclocal_read_repair_chance = 0.1
    AND default_time_to_live = 0
    AND gc_grace_seconds = 864000
    AND max_index_interval = 2048
    AND memtable_flush_period_in_ms = 0
    AND min_index_interval = 128
    AND read_repair_chance = 0.0
    AND speculative_retry = '99.0PERCENTILE';"""

#.........这里部分代码省略.........
开发者ID:animeshinvinci,项目名称:python-driver,代码行数:101,代码来源:test_metadata.py


示例18: setUp

 def setUp(self):
     cass_version = get_server_versions()[0]
     if cass_version < (3, 0):
         raise unittest.SkipTest("Materialized views require Cassandra 3.0+")
     super(TestNamedWithMV, self).setUp()
开发者ID:janin,项目名称:python-driver,代码行数:5,代码来源:test_named.py


示例19: setUp

    def setUp(self):
        if PROTOCOL_VERSION < 3:
            raise unittest.SkipTest("v3 protocol is required for UDT tests")

        self._cass_version, self._cql_version = get_server_versions()
开发者ID:anthony-cervantes,项目名称:python-driver,代码行数:5,代码来源:test_udts.py


示例20: is_prepend_reversed

def is_prepend_reversed():
    # do we have https://issues.apache.org/jira/browse/CASSANDRA-8733 ?
    ver, _ = get_server_versions()
    return not (ver >= (2, 0, 13) or ver >= (2, 1, 3))
开发者ID:heqing90,项目名称:python-driver,代码行数:4,代码来源:__init__.py



注:本文中的tests.integration.get_server_versions函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python integration.use_multidc函数代码示例发布时间:2022-05-27
下一篇:
Python integration.get_node函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap