本文整理汇总了Python中vtdb.vtgate_client.connect函数的典型用法代码示例。如果您正苦于以下问题:Python connect函数的具体用法?Python connect怎么用?Python connect使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了connect函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: _read_srv_keyspace
def _read_srv_keyspace(self, keyspace_name):
addr = utils.vtgate.rpc_endpoint()
protocol = protocols_flavor().vtgate_python_protocol()
conn = vtgate_client.connect(protocol, addr, 30.0)
result = conn.get_srv_keyspace(keyspace_name)
conn.close()
return result
开发者ID:hediehy,项目名称:vitess,代码行数:7,代码来源:keyspace_test.py
示例2: get_connection
def get_connection(timeout=10.0):
protocol, endpoint = utils.vtgate.rpc_endpoint(python=True)
try:
return vtgate_client.connect(protocol, endpoint, timeout)
except Exception:
logging.exception('Connection to vtgate (timeout=%s) failed.', timeout)
raise
开发者ID:izogain,项目名称:vitess,代码行数:7,代码来源:vtgatev3_test.py
示例3: main
def main():
parser = argparse.ArgumentParser()
parser.add_argument("-v", "--verbose", help="increase output verbosity",
action="store_true")
parser.add_argument("--version", help="show the program version",
action="store_true")
parser.add_argument("-g", "--vtgate-host", help="vtgate host",
dest="vtgate_host", default="localhost")
parser.add_argument("-p", "--vtgate-port", help="vtgate port",
dest="vtgate_port", default="15991")
parser.add_argument("-C", "--vtctld-host", help="vtctld host",
dest="vtctld_host", default="localhost")
parser.add_argument("-P", "--vtctld-port", help="vtctld port",
dest="vtctld_port", default="15999")
parser.add_argument("-c", "--cell", help="cell", dest="cell")
args = parser.parse_args()
if not args.cell:
return 1
vtgate_server = args.vtgate_host + ":" + args.vtgate_port
vtgate_conn = vtgate_client.connect('grpc', vtgate_server, 3600)
SqlForwarder(vtgate_conn).cmdloop()
return 0
开发者ID:mdhheydari,项目名称:vitess-cli,代码行数:26,代码来源:vitess-cli.py
示例4: setUp
def setUp(self):
addr = 'localhost:%d' % vtgateclienttest_port
protocol = protocols_flavor().vtgate_python_protocol()
self.conn = vtgate_client.connect(protocol, addr, 30.0)
logging.info(
'Start: %s, protocol %s.',
'.'.join(self.id().split('.')[-2:]), protocol)
开发者ID:xgwubin,项目名称:vitess,代码行数:7,代码来源:python_client_test.py
示例5: get_connection
def get_connection(timeout=10.0):
protocol = protocols_flavor().vtgate_python_protocol()
try:
return vtgate_client.connect(protocol, utils.vtgate.addr(), timeout)
except Exception:
logging.exception('Connection to vtgate (timeout=%s) failed.', timeout)
raise
开发者ID:hediehy,项目名称:vitess,代码行数:7,代码来源:vtgatev3_test.py
示例6: _exec_vt_txn
def _exec_vt_txn(self, query_list):
protocol, addr = utils.vtgate.rpc_endpoint(python=True)
vtgate_conn = vtgate_client.connect(protocol, addr, 30.0)
cursor = vtgate_conn.cursor(tablet_type="master", keyspace="test_keyspace", shards=["0"], writable=True)
cursor.begin()
for query in query_list:
cursor.execute(query, {})
cursor.commit()
return
开发者ID:erzel,项目名称:vitess,代码行数:9,代码来源:update_stream.py
示例7: __init__
def __init__(self, cache):
threading.Thread.__init__(self)
self.cache = cache
self.done = False
protocol, addr = utils.vtgate.rpc_endpoint(python=True)
self.conn = vtgate_client.connect(protocol, addr, 30.0)
self.timestamp = long(time.time())
self.start()
开发者ID:gitql,项目名称:vitess,代码行数:10,代码来源:cache_invalidation.py
示例8: setUp
def setUp(self):
super(TestPythonClientBase, self).setUp()
protocol = protocols_flavor().vtgate_python_protocol()
if protocol == 'grpc':
addr = 'localhost:%d' % vtgateclienttest_grpc_port
else:
addr = 'localhost:%d' % vtgateclienttest_port
self.conn = vtgate_client.connect(protocol, addr, 30.0)
logging.info(
'Start: %s, protocol %s.',
'.'.join(self.id().split('.')[-2:]), protocol)
开发者ID:alainjobart,项目名称:vitess,代码行数:11,代码来源:python_client_test.py
示例9: create_connection
def create_connection(self):
"""Connects to vtgate and allows to create a cursor to execute queries.
This method is preferred over the two other methods ("vtclient", "execute")
to execute a query in tests.
Yields:
A vtgate connection object.
Example:
with self.vtgate.create_connection() as conn:
c = conn.cursor(keyspace=KEYSPACE, shards=[SHARD], tablet_type='master',
writable=self.writable)
c.execute('SELECT * FROM buffer WHERE id = :id', {'id': 1})
"""
protocol, endpoint = self.rpc_endpoint(python=True)
# Use a very long timeout to account for slow tests.
conn = vtgate_client.connect(protocol, endpoint, 600.0)
yield conn
conn.close()
开发者ID:gitql,项目名称:vitess,代码行数:20,代码来源:utils.py
示例10: setUp
def setUp(self):
addr = 'localhost:%d' % vtgateclienttest_port
protocol = protocols_flavor().vtgate_python_protocol()
self.conn = vtgate_client.connect(protocol, addr, 30.0)
开发者ID:ruiaylin,项目名称:vitess,代码行数:4,代码来源:python_client_test.py
示例11: _vtdb_conn
def _vtdb_conn(self):
protocol, addr = utils.vtgate.rpc_endpoint(python=True)
return vtgate_client.connect(protocol, addr, 30.0)
开发者ID:benyu,项目名称:vitess,代码行数:3,代码来源:vertical_split.py
示例12: _read_srv_keyspace
def _read_srv_keyspace(self, keyspace_name):
protocol, addr = utils.vtgate.rpc_endpoint(python=True)
conn = vtgate_client.connect(protocol, addr, 30.0)
result = conn.get_srv_keyspace(keyspace_name)
conn.close()
return result
开发者ID:Rastusik,项目名称:vitess,代码行数:6,代码来源:keyspace_test.py
示例13: test_standalone
def test_standalone(self):
"""Sample test for run_local_database.py as a standalone process."""
topology = vttest_pb2.VTTestTopology()
keyspace = topology.keyspaces.add(name='test_keyspace')
keyspace.shards.add(name='-80')
keyspace.shards.add(name='80-')
topology.keyspaces.add(name='redirect', served_from='test_keyspace')
# launch a backend database based on the provided topology and schema
port = environment.reserve_ports(1)
args = [environment.run_local_database,
'--port', str(port),
'--proto_topo', text_format.MessageToString(topology,
as_one_line=True),
'--schema_dir', os.path.join(environment.vttop, 'test',
'vttest_schema'),
'--web_dir', environment.vttop + '/web/vtctld',
]
sp = subprocess.Popen(args, stdin=subprocess.PIPE, stdout=subprocess.PIPE)
config = json.loads(sp.stdout.readline())
# gather the vars for the vtgate process
url = 'http://localhost:%d/debug/vars' % config['port']
f = urllib.urlopen(url)
data = f.read()
f.close()
json_vars = json.loads(data)
self.assertIn('vtcombo', json_vars['cmdline'][0])
# build the vtcombo address and protocol
protocol = protocols_flavor().vttest_protocol()
if protocol == 'grpc':
vtgate_addr = 'localhost:%d' % config['grpc_port']
else:
vtgate_addr = 'localhost:%d' % config['port']
conn_timeout = 30.0
utils.pause('Paused test after vtcombo was started.\n'
'For manual testing, connect to vtgate at: %s '
'using protocol: %s.\n'
'Press enter to continue.' % (vtgate_addr, protocol))
# Connect to vtgate.
conn = vtgate_client.connect(protocol, vtgate_addr, conn_timeout)
# Insert a row.
row_id = 123
keyspace_id = get_keyspace_id(row_id)
cursor = conn.cursor(
tablet_type='master', keyspace='test_keyspace',
keyspace_ids=[pack_kid(keyspace_id)],
writable=True)
cursor.begin()
insert = ('insert into test_table (id, msg, keyspace_id) values (:id, '
':msg, :keyspace_id)')
bind_variables = {
'id': row_id,
'msg': 'test %s' % row_id,
'keyspace_id': keyspace_id,
}
cursor.execute(insert, bind_variables)
cursor.commit()
# Read the row back.
cursor.execute(
'select * from test_table where id=:id', {'id': row_id})
result = cursor.fetchall()
self.assertEqual(result[0][1], 'test 123')
# try to insert again, see if we get the rigth integrity error exception
# (this is meant to test vtcombo properly returns exceptions, and to a
# lesser extend that the python client converts it properly)
cursor.begin()
with self.assertRaises(dbexceptions.IntegrityError):
cursor.execute(insert, bind_variables)
cursor.rollback()
# Insert a bunch of rows with long msg values.
bind_variables['msg'] = 'x' * 64
id_start = 1000
rowcount = 500
cursor.begin()
for i in xrange(id_start, id_start+rowcount):
bind_variables['id'] = i
cursor.execute(insert, bind_variables)
cursor.commit()
cursor.close()
# Try to fetch a large number of rows, from a rdonly
# (more than one streaming result packet).
stream_cursor = conn.cursor(
tablet_type='rdonly', keyspace='test_keyspace',
keyspace_ids=[pack_kid(keyspace_id)],
cursorclass=vtgate_cursor.StreamVTGateCursor)
stream_cursor.execute('select * from test_table where id >= :id_start',
{'id_start': id_start})
self.assertEqual(rowcount, len(list(stream_cursor.fetchall())))
stream_cursor.close()
# try to read a row using the redirected keyspace, to a replica this time
#.........这里部分代码省略.........
开发者ID:CowLeo,项目名称:vitess,代码行数:101,代码来源:vttest_sample_test.py
示例14: env
return json.dumps(entries)
@app.route('/env')
def env():
return json.dumps(dict(os.environ))
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Run guestbook app')
parser.add_argument('--port', help='Port', default=8080, type=int)
parser.add_argument('--cell', help='Cell', default='test', type=str)
parser.add_argument(
'--keyspace', help='Keyspace', default='test_keyspace', type=str)
parser.add_argument(
'--timeout', help='Connect timeout (s)', default=10, type=int)
parser.add_argument(
'--vtgate_port', help='Vtgate Port', default=15991, type=int)
guestbook_args = parser.parse_args()
# Get vtgate service address from Kubernetes DNS.
addr = 'vtgate-%s:%d' % (guestbook_args.cell, guestbook_args.vtgate_port)
# Connect to vtgate.
conn = vtgate_client.connect('grpc', addr, guestbook_args.timeout)
keyspace = guestbook_args.keyspace
app.run(host='0.0.0.0', port=guestbook_args.port, debug=True)
开发者ID:alainjobart,项目名称:vitess,代码行数:29,代码来源:main.py
示例15: main
def main():
print "Content-Type: application/json\n"
try:
conn = vtgate_client.connect("grpc", "localhost:12346", 10.0)
args = cgi.FieldStorage()
query = args.getvalue("query")
response = {}
try:
queries = []
stats = capture_log(12345, queries)
time.sleep(0.25)
exec_query(conn, "result", query, response)
finally:
stats.terminate()
time.sleep(0.25)
response["queries"] = queries
# user table
exec_query(
conn, "user0",
"select * from user", response, keyspace="user", kr="-80")
exec_query(
conn, "user1",
"select * from user", response, keyspace="user", kr="80-")
# user_extra table
exec_query(
conn, "user_extra0",
"select * from user_extra", response, keyspace="user", kr="-80")
exec_query(
conn, "user_extra1",
"select * from user_extra", response, keyspace="user", kr="80-")
# music table
exec_query(
conn, "music0",
"select * from music", response, keyspace="user", kr="-80")
exec_query(
conn, "music1",
"select * from music", response, keyspace="user", kr="80-")
# music_extra table
exec_query(
conn, "music_extra0",
"select * from music_extra", response, keyspace="user", kr="-80")
exec_query(
conn, "music_extra1",
"select * from music_extra", response, keyspace="user", kr="80-")
# name_info table
exec_query(
conn, "name_info0",
"select * from name_info", response, keyspace="user", kr="-80")
exec_query(
conn, "name_info1",
"select * from name_info", response, keyspace="user", kr="80-")
# music_user_idx table
exec_query(
conn, "music_user_idx0",
"select * from music_user_idx", response, keyspace="user", kr="-80")
exec_query(
conn, "music_user_idx1",
"select * from music_user_idx", response, keyspace="user", kr="80-")
# lookup tables
exec_query(
conn, "user_seq", "select * from user_seq", response,
keyspace="lookup", kr="-")
exec_query(
conn, "music_seq", "select * from music_seq", response,
keyspace="lookup", kr="-")
exec_query(
conn, "name_user_idx", "select * from name_user_idx", response,
keyspace="lookup", kr="-")
print json.dumps(response)
except Exception as e: # pylint: disable=broad-except
print json.dumps({"error": str(e)})
开发者ID:32kb,项目名称:vitess,代码行数:81,代码来源:data.py
示例16: messages
"INSERT INTO messages (page, time_created_ns, keyspace_id, message)"
" VALUES (:page, :time_created_ns, :keyspace_id, :message)",
{"page": page, "time_created_ns": int(time.time() * 1e9), "keyspace_id": keyspace_id_int, "message": value},
)
cursor.commit()
# Read the list back from master (critical read) because it's
# important that the user sees their own addition immediately.
cursor.execute("SELECT message FROM messages WHERE page=:page" " ORDER BY time_created_ns", {"page": page})
entries = [row[0] for row in cursor.fetchall()]
cursor.close()
return json.dumps(entries)
@app.route("/env")
def env():
return json.dumps(dict(os.environ))
if __name__ == "__main__":
timeout = 10 # connect timeout in seconds
# Get vtgate service address from Kubernetes DNS.
addr = "vtgate-test:15991"
# Connect to vtgate.
conn = vtgate_client.connect("grpc", addr, timeout)
app.run(host="0.0.0.0", port=8080, debug=True)
开发者ID:aaijazi,项目名称:vitess,代码行数:30,代码来源:main.py
示例17: _vtdb_conn
def _vtdb_conn(self):
addr = utils.vtgate.rpc_endpoint()
protocol = protocols_flavor().vtgate_python_protocol()
return vtgate_client.connect(protocol, addr, 30.0)
开发者ID:payintel,项目名称:vitess,代码行数:4,代码来源:vertical_split.py
示例18: use_named
def use_named(self, instance_name):
# Check to make sure kubectl exists
try:
subprocess.check_output(['kubectl'])
except OSError:
raise base_environment.VitessEnvironmentError(
'kubectl not found, please install by visiting kubernetes.io or '
'running gcloud components update kubectl if using compute engine.')
get_address_template = (
'{{if ge (len .status.loadBalancer) 1}}'
'{{index (index .status.loadBalancer.ingress 0) "ip"}}'
'{{end}}')
get_address_params = ['kubectl', 'get', '-o', 'template', '--template',
get_address_template, 'service', '--namespace',
instance_name]
start_time = time.time()
vtctld_addr = ''
while time.time() - start_time < 60 and not vtctld_addr:
vtctld_addr = subprocess.check_output(
get_address_params + ['vtctld'], stderr=subprocess.STDOUT)
self.vtctl_addr = '%s:15999' % vtctld_addr
self.vtctl_helper = vtctl_helper.VtctlHelper('grpc', self.vtctl_addr)
self.cluster_name = instance_name
keyspaces = self.vtctl_helper.execute_vtctl_command(['GetKeyspaces'])
self.mobs = filter(None, keyspaces.split('\n'))
self.keyspaces = self.mobs
if not self.keyspaces:
raise base_environment.VitessEnvironmentError(
'Invalid environment, no keyspaces found')
self.num_shards = []
self.shards = []
for keyspace in self.keyspaces:
shards = json.loads(self.vtctl_helper.execute_vtctl_command(
['FindAllShardsInKeyspace', keyspace]))
self.shards.append(shards)
self.num_shards.append(len(shards))
# This assumes that all keyspaces/shards use the same set of cells
self.cells = json.loads(self.vtctl_helper.execute_vtctl_command(
['GetShard', '%s/%s' % (self.keyspaces[0], self.shards[0][0])]
))['cells']
self.primary_cells = self.cells
self.replica_instances = []
self.rdonly_instances = []
# This assumes that all cells are equivalent for k8s environments.
all_tablets_in_a_cell = self.vtctl_helper.execute_vtctl_command(
['ListAllTablets', self.cells[0]])
all_tablets_in_a_cell = [x.split(' ') for x in
filter(None, all_tablets_in_a_cell.split('\n'))]
for index, keyspace in enumerate(self.keyspaces):
keyspace_tablets_in_cell = [
tablet for tablet in all_tablets_in_a_cell if tablet[1] == keyspace]
replica_tablets_in_cell = [
tablet for tablet in keyspace_tablets_in_cell
if tablet[3] == 'master' or tablet[3] == 'replica']
replica_instances = len(replica_tablets_in_cell) / self.num_shards[index]
self.replica_instances.append(replica_instances)
self.rdonly_instances.append(
(len(keyspace_tablets_in_cell) / self.num_shards[index]) -
replica_instances)
# Converts keyspace name and alias to number of instances
self.keyspace_alias_to_num_instances_dict = {}
for index, keyspace in enumerate(self.keyspaces):
self.keyspace_alias_to_num_instances_dict[keyspace] = {
'replica': int(self.replica_instances[index]),
'rdonly': int(self.rdonly_instances[index])
}
start_time = time.time()
self.vtgate_addrs = {}
self.vtgate_conns = {}
for cell in self.cells:
self.vtgate_addr = ''
while time.time() - start_time < 60 and not self.vtgate_addr:
vtgate_addr = subprocess.check_output(
get_address_params + ['vtgate-%s' % cell], stderr=subprocess.STDOUT)
self.vtgate_addrs[cell] = '%s:15001' % vtgate_addr
self.vtgate_conns[cell] = vtgate_client.connect(
protocols_flavor.protocols_flavor().vtgate_python_protocol(),
self.vtgate_addrs[cell], 60)
开发者ID:Analyticalloopholes,项目名称:vitess,代码行数:92,代码来源:k8s_environment.py
示例19: test_secure
def test_secure(self):
with open(table_acl_config, 'w') as fd:
fd.write("""{
"table_groups": [
{
"table_names_or_prefixes": ["vt_insert_test"],
"readers": ["vtgate client 1"],
"writers": ["vtgate client 1"],
"admins": ["vtgate client 1"]
}
]
}
""")
# start the tablets
shard_0_master.start_vttablet(
wait_for_state='NOT_SERVING',
table_acl_config=table_acl_config,
extra_args=server_extra_args('vttablet-server-instance',
'vttablet-client'))
shard_0_slave.start_vttablet(
wait_for_state='NOT_SERVING',
table_acl_config=table_acl_config,
extra_args=server_extra_args('vttablet-server-instance',
'vttablet-client'))
# setup replication
utils.run_vtctl(tmclient_extra_args('vttablet-client-1') + [
'InitShardMaster', '-force', 'test_keyspace/0',
shard_0_master.tablet_alias], auto_log=True)
utils.run_vtctl(tmclient_extra_args('vttablet-client-1') + [
'ApplySchema', '-sql', create_vt_insert_test,
'test_keyspace'])
for t in [shard_0_master, shard_0_slave]:
utils.run_vtctl(tmclient_extra_args('vttablet-client-1') + [
'RunHealthCheck', t.tablet_alias])
# start vtgate
utils.VtGate().start(extra_args=tabletconn_extra_args('vttablet-client-1')+
server_extra_args('vtgate-server-instance',
'vtgate-client'))
# 'vtgate client 1' is authorized to access vt_insert_test
protocol, addr = utils.vtgate.rpc_endpoint(python=True)
conn = vtgate_client.connect(protocol, addr, 30.0,
**python_client_kwargs('vtgate-client-1',
'vtgate-server'))
cursor = conn.cursor(tablet_type='master', keyspace='test_keyspace',
shards=['0'])
cursor.execute('select * from vt_insert_test', {})
conn.close()
# 'vtgate client 2' is not authorized to access vt_insert_test
conn = vtgate_client.connect(protocol, addr, 30.0,
**python_client_kwargs('vtgate-client-2',
'vtgate-server'))
try:
cursor = conn.cursor(tablet_type='master', keyspace='test_keyspace',
shards=['0'])
cursor.execute('select * from vt_insert_test', {})
self.fail('Execute went through')
except dbexceptions.DatabaseError, e:
s = str(e)
self.assertIn('table acl error', s)
self.assertIn('cannot run PASS_SELECT on table', s)
开发者ID:dumbunny,项目名称:vitess,代码行数:65,代码来源:encrypted_transport.py
示例20: test_standalone
def test_standalone(self):
"""Sample test for run_local_database.py as a standalone process."""
# launch a backend database based on the provided topology and schema
port = environment.reserve_ports(1)
args = [environment.run_local_database,
'--port', str(port),
'--topology',
'test_keyspace/-80:test_keyspace_0,'
'test_keyspace/80-:test_keyspace_1',
'--schema_dir', os.path.join(environment.vttop, 'test',
'vttest_schema'),
'--web_dir', environment.vttop + '/web/vtctld',
]
sp = subprocess.Popen(args, stdin=subprocess.PIPE, stdout=subprocess.PIPE)
config = json.loads(sp.stdout.readline())
# gather the vars for the vtgate process
url = 'http://localhost:%d/debug/vars' % config['port']
f = urllib.urlopen(url)
data = f.read()
f.close()
json_vars = json.loads(data)
self.assertIn('vtcombo', json_vars['cmdline'][0])
# to test vtcombo:
# ./vttest_sample_test.py -v -d
# go install && vtcombo -port 15010 -grpc_port 15011 -service_map grpc-vtgateservice -topology test_keyspace/-80:test_keyspace_0,test_keyspace/80-:test_keyspace_1 -mycnf_server_id 1 -mycnf_socket_file $VTDATAROOT/vttest*/vt_0000000001/mysql.sock -db-config-dba-uname vt_dba -db-config-dba-charset utf8 -db-config-app-uname vt_app -db-config-app-charset utf8 -alsologtostderr
# vtctl -vtgate_protocol grpc VtGateExecuteShards -server localhost:15011 -keyspace test_keyspace -shards -80 -tablet_type master "select 1 from dual"
# vtctl -vtgate_protocol grpc VtGateExecuteKeyspaceIds -server localhost:15011 -keyspace test_keyspace -keyspace_ids 20 -tablet_type master "show tables"
utils.pause('good time to test vtcombo with database running')
protocol = protocols_flavor().vttest_protocol()
if protocol == 'grpc':
vtagte_addr = 'localhost:%d' % config['grpc_port']
else:
vtagte_addr = 'localhost:%d' % config['port']
conn_timeout = 30.0
# Connect to vtgate.
conn = vtgate_client.connect(protocol, vtagte_addr, conn_timeout)
# Insert a row.
row_id = 123
keyspace_id = get_keyspace_id(row_id)
cursor = conn.cursor(
'test_keyspace', 'master', keyspace_ids=[pack_kid(keyspace_id)],
writable=True)
cursor.begin()
insert = ('insert into test_table (id, msg, keyspace_id) values (%(id)s, '
'%(msg)s, %(keyspace_id)s)')
bind_variables = {
'id': row_id,
'msg': 'test %s' % row_id,
'keyspace_id': keyspace_id,
}
cursor.execute(insert, bind_variables)
cursor.commit()
# Read the row back.
cursor.execute(
'select * from test_table where id=%(id)s', {'id': row_id})
result = cursor.fetchall()
self.assertEqual(result[0][1], 'test 123')
# try to insert again, see if we get the rigth integrity error exception
# (this is meant to test vtcombo properly returns exceptions, and to a
# lesser extend that the python client converts it properly)
cursor.begin()
with self.assertRaises(dbexceptions.IntegrityError):
cursor.execute(insert, bind_variables)
cursor.rollback()
# Insert a bunch of rows with long msg values.
bind_variables['msg'] = 'x' * 64
id_start = 1000
rowcount = 500
cursor.begin()
for i in xrange(id_start, id_start+rowcount):
bind_variables['id'] = i
cursor.execute(insert, bind_variables)
cursor.commit()
# Try to fetch a large number of rows
# (more than one streaming result packet).
stream_cursor = conn.cursor(
'test_keyspace', 'master', keyspace_ids=[pack_kid(keyspace_id)],
cursorclass=vtgate_cursor.StreamVTGateCursor)
stream_cursor.execute('select * from test_table where id >= %(id_start)s',
{'id_start': id_start})
self.assertEqual(rowcount, len(list(stream_cursor.fetchall())))
stream_cursor.close()
# Clean up.
cursor.close()
conn.close()
# and we're done, clean-up process
sp.stdin.write('\n')
sp.wait()
开发者ID:MichaelLee90911,项目名称:vitess,代码行数:100,代码来源:vttest_sample_test.py
注:本文中的vtdb.vtgate_client.connect函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论