Merge lp:~akuzminsky/twindb-agent/restore into lp:twindb-agent

Proposed by Aleksandr Kuzminsky
Status: Merged
Merged at revision: 18
Proposed branch: lp:~akuzminsky/twindb-agent/restore
Merge into: lp:twindb-agent
Diff against target: 3480 lines (+1215/-1459)
1 file modified
twindb.py (+1215/-1459)
To merge this branch: bzr merge lp:~akuzminsky/twindb-agent/restore
Reviewer Review Type Date Requested Status
Ovais Tariq Pending
Review via email: mp+255386@code.launchpad.net
To post a comment you must log in.

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
=== modified file 'twindb.py'
--- twindb.py 2015-03-16 02:16:03 +0000
+++ twindb.py 2015-04-07 16:22:26 +0000
@@ -2,31 +2,34 @@
2from __future__ import print_function2from __future__ import print_function
3from __future__ import unicode_literals3from __future__ import unicode_literals
4from __future__ import absolute_import4from __future__ import absolute_import
5import ConfigParser
5from base64 import b64encode, b64decode6from base64 import b64encode, b64decode
6from datetime import datetime7from datetime import datetime
7import sys8import errno
8import time9import exceptions
9import getopt10import getopt
11import httplib
12import json
13import logging.handlers
14import os
15import pwd
10import signal16import signal
11import traceback17import shutil
12import exceptions
13import errno
14import socket18import socket
15import logging.handlers
16import os.path
17import subprocess19import subprocess
18import httplib20import sys
19import json21import tempfile
22import time
23import traceback
20import urllib24import urllib
2125import uuid
2226
23try:27try:
24 import mysql.connector28 import mysql.connector
25except ImportError:29except ImportError:
30 # On CentOS 5 mysql.connector is in python 2.6 directory
26 sys.path.insert(0, '/usr/lib/python2.6/site-packages')31 sys.path.insert(0, '/usr/lib/python2.6/site-packages')
27 import mysql.connector32 import mysql.connector
28import tempfile
29import uuid
3033
31# global variables34# global variables
32host = "dispatcher.twindb.com"35host = "dispatcher.twindb.com"
@@ -36,8 +39,8 @@
36debug_http = True39debug_http = True
37debug_gpg = False40debug_gpg = False
38init_config = "/etc/twindb.cfg"41init_config = "/etc/twindb.cfg"
39ssh_private_key = "/root/.ssh/twindb.key"42ssh_private_key_file = "/root/.ssh/twindb.key"
40ssh_public_key = "/root/.ssh/twindb.key.pub"43ssh_public_key_file = "/root/.ssh/twindb.key.pub"
41ssh_port = 419444ssh_port = 4194
42gpg_homedir = "/root/.gnupg/"45gpg_homedir = "/root/.gnupg/"
43pid_file = "/var/run/twindb.pid"46pid_file = "/var/run/twindb.pid"
@@ -45,9 +48,10 @@
45time_zone = "UTC"48time_zone = "UTC"
46api_email = "api@twindb.com"49api_email = "api@twindb.com"
47server_id = ""50server_id = ""
48job_id = 051job_id = None
49mysql_user = "root"52mysql_user = None
50mysql_password = ""53mysql_password = None
54agent_version = "@@TWINDB_AGENT_VERSION@@"
51api_pub_key = """55api_pub_key = """
52-----BEGIN PGP PUBLIC KEY BLOCK-----56-----BEGIN PGP PUBLIC KEY BLOCK-----
53Version: GnuPG v157Version: GnuPG v1
@@ -102,24 +106,24 @@
102=62IM106=62IM
103-----END PGP PUBLIC KEY BLOCK-----107-----END PGP PUBLIC KEY BLOCK-----
104"""108"""
105agent_version = "@@TWINDB_AGENT_VERSION@@"109
106110
107
108# Logging handler that logs to remote TwiDB dispatcher
109class RlogHandler(logging.Handler):111class RlogHandler(logging.Handler):
112 """
113 Logging handler that logs to remote TwiDB dispatcher
114 """
115 log_flag = True
116
110 def __init__(self):117 def __init__(self):
111 logging.Handler.__init__(self)118 logging.Handler.__init__(self)
112 self.log_flag = True119 self.log_flag = True
113120
114 def emit(self, record):121 def emit(self, record):
115 global server_id122 request = {
116 global job_id123 "type": "log",
117 global debug124 "params": {}
118125 }
119 request = {}126 if job_id:
120 request["type"] = "log"
121 request["params"] = {}
122 if job_id != 0:
123 request["params"]["job_id"] = job_id127 request["params"]["job_id"] = job_id
124 request["params"]["msg"] = record.getMessage()128 request["params"]["msg"] = record.getMessage()
125 if self.log_flag and not debug:129 if self.log_flag and not debug:
@@ -128,83 +132,106 @@
128 self.log_flag = True132 self.log_flag = True
129133
130134
135class JobError(Exception):
136 def __init__(self, value):
137 self.value = value
138
139 def __str__(self):
140 return self.value
141
142
143class JobTooSoonError(Exception):
144 def __init__(self, value):
145 self.value = value
146
147 def __str__(self):
148 return self.value
149
150
131def usage():151def usage():
132 print("Usage:\n\152 print("""Usage:
133twindb --start | --stop | --register <registration code> [-g] [-i interval]\n\153twindb --start | --stop | --register <registration code> [-g] [-i interval]
134 [-u user] [-p password]\n\154 [-u user] [-p password]
135\n\155
136 --start Start TwinDB agent.\n\156 --start Start TwinDB agent.
137 --stop Stop TwinDB agent.\n\157 --stop Stop TwinDB agent.
138\n\158
139 --register <code> Register this server in TwinDB.\n\159 --register <code> Register this server in TwinDB.
140 <code> is string like 7b90ae21ac642f2f8fc0a285c4789147\n\160 <code> is string like 7b90ae21ac642f2f8fc0a285c4789147
141 Get your code on https://console.twindb.com/?get_code\n\161 Get your code on https://console.twindb.com/?get_code
142\n\162
143 --unregister [--delete-backups] Unregister this server from TwinDB.\n\163 --unregister [--delete-backups] Unregister this server from TwinDB.
144 --backup Start backup from this server immediately.\n\164 --backup Start backup from this server immediately.
145 --is-registered Checks whether the server is registered and prints YES or NO.\n\165 --is-registered Checks whether the server is registered and prints YES or NO.
146 -g Print debug message.\n\166 -g Print debug message.
147 -i interval Check for a job every interval seconds.\n\167 -i interval Check for a job every interval seconds.
148 Default 300.\n\168 Default {check_period}.
149 During registration TwinDB agent needs to get some information from MySQL,\n\169 During registration TwinDB agent needs to get some information from MySQL,
150 (master host etc)\n\170 (master host etc)
151 This is credentials to connect to local MySQL instance\n\171 This is credentials to connect to local MySQL instance
152 -u | --user username MySQL user name\n\172 -u | --user username MySQL user name
153 -p | --password password MySQL password")173 -p | --password password MySQL password""".format(check_period=check_period))
154 return174 return
155175
156176
157def get_mysql_connection(user=None, passwd=None):177def get_mysql_connection(user=None, passwd=None):
158 global mysql_user178 """
159 global mysql_password179 Returns connection to the local MySQL instance.
160180 If user is passed as an argument it'll be used to connect,
181 otherwise the second choice will be to use mysql_user.
182 If neither user names are set the function will try to use either of MySQL option files
183 (/etc/my.cnf, /etc/mysql/my.cnf, or /root/.my.cnf). If the option files don't exist
184 it'll try to connect as root w/o password.
185 """
161 try:186 try:
162 if user is None:
163 user = mysql_user
164 if passwd is None:
165 passwd = mysql_password
166 mysql_option_files = []
167 unix_socket = get_unix_socket()187 unix_socket = get_unix_socket()
168 conn = mysql.connector.connect()188 if not user:
169 if user is not None:189 if mysql_user:
170 for f in ['/etc/mysql/my.cnf', '/etc/my.cnf', '/root/.my.cnf']:190 logger.debug('Using MySQL user specified in the command line')
171 if os.path.exists(f):191 user = mysql_user
172 mysql_option_files.append(f)192 passwd = mysql_password
173 if mysql_option_files:193 else:
174 conn = mysql.connector.connect(option_files=mysql_option_files, unix_socket=unix_socket)194 for options_file in ["/etc/my.cnf", "/etc/mysql/my.cnf", "/usr/etc/my.cnf",
175 logger.debug(195 "/root/.my.cnf", "/root/.mylogin.cnf"]:
176 "Connected to MySQL as '%s'@'localhost' with options files %r" % (conn.user, mysql_option_files))196 if os.path.exists(options_file):
177 if not conn.is_connected():197 config = ConfigParser.ConfigParser()
178 conn = mysql.connector.connect(user=user, passwd=passwd, unix_socket=unix_socket)198 config.read(options_file)
179 logger.debug("Connected to MySQL as %s@localhost " % conn.user)199 for section in ["client", "twindb"]:
200 if config.has_section(section):
201 if config.has_option(section, "user"):
202 user = config.get(section, "user")
203 if config.has_option(section, "password"):
204 passwd = config.get(section, "user")
205 # If user isn't set by the function argument, global mysql_user
206 # or MySQL options file connect as unix user w/ empty password
207 if not user:
208 user = pwd.getpwuid(os.getuid()).pw_name
209 passwd = ""
210 logger.debug("Connecting to MySQL as unix user %s" % user)
211 conn = mysql.connector.connect(user=user, passwd=passwd, unix_socket=unix_socket)
212 logger.debug("Connected to MySQL as %s@localhost " % conn.user)
180 except mysql.connector.Error as err:213 except mysql.connector.Error as err:
181 logger.error("Can not connect to local MySQL server")214 logger.error("Can not connect to local MySQL server")
182 logger.error("MySQL said: %s" % err.msg)215 logger.error("MySQL said: %s" % err.msg)
183 return None216 return None
184 except:
185 logger.error("Can not connect to local MySQL server")
186 logger.error("Unexpected error: %s,%s" % sys.exc_info()[:2]);
187 return None
188 return conn217 return conn
189218
190219
191# Checks if TwinDB API public key is installed
192# Returns
193# True - if the key is installed
194# False - if not
195def is_gpg_key_installed(email, key_type="public"):220def is_gpg_key_installed(email, key_type="public"):
196 global logger221 """
197 global gpg_homedir222 Checks if TwinDB API public key is installed
198 global debug_gpg223 Returns
199224 True - if the key is installed
225 False - if not
226 """
200 result = False227 result = False
201 if len(email) == 0:228 if not email:
202 logger.error("Can not check public key of an empty email")229 logger.error("Can not check public key of an empty email")
203 return False230 return False
204 if debug_gpg:231 if debug_gpg:
205 logger.debug("Checking if public key of %s is installed" % email)232 logger.debug("Checking if public key of %s is installed" % email)
233 gpg_cmd = ["gpg", "--homedir", gpg_homedir]
206 try:234 try:
207 gpg_cmd = ["gpg", "--homedir", gpg_homedir]
208 if key_type == "public":235 if key_type == "public":
209 gpg_cmd.append("-k")236 gpg_cmd.append("-k")
210 else:237 else:
@@ -212,63 +239,51 @@
212 gpg_cmd.append(email)239 gpg_cmd.append(email)
213 if debug_gpg:240 if debug_gpg:
214 logger.debug(gpg_cmd)241 logger.debug(gpg_cmd)
215 devnull = open('/dev/null', 'w')242 devnull = open("/dev/null", "w")
216 ret = subprocess.call(gpg_cmd, stdout=devnull, stderr=devnull)243 ret = subprocess.call(gpg_cmd, stdout=devnull, stderr=devnull)
217 file.close(devnull)244 devnull.close()
218 if ret != 0:245 if ret != 0:
219 if debug_gpg:246 if debug_gpg:
220 logger.debug("GPG returned %d" % ret);247 logger.debug("GPG returned %d" % ret)
221 logger.debug(key_type + " key " + email + " is NOT installed")248 logger.debug(key_type + " key " + email + " is NOT installed")
222 result = False249 result = False
223 else:250 else:
224 if debug_gpg:251 if debug_gpg:
225 logger.debug(key_type + " key is already installed")252 logger.debug(key_type + " key is already installed")
226 result = True253 result = True
227 except:254 except OSError as err:
228 logger.error("Couldn't get " + key_type + " key of %s" % email)255 logger.error("Couldn't run command %r. %s" % (gpg_cmd, err))
229 exit_on_error(traceback.format_exc())256 exit_on_error("Couldn't get %s key of %s." % (key_type, email))
230 return result257 return result
231258
232259
233# Installs TwinDB public key
234# Returns
235# True - if the key is successfully installed
236# Exits if the key wasn't installed
237
238def install_api_pub_key():260def install_api_pub_key():
239 global logger261 """
240 global api_pub_key262 Installs TwinDB public key
241 global gpg_homedir263 Returns
242 global debug_gpg264 True - if the key is successfully installed
243265 Exits if the key wasn't installed
266 """
244 logger.info("Installing twindb public key")267 logger.info("Installing twindb public key")
268 gpg_cmd = ["gpg", "--homedir", gpg_homedir, "--import"]
245 try:269 try:
246 p1 = subprocess.Popen(["echo", api_pub_key], stdout=subprocess.PIPE)270 p = subprocess.Popen(gpg_cmd, stdin=subprocess.PIPE)
247 p2 = subprocess.Popen(["gpg", "--homedir", gpg_homedir, "--import"],271 p.communicate(api_pub_key)
248 stdin=p1.stdout)272 except OSError as err:
249 p1.stdout.close()273 logger.error("Couldn't run command %r. %s" % (gpg_cmd, err))
250 except:274 exit_on_error("Couldn't install TwinDB public key")
251 logger.error("Couldn't install TwinDB public key")
252 exit_on_error(traceback.format_exc())
253 logger.info("Twindb public key successfully installed")275 logger.info("Twindb public key successfully installed")
254 return True276 return True
255277
256278
257# Checks if GPG environment is good to start TwinDB agent
258# Installs TwinDB public key if necessary
259# Returns
260# True - if the GPG environment good to proceed
261# Exits if GPG wasn't configured correctly
262
263def check_gpg():279def check_gpg():
264 global logger280 """
265 global api_email281 Checks if GPG environment is good to start TwinDB agent
266 global api_pub_key282 Installs TwinDB public key if necessary
267 global gpg_homedir283 Returns
268 global init_config284 True - if the GPG environment good to proceed
269 global server_id285 Exits if GPG wasn't configured correctly
270 global debug_gpg286 """
271
272 if debug_gpg:287 if debug_gpg:
273 logger.debug("Checking if GPG config is initialized")288 logger.debug("Checking if GPG config is initialized")
274 if os.path.exists(gpg_homedir):289 if os.path.exists(gpg_homedir):
@@ -282,13 +297,13 @@
282 try:297 try:
283 os.mkdir(gpg_homedir, 0700)298 os.mkdir(gpg_homedir, 0700)
284 install_api_pub_key()299 install_api_pub_key()
285 except:300 except OSError as err:
286 logger.error("Couldn't create directory " + gpg_homedir)301 logger.error("Failed to create directory %s. %s" % (gpg_homedir, err))
287 exit_on_error(traceback.format_exc())302 exit_on_error("Couldn't create directory " + gpg_homedir)
288 if len(server_id) == 0:303 if not server_id:
289 exit_on_error("Server id is neither read from config nor generated")304 exit_on_error("Server id is neither read from config nor generated")
290 email = "%s@twindb.com" % server_id305 email = "%s@twindb.com" % server_id
291 if not ( is_gpg_key_installed(email) and is_gpg_key_installed(email, "private")):306 if not (is_gpg_key_installed(email) and is_gpg_key_installed(email, "private")):
292 gen_entropy()307 gen_entropy()
293 gen_gpg_keypair("%s@twindb.com" % server_id)308 gen_gpg_keypair("%s@twindb.com" % server_id)
294 if debug_gpg:309 if debug_gpg:
@@ -296,13 +311,13 @@
296 return True311 return True
297312
298313
299# Checks the environment if it's OK to start TwinDB agent
300# Returns
301# True - if the environment is OK
302# Exits if the environment is not OK
303
304def check_env():314def check_env():
305 global logger315 """
316 Checks the environment if it's OK to start TwinDB agent
317 Returns
318 True - if the environment is OK
319 Exits if the environment is not OK
320 """
306 logger.debug("Checking environment")321 logger.debug("Checking environment")
307 if os.getuid() != 0:322 if os.getuid() != 0:
308 exit_on_error("TwinDB agent must be run by root")323 exit_on_error("TwinDB agent must be run by root")
@@ -311,41 +326,24 @@
311 return True326 return True
312327
313328
314# Checks how much entropy is available in the system
315# If not enough, does some disk activity to generate more
316def gen_entropy():329def gen_entropy():
330 """
331 Checks how much entropy is available in the system
332 If not enough, does some disk activity to generate more
333 """
317 # Do nothing until I find good way to generate entropy334 # Do nothing until I find good way to generate entropy
318 return335 return
319 try:336
320 f = open("/proc/sys/kernel/random/entropy_avail", "r")
321 entropy_avail = int(f.read())
322 f.close()
323 i = 0
324 while entropy_avail < 2048:
325 logger.info("Low entropy level %d. Will run 'find / -name file' to generate more")
326 cmd = ["find", "/", "-name", "file"]
327 p = subprocess.Popen(cmd)
328 p.wait()
329 f = open("/proc/sys/kernel/random/entropy_avail", "r")
330 entropy_avail = int(f.read())
331 f.close()
332 # Do not run find more than 10 times
333 if i > 10:
334 break
335 i = i + 1
336 except:
337 logger.error("Failed to generate entropy")
338 return
339
340
341# Formats bytes count to human readable form
342# Inputs:
343# num - bytes count
344# decimals - number of digits to save after point (Default 2)
345# Returns
346# human-readable string like "20.33 MB"
347337
348def h_size(num, decimals=2):338def h_size(num, decimals=2):
339 """
340 Formats bytes count to human readable form
341 Inputs:
342 num - bytes count
343 decimals - number of digits to save after point (Default 2)
344 Returns
345 human-readable string like "20.33 MB"
346 """
349 fmt = "%3." + str(decimals) + "f %s"347 fmt = "%3." + str(decimals) + "f %s"
350 for x in ['bytes', 'kB', 'MB', 'GB', 'TB', 'PB']:348 for x in ['bytes', 'kB', 'MB', 'GB', 'TB', 'PB']:
351 if num < 1024.0:349 if num < 1024.0:
@@ -353,30 +351,19 @@
353 num /= 1024.0351 num /= 1024.0
354352
355353
356# Generates random string of bytes good for crypthography
357# Inputs
358# n - Number of bytes
359# Returns
360# binary string of n bytes size
361
362def gen_key(n):
363 return os.urandom(n)
364
365
366# Generates GPG private and public keys for a given recipient
367# Inputs
368# email - recipient
369# Returns
370# True on success or exits
371
372def gen_gpg_keypair(email):354def gen_gpg_keypair(email):
373 global server_id355 """
374 global gpg_homedir356 Generates GPG private and public keys for a given recipient
375 global debug_gpg357 Inputs
376358 email - recipient
377 if len(email) == 0:359 Returns
360 True on success or exits
361 """
362 if not email:
378 exit_on_error("Can not generate GPG keypair for an empty email")363 exit_on_error("Can not generate GPG keypair for an empty email")
364 gpg_cmd = ["gpg", "--homedir", gpg_homedir, "--batch", "--gen-key"]
379 try:365 try:
366 logger.info("The agent needs to generate cryptographically strong keys.")
380 logger.info("Generating GPG keys pair for %s" % email)367 logger.info("Generating GPG keys pair for %s" % email)
381 logger.info("It may take really, really long time. Please be patient.")368 logger.info("It may take really, really long time. Please be patient.")
382 gen_entropy()369 gen_entropy()
@@ -393,36 +380,25 @@
393%%commit380%%commit
394%%echo done381%%echo done
395""" % (server_id, email)382""" % (server_id, email)
396 p1 = subprocess.Popen(["echo", gpg_script], stdout=subprocess.PIPE)383
397 p2 = subprocess.Popen(["gpg", "--homedir", gpg_homedir, "--batch",384 p = subprocess.Popen(gpg_cmd, stdin=subprocess.PIPE)
398 "--gen-key"], stdin=p1.stdout)385 p.communicate(gpg_script)
399 p1.stdout.close()386 except OSError as err:
400 p2.wait()387 logger.error("Failed to run command %r. %s" % (gpg_cmd, err))
401 del p1, p2388 exit_on_error("Failed to generate GPG keys pair")
402 except:
403 logger.error("Failed to generate GPG keys pair")
404 exit_on_error(traceback.format_exc())
405 return True389 return True
406390
407391
408# Encrypts message with TwinDB public key
409# If server_id is non-zero (which means the server is registered)
410# signs the message with the server's private key
411# Inputs
412# msg - string to encrypt
413# Returns
414# 64-base encoded and encrypted message. To read - decrypt and 64-base decode
415# EDIT: encrypted message
416# None - if error happens
417
418def encrypt(msg):392def encrypt(msg):
419 global logger393 """
420 global api_email394 Encrypts message with TwinDB public key
421 global gpg_homedir395 If server_id is non-zero (which means the server is registered)
422 global server_id396 signs the message with the server's private key
423 global debug_gpg397 :param msg: string to encrypt
424398 :return: 64-base encoded and encrypted message or None if error happens.
425 if len(server_id) == 0:399 To read the encrypted message - decrypt and 64-base decode
400 """
401 if not server_id:
426 exit_on_error("Trying to encrypt message while server_id is empty")402 exit_on_error("Trying to encrypt message while server_id is empty")
427 server_email = "%s@twindb.com" % server_id403 server_email = "%s@twindb.com" % server_id
428 if not is_gpg_key_installed(api_email):404 if not is_gpg_key_installed(api_email):
@@ -433,12 +409,10 @@
433 logger.debug("Public key of %s is not installed." % server_email)409 logger.debug("Public key of %s is not installed." % server_email)
434 logger.debug("Will not encrypt message")410 logger.debug("Will not encrypt message")
435 return None411 return None
436 enc_cmd = ["gpg", "--homedir", gpg_homedir, "-r", api_email, "--batch",412 enc_cmd = ["gpg", "--homedir", gpg_homedir, "-r", api_email, "--batch", "--trust-model", "always", "--armor",
437 "--trust-model", "always", "--armor"]413 "--sign", "--local-user", server_email, "--encrypt"]
438 enc_cmd.append("--sign")414 cout = "No output"
439 enc_cmd.append("--local-user")415 cerr = "No output"
440 enc_cmd.append(server_email)
441 enc_cmd.append("--encrypt")
442 try:416 try:
443 if debug_gpg:417 if debug_gpg:
444 logger.debug("Encrypting message:")418 logger.debug("Encrypting message:")
@@ -450,11 +424,14 @@
450 stdout=subprocess.PIPE,424 stdout=subprocess.PIPE,
451 stderr=subprocess.PIPE)425 stderr=subprocess.PIPE)
452 cout, cerr = p.communicate(msg)426 cout, cerr = p.communicate(msg)
427 if p.returncode != 0:
428 raise OSError(p.returncode)
453 ct = cout429 ct = cout
454 ct_64 = b64encode(ct)430 ct_64 = b64encode(ct)
455 if debug_gpg:431 if debug_gpg:
456 logger.debug("Encrypted message: " + ct_64)432 logger.debug("Encrypted message: " + ct_64)
457 except:433 except OSError as err:
434 logger.error("Failed to run command %r. %s" % (enc_cmd, err))
458 logger.error("Failed to encrypt message: " + msg)435 logger.error("Failed to encrypt message: " + msg)
459 logger.error("STDOUT: " + cout)436 logger.error("STDOUT: " + cout)
460 logger.error("STDERR: " + cerr)437 logger.error("STDERR: " + cerr)
@@ -462,52 +439,34 @@
462 return ct_64439 return ct_64
463440
464441
465# Decrypts message with local private key
466# Inputs
467# msg - 64-base encoded and encrypted message.
468# Before encryption the message was 64-base encoded
469# Returns
470# String - Plain text message
471# None - if error happens
472
473def decrypt(msg_64):442def decrypt(msg_64):
474 global logger443 """
475 global api_email444 Decrypts message with local private key
476 global gpg_homedir445 :param msg_64: 64-base encoded and encrypted message. Before encryption the message was 64-base encoded
477 global server_id446 :return: Plain text message or None if error happens
478 global debug_gpg447 """
479
480 if not msg_64:448 if not msg_64:
481 logger.error("Will not decrypt empty message")449 logger.error("Will not decrypt empty message")
482 return None450 return None
483451 cout = "No output"
484 cout = None452 cerr = "No output"
485 cerr = None453 gpg_cmd = ["gpg", "--homedir", gpg_homedir, "-d", "-q"]
486
487 try:454 try:
488 dec_cmd = ["gpg", "--homedir", gpg_homedir, "-d", "-q"]
489 debug_gpg = True
490 if debug_gpg:455 if debug_gpg:
491 logger.debug("Decrypting message:")456 logger.debug("Decrypting message:")
492 logger.debug(msg_64)457 logger.debug(msg_64)
493 logger.debug("Decryptor command:")458 logger.debug("Decryptor command:")
494 logger.debug(dec_cmd)459 logger.debug(gpg_cmd)
495 p = subprocess.Popen(dec_cmd,460 p = subprocess.Popen(gpg_cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
496 stdin=subprocess.PIPE,
497 stdout=subprocess.PIPE,
498 stderr=subprocess.PIPE)
499 msg = b64decode(msg_64)461 msg = b64decode(msg_64)
500 cout, cerr = p.communicate(msg)462 cout, cerr = p.communicate(msg)
501
502 if p.returncode != 0:463 if p.returncode != 0:
503 raise OSError(p.returncode)464 raise OSError(p.returncode)
504 except:465 except OSError as err:
466 logger.error("Failed to run command %r. %s" % (gpg_cmd, err))
505 logger.error("Failed to decrypt message: " + msg_64)467 logger.error("Failed to decrypt message: " + msg_64)
506 logger.error("Unexpected error: %s, %s" % sys.exc_info()[:2]);468 logger.error("STDOUT: " + cout)
507 if cout:469 logger.error("STDERR: " + cerr)
508 logger.error("STDOUT: " + cout)
509 if cerr:
510 logger.error("STDERR: " + cerr)
511 return None470 return None
512 if debug_gpg:471 if debug_gpg:
513 logger.debug("Decrypted message:")472 logger.debug("Decrypted message:")
@@ -515,26 +474,20 @@
515 return cout474 return cout
516475
517476
518# Sends HTTP POST request to TwinDB dispatcher
519# It converts python data structure in "data" variable into JSON string,
520# then encrypts it and then sends as variable "data" in HTTP request
521# Inputs
522# uri - URI to send the request
523# data - Data structure with variables
524# Returns
525# String with body of HTTP response
526# None - if error happened or empty response
527
528def get_response(request):477def get_response(request):
529 global logger478 """
530 global host479 Sends HTTP POST request to TwinDB dispatcher
531 global proto480 It converts python data structure in "data" variable into JSON string,
532 global api_dir481 then encrypts it and then sends as variable "data" in HTTP request
533482 Inputs
483 uri - URI to send the request
484 data - Data structure with variables
485 Returns
486 String with body of HTTP response
487 None - if error happened or empty response
488 """
534 uri = "api.php"489 uri = "api.php"
535 response_body = None490 response_body = None
536 conn = None
537
538 logger.debug("Enter get_response(uri=" + uri + ")")491 logger.debug("Enter get_response(uri=" + uri + ")")
539 if proto == "http":492 if proto == "http":
540 conn = httplib.HTTPConnection(host)493 conn = httplib.HTTPConnection(host)
@@ -542,8 +495,10 @@
542 conn = httplib.HTTPSConnection(host)495 conn = httplib.HTTPSConnection(host)
543 else:496 else:
544 logger.error("Unsupported protocol " + proto)497 logger.error("Unsupported protocol " + proto)
498 return None
499 url = proto + "://" + host + "/" + api_dir + "/" + uri
500 http_response = "Empty response"
545 try:501 try:
546 url = proto + "://" + host + "/" + api_dir + "/" + uri
547 conn.connect()502 conn.connect()
548 logger.debug("Sending to " + host + ": %s" % json.dumps(request, indent=4, sort_keys=True))503 logger.debug("Sending to " + host + ": %s" % json.dumps(request, indent=4, sort_keys=True))
549 data_json = json.dumps(request)504 data_json = json.dumps(request)
@@ -588,46 +543,38 @@
588 logger.error("Please check that DNS server is reachable and works")543 logger.error("Please check that DNS server is reachable and works")
589 return None544 return None
590 except exceptions.KeyError as err:545 except exceptions.KeyError as err:
591 logger.error("Failed to decode response from server %s" % http_response)546 logger.error("Failed to decode response from server: %s" % http_response)
592 logger.error("Could not find key %s" % err)547 logger.error("Could not find key %s" % err)
593 return None548 return None
594 except:
595 logger.error("Couldn't make HTTP request " + url)
596 logger.error("Unexpected error: %s, %s" % sys.exc_info()[:2]);
597 logger.debug(traceback.format_exc())
598 return None
599 finally:549 finally:
600 conn.close()550 conn.close()
601 return response_body551 return response_body
602552
603553
604# Replaces password from config with ********554def sanitize_config(config):
605# Inputs555 """
606# config - python dictionary with backup config556 Replaces password from config with ********
607# Returns557 :param config: python dictionary with backup config
608# Sanitized config558 :return: Sanitized config
609559 """
610def sanitize_config(c):560 if not config:
611 if not c:
612 return None561 return None
613 cc = dict(c)562 sanitized_config = dict(config)
614 try:563 if "mysql_password" in config:
615 cc["mysql_password"] = "********"564 sanitized_config["mysql_password"] = "********"
616 except:565 else:
617 logger.debug("Given config %r doesn't contain password" % c)566 logger.debug("Given config %r doesn't contain password" % config)
618 return cc567 return sanitized_config
619568
620
621# Gets backup config from TwinDB dispatcher
622# Returns
623# Backup config
624# None - if error happened
625569
626def get_config():570def get_config():
627 global logger571 """
628 global server_id572 Gets backup config from TwinDB dispatcher
629573 :return: Backup config or None if error happened
574 """
630 logger.debug("Getting config for server_id = %s" % server_id)575 logger.debug("Getting config for server_id = %s" % server_id)
576 response_body = "Empty response"
577 config = None
631 try:578 try:
632 data = {579 data = {
633 "type": "get_config",580 "type": "get_config",
@@ -648,114 +595,87 @@
648 if msg_pt["error"]:595 if msg_pt["error"]:
649 logger.error(msg_pt["error"])596 logger.error(msg_pt["error"])
650 except exceptions.KeyError as err:597 except exceptions.KeyError as err:
651 logger.error("Failed to decode %s" % response_body);598 logger.error("Failed to decode %s" % response_body)
652 logger.error(err);599 logger.error(err)
653 return None
654 except:
655 logger.error("Couldn't get backup config")
656 logger.error("Unexpected error: %s,%s" % sys.exc_info()[:2]);
657 logger.debug(traceback.format_exc())
658 return None600 return None
659 return config601 return config
660602
661603
662# Reports slave status to TwinDB dispatcher604def report_sss(config):
663def report_sss():605 """
664 global logger606 Reports slave status to TwinDB dispatcher
665 global server_id607 :param config: server config received from the dispatcher
666608 :return: nothing
609 """
667 logger.debug("Reporting SHOW SLAVE STATUS for server_id = %s" % server_id)610 logger.debug("Reporting SHOW SLAVE STATUS for server_id = %s" % server_id)
668 try:611 ss = get_slave_status(config["mysql_user"], config["mysql_password"])
669 ss = get_slave_status()612 data = {
670 data = {613 "type": "report_sss",
671 "type": "report_sss",614 "params": {
672 "params": {615 "server_id": server_id,
673 "server_id": server_id,616 "mysql_server_id": ss["mysql_server_id"],
674 "mysql_server_id": ss["mysql_server_id"],617 "mysql_master_server_id": ss["mysql_master_server_id"],
675 "mysql_master_server_id": ss["mysql_master_server_id"],618 "mysql_master_host": ss["mysql_master_host"],
676 "mysql_master_host": ss["mysql_master_host"],619 "mysql_seconds_behind_master": ss["mysql_seconds_behind_master"],
677 "mysql_seconds_behind_master": ss["mysql_seconds_behind_master"],620 "mysql_slave_io_running": ss["mysql_slave_io_running"],
678 "mysql_slave_io_running": ss["mysql_slave_io_running"],621 "mysql_slave_sql_running": ss["mysql_slave_sql_running"],
679 "mysql_slave_sql_running": ss["mysql_slave_sql_running"],
680 }
681 }622 }
682 response_body = get_response(data)623 }
683 except:624 get_response(data)
684 logger.error("Couldn't get report SHOW SLAVE STATUS")
685 logger.error("Unexpected error: %s,%s" % sys.exc_info()[:2]);
686 logger.debug(traceback.format_exc())
687 return None
688 return625 return
689626
690627
691# Reports what privileges are given to the agent
692def report_agent_privileges(config):628def report_agent_privileges(config):
693 global logger629 """
694 global server_id630 Reports what privileges are given to the agent
695631 :param config: server config received from the dispatcher
632 :return: nothing
633 """
696 logger.debug("Reporting agent privileges for server_id = %s" % server_id)634 logger.debug("Reporting agent privileges for server_id = %s" % server_id)
697 try:635 con = get_mysql_connection(user=config["mysql_user"], passwd=config["mysql_password"])
698 mysql_user = config["mysql_user"]636 cursor = con.cursor()
699 mysql_password = config["mysql_password"]637 query = "SELECT PRIVILEGE_TYPE FROM information_schema.USER_PRIVILEGES"
700 except:638 logger.debug("Sending query : %s" % query)
701 logger.error("Failed to read config")639 cursor.execute(query)
702 logger.error("Unexpected error: %s,%s" % sys.exc_info()[:2])640
703 return641 privileges = {
704 try:642 "Reload_priv": "N",
705 con = get_mysql_connection(user=mysql_user, passwd=mysql_password)643 "Lock_tables_priv": "N",
706 cursor = con.cursor()644 "Repl_client_priv": "N",
707 query = "SELECT PRIVILEGE_TYPE FROM information_schema.USER_PRIVILEGES"645 "Super_priv": "N",
708 logger.debug("Sending query : %s" % query)646 "Create_tablespace_priv": "N"
709 cursor.execute(query)647 }
710648 for (priv,) in cursor:
711 privileges = dict()649 if priv == "RELOAD":
712 privileges["Reload_priv"] = "N"650 privileges["Reload_priv"] = "Y"
713 privileges["Lock_tables_priv"] = "N"651 elif priv == "LOCK TABLES":
714 privileges["Repl_client_priv"] = "N"652 privileges["Lock_tables_priv"] = "Y"
715 privileges["Super_priv"] = "N"653 elif priv == "REPLICATION CLIENT":
716 privileges["Create_tablespace_priv"] = "N"654 privileges["Repl_client_priv"] = "Y"
717655 elif priv == "SUPER":
718 for (priv,) in cursor:656 privileges["Super_priv"] = "Y"
719 if priv == "RELOAD":657 elif priv == "CREATE TABLESPACE":
720 privileges["Reload_priv"] = "Y"658 privileges["Create_tablespace_priv"] = "Y"
721 elif priv == "LOCK TABLES":659 data = {
722 privileges["Lock_tables_priv"] = "Y"660 "type": "report_agent_privileges",
723 elif priv == "REPLICATION CLIENT":661 "params": {
724 privileges["Repl_client_priv"] = "Y"662 "Reload_priv": privileges["Reload_priv"],
725 elif priv == "SUPER":663 "Lock_tables_priv": privileges["Lock_tables_priv"],
726 privileges["Super_priv"] = "Y"664 "Repl_client_priv": privileges["Repl_client_priv"],
727 elif priv == "CREATE TABLESPACE":665 "Super_priv": privileges["Super_priv"],
728 privileges["Create_tablespace_priv"] = "Y"666 "Create_tablespace_priv": privileges["Create_tablespace_priv"]
729 data = {
730 "type": "report_agent_privileges",
731 "params": {
732 "Reload_priv": privileges["Reload_priv"],
733 "Lock_tables_priv": privileges["Lock_tables_priv"],
734 "Repl_client_priv": privileges["Repl_client_priv"],
735 "Super_priv": privileges["Super_priv"],
736 "Create_tablespace_priv": privileges["Create_tablespace_priv"]
737 }
738 }667 }
739 get_response(data)668 }
740 except:669 get_response(data)
741 logger.error("Couldn't get privileges granted to %s@localhost on server %s" % (mysql_user, server_id))
742 logger.error("Unexpected error: %s,%s" % sys.exc_info()[:2]);
743 logger.debug(traceback.format_exc())
744 return None
745 return670 return
746671
747672
748# Gets job order from TwinDB dispatcher
749# Returns
750# Job order in python dictionary
751# None - if error happened
752
753def get_job():673def get_job():
754 global logger674 """
755 global server_id675 Gets job order from TwinDB dispatcher
756 global job_id676 :return: Job order in python dictionary or None if error happened
677 """
757 job = None678 job = None
758
759 logger.debug("Getting job for server_id = %s" % server_id)679 logger.debug("Getting job for server_id = %s" % server_id)
760 try:680 try:
761 d = json.JSONDecoder()681 d = json.JSONDecoder()
@@ -764,36 +684,41 @@
764 "params": {}684 "params": {}
765 }685 }
766 response_body = get_response(data)686 response_body = get_response(data)
687 if not response_body:
688 raise JobError("Empty response from dispatcher")
767 response_body_decoded = d.decode(response_body)689 response_body_decoded = d.decode(response_body)
690 if "response" not in response_body_decoded:
691 raise JobError("There is no 'response' key in the response from dispatcher")
692 if "success" not in response_body_decoded:
693 raise JobError("There is no 'success' key in the response from dispatcher")
768 msg_enc = response_body_decoded["response"]694 msg_enc = response_body_decoded["response"]
769 if response_body_decoded["success"] == True:695 if response_body_decoded["success"]:
770 job_json = decrypt(msg_enc)696 job_json = decrypt(msg_enc)
697 logger.debug("job_json = %s" % job_json)
698 if "data" not in job_json:
699 raise JobError("There is no 'data' in decrypted response %s" % job_json)
771 job = d.decode(job_json)["data"]700 job = d.decode(job_json)["data"]
772 job["params"] = d.decode(job["params"])701 if job and "params" in job and job["params"]:
773 job_id = job["job_id"]702 job["params"] = d.decode(job["params"])
774 logger.info("Got job:\n%s" % json.dumps(job, indent=4, sort_keys=True))703 logger.info("Got job:\n%s" % json.dumps(job, indent=4, sort_keys=True))
775 else:704 else:
776 logger.error("Couldn't get job")705 logger.error("Couldn't get job")
777 job_json = decrypt(msg_enc)706 job_json = decrypt(msg_enc)
778 logger.error("Server said: %s" % d.decode(job_json)["error"])707 logger.error("Server said: %s" % d.decode(job_json)["error"])
779 logger.debug("Server said: %s" % d.decode(job_json)["debug"])
780 except exceptions.TypeError as err:708 except exceptions.TypeError as err:
709 logger.error("Failed to get a job: %s" % err)
781 return None710 return None
782 except:711 except JobError as err:
783 logger.error("Couldn't get job")712 logger.error(err)
784 logger.error("Unexpected error: %s,%s" % sys.exc_info()[:2]);
785 return None713 return None
786 return job714 return job
787715
788716
789# Checks whether the server is registered or not
790# Returns
791# True - registered
792# False - not so much
793def is_registered():717def is_registered():
794 global logger718 """
795 global server_id719 Checks whether the server is registered or not
796720 :return: True if registered, False if not so much
721 """
797 logger.debug("Getting registration status for server_id = %s" % server_id)722 logger.debug("Getting registration status for server_id = %s" % server_id)
798723
799 twindb_email = "%s@twindb.com" % server_id724 twindb_email = "%s@twindb.com" % server_id
@@ -801,13 +726,13 @@
801726
802 enc_public_key = None727 enc_public_key = None
803 # Reading the GPG key728 # Reading the GPG key
729 gpg_cmd = ["gpg", "--homedir", gpg_homedir, "--armor", "--export", twindb_email]
804 try:730 try:
805 p1 = subprocess.Popen(["gpg", "--homedir", gpg_homedir, "--armor", "--export", twindb_email],731 p = subprocess.Popen(gpg_cmd, stdout=subprocess.PIPE)
806 stdout = subprocess.PIPE)732 enc_public_key = p.communicate()[0]
807 enc_public_key = p1.stdout.read()733 except OSError as err:
808 except:734 logger.error("Failed to run command %r. %s" % (gpg_cmd, err))
809 logger.error("Failed to export GPG keys of %s from %s." % (twindb_email, gpg_homedir))735 exit_on_error("Failed to export GPG keys of %s from %s." % (twindb_email, gpg_homedir))
810 exit_on_error(traceback.format_exc())
811736
812 # Call the TwinDB api to check for server registration737 # Call the TwinDB api to check for server registration
813 response_body = None738 response_body = None
@@ -819,21 +744,17 @@
819 "enc_public_key": enc_public_key744 "enc_public_key": enc_public_key
820 }745 }
821 }746 }
822
823 response_body = get_response(data)747 response_body = get_response(data)
824 if not response_body:748 if not response_body:
825 return None749 return False
826
827 json_decoder = json.JSONDecoder()750 json_decoder = json.JSONDecoder()
828 response_body_decoded = json_decoder.decode(response_body)751 response_body_decoded = json_decoder.decode(response_body)
829
830 if response_body_decoded:752 if response_body_decoded:
831 if "response" in response_body_decoded:753 if "response" in response_body_decoded:
832 msg_decrypted = decrypt(response_body_decoded["response"])754 msg_decrypted = decrypt(response_body_decoded["response"])
833 if msg_decrypted is None:755 if msg_decrypted is None:
834 logger.debug("No valid response from dispatcher. Consider agent unregistered")756 logger.debug("No valid response from dispatcher. Consider agent unregistered")
835 return False757 return False
836
837 msg_pt = json_decoder.decode(msg_decrypted)758 msg_pt = json_decoder.decode(msg_decrypted)
838 registration_status = msg_pt["data"]759 registration_status = msg_pt["data"]
839 logger.debug("Got registration status:\n%s" % json.dumps(registration_status, indent=4, sort_keys=True))760 logger.debug("Got registration status:\n%s" % json.dumps(registration_status, indent=4, sort_keys=True))
@@ -844,216 +765,140 @@
844 else:765 else:
845 logger.debug("No valid response from dispatcher. Consider agent unregistered")766 logger.debug("No valid response from dispatcher. Consider agent unregistered")
846 return False767 return False
847 except exceptions.KeyError:768 except exceptions.KeyError as err:
848 logger.error("Failed to decode %s" % response_body)769 exit_on_error("Failed to decode response from dispatcher: %s. %s" % (response_body, err))
849 exit_on_error(traceback.format_exc())
850 except:
851 logger.error("Couldn't get backup config")
852 logger.error("Unexpected error: %s,%s" % sys.exc_info()[:2]);
853 exit_on_error(traceback.format_exc())
854 return False770 return False
855771
856772
857# Gets name of initial full backup that corresponds to given incremental backup773def log_job_notify(params):
858# Inputs774 """
859# f - Valid name of incremental backup775 Notifies a job event to TwinDB dispatcher
860# Returns776 :param params: { event: "start_job", job_id: job_id } or
861# String with name of the initial full backup. 777 { event: "stop_job", job_id: job_id, ret_code: ret }
862# If full backup was given it will return the same name778 :return: True of False if error happened
863# None - if error happened779 """
864
865def get_full_of(f):
866 global logger
867 global server_id
868
869 try:
870 logger.info("Getting full backup name of %s" % f)
871 d = json.JSONDecoder()
872 response = d.decode(get_response("get_full_of.php", {"backup_name": f}))
873 msg_enc = response["data"]
874 logger.debug("Reply from server %s" % msg_enc)
875 full = d.decode(decrypt(msg_enc))
876 logger.info("Got full backup name %s" % full["name"])
877 except:
878 logger.error("Couldn't get name of initial full backup of %s" % f)
879 logger.error(traceback.format_exc())
880 return None
881 return full["name"]
882
883
884# Gets name of child backup copy of the given backup name
885# Inputs
886# f - Valid name of incremental or full backup
887# Returns
888# String with name of child backup copy of the given backup copy
889# None - if error happened
890
891def get_child_of(f):
892 global logger
893 global server_id
894
895 try:
896 logger.info("Getting child backup name of %s" % f)
897 d = json.JSONDecoder()
898 response = d.decode(get_response("get_child_of.php",
899 {"backup_name": f}))
900 msg_enc = response["data"]
901 logger.debug("Reply from server %s" % msg_enc)
902 child = d.decode(decrypt(msg_enc))
903 logger.info("Got child backup name %s" % child["name"])
904 except:
905 logger.error("Couldn't get name of child backup of %s" % f)
906 logger.error(traceback.format_exc())
907 return None
908 return child["name"]
909
910
911# Notifies a job event to TwinDB dispatcher
912# Inputs
913# job_id - id of a job in jobs table
914# params - { event: "start_job", job_id: job_id } or
915# { event: "stop_job", job_id: job_id, ret_code: ret }
916# Returns
917# True
918# False - if error happened
919
920def log_job_notify(job_id, params):
921 global logging
922 result = False
923
924 logger.info("Sending event notification %s" % params["event"])780 logger.info("Sending event notification %s" % params["event"])
925 try:781 data = {
926 data = {782 "type": "notify",
927 "type": "notify",783 "params": params
928 "params": params784 }
929 }785 response_body = get_response(data)
930 response_body = get_response(data)786 d = json.JSONDecoder()
931 d = json.JSONDecoder()787 response_body_decoded = d.decode(response_body)
932 response_body_decoded = d.decode(response_body)788 if response_body_decoded["success"]:
933 msg_enc = response_body_decoded["response"]789 logger.debug("Dispatcher acknowledged job_id = %d notification" % job_id)
934 if response_body_decoded["success"] == True:790 result = True
935 logger.debug("Dispatcher acknowledged job_id = %d start" % job_id)791 else:
936 result = True792 logger.error("Dispatcher didn't acknowledge job_id = %d notification" % job_id)
937 else:793 result = False
938 logger.error("Dispatcher didn't acknowledge job_id = %d start" % job_id)
939 result = False
940 except:
941 logger.error("Unexpected error: %s,%s" % sys.exc_info()[:2]);
942 return result794 return result
943795
944796
945# Saves details about backup copy in TwinDB dispatcher797def record_backup(name, volume_id, size, lsn=None, ancestor=0):
946# Inputs798 """
947# job_id - id of a job in jobs table799 Saves details about backup copy in TwinDB dispatcher
948# name - name of backup800 :param name: name of backup
949# size - size of the backup in bytes801 :param volume_id: id of a volume where the backup copy is saved
950# lsn - last LSN if it was incremental backup802 :param size: size of the backup in bytes
951# ancestor - Ansector of the backup (not used now)803 :param lsn: last LSN if it was incremental backup
952# Returns804 :param ancestor: Ansector of the backup (not used now)
953# JSON string with status of the request i.e. { "success": True }805 :return: JSON string with status of the request i.e. { "success": True } or None if error happened
954# None - if error happened806 """
955807 logger.info("Saving information about backup:")
956def record_backup(job_id, name, volume_id, size, lsn=None, ancestor=0):808 logger.info("File name : %s" % name)
957 global logger809 logger.info("Volume id : %d" % int(volume_id))
958810 logger.info("Size : %d (%s)" % (int(size), h_size(size)))
959 try:811 logger.info("Ancestor : %d" % int(ancestor))
960 logger.info("Saving information about backup:")812 data = {
961 logger.info("File name : %s" % name)813 "type": "update_backup_data",
962 logger.info("Volume id : %d" % int(volume_id))814 "params": {
963 logger.info("Size : %d (%s)" % (int(size), h_size(size) ))815 "job_id": job_id,
964 logger.info("Ancestor : %d" % int(ancestor))816 "name": name,
965 data = {817 "volume_id": volume_id,
966 "type": "update_backup_data",818 "size": size,
967 "params": {819 "lsn": lsn,
968 "job_id": job_id,820 "ancestor": ancestor
969 "name": name,
970 "volume_id": volume_id,
971 "size": size,
972 "lsn": lsn,
973 "ancestor": ancestor
974 }
975 }821 }
976 logger.debug("Saving a record %s" % data)822 }
977 response = get_response(data)823 logger.debug("Saving a record %s" % data)
978 if response <> None:824 response = get_response(data)
979 jd = json.JSONDecoder()825 if response:
980 r = jd.decode(response)826 jd = json.JSONDecoder()
981 logger.debug(r)827 r = jd.decode(response)
982 if r["success"]:828 logger.debug(r)
983 logger.info("Saved backup copy details")829 if r["success"]:
984 return True830 logger.info("Saved backup copy details")
985 else:831 return True
986 logger.error("Failed to save backup copy details: "
987 + jd.decode(decrypt(r["response"]))["error"])
988 return False
989 del jd
990 else:832 else:
991 logger.error("Empty response from server")833 logger.error("Failed to save backup copy details: "
834 + jd.decode(decrypt(r["response"]))["error"])
992 return False835 return False
993 except:836 else:
994 logger.error("Failed to save backup copy details")837 logger.error("Empty response from server")
995 logger.error("Unexpected error: %s,%s" % sys.exc_info()[:2]);
996 return False838 return False
997 return True839
998
999
1000# Reads config from /etc/twindb.cfg and sets server_id variable
1001# Returns
1002# True - if config was successfully read
1003# Exits if error happened
1004840
1005def read_config():841def read_config():
1006 global init_config842 """
843 Reads config from /etc/twindb.cfg and sets server_id variable
844 :return: True if config was successfully read
845 Exits if error happened
846 """
1007 global server_id847 global server_id
1008
1009 try:848 try:
1010 if os.path.exists(init_config):849 if os.path.exists(init_config):
1011 execfile(init_config)850 ns = {}
851 execfile(init_config, ns)
852 if "server_id" in ns:
853 server_id = ns["server_id"]
1012 else:854 else:
1013 exit_on_error("Config %s doesn't exist" % init_config)855 print("Config %s doesn't exist" % init_config, file=sys.stderr)
1014 if len(server_id) == 0:856 return False
857 if not server_id:
1015 exit_on_error("Config %s doesn't set server_id" % init_config)858 exit_on_error("Config %s doesn't set server_id" % init_config)
1016 except:859 except IOError as err:
1017 logger.error(traceback.format_exc())860 exit_on_error("Failed to read config file %s. %s" % (init_config, err))
1018 exit_on_error("Failed to read from config in %s" % init_config)
1019 return True861 return True
1020862
1021863
1022# Saves server_id variable in /etc/twindb.cfg864def save_config():
1023# Returns865 """
1024# True - if config was successfully saved866 Saves server_id variable in file init_config (/etc/twindb.cfg)
1025# Exits if error happened867 :return: True - if config was successfully saved
1026868 Exits if error happened
1027def save_config(server_id):869 """
1028 global init_config870 if not server_id:
1029871 exit_on_error("Can not save agent config file because server_id is empty")
1030 try:872 try:
1031 f = open(init_config, 'w')873 f = open(init_config, "w")
1032 f.write("server_id='%s'\n" % server_id)874 f.write("server_id='%s'\n" % server_id)
1033 f.close()875 f.close()
1034 except:876 except IOError as err:
1035 print("Failed to save new config in %s" % init_config)877 exit_on_error("Failed to save new config in %s. %s" % (init_config, err))
1036 sys.exit(2)878 return True
1037 return879
1038880
1039881def get_slave_status(user=None, passwd=None):
1040def get_slave_status():882 """
1041 result = dict()883 Reads SHOW SLAVE STATUS from the local server
1042 # Read master host884 :param user: user to connect to MySQL
885 :param passwd: password to connect to MySQL
886 :return: dictionary with SHOW SLAVE STATUS result
887 """
888 conn = get_mysql_connection(user, passwd)
889 if conn:
890 cursor = conn.cursor(dictionary=True)
891 else:
892 return None
893 result = {
894 "mysql_server_id": None,
895 "mysql_master_server_id": None,
896 "mysql_master_host": None,
897 "mysql_seconds_behind_master": None,
898 "mysql_slave_io_running": None,
899 "mysql_slave_sql_running": None
900 }
1043 try:901 try:
1044 conn = get_mysql_connection()
1045 if conn:
1046 cursor = conn.cursor(dictionary=True)
1047 else:
1048 return None
1049
1050 result["mysql_server_id"] = None
1051 result["mysql_master_server_id"] = None
1052 result["mysql_master_host"] = None
1053 result["mysql_seconds_behind_master"] = None
1054 result["mysql_slave_io_running"] = None
1055 result["mysql_slave_sql_running"] = None
1056
1057 cursor.execute("SHOW SLAVE STATUS")902 cursor.execute("SHOW SLAVE STATUS")
1058 for row in cursor:903 for row in cursor:
1059 result["mysql_master_server_id"] = row["Master_Server_Id"]904 result["mysql_master_server_id"] = row["Master_Server_Id"]
@@ -1061,35 +906,44 @@
1061 result["mysql_seconds_behind_master"] = row["Seconds_Behind_Master"]906 result["mysql_seconds_behind_master"] = row["Seconds_Behind_Master"]
1062 result["mysql_slave_io_running"] = row["Slave_IO_Running"]907 result["mysql_slave_io_running"] = row["Slave_IO_Running"]
1063 result["mysql_slave_sql_running"] = row["Slave_SQL_Running"]908 result["mysql_slave_sql_running"] = row["Slave_SQL_Running"]
1064909 except mysql.connector.Error as err:
910 logger.error("Could get SHOW SLAVE STATUS")
911 logger.error("MySQL Error: " % err)
912 return None
913 try:
1065 cursor.execute("SELECT @@server_id AS server_id")914 cursor.execute("SELECT @@server_id AS server_id")
1066 for row in cursor:915 for row in cursor:
1067 result["mysql_server_id"] = row["server_id"]916 result["mysql_server_id"] = row["server_id"]
1068
1069 cursor.close()917 cursor.close()
1070 conn.close()918 conn.close()
1071 except:919 except mysql.connector.Error as err:
1072 logger.error("Could not read Master_host from MySQL")920 logger.error("Could not read server_id")
1073 logger.error("Unexpected error: %s,%s" % sys.exc_info()[:2]);921 logger.error("MySQL Error: " % err)
1074 return None922 return None
1075 return result923 return result
1076924
1077925
1078def has_mysql_access(username=None, password=None, grant_capability=True):926def has_mysql_access(username=None, password=None, grant_capability=True):
927 """
928 Reports if a user has all required MySQL privileges
929 :param username: MySQL user
930 :param password: MySQL password
931 :param grant_capability: TODO to
932 :return: a pair of a boolean that tells whether MySQL user has all required privileges
933 and a list of missing privileges
934 """
1079 has_required_grants = False935 has_required_grants = False
1080936
1081 # list of missing privileges937 # list of missing privileges
1082 missing_privileges = []938 missing_privileges = []
1083939
1084 try:940 try:
1085 if username is None or password is None:941 conn = get_mysql_connection(username, password)
1086 conn = get_mysql_connection()
1087 else:
1088 conn = get_mysql_connection(username, password)
1089942
1090 if isinstance(conn, mysql.connector.MySQLConnection):943 if isinstance(conn, mysql.connector.MySQLConnection):
1091 cursor = conn.cursor(dictionary=True)944 cursor = conn.cursor(dictionary=True)
1092 else:945 else:
946 missing_privileges = ['RELOAD', 'SUPER', 'LOCK TABLES', 'REPLICATION CLIENT', 'CREATE TABLESPACE']
1093 return has_required_grants, missing_privileges947 return has_required_grants, missing_privileges
1094948
1095 # Fetch the current user and matching host part as it could either be949 # Fetch the current user and matching host part as it could either be
@@ -1149,95 +1003,84 @@
11491003
1150 cursor.close()1004 cursor.close()
1151 conn.close()1005 conn.close()
1152 except:1006 except mysql.connector.Error as err:
1153 logger.error("Could not read the grants information from MySQL")1007 logger.error("Could not read the grants information from MySQL")
1154 logger.error("Unexpected error: %s,%s" % sys.exc_info()[:2])1008 logger.error("MySQL Error: %s" % err)
1155
1156 return has_required_grants, missing_privileges1009 return has_required_grants, missing_privileges
11571010
11581011
1159# Registers this server in TwinDB dispatcher
1160# Inputs
1161# code - string with secret registration code
1162# Returns
1163# True - if server was successfully registered
1164# Exits if error happened
1165def action_handler_register(code):1012def action_handler_register(code):
1166 global logger1013 """
1167 global init_config1014 Registers this server in TwinDB dispatcher
1168 global ssh_private_key1015 Inputs
1169 global ssh_public_key1016 code - string with secret registration code
1170 global gpg_homedir1017 Returns
1171 global server_id1018 True - if server was successfully registered
1172 global mysql_user1019 Exits if error happened
1173 global mysql_password1020 """
1174
1175 # Check early to see that the MySQL user passed to the agent has enough1021 # Check early to see that the MySQL user passed to the agent has enough
1176 # privileges to create a separate MySQL user needed by TwinDB1022 # privileges to create a separate MySQL user needed by TwinDB
1177 mysql_access_available, missing_mysql_privileges = has_mysql_access()1023 mysql_access_available, missing_mysql_privileges = has_mysql_access()
1178 if not mysql_access_available:1024 if not mysql_access_available:
1179 logger.error("The MySQL user %s does not have enough privileges" % mysql_user)1025 logger.error("The MySQL user %s does not have enough privileges" % mysql_user)
1180 if len(missing_mysql_privileges) > 0:1026 if missing_mysql_privileges:
1181 logger.error("Following privileges are missing: %s" % ','.join(missing_mysql_privileges))1027 logger.error("Following privileges are missing: %s" % ','.join(missing_mysql_privileges))
1182
1183 return False1028 return False
11841029
1185 logger.info("Registering TwinDB agent with code %s" % code)1030 logger.info("Registering TwinDB agent with code %s" % code)
1186 logger.info("The agent needs to generate cryptographically strong keys.")
1187 logger.info("It may take really, really long time. Please be patient.")
1188 name = os.uname()[1].strip() # un[1] is a hostname1031 name = os.uname()[1].strip() # un[1] is a hostname
11891032
1190 twindb_email = "%s@twindb.com" % server_id1033 twindb_email = "%s@twindb.com" % server_id
11911034
1192 # Generate GPG key1035 # Read GPG public key
1193 enc_public_key = None1036 enc_public_key = None
1037 cmd = ["gpg", "--homedir", gpg_homedir, "--armor", "--export", twindb_email]
1194 try:1038 try:
1195 logger.debug("Reading GPG public key of %s." % twindb_email)1039 logger.debug("Reading GPG public key of %s." % twindb_email)
1196 p1 = subprocess.Popen(["gpg", "--homedir", gpg_homedir,1040 p1 = subprocess.Popen(cmd, stdout=subprocess.PIPE)
1197 "--armor", "--export", twindb_email],1041 enc_public_key = p1.communicate()[0]
1198 stdout=subprocess.PIPE)1042 except OSError as err:
1199 enc_public_key = p1.stdout.read()1043 logger.error("Failed to run command %r. %s" % (cmd, err))
1200 except:1044 exit_on_error("Failed to export GPG keys of %s from %s." % (twindb_email, gpg_homedir))
1201 logger.error("Failed to export GPG keys of %s from %s."
1202 % (twindb_email, gpg_homedir))
1203 exit_on_error(traceback.format_exc())
12041045
1205 # Generate SSH key1046 if not os.path.isfile(ssh_private_key_file):
1206 try:1047 try:
1207 if not os.path.isfile(ssh_private_key):
1208 logger.info("Generating SSH keys pair.")1048 logger.info("Generating SSH keys pair.")
1209 subprocess.call(["ssh-keygen", "-N", "", "-f", ssh_private_key])1049 subprocess.call(["ssh-keygen", "-N", "", "-f", ssh_private_key_file])
1210 except:1050 except OSError as err:
1211 logger.error("Failed to generate SSH keys.")1051 logger.error("Failed to run command %r. %s" % (cmd, err))
1212 exit_on_error(traceback.format_exc())1052 exit_on_error("Failed to generate SSH keys.")
12131053 ssh_public_key = None
1214 try:1054 try:
1215 logger.info("Reading SSH public key from %s." % ssh_public_key)1055 logger.info("Reading SSH public key from %s." % ssh_public_key_file)
1216 f = open(ssh_public_key, 'r')1056 f = open(ssh_public_key_file, 'r')
1217 ssh_public_key = f.read()1057 ssh_public_key = f.read()
1218 f.close()1058 f.close()
1219 except:1059 except IOError as err:
1220 logger.error("Failed to read SSH keys.")1060 logger.error("Failed to read from file %s. %s" % (ssh_public_key_file, err))
1221 exit_on_error(traceback.format_exc())1061 exit_on_error("Failed to read SSH keys.")
12221062
1223 # Read local ip addresses1063 # Read local ip addresses
1224 cmd = "ip addr"1064 cmd = "ip addr"
1225 cmd += "| grep -w inet"1065 cmd += "| grep -w inet"
1226 cmd += "| awk '{ print $2}'"1066 cmd += "| awk '{ print $2}'"
1227 cmd += "| awk -F/ '{ print $1}'"1067 cmd += "| awk -F/ '{ print $1}'"
1228 ss = get_slave_status()1068 cout = None
12291069 logger.debug("Getting list of local IP addresses")
1230 if ss is None:1070 try:
1231 logger.error("Could not get slave status on this server")1071 logger.debug("Running: %s" % cmd)
1232 exit_on_error()1072 p = subprocess.Popen(cmd, stdout=subprocess.PIPE, shell=True)
1233 p = subprocess.Popen(cmd, stdout=subprocess.PIPE, shell=True)1073 cout, cerr = p.communicate()
12341074 logger.debug("STDOUT: %s" % cout)
1075 logger.debug("STDERR: %s" % cerr)
1076 except OSError as err:
1077 logger.error("Failed to run command %r. %s" % (cmd, err))
1235 local_ip = list()1078 local_ip = list()
1236 for row in p.stdout:1079 for row in cout.split("\n"):
1237 row = row.rstrip('\n')1080 row = row.rstrip("\n")
1238 if row != "127.0.0.1":1081 if row and row != "127.0.0.1":
1239 local_ip.append(row)1082 local_ip.append(row)
12401083 ss = get_slave_status()
1241 data = {1084 data = {
1242 "type": "register",1085 "type": "register",
1243 "params": {1086 "params": {
@@ -1255,9 +1098,8 @@
1255 "local_ip": local_ip1098 "local_ip": local_ip
1256 }1099 }
1257 }1100 }
1258
1259 api_response = get_response(data)1101 api_response = get_response(data)
1260 if api_response is not None:1102 if api_response:
1261 json_decoder = json.JSONDecoder()1103 json_decoder = json.JSONDecoder()
1262 response_decoded = json_decoder.decode(api_response)1104 response_decoded = json_decoder.decode(api_response)
1263 logger.debug(response_decoded)1105 logger.debug(response_decoded)
@@ -1274,18 +1116,16 @@
1274 elif "errors" in response_decoded:1116 elif "errors" in response_decoded:
1275 error_msg = response_decoded["errors"]["msg"]1117 error_msg = response_decoded["errors"]["msg"]
12761118
1277 logger.error("Failed to register the agent: %s" % error_msg)1119 exit_on_error("Failed to register the agent: %s" % error_msg)
1278 sys.exit(2)
1279 del json_decoder
1280 else:1120 else:
1281 exit_on_error("Empty response from server")1121 exit_on_error("Empty response from server")
1282 return True1122 return True
12831123
12841124
1285def create_agent_user():1125def create_agent_user():
1286 global mysql_user1126 """
1287 global mysql_password1127 Creates local MySQL user for twindb agent
12881128 """
1289 config = get_config()1129 config = get_config()
1290 try:1130 try:
1291 conn = get_mysql_connection()1131 conn = get_mysql_connection()
@@ -1294,9 +1134,8 @@
1294 cursor = conn.cursor()1134 cursor = conn.cursor()
1295 cursor.execute(q, (config["mysql_user"], config["mysql_password"]))1135 cursor.execute(q, (config["mysql_user"], config["mysql_password"]))
1296 except mysql.connector.Error as err:1136 except mysql.connector.Error as err:
1297 logger.error("Failed to create MySQL user %s@localhost for TwinDB agent" % config["mysql_user"])1137 logger.error("MySQL replied: %s" % err)
1298 logger.error("MySQL replied: %s" % err.msg)1138 exit_on_error("Failed to create MySQL user %s@localhost for TwinDB agent" % config["mysql_user"])
1299 exit_on_error(traceback.format_exc())
1300 logger.info("Created MySQL user %s@localhost for TwinDB agent" % config["mysql_user"])1139 logger.info("Created MySQL user %s@localhost for TwinDB agent" % config["mysql_user"])
1301 logger.info("Congratulations! The server is successfully registered in TwinDB.")1140 logger.info("Congratulations! The server is successfully registered in TwinDB.")
1302 logger.info("TwinDB will backup the server accordingly to the default config.")1141 logger.info("TwinDB will backup the server accordingly to the default config.")
@@ -1304,15 +1143,13 @@
1304 return True1143 return True
13051144
13061145
1307# Unregisters this server in TwinDB dispatcher
1308# Returns
1309# True - if server was successfully unregistered
1310# False - if error happened
1311
1312def action_handler_unregister(delete_backups):1146def action_handler_unregister(delete_backups):
1313 global logger1147 """
1314 global init_config1148 Unregisters this server in TwinDB dispatcher
13151149 Returns
1150 True - if server was successfully unregistered
1151 False - if error happened
1152 """
1316 data = {1153 data = {
1317 "type": "unregister",1154 "type": "unregister",
1318 "params": {1155 "params": {
@@ -1322,7 +1159,7 @@
1322 logger.debug("Unregistration request:")1159 logger.debug("Unregistration request:")
1323 logger.debug(data)1160 logger.debug(data)
1324 response = get_response(data)1161 response = get_response(data)
1325 if response <> None:1162 if response:
1326 jd = json.JSONDecoder()1163 jd = json.JSONDecoder()
1327 r = jd.decode(response)1164 r = jd.decode(response)
1328 logger.debug(r)1165 logger.debug(r)
@@ -1330,25 +1167,21 @@
1330 logger.info("The server is successfully unregistered")1167 logger.info("The server is successfully unregistered")
1331 return True1168 return True
1332 else:1169 else:
1333 logger.error("Failed to unregister the agent: "1170 exit_on_error("Failed to unregister the agent: " + jd.decode(decrypt(r["response"]))["error"])
1334 + jd.decode(decrypt(r["response"]))["error"])
1335 sys.exit(2)
1336 del jd
1337 else:1171 else:
1338 logger.error("Empty response from server")1172 exit_on_error("Empty response from server")
1339 sys.exit(2)
1340 return False1173 return False
13411174
13421175
1343# Starts immediate backup job
1344def action_handler_backup():1176def action_handler_backup():
1345 global logger1177 """
13461178 Starts immediate backup job
1179 """
1347 if schedule_backup():1180 if schedule_backup():
1348 config = get_config()1181 config = get_config()
1349 job = get_job()1182 job = get_job()
1350 if job:1183 if job:
1351 if 0 == process_job(config, job):1184 if process_job(config, job) == 0:
1352 return True1185 return True
1353 else:1186 else:
1354 logger.error("Didn't get any job from the dispatcher")1187 logger.error("Didn't get any job from the dispatcher")
@@ -1359,10 +1192,10 @@
1359 return False1192 return False
13601193
13611194
1362# Asks dispatcher to schedule a job for this server
1363def schedule_backup():1195def schedule_backup():
1364 global logger1196 """
13651197 Asks dispatcher to schedule a job for this server
1198 """
1366 data = {1199 data = {
1367 "type": "schedule_backup",1200 "type": "schedule_backup",
1368 "params": {1201 "params": {
@@ -1371,7 +1204,7 @@
1371 logger.debug("Schedule backup request:")1204 logger.debug("Schedule backup request:")
1372 logger.debug(data)1205 logger.debug(data)
1373 response = get_response(data)1206 response = get_response(data)
1374 if response <> None:1207 if response:
1375 jd = json.JSONDecoder()1208 jd = json.JSONDecoder()
1376 r = jd.decode(response)1209 r = jd.decode(response)
1377 logger.debug(r)1210 logger.debug(r)
@@ -1382,45 +1215,43 @@
1382 logger.error("Failed to schedule a job: "1215 logger.error("Failed to schedule a job: "
1383 + jd.decode(decrypt(r["response"]))["error"])1216 + jd.decode(decrypt(r["response"]))["error"])
1384 return False1217 return False
1385 del jd
1386 else:1218 else:
1387 exit_on_error("Empty response from server")1219 exit_on_error("Empty response from server")
13881220
13891221
1390# Checks if it's enough space in TwinDB storage1222def check_space():
1391# Inputs1223 """
1392# config - backup config1224 Checks if it's enough space in TwinDB storage
1393# job - job order1225 Inputs
1394# Returns always True1226 config - backup config
13951227 job - job order
1396def check_space(config, job_id):1228 Returns always True
1229 TODO: implement the fucntion
1230 """
1397 return True1231 return True
13981232
13991233
1400# Checks if MySQL user from backup config has enough privileges to perform backup
1401# Inputs
1402# config - backup config
1403# job - job order
1404# Returns
1405# True - if all necessary privileges are granted
1406# False - if not all necessary privilegers are granted
1407
1408def check_mysql_permissions(config):1234def check_mysql_permissions(config):
1409 global logger1235 """
14101236 Checks if MySQL user from backup config has enough privileges to perform backup
1237 Inputs
1238 config - backup config
1239 Returns
1240 True - if all necessary privileges are granted
1241 False - if not all necessary privilegers are granted
1242 """
1411 try:1243 try:
1412 logger.info("Checking if '%s'@'localhost' has enough rights to backup MySQL" % config["mysql_user"])1244 logger.info("Checking if '%s'@'localhost' has enough rights to backup MySQL" % config["mysql_user"])
1413 result = False
1414 con = get_mysql_connection(user=config["mysql_user"], passwd=config["mysql_password"])1245 con = get_mysql_connection(user=config["mysql_user"], passwd=config["mysql_password"])
1415 required_priv = ["RELOAD", "LOCK TABLES", "REPLICATION CLIENT", "SUPER", "CREATE TABLESPACE"]1246 required_priv = ["RELOAD", "LOCK TABLES", "REPLICATION CLIENT", "SUPER", "CREATE TABLESPACE"]
1416 for priv in required_priv:1247 for priv in required_priv:
1417 cursor = con.cursor()1248 cursor = con.cursor()
1418 logger.debug("Checking if privilege %s is granted" % priv)1249 logger.debug("Checking if privilege %s is granted" % priv)
1419 query = """1250 query = """
1420SELECT 1251 SELECT
1421 PRIVILEGE_TYPE 1252 PRIVILEGE_TYPE
1422FROM information_schema.USER_PRIVILEGES 1253 FROM information_schema.USER_PRIVILEGES
1423WHERE GRANTEE = '\'%s\'@\\\'localhost\\\'' AND PRIVILEGE_TYPE = %s"""1254 WHERE GRANTEE = '\'%s\'@\\\'localhost\\\'' AND PRIVILEGE_TYPE = %s"""
1424 logger.debug("Sending query : %s" % query)1255 logger.debug("Sending query : %s" % query)
1425 cursor.execute(query, (config["mysql_user"], priv))1256 cursor.execute(query, (config["mysql_user"], priv))
1426 cursor.fetchall()1257 cursor.fetchall()
@@ -1430,288 +1261,213 @@
1430 logger.debug("Privilege %s is granted to %s@localhost" % (priv, config["mysql_user"]))1261 logger.debug("Privilege %s is granted to %s@localhost" % (priv, config["mysql_user"]))
1431 else:1262 else:
1432 logger.info("%20s ... NOT GRANTED" % priv)1263 logger.info("%20s ... NOT GRANTED" % priv)
1433 logger.error("Privilege %s is not granted to %s@localhost" % ( priv, config["mysql_user"]))1264 logger.error("Privilege %s is not granted to %s@localhost" % (priv, config["mysql_user"]))
1434 logger.info(1265 logger.info("Please execute following SQL to grant %s to user %s@localhost:"
1435 "Please execute following SQL to grant %s to user %s@localhost:" % ( priv, config["mysql_user"]))1266 % (priv, config["mysql_user"]))
1436 logger.info("GRANT %s ON *.* TO '%s'@'localhost';" % ( priv, config["mysql_user"]))1267 logger.info("GRANT %s ON *.* TO '%s'@'localhost';" % (priv, config["mysql_user"]))
1437 return False1268 return False
1438 cursor.close()1269 cursor.close()
1439 con.close()1270 con.close()
1440 result = True1271 except mysql.connector.Error as err:
1441 except:1272 logger.error("MySQL Error: %s" % err)
1442 logger.error("Unexpected error: %s,%s" % sys.exc_info()[:2])1273 return False
1443 result = False1274 logger.debug("MySQL user %s@'localhost' has enough grants to take backup" % config["mysql_user"])
1444 if result:1275 return True
1445 logger.debug("MySQL user %s@'localhost' has enough grants to take backup" % config["mysql_user"])1276
1446 else:
1447 logger.error("MySQL user %s@'localhost' doesn't have enough grants to take backup" % config["mysql_user"])
1448 return result
1449
1450
1451# Meta fuction that calls actual backup fucntion depending on tool in backup config
1452# Inputs
1453# config - backup config
1454# job - job order
1455# Returns what actual backup function returned or -1 if the tool is not supported
14561277
1457def take_backup(config, job):1278def take_backup(config, job):
1458 global logger1279 """
14591280 Meta function that calls actual backup fucntion depending on tool in backup config
1281 :param config: backup config
1282 :param job: job order
1283 :return: what actual backup function returned or -1 if the tool is not supported
1284 """
1460 ret = -11285 ret = -1
1461 logger.info("Starting backup job")1286 logger.info("Starting backup job")
1462 try:1287 notify_params = {"event": "start_job", "job_id": job["job_id"]}
1463 job_id = int(job["job_id"])1288 if log_job_notify(notify_params):
1464 notify_params = {"event": "start_job", "job_id": job_id}1289 ret = take_backup_xtrabackup(config, job)
1465 if log_job_notify(job_id, notify_params):1290 notify_params = {"event": "stop_job", "job_id": job["job_id"], "ret_code": ret}
1466 ret = take_backup_xtrabackup(config, job)1291 log_job_notify(notify_params)
1467 notify_params = {"event": "stop_job", "job_id": job_id, "ret_code": ret}1292 logger.info("Backup job is complete")
1468 log_job_notify(job_id, notify_params)1293 else:
1469 logger.info("Backup job is complete")1294 logger.info("Backup job can not start")
1470 else:
1471 logger.info("Backup job can not start")
1472 except:
1473 logger.error("Unexpected error: %s,%s" % sys.exc_info()[:2]);
1474
1475 return ret1295 return ret
14761296
14771297
1478# Starts SSH process to TwinDB storage and saves input in file backup_name
1479# Inputs
1480# config - backup config
1481# backup_name - file name to save input in
1482# stdin, stderr - respective IO handlers
1483# Returns what SSH process returns
1484
1485def start_ssh_cmd(config, job_params, backup_name, stdin, stderr):1298def start_ssh_cmd(config, job_params, backup_name, stdin, stderr):
1486 global logger1299 """
1487 global ssh_private_key1300 Starts an SSH process to TwinDB storage and saves input in file backup_name
1488 global ssh_port1301 :param config: backup config
14891302 :param job_params: job parameters
1490 p = None1303 :param backup_name: file name to save input in
1304 :param stdin: respective IO handlers
1305 :param stderr: respective IO handlers
1306 :return: what SSH process returns
1307 """
1308 ssh_process = None
1309 ssh_cmd = ["ssh", "-oStrictHostKeyChecking=no", "-i", ssh_private_key_file, "-p", str(ssh_port),
1310 "user_id_%s@%s" % (config["user_id"], job_params["ip"]), "/bin/cat - > %s" % backup_name]
1491 try:1311 try:
1492 ssh_cmd = ["ssh", "-oStrictHostKeyChecking=no", "-i", ssh_private_key]
1493 ssh_cmd.append("-p")
1494 ssh_cmd.append(str(ssh_port))
1495 ssh_cmd.append("user_id_%s@%s" % (config["user_id"], job_params["ip"]))
1496 ssh_cmd.append("/bin/cat - > %s" % backup_name)
1497 logger.debug("Starting SSH process: %r" % ssh_cmd)1312 logger.debug("Starting SSH process: %r" % ssh_cmd)
1498 p = subprocess.Popen(ssh_cmd, stdin=stdin, stdout=subprocess.PIPE, stderr=stderr)1313 ssh_process = subprocess.Popen(ssh_cmd, stdin=stdin, stdout=subprocess.PIPE, stderr=stderr)
1499 except:1314 except OSError as err:
1500 logger.error(traceback.format_exc())1315 logger.error("Failed to run command %r. %s" % (ssh_cmd, err))
1501 return p1316 return ssh_process
15021317
15031318
1504# Starts GPG process, encrypts STDIN and outputs in into STDOUT1319def start_gpg_cmd(stdin, stderr):
1505# Inputs1320 """
1506# config - backup config1321 Starts GPG process, encrypts STDIN and outputs in into STDOUT
1507# stdin, stderr - respective IO handlers1322 :param stdin: respective IO handlers
1508# Returns what GPG process returns1323 :param stderr: respective IO handlers
15091324 :return: what GPG process returns
1510def start_gpg_cmd(config, stdin, stderr):1325 """
1511 global logger1326 gpg_process = None
1512 global ssh_private_key1327 gpg_cmd = ["gpg", "--encrypt", "--yes", "--batch", "--no-permission-warning", "--quiet", "--recipient", server_id]
1513 global ssh_port
1514 global server_id
1515
1516 p = None
1517 try:1328 try:
1518 gpg_cmd = ["gpg", "--encrypt", "--yes", "--batch", "--no-permission-warning", "--quiet"]
1519 gpg_cmd.append("--recipient")
1520 gpg_cmd.append("%s" % (server_id))
1521
1522 logger.debug("Starting GPG process: %r" % gpg_cmd)1329 logger.debug("Starting GPG process: %r" % gpg_cmd)
1523 p = subprocess.Popen(gpg_cmd, stdin=stdin, stdout=subprocess.PIPE, stderr=stderr)1330 gpg_process = subprocess.Popen(gpg_cmd, stdin=stdin, stdout=subprocess.PIPE, stderr=stderr)
1524 except:1331 except OSError as err:
1525 logger.error(traceback.format_exc())1332 logger.error("Failed to run command %r. %s" % (gpg_cmd, err))
1526 return p1333 return gpg_process
15271334
1528
1529# Takes backup copy with XtraBackup
1530# Inputs
1531# config - backup config
1532# job - job order
1533# Returns
1534# True - if backup successfully taken
1535# False - if backup failed
15361335
1537def take_backup_xtrabackup(config, job):1336def take_backup_xtrabackup(config, job):
1538 global logger1337 """
1539 global server_id1338 # Takes backup copy with XtraBackup
15401339 :param config: backup config
1541 suffix = "tar"1340 :param job: job order
1341 :return: True if backup was successfully taken or False if it has failed
1342 """
1343 suffix = "xbstream"
1542 backup_name = "server_id_%s_%s.%s.gpg" % (server_id, datetime.now().isoformat(), suffix)1344 backup_name = "server_id_%s_%s.%s.gpg" % (server_id, datetime.now().isoformat(), suffix)
1543 extra_config = ""
1544 ret_code = 01345 ret_code = 0
15451346 if "params" not in job:
1347 logger.error("There are no params in the job order")
1348 return -1
1349 # Check that job order has all required parameters
1350 mandatory_params = ["ancestor", "backup_type", "ip", "lsn", "type", "volume_id"]
1351 for param in mandatory_params:
1352 if param not in job["params"]:
1353 logger.error("There is no %s in the job order" % param)
1354 return -1
1355 backup_type = job["params"]["backup_type"]
1356 xtrabackup_cmd = [
1357 "innobackupex",
1358 "--stream=xbstream",
1359 "--user=%s" % config["mysql_user"],
1360 "--password=%s" % config["mysql_password"],
1361 "--socket=%s" % get_unix_socket(),
1362 "--slave-info",
1363 "--safe-slave-backup",
1364 "--safe-slave-backup-timeout=3600"]
1365 if backup_type == 'incremental':
1366 last_lsn = job["params"]["lsn"]
1367 xtrabackup_cmd.append("--incremental")
1368 xtrabackup_cmd.append(".")
1369 xtrabackup_cmd.append("--incremental-lsn=%s" % last_lsn)
1370 else:
1371 xtrabackup_cmd.append(".")
1372 extra_config = gen_extra_config(config)
1373 if extra_config:
1374 xtrabackup_cmd.append("--defaults-extra-file=%s" % extra_config)
1375 err_descriptors = dict()
1376 for desc in ["gpg", "ssh", "xtrabackup"]:
1377 desc_file = ("/tmp/twindb.%s.err" % desc)
1378 try:
1379 err_descriptors[desc] = open(desc_file, "w+")
1380 except IOError as err:
1381 logger.error("Failed to open file %s. %s" % (desc_file, err))
1382 return -1
1546 try:1383 try:
1547 mysql_user = config["mysql_user"]1384 logger.debug("Starting XtraBackup process: %r" % xtrabackup_cmd)
1548 mysql_password = config["mysql_password"]1385 xbk_proc = subprocess.Popen(xtrabackup_cmd, stdout=subprocess.PIPE, stderr=err_descriptors["xtrabackup"])
1549 d = json.JSONDecoder()1386 except OSError as err:
1550 backup_type = job["params"]["backup_type"]1387 logger.error("Failed to run command %r. %s" % (xtrabackup_cmd, err))
1551 xtrabackup_cmd = ["innobackupex"]1388 return -1
1552 xtrabackup_cmd.append("--stream=xbstream")1389 gpg_proc = start_gpg_cmd(xbk_proc.stdout, err_descriptors["gpg"])
1553 xtrabackup_cmd.append("--user=%s" % mysql_user)1390 ssh_proc = start_ssh_cmd(config, job["params"], backup_name, gpg_proc.stdout, err_descriptors["ssh"])
1554 xtrabackup_cmd.append("--password=%s" % mysql_password)1391
1555 xtrabackup_cmd.append("--socket=%s" % get_unix_socket())1392 ssh_proc.wait()
1556 xtrabackup_cmd.append("--slave-info")1393 xbk_proc.wait()
1557 xtrabackup_cmd.append("--safe-slave-backup")1394 gpg_proc.wait()
1558 xtrabackup_cmd.append("--safe-slave-backup-timeout=3600")1395
1559 if backup_type == 'incremental':1396 ret_code_ssh = ssh_proc.returncode
1560 last_lsn = job["params"]["lsn"]1397 ret_code_gpg = gpg_proc.returncode
1561 xtrabackup_cmd.append("--incremental")1398 ret_code_xbk = xbk_proc.returncode
1562 xtrabackup_cmd.append(".")1399
1563 xtrabackup_cmd.append("--incremental-lsn=%s" % last_lsn)1400 err_str = dict()
1564 else:1401 for desc in ["gpg", "ssh", "xtrabackup"]:
1565 xtrabackup_cmd.append(".")1402 err_descriptors[desc].seek(0)
1566 extra_config = gen_extra_config(config)1403 err_str[desc] = err_descriptors[desc].read()
1567 if extra_config:1404 if not err_str[desc]:
1568 xtrabackup_cmd.append("--defaults-extra-file=%s" % extra_config)1405 err_str[desc] = "no output"
15691406
1570 xtr_err = open('/tmp/twindb.xtrabackup.err', "w+")1407 logger.info("XtraBackup stderr: " + err_str["xtrabackup"])
1571 gpg_err = open('/tmp/twindb.gpg.err', "w+")1408 logger.info("GPG stderr: " + err_str["gpg"])
1572 ssh_err = open('/tmp/twindb.ssh.err', "w+")1409 logger.info("SSH stderr: " + err_str["ssh"])
1573 p1 = subprocess.Popen(xtrabackup_cmd, stdout=subprocess.PIPE, stderr=xtr_err)1410
1574 p2 = start_gpg_cmd(config, p1.stdout, gpg_err)1411 if ret_code_xbk == 0 and ret_code_gpg == 0 and ret_code_ssh == 0:
1575 p3 = start_ssh_cmd(config, job["params"], backup_name, p2.stdout, ssh_err)1412 lsn = grep_lsn(err_str["xtrabackup"])
15761413 if not lsn:
1577 p1.stdout.close()1414 logger.error("Could not find LSN in XtrabBackup output")
1578 p2.stdout.close()1415 return -1
1579 p3.communicate()1416 file_size = get_backup_size(config, job["params"], backup_name)
15801417 if not file_size:
1581 ret_code_ssh = p3.returncode1418 logger.error("Backup copy size must not be zero")
1582 ret_code_gpg = p2.wait()1419 return -1
1583 ret_code_xbk = p1.wait()1420 volume_id = job["params"]["volume_id"]
15841421 ancestor = job["params"]["ancestor"]
1585 xtr_err.seek(0)1422 if not record_backup(backup_name, volume_id, file_size, lsn, ancestor):
1586 gpg_err.seek(0)1423 logger.error("Failed to save backup copy details")
1587 ssh_err.seek(0)1424 return -1
15881425 else:
1589 xtrabackup_stderr = xtr_err.read()
1590 gpg_stderr = gpg_err.read()
1591 ssh_stderr = ssh_err.read()
1592
1593 if len(xtrabackup_stderr) == 0:
1594 xtrabackup_stderr = "no output"
1595 if len(gpg_stderr) == 0:
1596 gpg_stderr = "no output"
1597 if len(ssh_stderr) == 0:
1598 ssh_stderr = "no output"
1599
1600 logger.info("XtraBackup stderr: " + xtrabackup_stderr)
1601 logger.info("GPG stderr: " + gpg_stderr)
1602 logger.info("SSH stderr: " + ssh_stderr)
1603 ancestor = 0
1604
1605 if ret_code_xbk == 0 and ret_code_gpg == 0 and ret_code_ssh == 0:
1606 lsn = grep_lsn(xtrabackup_stderr)
1607 if lsn == None:
1608 logger.error("Could not find LSN in XtrabBackup output")
1609 return -1
1610 file_size = get_backup_size(config, job["params"], backup_name)
1611 if file_size == 0:
1612 logger.error("Backup copy size must not be zero")
1613 return -1
1614 job_id = job["job_id"]
1615 volume_id = job["params"]["volume_id"]
1616 ancestor = job["params"]["ancestor"]
1617 if not record_backup(job_id, backup_name, volume_id, file_size, lsn, ancestor):
1618 logger.error("Failed to save backup copy details")
1619 return -1
1620 else:
1621 logger.error("Failed to take backup")
1622 ret_code = -1
1623 except:
1624 logger.error("Failed to take backup")1426 logger.error("Failed to take backup")
1625 logger.error(traceback.format_exc())
1626 return -11427 return -1
1627 finally:1428 for f in [extra_config, "/tmp/twindb.xtrabackup.err", "/tmp/twindb.gpg.err", "/tmp/twindb.ssh.err"]:
1628 for f in [extra_config,1429 if os.path.isfile(f):
1629 "/tmp/twindb.xtrabackup.err",1430 try:
1630 "/tmp/twindb.gpg.err",
1631 "/tmp/twindb.ssh.err"]:
1632 if os.path.isfile(f):
1633 os.remove(f)1431 os.remove(f)
1432 except IOError as err:
1433 logger.error("Failed to remove file %s. %s" % (f, err))
1634 return ret_code1434 return ret_code
16351435
16361436
1637# Generates MySQL config with datadir option
1638# Inputs
1639# config - backup config
1640# Returns
1641# File name with MySQL config
1642# None - if error happened
1643
1644def gen_extra_config(config):1437def gen_extra_config(config):
1645 global logger1438 """
16461439 Generates MySQL config with datadir option
1647 extra_config = None1440 :param config: backup config
1441 :return: File name with MySQL config or None if error happened
1442 """
1648 try:1443 try:
1649 f, extra_config = tempfile.mkstemp()1444 f, extra_config = tempfile.mkstemp()
1650 os.write(f, "[mysqld]\n")1445 os.write(f, "[mysqld]\n")
1651 con = get_mysql_connection(1446 con = get_mysql_connection(config["mysql_user"], config["mysql_password"])
1652 user=config["mysql_user"],
1653 passwd=config["mysql_password"])
1654 cur = con.cursor()1447 cur = con.cursor()
1655 cur.execute("SELECT @@datadir")1448 cur.execute("SELECT @@datadir")
1656 row = cur.fetchone()1449 row = cur.fetchone()
1657 os.write(f, 'datadir="%s"\n' % row[0])1450 os.write(f, 'datadir="%s"\n' % row[0])
1658 cur.close()1451 cur.close()
1659 os.close(f)1452 os.close(f)
1660 except:1453 except IOError as err:
1661 logger.error("Failed to generate extra defaults file")1454 logger.error("Failed to generate extra defaults file. %s" % err)
1662 logger.error(traceback.format_exc())
1663 extra_config = None1455 extra_config = None
1664 return extra_config1456 return extra_config
16651457
16661458
1667# Checks if binary log is enabled in local MySQL
1668# Inputs
1669# config - backup config
1670# Returns
1671# True - binary log enabled
1672# False - binary log disabled
1673
1674def is_binlog_enabled(config):
1675 global logger
1676
1677 result = False
1678 try:
1679 con = MySQLdb.connect(user=config["mysql_user"], passwd=config["mysql_password"])
1680 with con:
1681 cur = con.cursor()
1682 cur.execute("SELECT @@GLOBAL.log_bin")
1683 row = cur.fetchone()
1684 if row[0] == "1":
1685 result = True
1686 else:
1687 result = False
1688 cur.close()
1689 del con
1690 except:
1691 logger.error("Failed to check if binlog is enabled")
1692 logger.error(traceback.format_exc())
1693 return result
1694
1695
1696# Logs in to TwinDB storage and get size of backup
1697# Inputs
1698# config - backup config
1699# backup_name - file name with backup
1700# Returns
1701# Size of backup in bytes
1702# Zero if error happened
1703
1704def get_backup_size(config, job_params, backup_name):1459def get_backup_size(config, job_params, backup_name):
1705 global logger1460 """
1706 global ssh_private_key1461 Logs in to TwinDB storage and get size of backup
1707 global ssh_port1462 :param config: backup config
1708 backup_size = 01463 :param job_params: job parameters
17091464 :param backup_name: file name with backup
1465 :return: size of backup in bytes or zeor if error happened
1466 """
1710 logger.debug("Getting size of %s" % backup_name)1467 logger.debug("Getting size of %s" % backup_name)
1468 ssh_cmd = ["ssh", "-oStrictHostKeyChecking=no", "-i", ssh_private_key_file, "-p", str(ssh_port),
1469 "user_id_%s@%s" % (config["user_id"], job_params["ip"]), "/bin/du -b %s" % backup_name]
1711 try:1470 try:
1712 ssh_cmd = ["ssh", "-oStrictHostKeyChecking=no", "-i", ssh_private_key, "-p", str(ssh_port),
1713 "user_id_%s@%s" % (config["user_id"], job_params["ip"]), "/bin/du -b %s" % backup_name]
1714
1715 process = subprocess.Popen(ssh_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)1471 process = subprocess.Popen(ssh_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
1716 cout, cerr = process.communicate()1472 cout, cerr = process.communicate()
17171473
@@ -1726,30 +1482,28 @@
1726 except subprocess.CalledProcessError as e:1482 except subprocess.CalledProcessError as e:
1727 logger.error("Failed to get size of backup %s" % backup_name)1483 logger.error("Failed to get size of backup %s" % backup_name)
1728 logger.error(str(e))1484 logger.error(str(e))
1485 return 0
1729 except OSError as e:1486 except OSError as e:
1730 logger.error("Failed to get size of backup %s" % backup_name)1487 logger.error("Failed to get size of backup %s" % backup_name)
1731 logger.error("Command execution failed: %s" % str(e))1488 logger.error("Failed to run command %r: %s" % (ssh_cmd, e))
1732 except:1489 return 0
1733 logger.error("Failed to get size of backup %s" % backup_name)1490 logger.debug("Size of %s = %d bytes (%s)" % (backup_name, backup_size, h_size(backup_size)))
1734 logger.error(traceback.format_exc())
1735 else:
1736 logger.debug("Size of %s = %d bytes (%s)" % (backup_name, backup_size, h_size(backup_size)))
1737
1738 return backup_size1491 return backup_size
17391492
17401493
1741def handler_send_key(config, job):1494def handler_send_key(job):
1742 """1495 """
1743 Processes send_key job1496 Processes send_key job
1744 """1497 """
1745 # Get owner of the GPG key1498 # Get owner of the GPG key
1499 cmd_1 = ["gpg", "--list-packets"]
1746 try:1500 try:
1747 gpg_pub_key = job["params"]["gpg_pub_key"]1501 gpg_pub_key = job["params"]["gpg_pub_key"]
1748 if gpg_pub_key:1502 if gpg_pub_key:
1749 cmd_1 = ["gpg", "--list-packets"]
1750 logger.debug("Starting %r" % cmd_1)1503 logger.debug("Starting %r" % cmd_1)
1751 p1 = subprocess.Popen(cmd_1, stdin=subprocess.PIPE, stdout=subprocess.PIPE)1504 p1 = subprocess.Popen(cmd_1, stdin=subprocess.PIPE, stdout=subprocess.PIPE)
1752 cout, cerr = p1.communicate(gpg_pub_key)1505 cout, cerr = p1.communicate(gpg_pub_key)
1506 keyid = "Unknown"
1753 for line in cout.split("\n"):1507 for line in cout.split("\n"):
1754 if "keyid:" in line:1508 if "keyid:" in line:
1755 keyid = line.replace("keyid:", "").strip()1509 keyid = line.replace("keyid:", "").strip()
@@ -1759,420 +1513,388 @@
1759 logger.error("Requestor public key is empty")1513 logger.error("Requestor public key is empty")
1760 return1514 return
1761 except OSError as err:1515 except OSError as err:
1762 logger.error(err)1516 logger.error("Failed to run command %r: %s" % (cmd_1, err))
1763 return1517 return
1764 # Import public GPG key. It's a user public key sent by the dispatcher1518 # Import public GPG key. It's a user public key sent by the dispatcher
1765 try:1519 try:
1766 logger.debug("Importing requestor's key %s" % keyid)1520 logger.debug("Importing requestor's key %s" % keyid)
1767 cmd_1 = ["gpg", "--import"]1521 cmd_1 = ["gpg", "--import"]
1768 p1 = subprocess.Popen(cmd_1, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)1522 p1 = subprocess.Popen(cmd_1, stdin=subprocess.PIPE)
1769 p1.communicate(gpg_pub_key)1523 p1.communicate(gpg_pub_key)
1770 except OSError as err:1524 except OSError as err:
1771 logger.error(err)1525 logger.error("Failed to run command %r: %s" % (cmd_1, err))
1772 return1526 return
1773 # Get private key and encrypt it1527 # Get private key and encrypt it
1774 try:1528 gpg_pub_key = job["params"]["gpg_pub_key"]
1775 gpg_pub_key = job["params"]["gpg_pub_key"]1529 if gpg_pub_key:
1776 if gpg_pub_key:1530 logger.debug("Exporting private key of server %s" % server_id)
1777 logger.debug("Exporting private key of server %s" % server_id)1531 cmd_1 = ["gpg", "--armor", "--export-secret-key", server_id]
1778 cmd_1 = ["gpg", "--armor", "--export-secret-key", server_id]1532 cmd_2 = ["gpg", "--armor", "--encrypt", "--sign", "--batch", "-r", keyid, "--local-user", server_id,
1779 cmd_2 = ["gpg", "--armor", "--encrypt", "--sign", "--batch", "-r", keyid, "--local-user", server_id, "--trust-model", "always"]1533 "--trust-model", "always"]
1534 try:
1780 logger.debug("Starting %r" % cmd_1)1535 logger.debug("Starting %r" % cmd_1)
1781 p1 = subprocess.Popen(cmd_1, stdout=subprocess.PIPE)1536 p1 = subprocess.Popen(cmd_1, stdout=subprocess.PIPE)
1537 except OSError as err:
1538 logger.error("Failed to run command %r: %s" % (cmd_1, err))
1539 return
1540 try:
1782 logger.debug("Starting %r" % cmd_2)1541 logger.debug("Starting %r" % cmd_2)
1783 p2 = subprocess.Popen(cmd_2, stdin=p1.stdout, stdout=subprocess.PIPE)1542 p2 = subprocess.Popen(cmd_2, stdin=p1.stdout, stdout=subprocess.PIPE)
1784 cout, cerr = p2.communicate()1543 cout, cerr = p2.communicate()
1785 enc_private_key = cout1544 enc_private_key = cout
1786 logger.debug("Encrypted private key %s" % enc_private_key)1545 logger.debug("Encrypted private key %s" % enc_private_key)
1787 except OSError as err:1546 except OSError as err:
1788 logger.error(err)1547 logger.error("Failed to run command %r: %s" % (cmd_2, err))
1789 return1548 return
1790 # Now send the private key to dispatcher1549 # Now send the private key to dispatcher
1791 job_id = job["job_id"]1550 data = {
1792 data = {1551 "type": "send_key",
1793 "type": "send_key",1552 "params": {
1794 "params": {1553 "enc_private_key": enc_private_key,
1795 "enc_private_key": enc_private_key,1554 "job_id": job_id
1796 "job_id": job_id1555 }
1797 }1556 }
1798 }1557 get_response(data)
1799 get_response(data)1558 else:
18001559 logger.error("The job order requested send_key, but no public key was provided")
1801# Check if directory is empty1560
1802# Inputs1561
1803# dir - directory name1562def is_dir_empty(directory):
1804# Returns1563 """
1805# True - directory is empty1564 Checks if directory is empty
1806# False - directory is not empty1565 :param directory: directory name
18071566 :return: True if the directory is empty of False otherwise
1808def is_dir_empty(dir):1567 """
1809 return len(os.listdir(dir)) == 01568 return len(os.listdir(directory)) == 0
18101569
1811
1812# Meta fuction that calls actual restore fucntion depending on tool in backup config
1813# Inputs
1814# config - backup config
1815# job - job order
1816# Returns what actual restore function returned or -1 if the tool is not supported
18171570
1818def restore_backup(config, job):1571def restore_backup(config, job):
1819 global logger1572 """
18201573 Meta function that calls actual restore fucntion depending on tool in backup config
1821 ret = -11574 :param config: backup config
1822 logger.info("Starting restore job %r" % job)1575 :param job: job order
1823 log_start_job(int(job["job_id"]))1576 :return: what actual restore function returned or -1 if the tool is not supported
18241577 """
1825 if config["tool"] == "xtrabackup":1578 ret = None
1579 logger.info("Starting restore job: %s"
1580 % json.dumps(job, indent=4, sort_keys=True))
1581 notify_params = {"event": "start_job", "job_id": job["job_id"]}
1582 if log_job_notify(notify_params):
1826 ret = restore_xtrabackup(config, job)1583 ret = restore_xtrabackup(config, job)
1827 elif config["tool"] == "mysqldump":1584 notify_params = {"event": "stop_job", "job_id": job["job_id"], "ret_code": ret}
1828 ret = restore_mysqldump(config, job)1585 log_job_notify(notify_params)
1586 logger.info("Restore job is complete")
1829 else:1587 else:
1830 logger.error("Can't restore backup with unsupported tool %s" % config["tool"])1588 logger.error("Restore job can not start")
1831 ret = -1
1832 log_stop_job(int(job["job_id"]), ret)
1833 logger.info("Restore job is complete")
1834 return ret1589 return ret
18351590
18361591
1837# Extracts an Xtrabackup archive arc in dst_dir
1838# Inputs
1839# config - backup config
1840# arc - file name with archive to extract
1841# dst_dir - local destination directory
1842# Returns
1843# True - if archive is successfully extracted
1844# False - if error happened
1845
1846def extract_archive(config, arc, dst_dir):
1847 global logger
1848 global ssh_private_key
1849 global ssh_port
1850
1851 logger.info("Extracting %s in %s" % (arc, dst_dir))
1852 try:
1853 ssh_cmd = ["ssh", "-oStrictHostKeyChecking=no", "-i", ssh_private_key, "-p", str(ssh_port)]
1854 ssh_cmd.append(config["username"] + "@" + config["dst_ip"])
1855 ssh_cmd.append("/bin/cat %s" % arc)
1856
1857 gpg_cmd = ["gpg", "--decrypt"]
1858
1859 xb_cmd = ["xbstream", "-x"]
1860
1861 xb_err = open('/tmp/twindb.xb.err', "w+")
1862 gpg_err = open('/tmp/twindb.gpg.err', "w+")
1863 ssh_err = open('/tmp/twindb.ssh.err', "w+")
1864 logger.info("Starting: %r" % ssh_cmd)
1865 p1 = subprocess.Popen(ssh_cmd, stdout=subprocess.PIPE, stderr=ssh_err)
1866 logger.info("Starting: %r" % gpg_cmd)
1867 p2 = subprocess.Popen(gpg_cmd, stdin=p1.stdout, stdout=subprocess.PIPE, stderr=gpg_err)
1868 logger.info("Starting: %r" % xb_cmd)
1869 p3 = subprocess.Popen(xb_cmd, stdin=p2.stdout, stdout=subprocess.PIPE, stderr=mysql_err, cwd=dst_dir)
1870 p1.stdout.close()
1871 p2.stdout.close()
1872 p3.wait()
1873 xb_err.seek(0)
1874 gpg_err.seek(0)
1875 ssh_err.seek(0)
1876 logger.info("SSH stderr: " + ssh_err.read())
1877 logger.info("GPG stderr: " + gpg_err.read())
1878 logger.info("xbstream stderr: " + xb_err.read())
1879 if p3.returncode != 0:
1880 logger.info("Failed to extract backup %s into %s" % (arc, dst_dir))
1881 return False
1882 except:
1883 logger.info("Failed to extract backup %s into %s" % (arc, dst_dir))
1884 finally:
1885 todelete = ['/tmp/twindb.xb.err', '/tmp/twindb.gpg.err', '/tmp/twindb.ssh.err']
1886 for f in todelete:
1887 if os.path.isfile(f):
1888 os.remove(f)
1889 logger.info("Extracted successfully %s in %s" % (arc, dst_dir))
1890 return True
1891
1892
1893# Restores backup copy with XtraBackup
1894# Inputs
1895# config - backup config
1896# job - job order
1897# Returns
1898# True - if backup successfully restored
1899# False - if restore job failed
1900
1901def restore_xtrabackup(config, job):1592def restore_xtrabackup(config, job):
1902 global logger1593 """
1903 global ssh_private_key1594 # Restores backup copy with XtraBackup
1904 global ssh_port1595 # Inputs
19051596 # config - backup config
1906 backup_name = job["restore_backup_copy"]1597 # job - job order
1907 backup_name_base = os.path.splitext(backup_name)1598 # Returns
1908 dst_dir = job["restore_dir"]1599 # True - if backup successfully restored
1600 # False - if restore job failed
1601 :param config: backup config received from the dispatcher
1602 :param job: job order
1603 :return:
1604 """
1605 if "params" not in job:
1606 logger.error("There are no params in the job order")
1607 return -1
1608 # Check that job order has all required parameters
1609 mandatory_params = ["backup_copy_id", "restore_dir", "server_id"]
1610 for param in mandatory_params:
1611 if param not in job["params"]:
1612 logger.error("There is no %s in the job order" % param)
1613 return -1
1614 dst_dir = job["params"]["restore_dir"]
1909 try:1615 try:
1910 if os.path.isdir(dst_dir):1616 if os.path.isdir(dst_dir):
1911 if is_dir_empty(dst_dir):1617 if is_dir_empty(dst_dir):
1912 logger.info("Directory %s exists. But it's empty, so we can restore backup here" % dst_dir)1618 logger.info("Directory %s exists. But it's empty, so we can restore backup here" % dst_dir)
1913 else:1619 else:
1914 logger.error("Directory %s exists and isn't empty, so we can not restore backup here" % dst_dir)1620 logger.error("Directory %s exists and isn't empty, so we can not restore backup here" % dst_dir)
1915 return False1621 return -1
1916 else:1622 else:
1917 os.makedirs(dst_dir)1623 os.makedirs(dst_dir)
1918 except:1624 except IOError as err:
1625 logger.error(err)
1919 logger.error("Can't use directory %s as destination for backup" % dst_dir)1626 logger.error("Can't use directory %s as destination for backup" % dst_dir)
1920 logger.error(traceback.format_exc())1627 return -1
1921 return False1628
1922 full_copy = get_full_of(backup_name)1629 full_copy = get_full_of(job["params"]["backup_copy_id"])
1923 logger.info("Restoring backup %s in %s" % (backup_name, dst_dir))1630 full_copy_id = int(full_copy["backup_copy_id"])
1631 if not full_copy:
1632 logger.error("Failed to get full copy parameters")
1633 return -1
1634 logger.info("Restoring full copy %s in %s" % (full_copy["name"], dst_dir))
1924 try:1635 try:
1925 extract_archive(config, full_copy, dst_dir)1636 if not extract_archive(config, full_copy, dst_dir):
1637 raise JobError("Failed to extract %s" % full_copy["name"])
1926 # We restored last full backup in dst_dir1638 # We restored last full backup in dst_dir
1927 xb_err = open('/tmp/twindb.xb.err', "w+")1639 try:
1928 if full_copy == backup_name:1640 xb_err = open('/tmp/twindb.xb.err', "w+")
1641 except IOError as err:
1642 raise JobError("Failed to open /tmp/twindb.xb.err: %s" % err)
1643 if full_copy["backup_copy_id"] == job["params"]["backup_copy_id"]:
1929 xb_cmd = ["innobackupex", "--apply-log", dst_dir]1644 xb_cmd = ["innobackupex", "--apply-log", dst_dir]
1930 else:1645 else:
1931 xb_cmd = ["innobackupex", "--apply-log", "--redo-only", dst_dir]1646 xb_cmd = ["innobackupex", "--apply-log", "--redo-only", dst_dir]
1932 p_xb = subprocess.Popen(xb_cmd, stdout=xb_err, stderr=xb_err)1647 try:
1933 p_xb.wait()1648 p_xb = subprocess.Popen(xb_cmd, stdout=xb_err, stderr=xb_err)
1934 xb_err.seek(0)1649 p_xb.communicate()
1935 logger.info("innobackupex stderr: " + xb_err.read())1650 except OSError as err:
1936 os.remove('/tmp/twindb.xb.err')1651 raise JobError("Failed to run %r: %s" % (xb_cmd, err))
1652 finally:
1653 xb_err.seek(0)
1654 logger.info("innobackupex stderr: " + xb_err.read())
1655 os.remove('/tmp/twindb.xb.err')
1937 if p_xb.returncode != 0:1656 if p_xb.returncode != 0:
1938 logger.info("Failed to apply log on full copy %s" % (full_copy))1657 raise JobError("Failed to apply log on full copy %s" % full_copy["name"])
1939 return False
1940 # if we restore full backup return now1658 # if we restore full backup return now
1941 if full_copy == backup_name:1659 if full_copy_id == job["params"]["backup_copy_id"]:
1942 logger.info("Successfully restored backup %s in %s" % (backup_name, dst_dir))1660 logger.info("Successfully restored backup %s in %s" % (full_copy["name"], dst_dir))
1943 return True1661 return 0
1944 # Now copy all incremental copies and apply then on dst_dir1662 # Now copy all incremental copies and apply then on dst_dir
1945 inc_copy = full_copy1663 backup_copy = full_copy
1664 backup_copy_id = full_copy_id
1946 while True:1665 while True:
1947 inc_copy = get_child_of(full_copy)1666 backup_copy = get_child_of(backup_copy_id)
1667 if not backup_copy:
1668 raise JobError("Failed to get a child copy of backup_copy_id %d" % backup_copy_id)
1669 backup_copy_id = int(backup_copy["backup_copy_id"])
1948 inc_dir = tempfile.mkdtemp()1670 inc_dir = tempfile.mkdtemp()
1949 extract_archive(config, inc_copy, inc_dir)1671 if not extract_archive(config, backup_copy, inc_dir):
19501672 raise JobError("Failed to extract %s in %s" % (backup_copy["name"], inc_dir))
1951 xb_err = open('/tmp/twindb.xb.err', "w+")1673 try:
1952 xb_cmd = ["innobackupex", '--apply-log']1674 xb_err = open("/tmp/twindb.xb.err", "w+")
1953 if inc_copy != backup_name:1675 except IOError as err:
1954 xb_cmd.append('--redo-only')1676 raise JobError("Failed to open /tmp/twindb.xb.err: %s" % err)
1677 xb_cmd = ["innobackupex", "--apply-log"]
1678 if backup_copy_id != job["params"]["backup_copy_id"]:
1679 xb_cmd.append("--redo-only")
1955 xb_cmd.append('--incremental-dir=%s' % inc_dir)1680 xb_cmd.append('--incremental-dir=%s' % inc_dir)
1956 xb_cmd.append(dst_dir)1681 xb_cmd.append(dst_dir)
1957 p_xb = subprocess.Popen(xb_cmd, stdout=xb_err, stderr=xb_err)1682 try:
1958 p_xb.wait()1683 p_xb = subprocess.Popen(xb_cmd, stdout=xb_err, stderr=xb_err)
1959 xb_err.seek(0)1684 p_xb.wait()
1960 logger.info("innobackupex stderr: " + xb_err.read())1685 except OSError as err:
1961 os.remove('/tmp/twindb.xb.err')1686 raise JobError("Failed to run %r: %s" % (xb_cmd, err))
1687 finally:
1688 xb_err.seek(0)
1689 logger.info("innobackupex stderr: " + xb_err.read())
1690 os.remove('/tmp/twindb.xb.err')
1962 if p_xb.returncode != 0:1691 if p_xb.returncode != 0:
1963 logger.info("Failed to apply log on full copy %s" % (full_copy))1692 raise JobError("Failed to apply log on copy %s" % backup_copy["name"])
1964 return False1693 shutil.rmtree(inc_dir)
1965 os.removedirs(inc_dir)1694 if backup_copy_id == job["params"]["backup_copy_id"]:
1966 if inc_copy == backup_name:
1967 break1695 break
1968 logger.info("Successfully restored backup %s in %s" % (backup_name, dst_dir))1696 logger.info("Successfully restored backup %s in %s" % (backup_copy["name"], dst_dir))
1969 except:1697 except JobError as err:
1698 logger.error(err)
1970 logger.error("Failed to restore backup")1699 logger.error("Failed to restore backup")
1971 logger.error(traceback.format_exc())1700 return -1
1972 return False
1973 finally:1701 finally:
1974 for f in ["/tmp/twindb.xb.err", "/tmp/twindb.gpg.err", "/tmp/twindb.ssh.err"]:1702 for f in ["/tmp/twindb.xb.err", "/tmp/twindb.gpg.err", "/tmp/twindb.ssh.err"]:
1975 if os.path.isfile(f):1703 if os.path.isfile(f):
1976 os.remove(f)1704 os.remove(f)
1977 return True1705 return 0
19781706
19791707
1980# Restores backup copy taken with mysqldump1708def extract_archive(config, arc, dst_dir):
1981# Inputs1709 """
1982# config - backup config1710 Extracts an Xtrabackup archive arc in dst_dir
1983# job - job order1711 :param config: backup config
1984# Returns1712 :param arc: dictionary with archive to extract
1985# True - if backup successfully restored1713 :param dst_dir: local destination directory
1986# False - if restore job failed1714 :return: True - if archive is successfully extracted.
19871715 False - if error happened
1988def restore_mysqldump(config, job):1716 """
1989 global logger1717 mandatory_params = ["backup_copy_id", "name", "ip"]
1990 global ssh_private_key1718 for param in mandatory_params:
1991 global ssh_port1719 if param not in arc:
19921720 logger.error("There is no %s in the archive parameters" % param)
1993 ret = True
1994 backup_name = job["restore_backup_copy"]
1995 backup_name_base = os.path.splitext(backup_name)
1996 dst_dir = job["restore_dir"]
1997 try:
1998 if os.path.isdir(dst_dir):
1999 if is_dir_empty(dst_dir):
2000 logger.info("Directory %s exists. But it's empty, so we can restore backup here" % dst_dir)
2001 else:
2002 logger.error("Directory %s exists and isn't empty, so we can not restore backup here" % dst_dir)
2003 return False
2004 else:
2005 os.makedirs(dst_dir)
2006 except:
2007 logger.error("Can't use directory %s as destination for backup" % dst_dir)
2008 logger.error(traceback.format_exc())
2009 return False
2010 full_copy = get_full_of(backup_name)
2011 # Init mysql datadir
2012 logger.info("Initializing MySQL datadir in %s" % dst_dir)
2013 try:
2014 mysql_socket = dst_dir + "/twindb.sock"
2015 mysql_pid = dst_dir + "/mysqld.pid"
2016 mysql_install_db_cmd = ["mysql_install_db", "--no-defaults"]
2017 mysql_install_db_cmd.append("--datadir=%s" % dst_dir)
2018 mysql_install_db_cmd.append("--user=root")
2019 mysql_install_db_cmd.append("--skip-networking")
2020 mysql_install_db_cmd.append("--log-error=/dev/stdout")
2021 mysql_install_db_cmd.append("--pid-file=%s" % mysql_pid)
2022 mysql_install_db_cmd.append("--socket=%s" % mysql_socket)
2023 p = subprocess.Popen(mysql_install_db_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
2024 out, err = p.communicate()
2025 ret = p.returncode
2026 del p
2027 if ret != 0:
2028 logger.error("Couldn't init MySQL datadir in " + dst_dir)
2029 logger.error("Command: %r" % mysql_install_db_cmd)
2030 logger.error("STDOUT: %s" % out)
2031 logger.error("STDERR: %s" % err)
2032 return False1721 return False
2033 else:1722 logger.info("Extracting %s in %s" % (arc["name"], dst_dir))
2034 logger.info("MySQL datadir in %s is successfully created" % dst_dir)1723 ssh_cmd = [
2035 except:1724 "ssh",
2036 logger.error("Can't init MySQL datadir in " + dst_dir)1725 "-oStrictHostKeyChecking=no",
2037 logger.error(traceback.format_exc())1726 "-i", ssh_private_key_file,
2038 return False1727 "-p", str(ssh_port),
2039 # Start MySQL instance1728 "user_id_%s@%s" % (config["user_id"], arc["ip"]),
2040 logger.info("Starting MySQL instance in %s" % dst_dir)1729 "/bin/cat %s" % arc["name"]
2041 try:1730 ]
2042 mysqld_err = open('/tmp/twindb.mysqld.err', "w+")1731 gpg_cmd = ["gpg", "--decrypt"]
2043 mysqld_cmd = ["mysqld", "--no-defaults"]1732 xb_cmd = ["xbstream", "-x"]
2044 mysqld_cmd.append("--datadir=%s" % dst_dir)1733
2045 mysqld_cmd.append("--user=root")1734 desc_file = None
2046 mysqld_cmd.append("--skip-networking")1735 try:
2047 mysqld_cmd.append("--skip-grant-tables")1736 err_desc = dict()
2048 mysqld_cmd.append("--log-error=/dev/stdout")1737 for desc in ["xb", "gpg", "ssh"]:
2049 mysqld_cmd.append("--pid-file=%s" % mysql_pid)1738 desc_file = ("/tmp/twindb.%s.err" % desc)
2050 mysqld_cmd.append("--socket=%s" % mysql_socket)1739 err_desc[desc] = open(desc_file, "w+")
2051 mysqld_proc = subprocess.Popen(mysqld_cmd, stdout=mysqld_err, stderr=mysqld_err)1740 except IOError as err:
2052 start_timeout = 51741 logger.error("Failed to open %s: %s" % (desc_file, err))
2053 while start_timeout >= 0:1742 return False
2054 if start_timeout == 0:1743
2055 logger.error("Can't start MySQL instance in " + dst_dir)1744 logger.info("Starting: %r" % ssh_cmd)
2056 try:1745 p1 = subprocess.Popen(ssh_cmd, stdout=subprocess.PIPE, stderr=err_desc["ssh"])
2057 mysqld_proc.terminate()1746 logger.info("Starting: %r" % gpg_cmd)
2058 except:1747 p2 = subprocess.Popen(gpg_cmd, stdin=p1.stdout, stdout=subprocess.PIPE,
2059 logger.info("MySQL process is already terminated")1748 stderr=err_desc["gpg"])
2060 finally:1749 logger.info("Starting: %r" % xb_cmd)
2061 if os.path.isfile("/tmp/twindb.mysqld.err"):1750 p3 = subprocess.Popen(xb_cmd, stdin=p2.stdout, stdout=subprocess.PIPE,
2062 os.remove("/tmp/twindb.mysqld.err")1751 stderr=err_desc["xb"], cwd=dst_dir)
2063 mysqld_err.seek(0)1752 p1.wait()
2064 logger.error("mysqld output: " + mysqld_err.read())1753 p2.wait()
2065 mysqld_err.close()1754 p3.wait()
2066 return False1755 for desc in ["xb", "gpg", "ssh"]:
2067 try:1756 err_desc[desc].seek(0)
2068 con = MySQLdb.connect(unix_socket=mysql_socket)1757
2069 logger.info("Started MySQL instance in %s successfully" % dst_dir)1758 logger.info("SSH stderr: " + err_desc["ssh"].read())
2070 start_timeout = -11759 logger.info("GPG stderr: " + err_desc["gpg"].read())
2071 del con1760 logger.info("xbstream stderr: " + err_desc["xb"].read())
2072 except:1761 if p1.returncode != 0 or p2.returncode != 0 or p3.returncode != 0:
2073 start_timeout -= 11762 logger.info("Failed to extract backup %s in %s" % (arc["name"], dst_dir))
2074 time.sleep(1)1763 return False
2075 except:1764 desc_file = None
2076 mysqld_err.seek(0)1765 try:
2077 logger.error("Can't start MySQL instance in " + dst_dir)1766 for desc in ["xb", "gpg", "ssh"]:
2078 logger.error("mysqld output: " + mysqld_err.read())1767 desc_file = ("/tmp/twindb.%s.err" % desc)
2079 logger.error(traceback.format_exc())1768 if os.path.isfile(desc_file):
2080 mysqld_err.close()1769 os.remove(desc_file)
2081 try:1770 except IOError as err:
2082 mysqld_proc.terminate()1771 logger.error("Failed to open %s: %s" % (desc_file, err))
2083 except:1772 logger.info("Extracted successfully %s in %s" % (arc["name"], dst_dir))
2084 logger.info("MySQL process is already terminated")
2085 finally:
2086 if os.path.isfile("/tmp/twindb.mysqld.err"):
2087 os.remove("/tmp/twindb.mysqld.err")
2088 return False
2089 # Copy, decrypt and restore a dump
2090 logger.info("Restoring backup %s in %s" % (backup_name, dst_dir))
2091 try:
2092 ssh_cmd = ["ssh", "-oStrictHostKeyChecking=no", "-i", ssh_private_key, "-p", str(ssh_port)]
2093 ssh_cmd.append(config["username"] + "@" + config["dst_ip"])
2094 ssh_cmd.append("/bin/cat %s" % backup_name)
2095
2096 gpg_cmd = ["gpg", "--decrypt"]
2097
2098 mysql_cmd = ["mysql", "-S", mysql_socket]
2099
2100 mysql_err = open('/tmp/twindb.mysql.err', "w+")
2101 gpg_err = open('/tmp/twindb.gpg.err', "w+")
2102 ssh_err = open('/tmp/twindb.ssh.err', "w+")
2103 logger.info("Starting: %r" % ssh_cmd)
2104 p1 = subprocess.Popen(ssh_cmd, stdout=subprocess.PIPE, stderr=ssh_err)
2105 logger.info("Starting: %r" % gpg_cmd)
2106 p2 = subprocess.Popen(gpg_cmd, stdin=p1.stdout, stdout=subprocess.PIPE, stderr=gpg_err)
2107 logger.info("Starting: %r" % mysql_cmd)
2108 p3 = subprocess.Popen(mysql_cmd, stdin=p2.stdout, stdout=subprocess.PIPE, stderr=mysql_err)
2109 p1.stdout.close()
2110 p2.stdout.close()
2111 out3 = p3.communicate()[0]
2112 mysql_err.seek(0)
2113 gpg_err.seek(0)
2114 ssh_err.seek(0)
2115 logger.info("SSH stderr: " + ssh_err.read())
2116 logger.info("GPG stderr: " + gpg_err.read())
2117 logger.info("mysql stderr: " + mysql_err.read())
2118 logger.info("Successfully restored backup %s in %s" % (backup_name, dst_dir))
2119 ret = p3.returncode
2120 # Stop MySQL
2121 mysqld_proc.terminate()
2122 except:
2123 logger.error("Failed to restore backup")
2124 logger.error(traceback.format_exc())
2125 return False
2126 finally:
2127 for f in ["/tmp/twindb.mysql.err", "/tmp/twindb.gpg.err", "/tmp/twindb.ssh.err"]:
2128 if os.path.isfile(f):
2129 os.remove(f)
2130 try:
2131 mysqld_proc.terminate()
2132 except:
2133 logger.info("MySQL process is already terminated")
2134 return True1773 return True
21351774
21361775
2137# Finds LSN in XtraBackup output1776def get_full_of(backup_copy_id):
21381777 """
2139def grep_lsn(str):1778 Gets full copy of a given backup_copy_id
1779 :param backup_copy_id: backup_copy_id
1780 :return: dictionary with full backup copy parameters:
1781 {
1782 "backup_copy_id": 188,
1783 "name": "server_id_479a41b3-d22d-41a8-b7d3-4e40302622f6_2015-04-06T15:10:46.050984.tar.gp",
1784 "ip": "127.0.0.1"
1785 }
1786 """
1787 logger.debug("Getting full copy parameters of backup_copy_id %d" % backup_copy_id)
1788 response_body = "Empty response"
1789 full_copy = None
1790 try:
1791 data = {
1792 "type": "get_full_copy_params",
1793 "params": {
1794 "backup_copy_id": backup_copy_id
1795 }
1796 }
1797 response_body = get_response(data)
1798 if not response_body:
1799 logger.debug("Empty response from dispatcher")
1800 return None
1801 d = json.JSONDecoder()
1802 response_body_decoded = d.decode(response_body)
1803 if response_body_decoded:
1804 msg_decrypted = decrypt(response_body_decoded["response"])
1805 msg_pt = d.decode(msg_decrypted)
1806 full_copy = msg_pt["data"]
1807 logger.info("Got full copy params:\n%s"
1808 % json.dumps(full_copy, indent=4, sort_keys=True))
1809 if msg_pt["error"]:
1810 logger.error(msg_pt["error"])
1811 except exceptions.KeyError as err:
1812 logger.error("Failed to decode %s" % response_body)
1813 logger.error(err)
1814 return None
1815 return full_copy
1816
1817
1818def get_child_of(backup_copy_id):
1819 """
1820 Returns a child copy params of a given backup copy
1821 :param backup_copy_id:
1822 :return:
1823 """
1824 logger.debug("Getting child copy parameters of backup_copy_id %d" % backup_copy_id)
1825 response_body = "Empty response"
1826 child_copy = None
1827 try:
1828 data = {
1829 "type": "get_child_copy_params",
1830 "params": {
1831 "backup_copy_id": backup_copy_id
1832 }
1833 }
1834 response_body = get_response(data)
1835 if not response_body:
1836 return None
1837 d = json.JSONDecoder()
1838 response_body_decoded = d.decode(response_body)
1839 if response_body_decoded:
1840 msg_decrypted = decrypt(response_body_decoded["response"])
1841 msg_pt = d.decode(msg_decrypted)
1842 child_copy = msg_pt["data"]
1843 logger.info("Got child copy params:\n%s"
1844 % json.dumps(child_copy, indent=4, sort_keys=True))
1845 if msg_pt["error"]:
1846 logger.error(msg_pt["error"])
1847 except exceptions.KeyError as err:
1848 logger.error("Failed to decode %s" % response_body)
1849 logger.error(err)
1850 return None
1851 return child_copy
1852
1853
1854def grep_lsn(output):
1855 """
1856 Finds LSN in XtraBackup output
1857 :param output: string with Xtrabackup output
1858 :return: LSN
1859 """
2140 lsn = None1860 lsn = None
2141 for line in str.split("\n"):1861 for line in output.split("\n"):
2142 if line.startswith("xtrabackup: The latest check point (for incremental):"):1862 if line.startswith("xtrabackup: The latest check point (for incremental):"):
2143 lsn = line.split("'")[1]1863 lsn = line.split("'")[1]
2144 return lsn1864 return lsn
21451865
21461866
2147# Finds MySQL socket
2148def get_unix_socket():1867def get_unix_socket():
2149 socket = ""1868 """
1869 Finds MySQL socket
1870 :return: path to unix socket or None if not found
1871 """
1872 cmd = ["lsof", "-U", "-c", "/^mysqld$/", "-a", "-F", "n"]
2150 try:1873 try:
2151 cmd = [
2152 "lsof", "-U", "-c", "/^mysqld$/", "-a", "-F", "n"
2153 ]
2154 p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)1874 p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
2155 cout, cerr = p.communicate()1875 cout, cerr = p.communicate()
2156 # Outputs socket in format1876 # Outputs socket in format
2157 # # lsof -U -c mysqld -a -F n1877 # # lsof -U -c mysqld -a -F n
2158 # p110291878 # p11029
2159 # n/var/lib/mysql/mysql.sock1879 # n/var/lib/mysql/mysql.sock
2160 socket = cout.split()[1][1:]1880 mysql_socket = cout.split()[1][1:]
2161 if not os.path.exists(socket):1881 if not os.path.exists(mysql_socket):
2162 return None1882 return None
2163 except:1883 except OSError as err:
1884 logger.error("Failed to run command %r. %s" % (cmd, err))
2164 return None1885 return None
21651886 return mysql_socket
2166 return socket
21671887
21681888
2169def pid_exists(pid):1889def pid_exists(pid):
2170 """Check whether pid exists in the current process table."""1890 """
1891 Checks whether pid exists in the current process table.
1892 """
2171 if pid < 0:1893 if pid < 0:
2172 return False1894 return False
2173 try:1895 try:
2174 os.kill(pid, 0)1896 os.kill(pid, 0)
2175 except OSError, e:1897 except OSError as e:
2176 return e.errno == errno.EPERM1898 return e.errno == errno.EPERM
2177 else:1899 else:
2178 return True1900 return True
@@ -2194,12 +1916,12 @@
2194 Raise TimeoutExpired on timeout expired (if specified).1916 Raise TimeoutExpired on timeout expired (if specified).
2195 """1917 """
21961918
2197 def check_timeout(delay):1919 def check_timeout(timeout_delay):
2198 if timeout is not None:1920 if timeout is not None:
2199 if time.time() >= stop_at:1921 if time.time() >= stop_at:
2200 raise TimeoutExpired1922 raise TimeoutExpired
2201 time.sleep(delay)1923 time.sleep(timeout_delay)
2202 return min(delay * 2, 0.04)1924 return min(timeout_delay * 2, 0.04)
22031925
2204 if timeout is not None:1926 if timeout is not None:
2205 waitcall = lambda: os.waitpid(pid, os.WNOHANG)1927 waitcall = lambda: os.waitpid(pid, os.WNOHANG)
@@ -2247,100 +1969,155 @@
2247 raise RuntimeError("unknown process exit status")1969 raise RuntimeError("unknown process exit status")
22481970
22491971
2250# Removes pid file
2251# Exits if error happened
2252
2253def remove_pid():1972def remove_pid():
2254 global pid_file1973 """
2255 try:1974 Removes pid file
2256 if os.path.isfile(pid_file):1975 :return: nothing
1976 Exits if error happened
1977 """
1978 if os.path.isfile(pid_file):
1979 try:
2257 os.remove(pid_file)1980 os.remove(pid_file)
2258 except:1981 except IOError as err:
2259 logger.error(traceback.format_exc())1982 exit_on_error("Failed to remove file %s. %s" % (pid_file, err))
2260 sys.exit(2)1983
22611984
22621985def check_pid():
2263# Cleans up when TwinDB agent exists1986 """
1987 Checks if pid file already exists.
1988 If it does it detects whether twindb agent is already running.
1989 If the pid file is stale it removes it.
1990 :return: True if pid file doesn't exist or was stale and it was removed.
1991 False if twindb agent is running or error happened
1992 """
1993 if os.path.isfile(pid_file):
1994 pid = read_pid()
1995 if pid_exists(pid):
1996 try:
1997 f = open("/proc/%d/cmdline" % pid, "r")
1998 cmdline = f.readline()
1999 f.close()
2000 if "twindb.py" in cmdline:
2001 # The process is a live twindb agent
2002 return False
2003 else:
2004 # It's some other process, not a twindb agent
2005 return True
2006 except IOError as err:
2007 logger.error("Can't read from file /proc/%d/cmdline:%s " % (pid, err))
2008 return False
2009 else:
2010 try:
2011 os.remove(pid_file)
2012 # It's a stale pid file
2013 return True
2014 except IOError as err:
2015 logger.error("Can't remove file %s: %s" % (pid_file, err))
2016 return False
2017 else:
2018 # pid file doesn't exist
2019 return True
2020
2021
2022def read_pid():
2023 """
2024 Read pid from pid_file
2025 :return: pid or zero if pid file doesn't exist
2026 """
2027 try:
2028 f = open(pid_file, 'r')
2029 pid = int(f.readline())
2030 f.close()
2031 except IOError as err:
2032 logger.error("Couldn't read from %s: %s" % (pid_file, err))
2033 return 0
2034 return int(pid)
2035
2036
2037def write_pid():
2038 """
2039 Writes pid of the current process to the pid file
2040 :return: nothing.
2041 Exists if error happened
2042 """
2043 try:
2044 f = open(pid_file, "w")
2045 f.write(str(os.getpid()))
2046 f.close()
2047 except IOError as err:
2048 logger.error("Couldn't save process id in " + pid_file)
2049 exit_on_error(err)
2050
22642051
2265def cleanup(signum, frame):2052def cleanup(signum, frame):
2266 global logger2053 """
2267 global pid_file2054 Cleans up when TwinDB agent exists
2055 :param signum:
2056 :param frame:
2057 :return:
2058 """
22682059
2269 logger.info("Cleaning up on signal " + str(signum))2060 logger.info("Cleaning up on signal " + str(signum))
2270 remove_pid()2061 logger.debug("Frame %r" % frame)
2271 logger.info("TwinDB agent is ready to exit")2062 logger.info("TwinDB agent is ready to exit")
2272 sys.exit(0)2063 sys.exit(0)
22732064
22742065
2275# Reports error removes pid and exits2066def exit_on_error(message):
22762067 """
2277def exit_on_error(tb=None):2068 Reports error removes pid and exits
2278 if tb is not None:2069 :rtype : object
2279 logger.debug(tb)2070 :param message: message to display
2280 remove_pid()2071 :return:
2072 """
2073 logger.error(message)
2074 logger.debug(traceback.format_exc())
2281 sys.exit(2)2075 sys.exit(2)
22822076
22832077
2284# Stops TwinDB agent2078# Stops TwinDB agent
22852079
2286def stop():2080def stop():
2287 global logger2081 """
2288 global pid_file2082 Stops TwinDB agent
2083 :return: nothing
2084 """
22892085
2290 logger.info("Shutting down TwinDB agent")2086 logger.info("Shutting down TwinDB agent")
22912087
2292 if not os.path.exists(pid_file):2088 if not os.path.exists(pid_file):
2293 logger.info("Pid file %s does not exist. Probably twindb agent isn't running" % pid_file)2089 logger.info("Pid file %s does not exist. Probably twindb agent isn't running" % pid_file)
2294 sys.exit(0)2090 sys.exit(0)
22952091 pid = None
2296 try:2092 try:
2297 f = open(pid_file, 'r')2093 f = open(pid_file, 'r')
2298 pid = int(f.readline())2094 pid = int(f.readline())
2299 os.kill(pid, signal.SIGTERM)2095 os.kill(pid, signal.SIGTERM)
2300 wait_pid(pid, 300)2096 wait_pid(pid, 300)
2301 f.close()2097 f.close()
2098 remove_pid()
2302 except OSError as err:2099 except OSError as err:
2303 logger.error("Couldn't kill process %d" % pid)2100 logger.error("Couldn't kill process %d" % pid)
2304 logger.error(err)2101 exit_on_error(err)
2305 exit_on_error(traceback.format_exc())
2306 except IOError as err:2102 except IOError as err:
2307 logger.error("Couldn't read from %s" % pid_file)2103 logger.error("Couldn't read from %s" % pid_file)
2308 logger.error(err)2104 exit_on_error(err)
2309 exit_on_error(traceback.format_exc())
2310 except:
2311 logger.error("Unexpected error: %s,%s" % sys.exc_info()[:2]);
2312 exit_on_error(traceback.format_exc())
2313 finally:
2314 remove_pid()
2315 logger.info("TwinDB agent successfully shut down")2105 logger.info("TwinDB agent successfully shut down")
2316 sys.exit(0)2106 sys.exit(0)
23172107
23182108
2319# Starts TwinDB agent
2320
2321def start():2109def start():
2322 global logger2110 """
2323 global pid_file2111 Starts TwinDB agent
2324 global check_period2112 :return: nothing
23252113 """
2114 global job_id
2326 logger.info("Starting TwinDB agent")2115 logger.info("Starting TwinDB agent")
23272116
2328 if os.path.isfile(pid_file):2117 if check_pid():
2329 logger.error("PID file " + pid_file + " exists");2118 write_pid()
2330 logger.error("Check if TwinDB agent is already running");2119 else:
2331 logger.error("If not, remove file " + pid_file + " manually and restart the agent");2120 exit_on_error("Another instance of TwinDB agent is running. Exiting")
2332 sys.exit(2)
2333 try:
2334 f = open(pid_file, 'w')
2335 f.write(str(os.getpid()))
2336 f.close()
2337 except IOError as err:
2338 logger.error("Couldn't save process id in " + pid_file)
2339 logger.error(err)
2340 exit_on_error(traceback.format_exc())
2341 except:
2342 logger.error("Unexpected error: %s,%s" % sys.exc_info()[:2]);
2343 exit_on_error(traceback.format_exc())
2344 try:2121 try:
2345 while True:2122 while True:
2346 read_config()2123 read_config()
@@ -2348,7 +2125,7 @@
2348 if not config:2125 if not config:
2349 time.sleep(check_period)2126 time.sleep(check_period)
2350 continue2127 continue
2351 report_sss()2128 report_sss(config)
2352 report_agent_privileges(config)2129 report_agent_privileges(config)
2353 job = get_job()2130 job = get_job()
2354 if job:2131 if job:
@@ -2356,130 +2133,111 @@
2356 time.sleep(check_period)2133 time.sleep(check_period)
2357 except SystemExit:2134 except SystemExit:
2358 remove_pid()2135 remove_pid()
2359 except:
2360 exit_on_error(traceback.format_exc())
2361 sys.exit(0)2136 sys.exit(0)
23622137
23632138
2364# Processes job
2365# Inputs
2366# config - backup config
2367# job - job order
2368# Returns what respective job function returns or False if error happens
2369def process_job(config, job):2139def process_job(config, job):
2370 global logger2140 """
2141 Processes job
2142 :param config: backup config
2143 :param job: job order
2144 :return: what respective job function returns or False if error happens
2145 """
2371 global job_id2146 global job_id
2147 # Check to see that the twindb_agent MySQL user has enough privileges
2148 username = config["mysql_user"]
2149 password = config["mysql_password"]
2150
2151 job_id = int(job["job_id"])
23722152
2373 try:2153 try:
2374 # Check to see that the twindb_agent MySQL user has enough privileges
2375 username = config["mysql_user"]
2376 password = config["mysql_password"]
2377
2378 mysql_access_available, missing_mysql_privileges = has_mysql_access(username, password, grant_capability=False)2154 mysql_access_available, missing_mysql_privileges = has_mysql_access(username, password, grant_capability=False)
2379 if not mysql_access_available:2155 if not mysql_access_available:
2380 logger.error("The MySQL user %s does not have all the required privileges." % username)2156 logger.error("The MySQL user %s does not have all the required privileges." % username)
2381 if len(missing_mysql_privileges) > 0:2157 if missing_mysql_privileges:
2382 logger.error("You can grant the required privileges by executing the following SQL: "2158 raise JobError("You can grant the required privileges by executing the following SQL: "
2383 "GRANT %s ON *.* TO '%s'@'localhost'" % (','.join(missing_mysql_privileges), username))2159 "GRANT %s ON *.* TO '%s'@'localhost'" % (','.join(missing_mysql_privileges), username))
23842160 if not job["start_scheduled"]:
2385 return False2161 raise JobError("Job start time isn't set")
2386
2387 if job["start_scheduled"] == None:
2388 logger.error("Job start time isn't set")
2389 return False
2390
2391 start_scheduled = int(job["start_scheduled"])2162 start_scheduled = int(job["start_scheduled"])
2392 now = int(time.time())2163 now = int(time.time())
2393 if now < start_scheduled:2164 if now < start_scheduled:
2394 logger.info("Job is scheduled on %s, now %s"2165 raise JobTooSoonError("Job is scheduled on %s, now %s"
2395 % (time.ctime(start_scheduled), time.ctime(now)))2166 % (time.ctime(start_scheduled), time.ctime(now)))
2396 return False
2397
2398 logger.info("Processing job_id = %d", int(job_id))2167 logger.info("Processing job_id = %d", int(job_id))
2399 if job["type"] == "backup":2168 if job["type"] == "backup":
2400 return take_backup(config, job)2169 return take_backup(config, job)
2401 elif job["type"] == "restore":2170 elif job["type"] == "restore":
2402 #return restore_backup(config, job)2171 return restore_backup(config, job)
2403 return False
2404 elif job["type"] == "send_key":2172 elif job["type"] == "send_key":
2405 return handler_send_key(config, job)2173 return handler_send_key(job)
2406 else:2174 else:
2407 logger.error("Unsupported job type " + job["type"])2175 raise JobError("Unsupported job type " + job["type"])
2408 return False2176 except JobError as err:
2409 except:2177 logger.error("Job error: %s", err)
2410 logger.error("Unexpected error: %s,%s" % sys.exc_info()[:2])2178 return False
2179 except JobTooSoonError as err:
2180 logger.debug(err)
2411 return False2181 return False
2412 finally:2182 finally:
2413 job_id = 02183 job_id = 0
2414 return False
24152184
24162185
2417def setup_logging():2186def setup_logging():
2187 """
2188 Setups logging handlers, formats
2189 :return:
2190 """
2418 global logger2191 global logger
24192192
2420 ch = logging.StreamHandler()2193 console_handler = logging.StreamHandler()
2421 sh = logging.handlers.SysLogHandler()2194 remote_handler = RlogHandler()
2422 rh = RlogHandler()2195
2423 fmt_str = '%(name)s: %(levelname)s: %(funcName)s():%(lineno)d: %(message)s'2196 fmt_str = '%(name)s: %(levelname)s: %(funcName)s():%(lineno)d: %(message)s'
2424 sfmt = logging.Formatter(fmt_str)2197 remote_format = logging.Formatter(fmt_str)
2425 cfmt = logging.Formatter("%(asctime)s: " + fmt_str)2198 console_format = logging.Formatter("%(asctime)s: " + fmt_str)
2426 ch.setFormatter(cfmt)2199
2427 sh.setFormatter(sfmt)2200 console_handler.setFormatter(console_format)
2428 rh.setFormatter(sfmt)2201 remote_handler.setFormatter(remote_format)
2429 logger.addHandler(sh)2202
2430 logger.addHandler(ch)2203 logger.addHandler(console_handler)
2431 logger.addHandler(rh)2204 logger.addHandler(remote_handler)
2432 logger.setLevel(logging.INFO)2205 logger.setLevel(logging.INFO)
2433 # syslog handler shouldn't log DEBUG messages2206
2434 sh.setLevel(logging.INFO)
2435
2436
2437# Main function
2438# Parses options, creates log class etc
24392207
2440def main():2208def main():
2209 """
2210 Main function
2211 Parses options, creates log class etc
2212 :return:
2213 """
2214 global server_id
2215 global check_period
2216 global host
2217 global mysql_user
2218 global mysql_password
2219 global debug
2220 global logger
2221
2222 setup_logging()
2223 read_config()
2441 # before we do *anything* we must ensure server_id is generated or read from config2224 # before we do *anything* we must ensure server_id is generated or read from config
2442 global host
2443 global init_config
2444 global server_id
2445 global check_period
2446 global logger
2447 global mysql_user
2448 global mysql_password
2449 global debug
2450
2451 # Read or generate server id2225 # Read or generate server id
2452 if os.path.exists(init_config):2226 if not server_id:
2453 try:
2454 execfile(init_config, globals())
2455 if len(server_id) == 0:
2456 print("Error: Config %s doesn't set server_id"
2457 % init_config, file=sys.stderr)
2458 sys.exit(2)
2459 except:
2460 print(traceback.format_exc())
2461 print("Error: Failed to read from config in %s"
2462 % init_config, file=sys.stderr)
2463 sys.exit(2)
2464 else:
2465 server_id = str(uuid.uuid4())2227 server_id = str(uuid.uuid4())
2466 save_config(server_id)2228 save_config()
24672229
2468 # At this point server_id must be set2230 # At this point server_id must be set
2469 if len(server_id) == 0:2231 if not server_id:
2470 print("Error: Server id is empty. Can not continue operation",2232 print("Error: Server id is empty. Can not continue operation", file=sys.stderr)
2471 file=sys.stderr)
2472 sys.exit(2)2233 sys.exit(2)
2473
2474 setup_logging()
2475
2476 # Signal handlers2234 # Signal handlers
2477 signal.signal(signal.SIGTERM, cleanup)2235 signal.signal(signal.SIGTERM, cleanup)
2478 signal.signal(signal.SIGINT, cleanup)2236 signal.signal(signal.SIGINT, cleanup)
24792237
2480 # Not clear why I ignore SIGCHLD.2238 # Not clear why I ignore SIGCHLD.
2481 # it make subprocess.call throw exception, so I comment it out so far2239 # it make subprocess.call throw exception, so I comment it out so far
2482 #signal.signal(signal.SIGCHLD, signal.SIG_IGN)2240 # signal.signal(signal.SIGCHLD, signal.SIG_IGN)
24832241
2484 opts = []2242 opts = []
2485 try:2243 try:
@@ -2489,15 +2247,11 @@
2489 "unregister", "delete-backups", "backup", "is-registered"])2247 "unregister", "delete-backups", "backup", "is-registered"])
2490 except getopt.GetoptError as err:2248 except getopt.GetoptError as err:
2491 logger.error(err)2249 logger.error(err)
2492 exit_on_error(traceback.format_exc())
2493 except:
2494 logger.error("Unexpected error: %s,%s" % sys.exc_info()[:2]);
2495 usage()
2496 exit_on_error(traceback.format_exc())
24972250
2498 # Set options first2251 # Set options first
2499 action = None2252 action = None
2500 delete_backups = False2253 delete_backups = False
2254 regcode = None
2501 for opt, arg in opts:2255 for opt, arg in opts:
2502 if opt == '-i':2256 if opt == '-i':
2503 check_period = int(arg)2257 check_period = int(arg)
@@ -2547,29 +2301,32 @@
2547 check_period = 12301 check_period = 1
25482302
2549 if action == "start":2303 if action == "start":
2550 if is_registered():2304 while True:
2551 start()2305 # It'll keep checking until the agent is registered
2552 else:2306 # Then it starts
2553 logger.error(2307 if is_registered():
2554 "The server must be registered first. Run following command:\n\ntwindb --register <registration code>\n")2308 start()
2555 logger.error("Get your code on https://console.twindb.com/?get_code")2309 else:
2556 sys.exit(2)2310 logger.error("The server must be registered first. Run following command:\n\n"
2311 "twindb --register <registration code>\n")
2312 logger.error("Get your code on https://console.twindb.com/?get_code")
2313 time.sleep(check_period)
2557 elif action == "stop":2314 elif action == "stop":
2558 stop()2315 stop()
2559 elif action == "register":2316 elif action == "register":
2560 action_handler_register(regcode)2317 action_handler_register(regcode)
2561 elif action == "unregister":2318 elif action == "unregister":
2562 if not is_registered():2319 if not is_registered():
2563 logger.error(2320 logger.error("The server must be registered first. Run following command:"
2564 "The server must be registered first. Run following command:\n\ntwindb --register <registration code>\n")2321 "\n\ntwindb --register <registration code>\n")
2565 logger.error("Get your code on https://console.twindb.com/?get_code")2322 logger.error("Get your code on https://console.twindb.com/?get_code")
2566 sys.exit(2)2323 sys.exit(2)
2567 if action_handler_unregister(delete_backups):2324 if action_handler_unregister(delete_backups):
2568 stop()2325 stop()
2569 elif action == "backup":2326 elif action == "backup":
2570 if not is_registered():2327 if not is_registered():
2571 logger.error(2328 logger.error("The server must be registered first. Run following command:"
2572 "The server must be registered first. Run following command:\n\ntwindb --register <registration code>\n")2329 "\n\ntwindb --register <registration code>\n")
2573 logger.error("Get your code on https://console.twindb.com/?get_code")2330 logger.error("Get your code on https://console.twindb.com/?get_code")
2574 sys.exit(2)2331 sys.exit(2)
2575 if not action_handler_backup():2332 if not action_handler_backup():
@@ -2583,9 +2340,8 @@
2583 sys.exit(0)2340 sys.exit(0)
2584 else:2341 else:
2585 # If we got there print usage() and exit2342 # If we got there print usage() and exit
2586 logger.error("Neither --start nor --stop nor --register is specified")2343 logger.error("Failed to parse command line options")
2587 usage()2344 usage()
2588 remove_pid()
2589 sys.exit(2)2345 sys.exit(2)
25902346
25912347

Subscribers

People subscribed via source and target branches

to all changes: