本文整理汇总了Python中tools.assertions.assert_invalid函数的典型用法代码示例。如果您正苦于以下问题:Python assert_invalid函数的具体用法?Python assert_invalid怎么用?Python assert_invalid使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了assert_invalid函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: check_permissions
def check_permissions(self, node, upgraded):
# use an exclusive connection to ensure we only talk to the specified node
klaus = self.patient_exclusive_cql_connection(node, user='klaus', password='12345', timeout=20)
# klaus is a superuser, so should be able to list all permissions
# the output of LIST PERMISSIONS changes slightly with #7653 adding
# a new role column to results, so we need to tailor our check
# based on whether the node has been upgraded or not
if not upgraded:
assert_all(klaus,
'LIST ALL PERMISSIONS',
[['michael', '<table ks.cf1>', 'MODIFY'],
['michael', '<table ks.cf2>', 'SELECT']],
timeout=60)
else:
assert_all(klaus,
'LIST ALL PERMISSIONS',
[['michael', 'michael', '<table ks.cf1>', 'MODIFY'],
['michael', 'michael', '<table ks.cf2>', 'SELECT']],
timeout=60)
klaus.cluster.shutdown()
michael = self.patient_exclusive_cql_connection(node, user='michael', password='54321')
michael.execute('INSERT INTO ks.cf1 (id, val) VALUES (0,0)')
michael.execute('SELECT * FROM ks.cf2')
assert_invalid(michael,
'SELECT * FROM ks.cf1',
'User michael has no SELECT permission on <table ks.cf1> or any of its parents',
Unauthorized)
michael.cluster.shutdown()
开发者ID:beobal,项目名称:cassandra-dtest,代码行数:30,代码来源:upgrade_internal_auth_test.py
示例2: udf_with_udt_keyspace_isolation_test
def udf_with_udt_keyspace_isolation_test(self):
"""
Ensure functions dont allow a UDT from another keyspace
@jira_ticket CASSANDRA-9409
@since 2.2
"""
session = self.prepare()
session.execute("create type udt (a text, b int);")
create_ks(session, 'user_ks', 1)
# ensure we cannot use a udt from another keyspace as function argument
assert_invalid(
session,
"CREATE FUNCTION overloaded(v ks.udt) called on null input RETURNS text LANGUAGE java AS 'return \"f1\";'",
"Statement on keyspace user_ks cannot refer to a user type in keyspace ks"
)
# ensure we cannot use a udt from another keyspace as return value
assert_invalid(
session,
("CREATE FUNCTION test(v text) called on null input RETURNS ks.udt "
"LANGUAGE java AS 'return null;';"),
"Statement on keyspace user_ks cannot refer to a user type in keyspace ks"
)
开发者ID:iamaleksey,项目名称:cassandra-dtest,代码行数:25,代码来源:user_functions_test.py
示例3: udf_with_udt_test
def udf_with_udt_test(self):
"""
Test UDFs that operate on non-frozen UDTs.
@jira_ticket CASSANDRA-7423
@since 3.6
"""
session = self.prepare()
session.execute("create type test (a text, b int);")
session.execute("create function funk(udt test) called on null input returns int language java as 'return Integer.valueOf(udt.getInt(\"b\"));';")
if self.cluster.version() >= LooseVersion('3.6'):
frozen_vals = (False, True)
else:
frozen_vals = (True,)
for frozen in frozen_vals:
debug("Using {} UDTs".format("frozen" if frozen else "non-frozen"))
table_name = "tab_frozen" if frozen else "tab"
column_type = "frozen<test>" if frozen else "test"
session.execute("create table {} (key int primary key, udt {});".format(table_name, column_type))
session.execute("insert into %s (key, udt) values (1, {a: 'un', b:1});" % (table_name,))
session.execute("insert into %s (key, udt) values (2, {a: 'deux', b:2});" % (table_name,))
session.execute("insert into %s (key, udt) values (3, {a: 'trois', b:3});" % (table_name,))
assert_one(session, "select sum(funk(udt)) from {}".format(table_name), [6])
assert_invalid(session, "drop type test;")
开发者ID:iamaleksey,项目名称:cassandra-dtest,代码行数:29,代码来源:user_functions_test.py
示例4: drop_column_compact_test
def drop_column_compact_test(self):
session = self.prepare()
session.execute("USE ks")
session.execute("CREATE TABLE cf (key int PRIMARY KEY, c1 int, c2 int) WITH COMPACT STORAGE")
assert_invalid(session, "ALTER TABLE cf DROP c1", "Cannot drop columns from a")
开发者ID:iamaleksey,项目名称:cassandra-dtest,代码行数:7,代码来源:schema_test.py
示例5: only_one_timestamp_is_valid_test
def only_one_timestamp_is_valid_test(self):
""" Test that TIMESTAMP must not be used in the statements within the batch. """
session = self.prepare()
assert_invalid(session, """
BEGIN BATCH USING TIMESTAMP 1111111111111111
INSERT INTO users (id, firstname, lastname) VALUES (0, 'Jack', 'Sparrow') USING TIMESTAMP 2
INSERT INTO users (id, firstname, lastname) VALUES (1, 'Will', 'Turner')
APPLY BATCH
""", matching="Timestamp must be set either on BATCH or individual statements")
开发者ID:krummas,项目名称:cassandra-dtest,代码行数:9,代码来源:batch_test.py
示例6: test_reloadlocalschema
def test_reloadlocalschema(self):
"""
@jira_ticket CASSANDRA-13954
Test that `nodetool reloadlocalschema` works as intended
"""
cluster = self.cluster
cluster.populate(1)
node = cluster.nodelist()[0]
remove_perf_disable_shared_mem(node) # for jmx
cluster.start()
session = self.patient_cql_connection(node)
query = "CREATE KEYSPACE IF NOT EXISTS test WITH replication = {'class': 'NetworkTopologyStrategy', 'datacenter1': 2};"
session.execute(query)
query = 'CREATE TABLE test.test (pk int, ck int, PRIMARY KEY (pk, ck));'
session.execute(query)
ss = make_mbean('db', type='StorageService')
schema_version = ''
# get initial schema version
with JolokiaAgent(node) as jmx:
schema_version = jmx.read_attribute(ss, 'SchemaVersion')
# manually add a regular column 'val' to test.test
query = """
INSERT INTO system_schema.columns
(keyspace_name, table_name, column_name, clustering_order,
column_name_bytes, kind, position, type)
VALUES
('test', 'test', 'val', 'none',
0x76616c, 'regular', -1, 'int');"""
session.execute(query)
# validate that schema version wasn't automatically updated
with JolokiaAgent(node) as jmx:
self.assertEqual(schema_version, jmx.read_attribute(ss, 'SchemaVersion'))
# make sure the new column wasn't automagically picked up
assert_invalid(session, 'INSERT INTO test.test (pk, ck, val) VALUES (0, 1, 2);')
# force the node to reload schema from disk
node.nodetool('reloadlocalschema')
# validate that schema version changed
with JolokiaAgent(node) as jmx:
self.assertNotEqual(schema_version, jmx.read_attribute(ss, 'SchemaVersion'))
# try an insert with the new column again and validate it succeeds this time
session.execute('INSERT INTO test.test (pk, ck, val) VALUES (0, 1, 2);')
assert_all(session, 'SELECT pk, ck, val FROM test.test;', [[0, 1, 2]])
开发者ID:snazy,项目名称:cassandra-dtest,代码行数:55,代码来源:nodetool_test.py
示例7: logged_batch_rejects_counter_mutations_test
def logged_batch_rejects_counter_mutations_test(self):
""" Test that logged batch rejects counter mutations """
session = self.prepare()
err = "Cannot include a counter statement in a logged batch"
assert_invalid(session, """
BEGIN BATCH
INSERT INTO users (id, firstname, lastname) VALUES (0, 'Jack', 'Sparrow')
INSERT INTO users (id, firstname, lastname) VALUES (1, 'Will', 'Turner')
UPDATE clicks SET total = total + 1 WHERE userid = 1 and url = 'http://foo.com'
APPLY BATCH
""", matching=err)
开发者ID:krummas,项目名称:cassandra-dtest,代码行数:12,代码来源:batch_test.py
示例8: test_nested_type_dropping
def test_nested_type_dropping(self):
"""
Confirm a user type can't be dropped when being used by another user type.
"""
cluster = self.cluster
cluster.populate(3).start()
node1, node2, node3 = cluster.nodelist()
session = self.patient_cql_connection(node1)
create_ks(session, "nested_user_type_dropping", 2)
session.default_consistency_level = ConsistencyLevel.LOCAL_QUORUM
stmt = """
USE nested_user_type_dropping
"""
session.execute(stmt)
stmt = """
CREATE TYPE simple_type (
user_number int,
user_text text
)
"""
session.execute(stmt)
stmt = """
CREATE TYPE another_type (
somefield frozen<simple_type>
)
"""
session.execute(stmt)
stmt = """
DROP TYPE simple_type;
"""
assert_invalid(
session,
stmt,
"Cannot drop user type nested_user_type_dropping.simple_type as it is still used by user type another_type",
)
# drop the type that's impeding the drop, and then try again
stmt = """
DROP TYPE another_type;
"""
session.execute(stmt)
stmt = """
DROP TYPE simple_type;
"""
session.execute(stmt)
# now let's have a look at the system schema and make sure no user types are defined
self.assertNoTypes(session)
开发者ID:jeffjirsa,项目名称:cassandra-dtest,代码行数:53,代码来源:user_types_test.py
示例9: unlogged_batch_rejects_counter_mutations_test
def unlogged_batch_rejects_counter_mutations_test(self):
""" Test that unlogged batch rejects counter mutations """
session = self.prepare()
err = "Counter and non-counter mutations cannot exist in the same batch"
assert_invalid(session, """
BEGIN UNLOGGED BATCH
INSERT INTO users (id, firstname, lastname) VALUES (0, 'Jack', 'Sparrow')
INSERT INTO users (id, firstname, lastname) VALUES (2, 'Elizabeth', 'Swann')
UPDATE clicks SET total = total + 1 WHERE userid = 1 AND url = 'http://foo.com'
APPLY BATCH
""", matching=err)
开发者ID:krummas,项目名称:cassandra-dtest,代码行数:12,代码来源:batch_test.py
示例10: test_type_enforcement
def test_type_enforcement(self):
"""
Confirm error when incorrect data type used for user type
"""
cluster = self.cluster
cluster.populate(3).start()
node1, node2, node3 = cluster.nodelist()
session = self.cql_connection(node1)
create_ks(session, "user_type_enforcement", 2)
session.default_consistency_level = ConsistencyLevel.LOCAL_QUORUM
stmt = """
USE user_type_enforcement
"""
session.execute(stmt)
stmt = """
CREATE TYPE simple_type (
user_number int
)
"""
session.execute(stmt)
stmt = """
CREATE TABLE simple_table (
id uuid PRIMARY KEY,
number frozen<simple_type>
)
"""
session.execute(stmt)
# Make sure the schema propagate
time.sleep(2)
# here we will attempt an insert statement which should fail
# because the user type is an int, but the insert statement is
# providing text
_id = uuid.uuid4()
stmt = """
INSERT INTO simple_table (id, number)
VALUES ({id}, {{user_number: 'uh oh....this is not a number'}});
""".format(
id=_id
)
assert_invalid(session, stmt, "field user_number is not of type int")
# let's check the rowcount and make sure the data
# didn't get inserted when the exception asserted above was thrown
stmt = """
SELECT * FROM simple_table;
"""
rows = list(session.execute(stmt))
self.assertEqual(0, len(rows))
开发者ID:jeffjirsa,项目名称:cassandra-dtest,代码行数:52,代码来源:user_types_test.py
示例11: counter_batch_rejects_regular_mutations_test
def counter_batch_rejects_regular_mutations_test(self):
""" Test that counter batch rejects non-counter mutations """
session = self.prepare()
err = "Cannot include non-counter statement in a counter batch"
assert_invalid(session, """
BEGIN COUNTER BATCH
UPDATE clicks SET total = total + 1 WHERE userid = 1 and url = 'http://foo.com'
UPDATE clicks SET total = total + 1 WHERE userid = 1 and url = 'http://bar.com'
UPDATE clicks SET total = total + 1 WHERE userid = 2 and url = 'http://baz.com'
INSERT INTO users (id, firstname, lastname) VALUES (0, 'Jack', 'Sparrow')
APPLY BATCH
""", matching=err)
开发者ID:krummas,项目名称:cassandra-dtest,代码行数:13,代码来源:batch_test.py
示例12: aggregate_with_udt_keyspace_isolation_test
def aggregate_with_udt_keyspace_isolation_test(self):
"""
Ensure aggregates dont allow a UDT from another keyspace
@jira_ticket CASSANDRA-9409
"""
session = self.prepare()
session.execute("create type udt (a int);")
create_ks(session, 'user_ks', 1)
assert_invalid(
session,
"create aggregate suma (ks.udt) sfunc plus stype int finalfunc stri initcond 10",
"Statement on keyspace user_ks cannot refer to a user type in keyspace ks"
)
开发者ID:iamaleksey,项目名称:cassandra-dtest,代码行数:14,代码来源:user_functions_test.py
示例13: aggregate_udf_test
def aggregate_udf_test(self):
session = self.prepare()
session.execute("create table nums (key int primary key, val int);")
for x in range(1, 4):
session.execute("INSERT INTO nums (key, val) VALUES (%d, %d)" % (x, x))
session.execute("create function plus(key int, val int) called on null input returns int language java as 'return Integer.valueOf(key.intValue() + val.intValue());'")
session.execute("create function stri(key int) called on null input returns text language java as 'return key.toString();'")
session.execute("create aggregate suma (int) sfunc plus stype int finalfunc stri initcond 10")
assert_one(session, "select suma(val) from nums", ["16"])
session.execute("create function test(a int, b double) called on null input returns int language javascript as 'a + b;'")
session.execute("create aggregate aggy(double) sfunc test stype int")
assert_invalid(session, "create aggregate aggtwo(int) sfunc aggy stype int")
assert_invalid(session, "create aggregate aggthree(int) sfunc test stype int finalfunc aggtwo")
开发者ID:iamaleksey,项目名称:cassandra-dtest,代码行数:18,代码来源:user_functions_test.py
示例14: assertions_test
def assertions_test(self):
# assert_exception_test
mock_session = Mock(**{'execute.side_effect': AlreadyExists("Dummy exception message.")})
assert_exception(mock_session, "DUMMY QUERY", expected=AlreadyExists)
# assert_unavailable_test
mock_session = Mock(**{'execute.side_effect': Unavailable("Dummy Unavailabile message.")})
assert_unavailable(mock_session.execute)
# assert_invalid_test
mock_session = Mock(**{'execute.side_effect': InvalidRequest("Dummy InvalidRequest message.")})
assert_invalid(mock_session, "DUMMY QUERY")
# assert_unauthorized_test
mock_session = Mock(**{'execute.side_effect': Unauthorized("Dummy Unauthorized message.")})
assert_unauthorized(mock_session, "DUMMY QUERY", None)
# assert_one_test
mock_session = Mock()
mock_session.execute = Mock(return_value=[[1, 1]])
assert_one(mock_session, "SELECT * FROM test", [1, 1])
# assert_none_test
mock_session = Mock()
mock_session.execute = Mock(return_value=[])
assert_none(mock_session, "SELECT * FROM test")
# assert_all_test
mock_session = Mock()
mock_session.execute = Mock(return_value=[[i, i] for i in range(0, 10)])
assert_all(mock_session, "SELECT k, v FROM test", [[i, i] for i in range(0, 10)], ignore_order=True)
# assert_almost_equal_test
assert_almost_equal(1, 1.1, 1.2, 1.9, error=1.0)
# assert_row_count_test
mock_session = Mock()
mock_session.execute = Mock(return_value=[[1]])
assert_row_count(mock_session, 'test', 1)
# assert_length_equal_test
check = [1, 2, 3, 4]
assert_length_equal(check, 4)
开发者ID:iamaleksey,项目名称:cassandra-dtest,代码行数:43,代码来源:assertion_test.py
示例15: drop_counter_column_test
def drop_counter_column_test(self):
"""Test for CASSANDRA-7831"""
cluster = self.cluster
cluster.populate(1).start()
node1, = cluster.nodelist()
session = self.patient_cql_connection(node1)
create_ks(session, 'counter_tests', 1)
session.execute("CREATE TABLE counter_bug (t int, c counter, primary key(t))")
session.execute("UPDATE counter_bug SET c = c + 1 where t = 1")
row = list(session.execute("SELECT * from counter_bug"))
self.assertEqual(rows_to_list(row)[0], [1, 1])
self.assertEqual(len(row), 1)
session.execute("ALTER TABLE counter_bug drop c")
assert_invalid(session, "ALTER TABLE counter_bug add c counter", "Cannot re-add previously dropped counter column c")
开发者ID:jeffjirsa,项目名称:cassandra-dtest,代码行数:19,代码来源:counter_tests.py
示例16: test_no_counters_in_user_types
def test_no_counters_in_user_types(self):
# CASSANDRA-7672
cluster = self.cluster
cluster.populate(1).start()
[node1] = cluster.nodelist()
session = self.patient_cql_connection(node1)
create_ks(session, 'user_types', 1)
stmt = """
USE user_types
"""
session.execute(stmt)
stmt = """
CREATE TYPE t_item (
sub_one COUNTER )
"""
assert_invalid(session, stmt, 'A user type cannot contain counters')
开发者ID:beobal,项目名称:cassandra-dtest,代码行数:19,代码来源:user_types_test.py
示例17: udt_test
def udt_test(self):
""" Test (somewhat indirectly) that user queries involving UDT's are properly encoded (due to driver not recognizing UDT syntax) """
cluster = self.cluster
cluster.populate(3).start()
node1, node2, node3 = cluster.nodelist()
time.sleep(.5)
session = self.patient_cql_connection(node1)
create_ks(session, 'ks', 3)
# create udt and insert correctly (should be successful)
session.execute('CREATE TYPE address (city text,zip int);')
session.execute('CREATE TABLE user_profiles (login text PRIMARY KEY, addresses map<text, frozen<address>>);')
session.execute("INSERT INTO user_profiles(login, addresses) VALUES ('tsmith', { 'home': {city: 'San Fransisco',zip: 94110 }});")
# note here address looks likes a map -> which is what the driver thinks it is. udt is encoded server side, we test that if addresses is changed slightly whether encoder recognizes the errors
# try adding a field - see if will be encoded to a udt (should return error)
assert_invalid(session,
"INSERT INTO user_profiles(login, addresses) VALUES ('jsmith', { 'home': {street: 'El Camino Real', city: 'San Fransisco', zip: 94110 }});",
"Unknown field 'street' in value of user defined type address")
# try modifying a field name - see if will be encoded to a udt (should return error)
assert_invalid(session,
"INSERT INTO user_profiles(login, addresses) VALUES ('fsmith', { 'home': {cityname: 'San Fransisco', zip: 94110 }});",
"Unknown field 'cityname' in value of user defined type address")
# try modifying a type within the collection - see if will be encoded to a udt (should return error)
assert_invalid(session, "INSERT INTO user_profiles(login, addresses) VALUES ('fsmith', { 'home': {city: 'San Fransisco', zip: '94110' }});",
"Invalid map literal for addresses")
开发者ID:iamaleksey,项目名称:cassandra-dtest,代码行数:31,代码来源:udtencoding_test.py
示例18: test_user_type_isolation
def test_user_type_isolation(self):
"""
Ensure UDT cannot be used from another keyspace
@jira_ticket CASSANDRA-9409
@since 2.2
"""
cluster = self.cluster
cluster.populate(1).start()
node1 = cluster.nodelist()[0]
session = self.patient_cql_connection(node1)
create_ks(session, 'user_types', 1)
# create a user defined type in a keyspace
session.execute("CREATE TYPE udt (first text, second int, third int)")
# ensure we cannot use a udt from another keyspace
create_ks(session, 'user_ks', 1)
assert_invalid(
session,
"CREATE TABLE t (id int PRIMARY KEY, v frozen<user_types.udt>)",
"Statement on keyspace user_ks cannot refer to a user type in keyspace user_types"
)
开发者ID:beobal,项目名称:cassandra-dtest,代码行数:22,代码来源:user_types_test.py
示例19: test_type_as_part_of_pkey
def test_type_as_part_of_pkey(self):
"""Tests user types as part of a composite pkey"""
# make sure we can define a table with a user type as part of the pkey
# and do a basic insert/query of data in that table.
cluster = self.cluster
cluster.populate(3).start()
node1, node2, node3 = cluster.nodelist()
session = self.patient_cql_connection(node1, consistency_level=ConsistencyLevel.LOCAL_QUORUM)
create_ks(session, 'user_type_pkeys', 2)
stmt = """
CREATE TYPE t_person_name (
first text,
middle text,
last text
)
"""
session.execute(stmt)
stmt = """
CREATE TABLE person_likes (
id uuid,
name frozen<t_person_name>,
like text,
PRIMARY KEY ((id, name))
)
"""
session.execute(stmt)
# Make sure the schema propagate
time.sleep(2)
_id = uuid.uuid4()
stmt = """
INSERT INTO person_likes (id, name, like)
VALUES ({id}, {{first:'Nero', middle:'Claudius Caesar Augustus', last:'Germanicus'}}, 'arson');
""".format(id=_id)
session.execute(stmt)
# attempt to query without the user type portion of the pkey and confirm there is an error
stmt = """
SELECT id, name.first from person_likes where id={id};
""".format(id=_id)
if self.cluster.version() >= '3.10':
assert_invalid(session, stmt, 'Cannot execute this query as it might involve data filtering')
elif self.cluster.version() >= '2.2':
assert_invalid(session, stmt, 'Partition key parts: name must be restricted as other parts are')
else:
assert_invalid(session, stmt, 'Partition key part name must be restricted since preceding part is')
stmt = """
SELECT id, name.first, like from person_likes where id={id} and name = {{first:'Nero', middle: 'Claudius Caesar Augustus', last: 'Germanicus'}};
""".format(id=_id)
rows = session.execute(stmt)
row_uuid, first_name, like = rows[0]
assert first_name == 'Nero'
assert like == 'arson'
开发者ID:beobal,项目名称:cassandra-dtest,代码行数:59,代码来源:user_types_test.py
示例20: udf_scripting_test
def udf_scripting_test(self):
session = self.prepare()
session.execute("create table nums (key int primary key, val double);")
for x in range(1, 4):
session.execute("INSERT INTO nums (key, val) VALUES (%d, %d)" % (x, float(x)))
session.execute("CREATE FUNCTION x_sin(val double) called on null input returns double language javascript as 'Math.sin(val)'")
assert_one(session, "SELECT key, val, x_sin(val) FROM nums where key = %d" % 1, [1, 1.0, math.sin(1.0)])
assert_one(session, "SELECT key, val, x_sin(val) FROM nums where key = %d" % 2, [2, 2.0, math.sin(2.0)])
assert_one(session, "SELECT key, val, x_sin(val) FROM nums where key = %d" % 3, [3, 3.0, math.sin(3.0)])
session.execute("create function y_sin(val double) called on null input returns double language javascript as 'Math.sin(val).toString()'")
assert_invalid(session, "select y_sin(val) from nums where key = 1", expected=FunctionFailure)
assert_invalid(session, "create function compilefail(key int) called on null input returns double language javascript as 'foo bar';")
session.execute("create function plustwo(key int) called on null input returns double language javascript as 'key+2'")
assert_one(session, "select plustwo(key) from nums where key = 3", [5])
开发者ID:iamaleksey,项目名称:cassandra-dtest,代码行数:22,代码来源:user_functions_test.py
注:本文中的tools.assertions.assert_invalid函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论