• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Python assertions.assert_invalid函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中tools.assertions.assert_invalid函数的典型用法代码示例。如果您正苦于以下问题:Python assert_invalid函数的具体用法?Python assert_invalid怎么用?Python assert_invalid使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了assert_invalid函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: check_permissions

    def check_permissions(self, node, upgraded):
        # use an exclusive connection to ensure we only talk to the specified node
        klaus = self.patient_exclusive_cql_connection(node, user='klaus', password='12345', timeout=20)
        # klaus is a superuser, so should be able to list all permissions
        # the output of LIST PERMISSIONS changes slightly with #7653 adding
        # a new role column to results, so we need to tailor our check
        # based on whether the node has been upgraded or not
        if not upgraded:
            assert_all(klaus,
                       'LIST ALL PERMISSIONS',
                       [['michael', '<table ks.cf1>', 'MODIFY'],
                        ['michael', '<table ks.cf2>', 'SELECT']],
                       timeout=60)
        else:
            assert_all(klaus,
                       'LIST ALL PERMISSIONS',
                       [['michael', 'michael', '<table ks.cf1>', 'MODIFY'],
                        ['michael', 'michael', '<table ks.cf2>', 'SELECT']],
                       timeout=60)

        klaus.cluster.shutdown()

        michael = self.patient_exclusive_cql_connection(node, user='michael', password='54321')
        michael.execute('INSERT INTO ks.cf1 (id, val) VALUES (0,0)')
        michael.execute('SELECT * FROM ks.cf2')
        assert_invalid(michael,
                       'SELECT * FROM ks.cf1',
                       'User michael has no SELECT permission on <table ks.cf1> or any of its parents',
                       Unauthorized)
        michael.cluster.shutdown()
开发者ID:beobal,项目名称:cassandra-dtest,代码行数:30,代码来源:upgrade_internal_auth_test.py


示例2: udf_with_udt_keyspace_isolation_test

    def udf_with_udt_keyspace_isolation_test(self):
        """
        Ensure functions dont allow a UDT from another keyspace
        @jira_ticket CASSANDRA-9409
        @since 2.2
        """
        session = self.prepare()

        session.execute("create type udt (a text, b int);")
        create_ks(session, 'user_ks', 1)

        # ensure we cannot use a udt from another keyspace as function argument
        assert_invalid(
            session,
            "CREATE FUNCTION overloaded(v ks.udt) called on null input RETURNS text LANGUAGE java AS 'return \"f1\";'",
            "Statement on keyspace user_ks cannot refer to a user type in keyspace ks"
        )

        # ensure we cannot use a udt from another keyspace as return value
        assert_invalid(
            session,
            ("CREATE FUNCTION test(v text) called on null input RETURNS ks.udt "
             "LANGUAGE java AS 'return null;';"),
            "Statement on keyspace user_ks cannot refer to a user type in keyspace ks"
        )
开发者ID:iamaleksey,项目名称:cassandra-dtest,代码行数:25,代码来源:user_functions_test.py


示例3: udf_with_udt_test

    def udf_with_udt_test(self):
        """
        Test UDFs that operate on non-frozen UDTs.
        @jira_ticket CASSANDRA-7423
        @since 3.6
        """
        session = self.prepare()
        session.execute("create type test (a text, b int);")
        session.execute("create function funk(udt test) called on null input returns int language java as 'return Integer.valueOf(udt.getInt(\"b\"));';")

        if self.cluster.version() >= LooseVersion('3.6'):
            frozen_vals = (False, True)
        else:
            frozen_vals = (True,)

        for frozen in frozen_vals:
            debug("Using {} UDTs".format("frozen" if frozen else "non-frozen"))

            table_name = "tab_frozen" if frozen else "tab"
            column_type = "frozen<test>" if frozen else "test"
            session.execute("create table {} (key int primary key, udt {});".format(table_name, column_type))

            session.execute("insert into %s (key, udt) values (1, {a: 'un', b:1});" % (table_name,))
            session.execute("insert into %s (key, udt) values (2, {a: 'deux', b:2});" % (table_name,))
            session.execute("insert into %s (key, udt) values (3, {a: 'trois', b:3});" % (table_name,))

            assert_one(session, "select sum(funk(udt)) from {}".format(table_name), [6])

            assert_invalid(session, "drop type test;")
