def do_command(self):
""" Check and execute the audit log command (previously set by the the
options of the object constructor).
"""
# Check for valid command
command = self.options.get("command", None)
if not command in VALID_COMMANDS:
raise UtilError("Invalid command.")
command_value = self.options.get("value", None)
# Check for valid value if needed
if (command_requires_value(command)
and not check_command_value(command, command_value)):
raise UtilError("Please provide the correct value for the %s "
"command." % command)
# Copy command does not need the server
if command == "COPY":
self._copy_log()
return True
# Connect to server
server = Server({'conn_info': self.options.get("server_vals", None)})
server.connect()
# Now execute the command
print "#\n# Executing %s command.\n#\n" % command
try:
if command == "POLICY":
server.exec_query("SET @@GLOBAL.audit_log_policy = %s" %
command_value)
elif command == "ROTATE":
self._rotate_log(server)
else: # "ROTATE_ON_SIZE":
server.exec_query("SET @@GLOBAL.audit_log_rotate_on_size = %s"
% command_value)
finally:
server.disconnect()
return True
def purge(self):
"""The purge method for a standalone server.
Determines the latest log file to purge, which becomes the target
file to purge binary logs to in case no other file is specified.
"""
# Connect to server
self.server = Server({'conn_info': self.server_cnx_val})
self.server.connect()
# Check required privileges
check_privileges(self.server, BINLOG_OP_PURGE,
["SUPER", "REPLICATION SLAVE"],
BINLOG_OP_PURGE_DESC, self.verbosity, self._report)
# retrieve active binlog info
binlog_file_name, active_binlog_file, index_last_in_use = (
get_binlog_info(self.server, reporter=self._report,
server_name="server", verbosity=self.verbosity)
)
# Verify this server is not a Master.
processes = self.server.exec_query("SHOW PROCESSLIST")
binlog_dump = False
for process in processes:
if process[4] == "Binlog Dump":
binlog_dump = True
break
hosts = self.server.exec_query("SHOW SLAVE HOSTS")
if binlog_dump or hosts:
if hosts and not self.verbosity:
msg_v = " For more info use verbose option."
else:
msg_v = ""
if self.verbosity >= 1:
for host in hosts:
self._report("# WARNING: Slave with id:{0} at {1}:{2} "
"is connected to this server."
"".format(host[0], host[1], host[2]))
raise UtilError("The given server is acting as a master and has "
"slaves connected to it. To proceed please use the"
" --master option.{0}".format(msg_v))
target_binlog_index = self.get_target_binlog_index(binlog_file_name)
self._purge(index_last_in_use, active_binlog_file, binlog_file_name,
target_binlog_index)
def _bulk_insert(self, rows, new_db, destination=None):
"""Import data using bulk insert
Reads data from a table and builds group INSERT statements for writing
to the destination server specified (new_db.name).
This method is designed to be used in a thread for parallel inserts.
As such, it requires its own connection to the destination server.
Note: This method does not print any information to stdout.
rows[in] a list of rows to process
new_db[in] new database name
destination[in] the destination server
"""
if self.dest_vals is None:
self.dest_vals = self.get_dest_values(destination)
# Spawn a new connection
server_options = {
'conn_info': self.dest_vals,
'role': "thread",
}
dest = Server(server_options)
dest.connect()
# Issue the write lock
lock_list = [("%s.%s" % (new_db, self.q_tbl_name), 'WRITE')]
my_lock = Lock(dest, lock_list, {'locking': 'lock-all', })
# First, turn off foreign keys if turned on
dest.disable_foreign_key_checks(True)
if self.column_format is None:
self.get_column_metadata()
data_lists = self.make_bulk_insert(rows, new_db)
insert_data = data_lists[0]
blob_data = data_lists[1]
# Insert the data first
for data_insert in insert_data:
try:
dest.exec_query(data_insert, self.query_options)
except UtilError, e:
raise UtilError("Problem inserting data. "
"Error = %s" % e.errmsg)
def check_audit_log(self):
"""Verify if the audit log plugin is installed on the server.
Return the message error if not, or None.
"""
error = None
server = Server({'conn_info': self.options.get("server_vals", None)})
server.connect()
# Check to see if the plug-in is installed
if not server.supports_plugin("audit"):
error = "The audit log plug-in is not installed on this " + \
"server or is not enabled."
server.disconnect()
return error
def __init__(self, server_cnx_val, options):
"""Initiator.
server_cnx_val[in] Dictionary with the connection values for the
server.
options[in] options for controlling behavior:
logging If logging is active or not.
verbose print extra data during operations (optional)
default value = False
min_size minimum size that the active binlog must have prior
to rotate it.
dry_run Don't actually rotate the active binlog, instead
it will print information about file name and size.
"""
# Connect to server
self.server = Server({'conn_info': server_cnx_val})
self.server.connect()
self.options = options
self.verbosity = self.options.get("verbosity", 0)
self.quiet = self.options.get("quiet", False)
self.logging = self.options.get("logging", False)
self.dry_run = self.options.get("dry_run", 0)
self.binlog_min_size = self.options.get("min_size", False)
def load_test_data(server, db_num=1):
"""Load/insert data into the test databases.
A considerable amount of data should be considered in order to take some
time to load, allowing mysqlrplsync to be executed at the same time the
data is still being inserted.
server[in] Target server to load the test data.
db_num[in] Number of databases to load the data (by default: 1).
It is assumed that a matching number of test databases
have been previously created.
Note: method prepared to be invoked by a different thread.
"""
# Create a new server instance with a new connection (for multithreading).
srv = Server({'conn_info': server})
srv.connect()
for db_index in xrange(db_num):
db_name = '`test_rplsync_db{0}`'.format(
'' if db_num == 1 else db_index
)
# Insert random data on all tables.
random_values = string.letters + string.digits
for _ in xrange(TEST_DB_NUM_ROWS):
columns = []
values = []
for table_index in xrange(TEST_DB_NUM_TABLES):
columns.append('rnd_txt{0}'.format(table_index))
rnd_text = "".join(
[random.choice(random_values) for _ in xrange(20)]
)
values.append("'{0}'".format(rnd_text))
insert = ("INSERT INTO {0}.`t{1}` ({2}) VALUES ({3})"
"").format(db_name, table_index, ', '.join(columns),
', '.join(values))
srv.exec_query(insert)
srv.commit()
def _spawn_server(options):
"""Spawn a server to use for reading .frm files
This method spawns a new server instance on the port specified by the
user in the options dictionary.
options[in] Options from user
Returns tuple - (Server instance, new datdir) or raises exception on error
"""
verbosity = int(options.get("verbosity", 0))
quiet = options.get("quiet", False)
new_port = options.get("port", 3310)
user = options.get("user", None)
start_timeout = int(options.get("start_timeout", 10))
# 1) create a directory to use for new datadir
# If the user is not the same as the user running the script...
if user_change_as_root(options):
# Since Python libraries correctly restrict temporary folders to
# the user who runs the script and /tmp is protected on some
# platforms, we must create the folder in the current folder
temp_datadir = os.path.join(os.getcwd(), str(uuid.uuid4()))
os.mkdir(temp_datadir)
else:
temp_datadir = tempfile.mkdtemp()
if verbosity > 1 and not quiet:
print "# Creating a temporary datadir =", temp_datadir
# 2) spawn a server pointed to temp
if not quiet:
if user:
print("# Spawning server with --user={0}.".format(user))
print "# Starting the spawned server on port %s ..." % new_port,
sys.stdout.flush()
bootstrap_options = {
'new_data': temp_datadir,
'new_port': new_port,
'new_id': 101,
'root_pass': "root",
'mysqld_options': None,
'verbosity': verbosity if verbosity > 1 else 0,
'basedir': options.get("basedir"),
'delete': True,
'quiet': True if verbosity <= 1 else False,
'user': user,
'start_timeout': start_timeout,
}
if verbosity > 1 and not quiet:
print
try:
serverclone.clone_server(None, bootstrap_options)
except UtilError as error:
if error.errmsg.startswith("Unable to communicate"):
err = ". Clone server error: {0}".format(error.errmsg)
proc_id = int(error.errmsg.split("=")[1].strip('.'))
print("ERROR Attempting to stop failed spawned server. "
" Process id = {0}.".format(proc_id))
if os.name == "posix":
try:
os.kill(proc_id, subprocess.signal.SIGTERM)
except OSError:
pass
else:
try:
subprocess.Popen("taskkill /F /T /PID %i" %
proc_id, shell=True)
except:
pass
raise UtilError(_SPAWN_SERVER_ERROR.format(err))
else:
raise
if verbosity > 1 and not quiet:
print "# Connecting to spawned server"
conn = {
"user": "root",
"passwd": "root",
"host": "127.0.0.1",
"port": options.get("port"),
}
server_options = {
'conn_info': conn,
'role': "frm_reader_bootstrap",
}
server = Server(server_options)
try:
server.connect()
except UtilError:
raise UtilError(_SPAWN_SERVER_ERROR.format(""))
if not quiet:
print "done."
return (server, temp_datadir)
def _start_server(server_val, basedir, datadir, options=None):
"""Start an instance of a server in read only mode
This method is used to start the server in read only mode. It will launch
the server with --skip-grant-tables and --read_only options set.
Caller must stop the server with _stop_server().
server_val[in] dictionary of server connection values
basedir[in] the base directory for the server
datadir[in] the data directory for the server
options[in] dictionary of options (verbosity)
"""
if options is None:
options = {}
verbosity = options.get("verbosity", 0)
start_timeout = options.get("start_timeout", 10)
mysqld_path = get_tool_path(basedir, "mysqld", quote=True)
print "# Server is offline."
# Check server version
print "# Checking server version ...",
version = get_mysqld_version(mysqld_path)
print "done."
if version is not None and int(version[0]) >= 5:
post_5_5 = int(version[1]) >= 5
post_5_6 = int(version[1]) >= 6
post_5_7_4 = int(version[1]) >= 7 and int(version[2]) > 4
else:
print("# Warning: cannot get server version.")
post_5_5 = False
post_5_6 = False
post_5_7_4 = False
# Get the user executing the utility to use in the mysqld options.
# Note: the option --user=user_name is mandatory to start mysqld as root.
user_name = getpass.getuser()
# Start the instance
if verbosity > 0:
print "# Starting read-only instance of the server ..."
print "# --- BEGIN (server output) ---"
else:
print "# Starting read-only instance of the server ...",
args = shlex.split(mysqld_path)
args.extend([
"--no-defaults",
"--skip-grant-tables",
"--read_only",
"--port=%(port)s" % server_val,
"--basedir=" + basedir,
"--datadir=" + datadir,
"--user={0}".format(user_name),
])
# It the server is 5.6 or later, we must use additional parameters
if post_5_5:
server_args = [
"--skip-slave-start",
"--default-storage-engine=MYISAM",
"--server-id=0",
]
if post_5_6:
server_args.append("--default-tmp-storage-engine=MYISAM")
if not post_5_7_4:
server_args.append("--skip-innodb")
args.extend(server_args)
socket = server_val.get('unix_socket', None)
if socket is not None:
args.append("--socket=%(unix_socket)s" % server_val)
if verbosity > 0:
subprocess.Popen(args, shell=False)
else:
out = open(os.devnull, 'w')
subprocess.Popen(args, shell=False, stdout=out, stderr=out)
server_options = {
'conn_info': server_val,
'role': "read_only",
}
server = Server(server_options)
# Try to connect to the server, waiting for the server to become ready
# (retry start_timeout times and wait 1 sec between each attempt).
# Note: It can take up to 10 seconds for Windows machines.
i = 0
while i < start_timeout:
# Reset error and wait 1 second.
error = None
time.sleep(1)
try:
server.connect()
break # Server ready (connect succeed)! Exit the for loop.
except UtilError as err:
# Store exception to raise later (if needed).
error = err
i += 1
#.........这里部分代码省略.........
sys.stdout.write(" Connecting to %s as user %s on port %s: " %
(conn_val["host"], conn_val["user"],
conn_val["port"]))
sys.stdout.flush()
if conn_val["port"] is not None:
conn_val["port"] = int(conn_val["port"])
else:
conn_val["port"] = 0
server_options = {
'conn_info' : conn_val,
'role' : "server%d" % i,
}
conn = Server(server_options)
try:
conn.connect()
server_list.add_new_server(conn)
print("CONNECTED")
res = conn.show_server_variable("basedir")
#print res
basedir = res[0][1]
# Here we capture any exception and print the error message.
# Since all util errors (exceptions) derive from Exception, this is
# safe.
except Exception:
_, err, _ = sys.exc_info()
print("%sFAILED%s" % (BOLD_ON, BOLD_OFF))
if conn.connect_error is not None:
print(conn.connect_error)
def move_binlogs_from_server(server_cnx_val, destination, options,
bin_basename=None, bin_index=None,
relay_basename=None):
"""Relocate binary logs from the given server to a new location.
This function relocate the binary logs from a MySQL server to the specified
destination directory, attending to the specified options.
server_cnx_val[in] Dictionary with the connection values for the server.
destination[in] Path of the destination directory for the binary log
files.
options[in] Dictionary of options (log_type, skip_flush_binlogs,
modified_before, sequence, verbosity).
bin_basename[in] Base name for the binlog files, i.e., same as the
value for the server option --log-bin. It replaces
the server variable 'log_bin_basename' for versions
< 5.6.2, otherwise it is ignored.
bin_index[in] Path of the binlog index file. It replaces the server
variable 'log_bin_index' for versions < 5.6.4,
otherwise it is ignored.
relay_basename[in] Base name for the relay log files, i.e., filename
without the extension (sequence number). Same as the
value for the server option --relay-log. It replaces
the server variable 'relay_log_basename' for versions
< 5.6.2, otherwise it is ignored.
"""
log_type = options.get('log_type', LOG_TYPE_BIN)
skip_flush = options['skip_flush_binlogs']
verbosity = options['verbosity']
# Connect to server
server_options = {
'conn_info': server_cnx_val,
}
srv = Server(server_options)
srv.connect()
# Check if the server is running locally (not remote server).
if not srv.is_alias('localhost'):
raise UtilError("You are using a remote server. This utility must be "
"run on the local server. It does not support remote "
"access to the binary log files.")
# Check privileges.
_check_privileges_to_move_binlogs(srv, options)
# Process binlog files.
if log_type in (LOG_TYPE_BIN, LOG_TYPE_ALL):
# Get log_bin_basename (available since MySQL 5.6.2).
if srv.check_version_compat(5, 6, 2):
if bin_basename:
print(_WARN_MSG_VAL_NOT_REQ_FOR_SERVER.format(
value='bin basename', min_version='5.6.2',
var_name='log_bin_basename'))
binlog_basename = srv.select_variable('log_bin_basename')
if verbosity > 0:
print("#")
print("# log_bin_basename: {0}".format(binlog_basename))
binlog_source, binlog_file = os.path.split(binlog_basename)
# Get log_bin_index (available since MySQL 5.6.4).
if srv.check_version_compat(5, 6, 4):
if bin_index:
print(_WARN_MSG_VAL_NOT_REQ_FOR_SERVER.format(
value='bin index', min_version='5.6.4',
var_name='log_bin_index'))
binlog_index = srv.select_variable('log_bin_index')
else:
binlog_index = None
action = _ACTION_SEARCH_INDEX.format(file_type='bin-log')
print(_WARN_MSG_VAR_NOT_AVAILABLE.format(
var_name='log_bin_basename', host=srv.host, port=srv.port,
min_version='5.6.4', action=action))
if verbosity > 0:
print("# log_bin_index: {0}".format(binlog_index))
else:
if bin_basename:
binlog_source, binlog_file = os.path.split(bin_basename)
else:
action = _ACTION_DATADIR_USED.format(file_type='bin-log')
print(_WARN_MSG_VAR_NOT_AVAILABLE.format(
var_name='log_bin_basename', host=srv.host, port=srv.port,
min_version='5.6.2', action=action))
# Get datadir value.
binlog_source = srv.select_variable('datadir')
binlog_file = None
if verbosity > 0:
print("#")
print("# datadir: {0}".format(binlog_source))
binlog_index = bin_index
# Move binlog files.
num_files = _move_binlogs(
binlog_source, destination, LOG_TYPE_BIN, options,
basename=binlog_file, index_file=binlog_index, skip_latest=True)
print("#")
# Flush binary logs to reload server's cache after move.
if not skip_flush and num_files > 0:
# Note: log_type for FLUSH available since MySQL 5.5.3.
if srv.check_version_compat(5, 5, 3):
#.........这里部分代码省略.........
请发表评论