开发者ID:iamaleksey,项目名称:cassandra-dtest,代码行数:29,代码来源:user_functions_test.py


示例4: drop_column_compact_test

    def drop_column_compact_test(self):
        session = self.prepare()

        session.execute("USE ks")
        session.execute("CREATE TABLE cf (key int PRIMARY KEY, c1 int, c2 int) WITH COMPACT STORAGE")

        assert_invalid(session, "ALTER TABLE cf DROP c1", "Cannot drop columns from a")
开发者ID:iamaleksey,项目名称:cassandra-dtest,代码行数:7,代码来源:schema_test.py


示例5: only_one_timestamp_is_valid_test

 def only_one_timestamp_is_valid_test(self):
     """ Test that TIMESTAMP must not be used in the statements within the batch. """
     session = self.prepare()
     assert_invalid(session, """
         BEGIN BATCH USING TIMESTAMP 1111111111111111
         INSERT INTO users (id, firstname, lastname) VALUES (0, 'Jack', 'Sparrow') USING TIMESTAMP 2
         INSERT INTO users (id, firstname, lastname) VALUES (1, 'Will', 'Turner')
         APPLY BATCH
     """, matching="Timestamp must be set either on BATCH or individual statements")
开发者ID:krummas,项目名称:cassandra-dtest,代码行数:9,代码来源:batch_test.py


示例6: test_reloadlocalschema

    def test_reloadlocalschema(self):
        """
        @jira_ticket CASSANDRA-13954

        Test that `nodetool reloadlocalschema` works as intended
        """
        cluster = self.cluster
        cluster.populate(1)
        node = cluster.nodelist()[0]
        remove_perf_disable_shared_mem(node)  # for jmx
        cluster.start()

        session = self.patient_cql_connection(node)

        query = "CREATE KEYSPACE IF NOT EXISTS test WITH replication = {'class': 'NetworkTopologyStrategy', 'datacenter1': 2};"
        session.execute(query)

        query = 'CREATE TABLE test.test (pk int, ck int, PRIMARY KEY (pk, ck));'
        session.execute(query)

        ss = make_mbean('db', type='StorageService')

        schema_version = ''

        # get initial schema version
        with JolokiaAgent(node) as jmx:
            schema_version = jmx.read_attribute(ss, 'SchemaVersion')

        # manually add a regular column 'val' to test.test
        query = """
            INSERT INTO system_schema.columns
                (keyspace_name, table_name, column_name, clustering_order,
                 column_name_bytes, kind, position, type)
            VALUES
                ('test', 'test', 'val', 'none',
                 0x76616c, 'regular', -1, 'int');"""
        session.execute(query)

        # validate that schema version wasn't automatically updated
        with JolokiaAgent(node) as jmx:
            self.assertEqual(schema_version, jmx.read_attribute(ss, 'SchemaVersion'))

        # make sure the new column wasn't automagically picked up
        assert_invalid(session, 'INSERT INTO test.test (pk, ck, val) VALUES (0, 1, 2);')

        # force the node to reload schema from disk
        node.nodetool('reloadlocalschema')

        # validate that schema version changed
        with JolokiaAgent(node) as jmx:
            self.assertNotEqual(schema_version, jmx.read_attribute(ss, 'SchemaVersion'))

        # try an insert with the new column again and validate it succeeds this time
        session.execute('INSERT INTO test.test (pk, ck, val) VALUES (0, 1, 2);')
        assert_all(session, 'SELECT pk, ck, val FROM test.test;', [[0, 1, 2]])
开发者ID:snazy,项目名称:cassandra-dtest,代码行数:55,代码来源:nodetool_test.py


示例7: logged_batch_rejects_counter_mutations_test

    def logged_batch_rejects_counter_mutations_test(self):
        """ Test that logged batch rejects counter mutations """
        session = self.prepare()
        err = "Cannot include a counter statement in a logged batch"

        assert_invalid(session, """
            BEGIN BATCH
            INSERT INTO users (id, firstname, lastname) VALUES (0, 'Jack', 'Sparrow')
            INSERT INTO users (id, firstname, lastname) VALUES (1, 'Will', 'Turner')
            UPDATE clicks SET total = total + 1 WHERE userid = 1 and url = 'http://foo.com'
            APPLY BATCH
            """, matching=err)
开发者ID:krummas,项目名称:cassandra-dtest,代码行数:12,代码来源:batch_test.py


示例8: test_nested_type_dropping

    def test_nested_type_dropping(self):
        """
        Confirm a user type can't be dropped when being used by another user type.
        """
        cluster = self.cluster
        cluster.populate(3).start()
        node1, node2, node3 = cluster.nodelist()
        session = self.patient_cql_connection(node1)
        create_ks(session, "nested_user_type_dropping", 2)
        session.default_consistency_level = ConsistencyLevel.LOCAL_QUORUM

        stmt = """
              USE nested_user_type_dropping
           """
        session.execute(stmt)

        stmt = """
              CREATE TYPE simple_type (
              user_number int,
              user_text text
              )
           """
        session.execute(stmt)

        stmt = """
              CREATE TYPE another_type (
              somefield frozen<simple_type>
              )
           """
        session.execute(stmt)

        stmt = """
              DROP TYPE simple_type;
           """
        assert_invalid(
            session,
            stmt,
            "Cannot drop user type nested_user_type_dropping.simple_type as it is still used by user type another_type",
        )

        # drop the type that's impeding the drop, and then try again
        stmt = """
              DROP TYPE another_type;
           """
        session.execute(stmt)

        stmt = """
              DROP TYPE simple_type;
           """
        session.execute(stmt)

        # now let's have a look at the system schema and make sure no user types are defined
        self.assertNoTypes(session)
开发者ID:jeffjirsa,项目名称:cassandra-dtest,代码行数:53,代码来源:user_types_test.py


示例9: unlogged_batch_rejects_counter_mutations_test

    def unlogged_batch_rejects_counter_mutations_test(self):
        """ Test that unlogged batch rejects counter mutations """
        session = self.prepare()
        err = "Counter and non-counter mutations cannot exist in the same batch"

        assert_invalid(session, """
            BEGIN UNLOGGED BATCH
            INSERT INTO users (id, firstname, lastname) VALUES (0, 'Jack', 'Sparrow')
            INSERT INTO users (id, firstname, lastname) VALUES (2, 'Elizabeth', 'Swann')
            UPDATE clicks SET total = total + 1 WHERE userid = 1 AND url = 'http://foo.com'
            APPLY BATCH
            """, matching=err)
开发者ID:krummas,项目名称:cassandra-dtest,代码行数:12,代码来源:batch_test.py


示例10: test_type_enforcement

    def test_type_enforcement(self):
        """
        Confirm error when incorrect data type used for user type
        """
        cluster = self.cluster
        cluster.populate(3).start()
        node1, node2, node3 = cluster.nodelist()
        session = self.cql_connection(node1)
        create_ks(session, "user_type_enforcement", 2)
        session.default_consistency_level = ConsistencyLevel.LOCAL_QUORUM

        stmt = """
              USE user_type_enforcement
           """
        session.execute(stmt)

        stmt = """
              CREATE TYPE simple_type (
              user_number int
              )
           """
        session.execute(stmt)

        stmt = """
              CREATE TABLE simple_table (
              id uuid PRIMARY KEY,
              number frozen<simple_type>
              )
           """
        session.execute(stmt)
        # Make sure the schema propagate
        time.sleep(2)

        # here we will attempt an insert statement which should fail
        # because the user type is an int, but the insert statement is
        # providing text
        _id = uuid.uuid4()
        stmt = """
              INSERT INTO simple_table (id, number)
              VALUES ({id}, {{user_number: 'uh oh....this is not a number'}});
           """.format(
            id=_id
        )
        assert_invalid(session, stmt, "field user_number is not of type int")

        # let's check the rowcount and make sure the data
        # didn't get inserted when the exception asserted above was thrown
        stmt = """
              SELECT * FROM simple_table;
           """
        rows = list(session.execute(stmt))
        self.assertEqual(0, len(rows))
开发者ID:jeffjirsa,项目名称:cassandra-dtest,代码行数:52,代码来源:user_types_test.py


示例11: counter_batch_rejects_regular_mutations_test

    def counter_batch_rejects_regular_mutations_test(self):
        """ Test that counter batch rejects non-counter mutations """
        session = self.prepare()
        err = "Cannot include non-counter statement in a counter batch"

        assert_invalid(session, """
            BEGIN COUNTER BATCH
            UPDATE clicks SET total = total + 1 WHERE userid = 1 and url = 'http://foo.com'
            UPDATE clicks SET total = total + 1 WHERE userid = 1 and url = 'http://bar.com'
            UPDATE clicks SET total = total + 1 WHERE userid = 2 and url = 'http://baz.com'
            INSERT INTO users (id, firstname, lastname) VALUES (0, 'Jack', 'Sparrow')
            APPLY BATCH
            """, matching=err)
开发者ID:krummas,项目名称:cassandra-dtest,代码行数:13,代码来源:batch_test.py


示例12: aggregate_with_udt_keyspace_isolation_test

    def aggregate_with_udt_keyspace_isolation_test(self):
        """
        Ensure aggregates dont allow a UDT from another keyspace
        @jira_ticket CASSANDRA-9409
        """
        session = self.prepare()

        session.execute("create type udt (a int);")
        create_ks(session, 'user_ks', 1)
        assert_invalid(
            session,
            "create aggregate suma (ks.udt) sfunc plus stype int finalfunc stri initcond 10",
            "Statement on keyspace user_ks cannot refer to a user type in keyspace ks"
        )
开发者ID:iamaleksey,项目名称:cassandra-dtest,代码行数:14,代码来源:user_functions_test.py


示例13: aggregate_udf_test

    def aggregate_udf_test(self):
        session = self.prepare()
        session.execute("create table nums (key int primary key, val int);")

        for x in range(1, 4):
            session.execute("INSERT INTO nums (key, val) VALUES (%d, %d)" % (x, x))
        session.execute("create function plus(key int, val int) called on null input returns int language java as 'return Integer.valueOf(key.intValue() + val.intValue());'")
        session.execute("create function stri(key int) called on null input returns text language java as 'return key.toString();'")
        session.execute("create aggregate suma (int) sfunc plus stype int finalfunc stri initcond 10")

        assert_one(session, "select suma(val) from nums", ["16"])

        session.execute("create function test(a int, b double) called on null input returns int language javascript as 'a + b;'")
        session.execute("create aggregate aggy(double) sfunc test stype int")

        assert_invalid(session, "create aggregate aggtwo(int) sfunc aggy stype int")

        assert_invalid(session, "create aggregate aggthree(int) sfunc test stype int finalfunc aggtwo")
开发者ID:iamaleksey,项目名称:cassandra-dtest,代码行数:18,代码来源:user_functions_test.py


示例14: assertions_test

    def assertions_test(self):
        # assert_exception_test
        mock_session = Mock(**{'execute.side_effect': AlreadyExists("Dummy exception message.")})
        assert_exception(mock_session, "DUMMY QUERY", expected=AlreadyExists)

        # assert_unavailable_test
        mock_session = Mock(**{'execute.side_effect': Unavailable("Dummy Unavailabile message.")})
        assert_unavailable(mock_session.execute)

        # assert_invalid_test
        mock_session = Mock(**{'execute.side_effect': InvalidRequest("Dummy InvalidRequest message.")})
        assert_invalid(mock_session, "DUMMY QUERY")

        # assert_unauthorized_test
        mock_session = Mock(**{'execute.side_effect': Unauthorized("Dummy Unauthorized message.")})
        assert_unauthorized(mock_session, "DUMMY QUERY", None)

        # assert_one_test
        mock_session = Mock()
        mock_session.execute = Mock(return_value=[[1, 1]])
        assert_one(mock_session, "SELECT * FROM test", [1, 1])

        # assert_none_test
        mock_session = Mock()
        mock_session.execute = Mock(return_value=[])
        assert_none(mock_session, "SELECT * FROM test")

        # assert_all_test
        mock_session = Mock()
        mock_session.execute = Mock(return_value=[[i, i] for i in range(0, 10)])
        assert_all(mock_session, "SELECT k, v FROM test", [[i, i] for i in range(0, 10)], ignore_order=True)

        # assert_almost_equal_test
        assert_almost_equal(1, 1.1, 1.2, 1.9, error=1.0)

        # assert_row_count_test
        mock_session = Mock()
        mock_session.execute = Mock(return_value=[[1]])
        assert_row_count(mock_session, 'test', 1)

        # assert_length_equal_test
        check = [1, 2, 3, 4]
        assert_length_equal(check, 4)
开发者ID:iamaleksey,项目名称:cassandra-dtest,代码行数:43,代码来源:assertion_test.py


示例15: drop_counter_column_test

    def drop_counter_column_test(self):
        """Test for CASSANDRA-7831"""
        cluster = self.cluster
        cluster.populate(1).start()
        node1, = cluster.nodelist()
        session = self.patient_cql_connection(node1)
        create_ks(session, 'counter_tests', 1)

        session.execute("CREATE TABLE counter_bug (t int, c counter, primary key(t))")

        session.execute("UPDATE counter_bug SET c = c + 1 where t = 1")
        row = list(session.execute("SELECT * from counter_bug"))

        self.assertEqual(rows_to_list(row)[0], [1, 1])
        self.assertEqual(len(row), 1)

        session.execute("ALTER TABLE counter_bug drop c")

        assert_invalid(session, "ALTER TABLE counter_bug add c counter", "Cannot re-add previously dropped counter column c")
开发者ID:jeffjirsa,项目名称:cassandra-dtest,代码行数:19,代码来源:counter_tests.py


示例16: test_no_counters_in_user_types

    def test_no_counters_in_user_types(self):
        # CASSANDRA-7672
        cluster = self.cluster
        cluster.populate(1).start()
        [node1] = cluster.nodelist()
        session = self.patient_cql_connection(node1)
        create_ks(session, 'user_types', 1)

        stmt = """
            USE user_types
         """
        session.execute(stmt)

        stmt = """
            CREATE TYPE t_item (
            sub_one COUNTER )
         """

        assert_invalid(session, stmt, 'A user type cannot contain counters')
开发者ID:beobal,项目名称:cassandra-dtest,代码行数:19,代码来源:user_types_test.py


示例17: udt_test

    def udt_test(self):
        """ Test (somewhat indirectly) that user queries involving UDT's are properly encoded (due to driver not recognizing UDT syntax) """
        cluster = self.cluster

        cluster.populate(3).start()
        node1, node2, node3 = cluster.nodelist()

        time.sleep(.5)
        session = self.patient_cql_connection(node1)
        create_ks(session, 'ks', 3)

        # create udt and insert correctly (should be successful)
        session.execute('CREATE TYPE address (city text,zip int);')
        session.execute('CREATE TABLE user_profiles (login text PRIMARY KEY, addresses map<text, frozen<address>>);')
        session.execute("INSERT INTO user_profiles(login, addresses) VALUES ('tsmith', { 'home': {city: 'San Fransisco',zip: 94110 }});")

        # note here address looks likes a map -> which is what the driver thinks it is. udt is encoded server side, we test that if addresses is changed slightly whether encoder recognizes the errors

        # try adding a field - see if will be encoded to a udt (should return error)
        assert_invalid(session,
                       "INSERT INTO user_profiles(login, addresses) VALUES ('jsmith', { 'home': {street: 'El Camino Real', city: 'San Fransisco', zip: 94110 }});",
                       "Unknown field 'street' in value of user defined type address")

        # try modifying a field name - see if will be encoded to a udt (should return error)
        assert_invalid(session,
                       "INSERT INTO user_profiles(login, addresses) VALUES ('fsmith', { 'home': {cityname: 'San Fransisco', zip: 94110 }});",
                       "Unknown field 'cityname' in value of user defined type address")

        # try modifying a type within the collection - see if will be encoded to a udt (should return error)
        assert_invalid(session, "INSERT INTO user_profiles(login, addresses) VALUES ('fsmith', { 'home': {city: 'San Fransisco', zip: '94110' }});",
                       "Invalid map literal for addresses")
开发者ID:iamaleksey,项目名称:cassandra-dtest,代码行数:31,代码来源:udtencoding_test.py


示例18: test_user_type_isolation

    def test_user_type_isolation(self):
        """
        Ensure UDT cannot be used from another keyspace
        @jira_ticket CASSANDRA-9409
        @since 2.2
        """
        cluster = self.cluster
        cluster.populate(1).start()
        node1 = cluster.nodelist()[0]
        session = self.patient_cql_connection(node1)
        create_ks(session, 'user_types', 1)

        # create a user defined type in a keyspace
        session.execute("CREATE TYPE udt (first text, second int, third int)")

        # ensure we cannot use a udt from another keyspace
        create_ks(session, 'user_ks', 1)
        assert_invalid(
            session,
            "CREATE TABLE t (id int PRIMARY KEY, v frozen<user_types.udt>)",
            "Statement on keyspace user_ks cannot refer to a user type in keyspace user_types"
        )
开发者ID:beobal,项目名称:cassandra-dtest,代码行数:22,代码来源:user_types_test.py


示例19: test_type_as_part_of_pkey

    def test_type_as_part_of_pkey(self):
        """Tests user types as part of a composite pkey"""
        # make sure we can define a table with a user type as part of the pkey
        # and do a basic insert/query of data in that table.
        cluster = self.cluster
        cluster.populate(3).start()
        node1, node2, node3 = cluster.nodelist()
        session = self.patient_cql_connection(node1, consistency_level=ConsistencyLevel.LOCAL_QUORUM)
        create_ks(session, 'user_type_pkeys', 2)

        stmt = """
              CREATE TYPE t_person_name (
              first text,
              middle text,
              last text
            )
           """
        session.execute(stmt)

        stmt = """
              CREATE TABLE person_likes (
              id uuid,
              name frozen<t_person_name>,
              like text,
              PRIMARY KEY ((id, name))
              )
           """
        session.execute(stmt)
        # Make sure the schema propagate
        time.sleep(2)

        _id = uuid.uuid4()

        stmt = """
              INSERT INTO person_likes (id, name, like)
              VALUES ({id}, {{first:'Nero', middle:'Claudius Caesar Augustus', last:'Germanicus'}}, 'arson');
           """.format(id=_id)
        session.execute(stmt)

        # attempt to query without the user type portion of the pkey and confirm there is an error
        stmt = """
              SELECT id, name.first from person_likes where id={id};
           """.format(id=_id)

        if self.cluster.version() >= '3.10':
            assert_invalid(session, stmt, 'Cannot execute this query as it might involve data filtering')
        elif self.cluster.version() >= '2.2':
            assert_invalid(session, stmt, 'Partition key parts: name must be restricted as other parts are')
        else:
            assert_invalid(session, stmt, 'Partition key part name must be restricted since preceding part is')

        stmt = """
              SELECT id, name.first, like from person_likes where id={id} and name = {{first:'Nero', middle: 'Claudius Caesar Augustus', last: 'Germanicus'}};
           """.format(id=_id)
        rows = session.execute(stmt)

        row_uuid, first_name, like = rows[0]
        assert first_name == 'Nero'
        assert like == 'arson'
开发者ID:beobal,项目名称:cassandra-dtest,代码行数:59,代码来源:user_types_test.py


示例20: udf_scripting_test

    def udf_scripting_test(self):
        session = self.prepare()
        session.execute("create table nums (key int primary key, val double);")

        for x in range(1, 4):
            session.execute("INSERT INTO nums (key, val) VALUES (%d, %d)" % (x, float(x)))

        session.execute("CREATE FUNCTION x_sin(val double) called on null input returns double language javascript as 'Math.sin(val)'")

        assert_one(session, "SELECT key, val, x_sin(val) FROM nums where key = %d" % 1, [1, 1.0, math.sin(1.0)])
        assert_one(session, "SELECT key, val, x_sin(val) FROM nums where key = %d" % 2, [2, 2.0, math.sin(2.0)])
        assert_one(session, "SELECT key, val, x_sin(val) FROM nums where key = %d" % 3, [3, 3.0, math.sin(3.0)])

        session.execute("create function y_sin(val double) called on null input returns double language javascript as 'Math.sin(val).toString()'")

        assert_invalid(session, "select y_sin(val) from nums where key = 1", expected=FunctionFailure)

        assert_invalid(session, "create function compilefail(key int) called on null input returns double language javascript as 'foo bar';")

        session.execute("create function plustwo(key int) called on null input returns double language javascript as 'key+2'")

        assert_one(session, "select plustwo(key) from nums where key = 3", [5])
开发者ID:iamaleksey,项目名称:cassandra-dtest,代码行数:22,代码来源:user_functions_test.py



注:本文中的tools.assertions.assert_invalid函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python assertions.assert_one函数代码示例发布时间:2022-05-27
下一篇:
Python assertions.assert_all函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap