Merge lp:~akuzminsky/twindb-agent/restore into lp:twindb-agent

Proposed by Aleksandr Kuzminsky
Status: Merged
Merged at revision: 18
Proposed branch: lp:~akuzminsky/twindb-agent/restore
Merge into: lp:twindb-agent
Diff against target: 3480 lines (+1215/-1459)
1 file modified
twindb.py (+1215/-1459)
To merge this branch: bzr merge lp:~akuzminsky/twindb-agent/restore
Reviewer Review Type Date Requested Status
Ovais Tariq Pending
Review via email: mp+255386@code.launchpad.net
To post a comment you must log in.

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'twindb.py'
2--- twindb.py 2015-03-16 02:16:03 +0000
3+++ twindb.py 2015-04-07 16:22:26 +0000
4@@ -2,31 +2,34 @@
5 from __future__ import print_function
6 from __future__ import unicode_literals
7 from __future__ import absolute_import
8+import ConfigParser
9 from base64 import b64encode, b64decode
10 from datetime import datetime
11-import sys
12-import time
13+import errno
14+import exceptions
15 import getopt
16+import httplib
17+import json
18+import logging.handlers
19+import os
20+import pwd
21 import signal
22-import traceback
23-import exceptions
24-import errno
25+import shutil
26 import socket
27-import logging.handlers
28-import os.path
29 import subprocess
30-import httplib
31-import json
32+import sys
33+import tempfile
34+import time
35+import traceback
36 import urllib
37-
38+import uuid
39
40 try:
41 import mysql.connector
42 except ImportError:
43+ # On CentOS 5 mysql.connector is in python 2.6 directory
44 sys.path.insert(0, '/usr/lib/python2.6/site-packages')
45 import mysql.connector
46-import tempfile
47-import uuid
48
49 # global variables
50 host = "dispatcher.twindb.com"
51@@ -36,8 +39,8 @@
52 debug_http = True
53 debug_gpg = False
54 init_config = "/etc/twindb.cfg"
55-ssh_private_key = "/root/.ssh/twindb.key"
56-ssh_public_key = "/root/.ssh/twindb.key.pub"
57+ssh_private_key_file = "/root/.ssh/twindb.key"
58+ssh_public_key_file = "/root/.ssh/twindb.key.pub"
59 ssh_port = 4194
60 gpg_homedir = "/root/.gnupg/"
61 pid_file = "/var/run/twindb.pid"
62@@ -45,9 +48,10 @@
63 time_zone = "UTC"
64 api_email = "api@twindb.com"
65 server_id = ""
66-job_id = 0
67-mysql_user = "root"
68-mysql_password = ""
69+job_id = None
70+mysql_user = None
71+mysql_password = None
72+agent_version = "@@TWINDB_AGENT_VERSION@@"
73 api_pub_key = """
74 -----BEGIN PGP PUBLIC KEY BLOCK-----
75 Version: GnuPG v1
76@@ -102,24 +106,24 @@
77 =62IM
78 -----END PGP PUBLIC KEY BLOCK-----
79 """
80-agent_version = "@@TWINDB_AGENT_VERSION@@"
81-
82-
83-# Logging handler that logs to remote TwiDB dispatcher
84+
85+
86 class RlogHandler(logging.Handler):
87+ """
88+ Logging handler that logs to remote TwiDB dispatcher
89+ """
90+ log_flag = True
91+
92 def __init__(self):
93 logging.Handler.__init__(self)
94 self.log_flag = True
95
96 def emit(self, record):
97- global server_id
98- global job_id
99- global debug
100-
101- request = {}
102- request["type"] = "log"
103- request["params"] = {}
104- if job_id != 0:
105+ request = {
106+ "type": "log",
107+ "params": {}
108+ }
109+ if job_id:
110 request["params"]["job_id"] = job_id
111 request["params"]["msg"] = record.getMessage()
112 if self.log_flag and not debug:
113@@ -128,83 +132,106 @@
114 self.log_flag = True
115
116
117+class JobError(Exception):
118+ def __init__(self, value):
119+ self.value = value
120+
121+ def __str__(self):
122+ return self.value
123+
124+
125+class JobTooSoonError(Exception):
126+ def __init__(self, value):
127+ self.value = value
128+
129+ def __str__(self):
130+ return self.value
131+
132+
133 def usage():
134- print("Usage:\n\
135-twindb --start | --stop | --register <registration code> [-g] [-i interval]\n\
136- [-u user] [-p password]\n\
137-\n\
138- --start Start TwinDB agent.\n\
139- --stop Stop TwinDB agent.\n\
140-\n\
141- --register <code> Register this server in TwinDB.\n\
142- <code> is string like 7b90ae21ac642f2f8fc0a285c4789147\n\
143- Get your code on https://console.twindb.com/?get_code\n\
144-\n\
145- --unregister [--delete-backups] Unregister this server from TwinDB.\n\
146- --backup Start backup from this server immediately.\n\
147- --is-registered Checks whether the server is registered and prints YES or NO.\n\
148- -g Print debug message.\n\
149- -i interval Check for a job every interval seconds.\n\
150- Default 300.\n\
151- During registration TwinDB agent needs to get some information from MySQL,\n\
152- (master host etc)\n\
153- This is credentials to connect to local MySQL instance\n\
154- -u | --user username MySQL user name\n\
155- -p | --password password MySQL password")
156+ print("""Usage:
157+twindb --start | --stop | --register <registration code> [-g] [-i interval]
158+ [-u user] [-p password]
159+
160+ --start Start TwinDB agent.
161+ --stop Stop TwinDB agent.
162+
163+ --register <code> Register this server in TwinDB.
164+ <code> is string like 7b90ae21ac642f2f8fc0a285c4789147
165+ Get your code on https://console.twindb.com/?get_code
166+
167+ --unregister [--delete-backups] Unregister this server from TwinDB.
168+ --backup Start backup from this server immediately.
169+ --is-registered Checks whether the server is registered and prints YES or NO.
170+ -g Print debug message.
171+ -i interval Check for a job every interval seconds.
172+ Default {check_period}.
173+ During registration TwinDB agent needs to get some information from MySQL,
174+ (master host etc)
175+ This is credentials to connect to local MySQL instance
176+ -u | --user username MySQL user name
177+ -p | --password password MySQL password""".format(check_period=check_period))
178 return
179
180
181 def get_mysql_connection(user=None, passwd=None):
182- global mysql_user
183- global mysql_password
184-
185+ """
186+ Returns connection to the local MySQL instance.
187+ If user is passed as an argument it'll be used to connect,
188+ otherwise the second choice will be to use mysql_user.
189+ If neither user names are set the function will try to use either of MySQL option files
190+ (/etc/my.cnf, /etc/mysql/my.cnf, or /root/.my.cnf). If the option files don't exist
191+ it'll try to connect as root w/o password.
192+ """
193 try:
194- if user is None:
195- user = mysql_user
196- if passwd is None:
197- passwd = mysql_password
198- mysql_option_files = []
199 unix_socket = get_unix_socket()
200- conn = mysql.connector.connect()
201- if user is not None:
202- for f in ['/etc/mysql/my.cnf', '/etc/my.cnf', '/root/.my.cnf']:
203- if os.path.exists(f):
204- mysql_option_files.append(f)
205- if mysql_option_files:
206- conn = mysql.connector.connect(option_files=mysql_option_files, unix_socket=unix_socket)
207- logger.debug(
208- "Connected to MySQL as '%s'@'localhost' with options files %r" % (conn.user, mysql_option_files))
209- if not conn.is_connected():
210- conn = mysql.connector.connect(user=user, passwd=passwd, unix_socket=unix_socket)
211- logger.debug("Connected to MySQL as %s@localhost " % conn.user)
212+ if not user:
213+ if mysql_user:
214+ logger.debug('Using MySQL user specified in the command line')
215+ user = mysql_user
216+ passwd = mysql_password
217+ else:
218+ for options_file in ["/etc/my.cnf", "/etc/mysql/my.cnf", "/usr/etc/my.cnf",
219+ "/root/.my.cnf", "/root/.mylogin.cnf"]:
220+ if os.path.exists(options_file):
221+ config = ConfigParser.ConfigParser()
222+ config.read(options_file)
223+ for section in ["client", "twindb"]:
224+ if config.has_section(section):
225+ if config.has_option(section, "user"):
226+ user = config.get(section, "user")
227+ if config.has_option(section, "password"):
228+ passwd = config.get(section, "user")
229+ # If user isn't set by the function argument, global mysql_user
230+ # or MySQL options file connect as unix user w/ empty password
231+ if not user:
232+ user = pwd.getpwuid(os.getuid()).pw_name
233+ passwd = ""
234+ logger.debug("Connecting to MySQL as unix user %s" % user)
235+ conn = mysql.connector.connect(user=user, passwd=passwd, unix_socket=unix_socket)
236+ logger.debug("Connected to MySQL as %s@localhost " % conn.user)
237 except mysql.connector.Error as err:
238 logger.error("Can not connect to local MySQL server")
239 logger.error("MySQL said: %s" % err.msg)
240 return None
241- except:
242- logger.error("Can not connect to local MySQL server")
243- logger.error("Unexpected error: %s,%s" % sys.exc_info()[:2]);
244- return None
245 return conn
246
247
248-# Checks if TwinDB API public key is installed
249-# Returns
250-# True - if the key is installed
251-# False - if not
252 def is_gpg_key_installed(email, key_type="public"):
253- global logger
254- global gpg_homedir
255- global debug_gpg
256-
257+ """
258+ Checks if TwinDB API public key is installed
259+ Returns
260+ True - if the key is installed
261+ False - if not
262+ """
263 result = False
264- if len(email) == 0:
265+ if not email:
266 logger.error("Can not check public key of an empty email")
267 return False
268 if debug_gpg:
269 logger.debug("Checking if public key of %s is installed" % email)
270+ gpg_cmd = ["gpg", "--homedir", gpg_homedir]
271 try:
272- gpg_cmd = ["gpg", "--homedir", gpg_homedir]
273 if key_type == "public":
274 gpg_cmd.append("-k")
275 else:
276@@ -212,63 +239,51 @@
277 gpg_cmd.append(email)
278 if debug_gpg:
279 logger.debug(gpg_cmd)
280- devnull = open('/dev/null', 'w')
281+ devnull = open("/dev/null", "w")
282 ret = subprocess.call(gpg_cmd, stdout=devnull, stderr=devnull)
283- file.close(devnull)
284+ devnull.close()
285 if ret != 0:
286 if debug_gpg:
287- logger.debug("GPG returned %d" % ret);
288+ logger.debug("GPG returned %d" % ret)
289 logger.debug(key_type + " key " + email + " is NOT installed")
290 result = False
291 else:
292 if debug_gpg:
293 logger.debug(key_type + " key is already installed")
294 result = True
295- except:
296- logger.error("Couldn't get " + key_type + " key of %s" % email)
297- exit_on_error(traceback.format_exc())
298+ except OSError as err:
299+ logger.error("Couldn't run command %r. %s" % (gpg_cmd, err))
300+ exit_on_error("Couldn't get %s key of %s." % (key_type, email))
301 return result
302
303
304-# Installs TwinDB public key
305-# Returns
306-# True - if the key is successfully installed
307-# Exits if the key wasn't installed
308-
309 def install_api_pub_key():
310- global logger
311- global api_pub_key
312- global gpg_homedir
313- global debug_gpg
314-
315+ """
316+ Installs TwinDB public key
317+ Returns
318+ True - if the key is successfully installed
319+ Exits if the key wasn't installed
320+ """
321 logger.info("Installing twindb public key")
322+ gpg_cmd = ["gpg", "--homedir", gpg_homedir, "--import"]
323 try:
324- p1 = subprocess.Popen(["echo", api_pub_key], stdout=subprocess.PIPE)
325- p2 = subprocess.Popen(["gpg", "--homedir", gpg_homedir, "--import"],
326- stdin=p1.stdout)
327- p1.stdout.close()
328- except:
329- logger.error("Couldn't install TwinDB public key")
330- exit_on_error(traceback.format_exc())
331+ p = subprocess.Popen(gpg_cmd, stdin=subprocess.PIPE)
332+ p.communicate(api_pub_key)
333+ except OSError as err:
334+ logger.error("Couldn't run command %r. %s" % (gpg_cmd, err))
335+ exit_on_error("Couldn't install TwinDB public key")
336 logger.info("Twindb public key successfully installed")
337 return True
338
339
340-# Checks if GPG environment is good to start TwinDB agent
341-# Installs TwinDB public key if necessary
342-# Returns
343-# True - if the GPG environment good to proceed
344-# Exits if GPG wasn't configured correctly
345-
346 def check_gpg():
347- global logger
348- global api_email
349- global api_pub_key
350- global gpg_homedir
351- global init_config
352- global server_id
353- global debug_gpg
354-
355+ """
356+ Checks if GPG environment is good to start TwinDB agent
357+ Installs TwinDB public key if necessary
358+ Returns
359+ True - if the GPG environment good to proceed
360+ Exits if GPG wasn't configured correctly
361+ """
362 if debug_gpg:
363 logger.debug("Checking if GPG config is initialized")
364 if os.path.exists(gpg_homedir):
365@@ -282,13 +297,13 @@
366 try:
367 os.mkdir(gpg_homedir, 0700)
368 install_api_pub_key()
369- except:
370- logger.error("Couldn't create directory " + gpg_homedir)
371- exit_on_error(traceback.format_exc())
372- if len(server_id) == 0:
373+ except OSError as err:
374+ logger.error("Failed to create directory %s. %s" % (gpg_homedir, err))
375+ exit_on_error("Couldn't create directory " + gpg_homedir)
376+ if not server_id:
377 exit_on_error("Server id is neither read from config nor generated")
378 email = "%s@twindb.com" % server_id
379- if not ( is_gpg_key_installed(email) and is_gpg_key_installed(email, "private")):
380+ if not (is_gpg_key_installed(email) and is_gpg_key_installed(email, "private")):
381 gen_entropy()
382 gen_gpg_keypair("%s@twindb.com" % server_id)
383 if debug_gpg:
384@@ -296,13 +311,13 @@
385 return True
386
387
388-# Checks the environment if it's OK to start TwinDB agent
389-# Returns
390-# True - if the environment is OK
391-# Exits if the environment is not OK
392-
393 def check_env():
394- global logger
395+ """
396+ Checks the environment if it's OK to start TwinDB agent
397+ Returns
398+ True - if the environment is OK
399+ Exits if the environment is not OK
400+ """
401 logger.debug("Checking environment")
402 if os.getuid() != 0:
403 exit_on_error("TwinDB agent must be run by root")
404@@ -311,41 +326,24 @@
405 return True
406
407
408-# Checks how much entropy is available in the system
409-# If not enough, does some disk activity to generate more
410 def gen_entropy():
411+ """
412+ Checks how much entropy is available in the system
413+ If not enough, does some disk activity to generate more
414+ """
415 # Do nothing until I find good way to generate entropy
416 return
417- try:
418- f = open("/proc/sys/kernel/random/entropy_avail", "r")
419- entropy_avail = int(f.read())
420- f.close()
421- i = 0
422- while entropy_avail < 2048:
423- logger.info("Low entropy level %d. Will run 'find / -name file' to generate more")
424- cmd = ["find", "/", "-name", "file"]
425- p = subprocess.Popen(cmd)
426- p.wait()
427- f = open("/proc/sys/kernel/random/entropy_avail", "r")
428- entropy_avail = int(f.read())
429- f.close()
430- # Do not run find more than 10 times
431- if i > 10:
432- break
433- i = i + 1
434- except:
435- logger.error("Failed to generate entropy")
436- return
437-
438-
439-# Formats bytes count to human readable form
440-# Inputs:
441-# num - bytes count
442-# decimals - number of digits to save after point (Default 2)
443-# Returns
444-# human-readable string like "20.33 MB"
445+
446
447 def h_size(num, decimals=2):
448+ """
449+ Formats bytes count to human readable form
450+ Inputs:
451+ num - bytes count
452+ decimals - number of digits to save after point (Default 2)
453+ Returns
454+ human-readable string like "20.33 MB"
455+ """
456 fmt = "%3." + str(decimals) + "f %s"
457 for x in ['bytes', 'kB', 'MB', 'GB', 'TB', 'PB']:
458 if num < 1024.0:
459@@ -353,30 +351,19 @@
460 num /= 1024.0
461
462
463-# Generates random string of bytes good for crypthography
464-# Inputs
465-# n - Number of bytes
466-# Returns
467-# binary string of n bytes size
468-
469-def gen_key(n):
470- return os.urandom(n)
471-
472-
473-# Generates GPG private and public keys for a given recipient
474-# Inputs
475-# email - recipient
476-# Returns
477-# True on success or exits
478-
479 def gen_gpg_keypair(email):
480- global server_id
481- global gpg_homedir
482- global debug_gpg
483-
484- if len(email) == 0:
485+ """
486+ Generates GPG private and public keys for a given recipient
487+ Inputs
488+ email - recipient
489+ Returns
490+ True on success or exits
491+ """
492+ if not email:
493 exit_on_error("Can not generate GPG keypair for an empty email")
494+ gpg_cmd = ["gpg", "--homedir", gpg_homedir, "--batch", "--gen-key"]
495 try:
496+ logger.info("The agent needs to generate cryptographically strong keys.")
497 logger.info("Generating GPG keys pair for %s" % email)
498 logger.info("It may take really, really long time. Please be patient.")
499 gen_entropy()
500@@ -393,36 +380,25 @@
501 %%commit
502 %%echo done
503 """ % (server_id, email)
504- p1 = subprocess.Popen(["echo", gpg_script], stdout=subprocess.PIPE)
505- p2 = subprocess.Popen(["gpg", "--homedir", gpg_homedir, "--batch",
506- "--gen-key"], stdin=p1.stdout)
507- p1.stdout.close()
508- p2.wait()
509- del p1, p2
510- except:
511- logger.error("Failed to generate GPG keys pair")
512- exit_on_error(traceback.format_exc())
513+
514+ p = subprocess.Popen(gpg_cmd, stdin=subprocess.PIPE)
515+ p.communicate(gpg_script)
516+ except OSError as err:
517+ logger.error("Failed to run command %r. %s" % (gpg_cmd, err))
518+ exit_on_error("Failed to generate GPG keys pair")
519 return True
520
521
522-# Encrypts message with TwinDB public key
523-# If server_id is non-zero (which means the server is registered)
524-# signs the message with the server's private key
525-# Inputs
526-# msg - string to encrypt
527-# Returns
528-# 64-base encoded and encrypted message. To read - decrypt and 64-base decode
529-# EDIT: encrypted message
530-# None - if error happens
531-
532 def encrypt(msg):
533- global logger
534- global api_email
535- global gpg_homedir
536- global server_id
537- global debug_gpg
538-
539- if len(server_id) == 0:
540+ """
541+ Encrypts message with TwinDB public key
542+ If server_id is non-zero (which means the server is registered)
543+ signs the message with the server's private key
544+ :param msg: string to encrypt
545+ :return: 64-base encoded and encrypted message or None if error happens.
546+ To read the encrypted message - decrypt and 64-base decode
547+ """
548+ if not server_id:
549 exit_on_error("Trying to encrypt message while server_id is empty")
550 server_email = "%s@twindb.com" % server_id
551 if not is_gpg_key_installed(api_email):
552@@ -433,12 +409,10 @@
553 logger.debug("Public key of %s is not installed." % server_email)
554 logger.debug("Will not encrypt message")
555 return None
556- enc_cmd = ["gpg", "--homedir", gpg_homedir, "-r", api_email, "--batch",
557- "--trust-model", "always", "--armor"]
558- enc_cmd.append("--sign")
559- enc_cmd.append("--local-user")
560- enc_cmd.append(server_email)
561- enc_cmd.append("--encrypt")
562+ enc_cmd = ["gpg", "--homedir", gpg_homedir, "-r", api_email, "--batch", "--trust-model", "always", "--armor",
563+ "--sign", "--local-user", server_email, "--encrypt"]
564+ cout = "No output"
565+ cerr = "No output"
566 try:
567 if debug_gpg:
568 logger.debug("Encrypting message:")
569@@ -450,11 +424,14 @@
570 stdout=subprocess.PIPE,
571 stderr=subprocess.PIPE)
572 cout, cerr = p.communicate(msg)
573+ if p.returncode != 0:
574+ raise OSError(p.returncode)
575 ct = cout
576 ct_64 = b64encode(ct)
577 if debug_gpg:
578 logger.debug("Encrypted message: " + ct_64)
579- except:
580+ except OSError as err:
581+ logger.error("Failed to run command %r. %s" % (enc_cmd, err))
582 logger.error("Failed to encrypt message: " + msg)
583 logger.error("STDOUT: " + cout)
584 logger.error("STDERR: " + cerr)
585@@ -462,52 +439,34 @@
586 return ct_64
587
588
589-# Decrypts message with local private key
590-# Inputs
591-# msg - 64-base encoded and encrypted message.
592-# Before encryption the message was 64-base encoded
593-# Returns
594-# String - Plain text message
595-# None - if error happens
596-
597 def decrypt(msg_64):
598- global logger
599- global api_email
600- global gpg_homedir
601- global server_id
602- global debug_gpg
603-
604+ """
605+ Decrypts message with local private key
606+ :param msg_64: 64-base encoded and encrypted message. Before encryption the message was 64-base encoded
607+ :return: Plain text message or None if error happens
608+ """
609 if not msg_64:
610 logger.error("Will not decrypt empty message")
611 return None
612-
613- cout = None
614- cerr = None
615-
616+ cout = "No output"
617+ cerr = "No output"
618+ gpg_cmd = ["gpg", "--homedir", gpg_homedir, "-d", "-q"]
619 try:
620- dec_cmd = ["gpg", "--homedir", gpg_homedir, "-d", "-q"]
621- debug_gpg = True
622 if debug_gpg:
623 logger.debug("Decrypting message:")
624 logger.debug(msg_64)
625 logger.debug("Decryptor command:")
626- logger.debug(dec_cmd)
627- p = subprocess.Popen(dec_cmd,
628- stdin=subprocess.PIPE,
629- stdout=subprocess.PIPE,
630- stderr=subprocess.PIPE)
631+ logger.debug(gpg_cmd)
632+ p = subprocess.Popen(gpg_cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
633 msg = b64decode(msg_64)
634 cout, cerr = p.communicate(msg)
635-
636 if p.returncode != 0:
637 raise OSError(p.returncode)
638- except:
639+ except OSError as err:
640+ logger.error("Failed to run command %r. %s" % (gpg_cmd, err))
641 logger.error("Failed to decrypt message: " + msg_64)
642- logger.error("Unexpected error: %s, %s" % sys.exc_info()[:2]);
643- if cout:
644- logger.error("STDOUT: " + cout)
645- if cerr:
646- logger.error("STDERR: " + cerr)
647+ logger.error("STDOUT: " + cout)
648+ logger.error("STDERR: " + cerr)
649 return None
650 if debug_gpg:
651 logger.debug("Decrypted message:")
652@@ -515,26 +474,20 @@
653 return cout
654
655
656-# Sends HTTP POST request to TwinDB dispatcher
657-# It converts python data structure in "data" variable into JSON string,
658-# then encrypts it and then sends as variable "data" in HTTP request
659-# Inputs
660-# uri - URI to send the request
661-# data - Data structure with variables
662-# Returns
663-# String with body of HTTP response
664-# None - if error happened or empty response
665-
666 def get_response(request):
667- global logger
668- global host
669- global proto
670- global api_dir
671-
672+ """
673+ Sends HTTP POST request to TwinDB dispatcher
674+ It converts python data structure in "data" variable into JSON string,
675+ then encrypts it and then sends as variable "data" in HTTP request
676+ Inputs
677+ uri - URI to send the request
678+ data - Data structure with variables
679+ Returns
680+ String with body of HTTP response
681+ None - if error happened or empty response
682+ """
683 uri = "api.php"
684 response_body = None
685- conn = None
686-
687 logger.debug("Enter get_response(uri=" + uri + ")")
688 if proto == "http":
689 conn = httplib.HTTPConnection(host)
690@@ -542,8 +495,10 @@
691 conn = httplib.HTTPSConnection(host)
692 else:
693 logger.error("Unsupported protocol " + proto)
694+ return None
695+ url = proto + "://" + host + "/" + api_dir + "/" + uri
696+ http_response = "Empty response"
697 try:
698- url = proto + "://" + host + "/" + api_dir + "/" + uri
699 conn.connect()
700 logger.debug("Sending to " + host + ": %s" % json.dumps(request, indent=4, sort_keys=True))
701 data_json = json.dumps(request)
702@@ -588,46 +543,38 @@
703 logger.error("Please check that DNS server is reachable and works")
704 return None
705 except exceptions.KeyError as err:
706- logger.error("Failed to decode response from server %s" % http_response)
707+ logger.error("Failed to decode response from server: %s" % http_response)
708 logger.error("Could not find key %s" % err)
709 return None
710- except:
711- logger.error("Couldn't make HTTP request " + url)
712- logger.error("Unexpected error: %s, %s" % sys.exc_info()[:2]);
713- logger.debug(traceback.format_exc())
714- return None
715 finally:
716 conn.close()
717 return response_body
718
719
720-# Replaces password from config with ********
721-# Inputs
722-# config - python dictionary with backup config
723-# Returns
724-# Sanitized config
725-
726-def sanitize_config(c):
727- if not c:
728+def sanitize_config(config):
729+ """
730+ Replaces password from config with ********
731+ :param config: python dictionary with backup config
732+ :return: Sanitized config
733+ """
734+ if not config:
735 return None
736- cc = dict(c)
737- try:
738- cc["mysql_password"] = "********"
739- except:
740- logger.debug("Given config %r doesn't contain password" % c)
741- return cc
742-
743-
744-# Gets backup config from TwinDB dispatcher
745-# Returns
746-# Backup config
747-# None - if error happened
748+ sanitized_config = dict(config)
749+ if "mysql_password" in config:
750+ sanitized_config["mysql_password"] = "********"
751+ else:
752+ logger.debug("Given config %r doesn't contain password" % config)
753+ return sanitized_config
754+
755
756 def get_config():
757- global logger
758- global server_id
759-
760+ """
761+ Gets backup config from TwinDB dispatcher
762+ :return: Backup config or None if error happened
763+ """
764 logger.debug("Getting config for server_id = %s" % server_id)
765+ response_body = "Empty response"
766+ config = None
767 try:
768 data = {
769 "type": "get_config",
770@@ -648,114 +595,87 @@
771 if msg_pt["error"]:
772 logger.error(msg_pt["error"])
773 except exceptions.KeyError as err:
774- logger.error("Failed to decode %s" % response_body);
775- logger.error(err);
776- return None
777- except:
778- logger.error("Couldn't get backup config")
779- logger.error("Unexpected error: %s,%s" % sys.exc_info()[:2]);
780- logger.debug(traceback.format_exc())
781+ logger.error("Failed to decode %s" % response_body)
782+ logger.error(err)
783 return None
784 return config
785
786
787-# Reports slave status to TwinDB dispatcher
788-def report_sss():
789- global logger
790- global server_id
791-
792+def report_sss(config):
793+ """
794+ Reports slave status to TwinDB dispatcher
795+ :param config: server config received from the dispatcher
796+ :return: nothing
797+ """
798 logger.debug("Reporting SHOW SLAVE STATUS for server_id = %s" % server_id)
799- try:
800- ss = get_slave_status()
801- data = {
802- "type": "report_sss",
803- "params": {
804- "server_id": server_id,
805- "mysql_server_id": ss["mysql_server_id"],
806- "mysql_master_server_id": ss["mysql_master_server_id"],
807- "mysql_master_host": ss["mysql_master_host"],
808- "mysql_seconds_behind_master": ss["mysql_seconds_behind_master"],
809- "mysql_slave_io_running": ss["mysql_slave_io_running"],
810- "mysql_slave_sql_running": ss["mysql_slave_sql_running"],
811- }
812+ ss = get_slave_status(config["mysql_user"], config["mysql_password"])
813+ data = {
814+ "type": "report_sss",
815+ "params": {
816+ "server_id": server_id,
817+ "mysql_server_id": ss["mysql_server_id"],
818+ "mysql_master_server_id": ss["mysql_master_server_id"],
819+ "mysql_master_host": ss["mysql_master_host"],
820+ "mysql_seconds_behind_master": ss["mysql_seconds_behind_master"],
821+ "mysql_slave_io_running": ss["mysql_slave_io_running"],
822+ "mysql_slave_sql_running": ss["mysql_slave_sql_running"],
823 }
824- response_body = get_response(data)
825- except:
826- logger.error("Couldn't get report SHOW SLAVE STATUS")
827- logger.error("Unexpected error: %s,%s" % sys.exc_info()[:2]);
828- logger.debug(traceback.format_exc())
829- return None
830+ }
831+ get_response(data)
832 return
833
834
835-# Reports what privileges are given to the agent
836 def report_agent_privileges(config):
837- global logger
838- global server_id
839-
840+ """
841+ Reports what privileges are given to the agent
842+ :param config: server config received from the dispatcher
843+ :return: nothing
844+ """
845 logger.debug("Reporting agent privileges for server_id = %s" % server_id)
846- try:
847- mysql_user = config["mysql_user"]
848- mysql_password = config["mysql_password"]
849- except:
850- logger.error("Failed to read config")
851- logger.error("Unexpected error: %s,%s" % sys.exc_info()[:2])
852- return
853- try:
854- con = get_mysql_connection(user=mysql_user, passwd=mysql_password)
855- cursor = con.cursor()
856- query = "SELECT PRIVILEGE_TYPE FROM information_schema.USER_PRIVILEGES"
857- logger.debug("Sending query : %s" % query)
858- cursor.execute(query)
859-
860- privileges = dict()
861- privileges["Reload_priv"] = "N"
862- privileges["Lock_tables_priv"] = "N"
863- privileges["Repl_client_priv"] = "N"
864- privileges["Super_priv"] = "N"
865- privileges["Create_tablespace_priv"] = "N"
866-
867- for (priv,) in cursor:
868- if priv == "RELOAD":
869- privileges["Reload_priv"] = "Y"
870- elif priv == "LOCK TABLES":
871- privileges["Lock_tables_priv"] = "Y"
872- elif priv == "REPLICATION CLIENT":
873- privileges["Repl_client_priv"] = "Y"
874- elif priv == "SUPER":
875- privileges["Super_priv"] = "Y"
876- elif priv == "CREATE TABLESPACE":
877- privileges["Create_tablespace_priv"] = "Y"
878- data = {
879- "type": "report_agent_privileges",
880- "params": {
881- "Reload_priv": privileges["Reload_priv"],
882- "Lock_tables_priv": privileges["Lock_tables_priv"],
883- "Repl_client_priv": privileges["Repl_client_priv"],
884- "Super_priv": privileges["Super_priv"],
885- "Create_tablespace_priv": privileges["Create_tablespace_priv"]
886- }
887+ con = get_mysql_connection(user=config["mysql_user"], passwd=config["mysql_password"])
888+ cursor = con.cursor()
889+ query = "SELECT PRIVILEGE_TYPE FROM information_schema.USER_PRIVILEGES"
890+ logger.debug("Sending query : %s" % query)
891+ cursor.execute(query)
892+
893+ privileges = {
894+ "Reload_priv": "N",
895+ "Lock_tables_priv": "N",
896+ "Repl_client_priv": "N",
897+ "Super_priv": "N",
898+ "Create_tablespace_priv": "N"
899+ }
900+ for (priv,) in cursor:
901+ if priv == "RELOAD":
902+ privileges["Reload_priv"] = "Y"
903+ elif priv == "LOCK TABLES":
904+ privileges["Lock_tables_priv"] = "Y"
905+ elif priv == "REPLICATION CLIENT":
906+ privileges["Repl_client_priv"] = "Y"
907+ elif priv == "SUPER":
908+ privileges["Super_priv"] = "Y"
909+ elif priv == "CREATE TABLESPACE":
910+ privileges["Create_tablespace_priv"] = "Y"
911+ data = {
912+ "type": "report_agent_privileges",
913+ "params": {
914+ "Reload_priv": privileges["Reload_priv"],
915+ "Lock_tables_priv": privileges["Lock_tables_priv"],
916+ "Repl_client_priv": privileges["Repl_client_priv"],
917+ "Super_priv": privileges["Super_priv"],
918+ "Create_tablespace_priv": privileges["Create_tablespace_priv"]
919 }
920- get_response(data)
921- except:
922- logger.error("Couldn't get privileges granted to %s@localhost on server %s" % (mysql_user, server_id))
923- logger.error("Unexpected error: %s,%s" % sys.exc_info()[:2]);
924- logger.debug(traceback.format_exc())
925- return None
926+ }
927+ get_response(data)
928 return
929
930
931-# Gets job order from TwinDB dispatcher
932-# Returns
933-# Job order in python dictionary
934-# None - if error happened
935-
936 def get_job():
937- global logger
938- global server_id
939- global job_id
940+ """
941+ Gets job order from TwinDB dispatcher
942+ :return: Job order in python dictionary or None if error happened
943+ """
944 job = None
945-
946 logger.debug("Getting job for server_id = %s" % server_id)
947 try:
948 d = json.JSONDecoder()
949@@ -764,36 +684,41 @@
950 "params": {}
951 }
952 response_body = get_response(data)
953+ if not response_body:
954+ raise JobError("Empty response from dispatcher")
955 response_body_decoded = d.decode(response_body)
956+ if "response" not in response_body_decoded:
957+ raise JobError("There is no 'response' key in the response from dispatcher")
958+ if "success" not in response_body_decoded:
959+ raise JobError("There is no 'success' key in the response from dispatcher")
960 msg_enc = response_body_decoded["response"]
961- if response_body_decoded["success"] == True:
962+ if response_body_decoded["success"]:
963 job_json = decrypt(msg_enc)
964+ logger.debug("job_json = %s" % job_json)
965+ if "data" not in job_json:
966+ raise JobError("There is no 'data' in decrypted response %s" % job_json)
967 job = d.decode(job_json)["data"]
968- job["params"] = d.decode(job["params"])
969- job_id = job["job_id"]
970- logger.info("Got job:\n%s" % json.dumps(job, indent=4, sort_keys=True))
971+ if job and "params" in job and job["params"]:
972+ job["params"] = d.decode(job["params"])
973+ logger.info("Got job:\n%s" % json.dumps(job, indent=4, sort_keys=True))
974 else:
975 logger.error("Couldn't get job")
976 job_json = decrypt(msg_enc)
977 logger.error("Server said: %s" % d.decode(job_json)["error"])
978- logger.debug("Server said: %s" % d.decode(job_json)["debug"])
979 except exceptions.TypeError as err:
980+ logger.error("Failed to get a job: %s" % err)
981 return None
982- except:
983- logger.error("Couldn't get job")
984- logger.error("Unexpected error: %s,%s" % sys.exc_info()[:2]);
985+ except JobError as err:
986+ logger.error(err)
987 return None
988 return job
989
990
991-# Checks whether the server is registered or not
992-# Returns
993-# True - registered
994-# False - not so much
995 def is_registered():
996- global logger
997- global server_id
998-
999+ """
1000+ Checks whether the server is registered or not
1001+ :return: True if registered, False if not so much
1002+ """
1003 logger.debug("Getting registration status for server_id = %s" % server_id)
1004
1005 twindb_email = "%s@twindb.com" % server_id
1006@@ -801,13 +726,13 @@
1007
1008 enc_public_key = None
1009 # Reading the GPG key
1010+ gpg_cmd = ["gpg", "--homedir", gpg_homedir, "--armor", "--export", twindb_email]
1011 try:
1012- p1 = subprocess.Popen(["gpg", "--homedir", gpg_homedir, "--armor", "--export", twindb_email],
1013- stdout = subprocess.PIPE)
1014- enc_public_key = p1.stdout.read()
1015- except:
1016- logger.error("Failed to export GPG keys of %s from %s." % (twindb_email, gpg_homedir))
1017- exit_on_error(traceback.format_exc())
1018+ p = subprocess.Popen(gpg_cmd, stdout=subprocess.PIPE)
1019+ enc_public_key = p.communicate()[0]
1020+ except OSError as err:
1021+ logger.error("Failed to run command %r. %s" % (gpg_cmd, err))
1022+ exit_on_error("Failed to export GPG keys of %s from %s." % (twindb_email, gpg_homedir))
1023
1024 # Call the TwinDB api to check for server registration
1025 response_body = None
1026@@ -819,21 +744,17 @@
1027 "enc_public_key": enc_public_key
1028 }
1029 }
1030-
1031 response_body = get_response(data)
1032 if not response_body:
1033- return None
1034-
1035+ return False
1036 json_decoder = json.JSONDecoder()
1037 response_body_decoded = json_decoder.decode(response_body)
1038-
1039 if response_body_decoded:
1040 if "response" in response_body_decoded:
1041 msg_decrypted = decrypt(response_body_decoded["response"])
1042 if msg_decrypted is None:
1043 logger.debug("No valid response from dispatcher. Consider agent unregistered")
1044 return False
1045-
1046 msg_pt = json_decoder.decode(msg_decrypted)
1047 registration_status = msg_pt["data"]
1048 logger.debug("Got registration status:\n%s" % json.dumps(registration_status, indent=4, sort_keys=True))
1049@@ -844,216 +765,140 @@
1050 else:
1051 logger.debug("No valid response from dispatcher. Consider agent unregistered")
1052 return False
1053- except exceptions.KeyError:
1054- logger.error("Failed to decode %s" % response_body)
1055- exit_on_error(traceback.format_exc())
1056- except:
1057- logger.error("Couldn't get backup config")
1058- logger.error("Unexpected error: %s,%s" % sys.exc_info()[:2]);
1059- exit_on_error(traceback.format_exc())
1060+ except exceptions.KeyError as err:
1061+ exit_on_error("Failed to decode response from dispatcher: %s. %s" % (response_body, err))
1062 return False
1063
1064
1065-# Gets name of initial full backup that corresponds to given incremental backup
1066-# Inputs
1067-# f - Valid name of incremental backup
1068-# Returns
1069-# String with name of the initial full backup.
1070-# If full backup was given it will return the same name
1071-# None - if error happened
1072-
1073-def get_full_of(f):
1074- global logger
1075- global server_id
1076-
1077- try:
1078- logger.info("Getting full backup name of %s" % f)
1079- d = json.JSONDecoder()
1080- response = d.decode(get_response("get_full_of.php", {"backup_name": f}))
1081- msg_enc = response["data"]
1082- logger.debug("Reply from server %s" % msg_enc)
1083- full = d.decode(decrypt(msg_enc))
1084- logger.info("Got full backup name %s" % full["name"])
1085- except:
1086- logger.error("Couldn't get name of initial full backup of %s" % f)
1087- logger.error(traceback.format_exc())
1088- return None
1089- return full["name"]
1090-
1091-
1092-# Gets name of child backup copy of the given backup name
1093-# Inputs
1094-# f - Valid name of incremental or full backup
1095-# Returns
1096-# String with name of child backup copy of the given backup copy
1097-# None - if error happened
1098-
1099-def get_child_of(f):
1100- global logger
1101- global server_id
1102-
1103- try:
1104- logger.info("Getting child backup name of %s" % f)
1105- d = json.JSONDecoder()
1106- response = d.decode(get_response("get_child_of.php",
1107- {"backup_name": f}))
1108- msg_enc = response["data"]
1109- logger.debug("Reply from server %s" % msg_enc)
1110- child = d.decode(decrypt(msg_enc))
1111- logger.info("Got child backup name %s" % child["name"])
1112- except:
1113- logger.error("Couldn't get name of child backup of %s" % f)
1114- logger.error(traceback.format_exc())
1115- return None
1116- return child["name"]
1117-
1118-
1119-# Notifies a job event to TwinDB dispatcher
1120-# Inputs
1121-# job_id - id of a job in jobs table
1122-# params - { event: "start_job", job_id: job_id } or
1123-# { event: "stop_job", job_id: job_id, ret_code: ret }
1124-# Returns
1125-# True
1126-# False - if error happened
1127-
1128-def log_job_notify(job_id, params):
1129- global logging
1130- result = False
1131-
1132+def log_job_notify(params):
1133+ """
1134+ Notifies a job event to TwinDB dispatcher
1135+ :param params: { event: "start_job", job_id: job_id } or
1136+ { event: "stop_job", job_id: job_id, ret_code: ret }
1137+ :return: True of False if error happened
1138+ """
1139 logger.info("Sending event notification %s" % params["event"])
1140- try:
1141- data = {
1142- "type": "notify",
1143- "params": params
1144- }
1145- response_body = get_response(data)
1146- d = json.JSONDecoder()
1147- response_body_decoded = d.decode(response_body)
1148- msg_enc = response_body_decoded["response"]
1149- if response_body_decoded["success"] == True:
1150- logger.debug("Dispatcher acknowledged job_id = %d start" % job_id)
1151- result = True
1152- else:
1153- logger.error("Dispatcher didn't acknowledge job_id = %d start" % job_id)
1154- result = False
1155- except:
1156- logger.error("Unexpected error: %s,%s" % sys.exc_info()[:2]);
1157+ data = {
1158+ "type": "notify",
1159+ "params": params
1160+ }
1161+ response_body = get_response(data)
1162+ d = json.JSONDecoder()
1163+ response_body_decoded = d.decode(response_body)
1164+ if response_body_decoded["success"]:
1165+ logger.debug("Dispatcher acknowledged job_id = %d notification" % job_id)
1166+ result = True
1167+ else:
1168+ logger.error("Dispatcher didn't acknowledge job_id = %d notification" % job_id)
1169+ result = False
1170 return result
1171
1172
1173-# Saves details about backup copy in TwinDB dispatcher
1174-# Inputs
1175-# job_id - id of a job in jobs table
1176-# name - name of backup
1177-# size - size of the backup in bytes
1178-# lsn - last LSN if it was incremental backup
1179-# ancestor - Ansector of the backup (not used now)
1180-# Returns
1181-# JSON string with status of the request i.e. { "success": True }
1182-# None - if error happened
1183-
1184-def record_backup(job_id, name, volume_id, size, lsn=None, ancestor=0):
1185- global logger
1186-
1187- try:
1188- logger.info("Saving information about backup:")
1189- logger.info("File name : %s" % name)
1190- logger.info("Volume id : %d" % int(volume_id))
1191- logger.info("Size : %d (%s)" % (int(size), h_size(size) ))
1192- logger.info("Ancestor : %d" % int(ancestor))
1193- data = {
1194- "type": "update_backup_data",
1195- "params": {
1196- "job_id": job_id,
1197- "name": name,
1198- "volume_id": volume_id,
1199- "size": size,
1200- "lsn": lsn,
1201- "ancestor": ancestor
1202- }
1203+def record_backup(name, volume_id, size, lsn=None, ancestor=0):
1204+ """
1205+ Saves details about backup copy in TwinDB dispatcher
1206+ :param name: name of backup
1207+ :param volume_id: id of a volume where the backup copy is saved
1208+ :param size: size of the backup in bytes
1209+ :param lsn: last LSN if it was incremental backup
1210+ :param ancestor: Ansector of the backup (not used now)
1211+ :return: JSON string with status of the request i.e. { "success": True } or None if error happened
1212+ """
1213+ logger.info("Saving information about backup:")
1214+ logger.info("File name : %s" % name)
1215+ logger.info("Volume id : %d" % int(volume_id))
1216+ logger.info("Size : %d (%s)" % (int(size), h_size(size)))
1217+ logger.info("Ancestor : %d" % int(ancestor))
1218+ data = {
1219+ "type": "update_backup_data",
1220+ "params": {
1221+ "job_id": job_id,
1222+ "name": name,
1223+ "volume_id": volume_id,
1224+ "size": size,
1225+ "lsn": lsn,
1226+ "ancestor": ancestor
1227 }
1228- logger.debug("Saving a record %s" % data)
1229- response = get_response(data)
1230- if response <> None:
1231- jd = json.JSONDecoder()
1232- r = jd.decode(response)
1233- logger.debug(r)
1234- if r["success"]:
1235- logger.info("Saved backup copy details")
1236- return True
1237- else:
1238- logger.error("Failed to save backup copy details: "
1239- + jd.decode(decrypt(r["response"]))["error"])
1240- return False
1241- del jd
1242+ }
1243+ logger.debug("Saving a record %s" % data)
1244+ response = get_response(data)
1245+ if response:
1246+ jd = json.JSONDecoder()
1247+ r = jd.decode(response)
1248+ logger.debug(r)
1249+ if r["success"]:
1250+ logger.info("Saved backup copy details")
1251+ return True
1252 else:
1253- logger.error("Empty response from server")
1254+ logger.error("Failed to save backup copy details: "
1255+ + jd.decode(decrypt(r["response"]))["error"])
1256 return False
1257- except:
1258- logger.error("Failed to save backup copy details")
1259- logger.error("Unexpected error: %s,%s" % sys.exc_info()[:2]);
1260+ else:
1261+ logger.error("Empty response from server")
1262 return False
1263- return True
1264-
1265-
1266-# Reads config from /etc/twindb.cfg and sets server_id variable
1267-# Returns
1268-# True - if config was successfully read
1269-# Exits if error happened
1270+
1271
1272 def read_config():
1273- global init_config
1274+ """
1275+ Reads config from /etc/twindb.cfg and sets server_id variable
1276+ :return: True if config was successfully read
1277+ Exits if error happened
1278+ """
1279 global server_id
1280-
1281 try:
1282 if os.path.exists(init_config):
1283- execfile(init_config)
1284+ ns = {}
1285+ execfile(init_config, ns)
1286+ if "server_id" in ns:
1287+ server_id = ns["server_id"]
1288 else:
1289- exit_on_error("Config %s doesn't exist" % init_config)
1290- if len(server_id) == 0:
1291+ print("Config %s doesn't exist" % init_config, file=sys.stderr)
1292+ return False
1293+ if not server_id:
1294 exit_on_error("Config %s doesn't set server_id" % init_config)
1295- except:
1296- logger.error(traceback.format_exc())
1297- exit_on_error("Failed to read from config in %s" % init_config)
1298+ except IOError as err:
1299+ exit_on_error("Failed to read config file %s. %s" % (init_config, err))
1300 return True
1301
1302
1303-# Saves server_id variable in /etc/twindb.cfg
1304-# Returns
1305-# True - if config was successfully saved
1306-# Exits if error happened
1307-
1308-def save_config(server_id):
1309- global init_config
1310-
1311+def save_config():
1312+ """
1313+ Saves server_id variable in file init_config (/etc/twindb.cfg)
1314+ :return: True - if config was successfully saved
1315+ Exits if error happened
1316+ """
1317+ if not server_id:
1318+ exit_on_error("Can not save agent config file because server_id is empty")
1319 try:
1320- f = open(init_config, 'w')
1321+ f = open(init_config, "w")
1322 f.write("server_id='%s'\n" % server_id)
1323 f.close()
1324- except:
1325- print("Failed to save new config in %s" % init_config)
1326- sys.exit(2)
1327- return
1328-
1329-
1330-def get_slave_status():
1331- result = dict()
1332- # Read master host
1333+ except IOError as err:
1334+ exit_on_error("Failed to save new config in %s. %s" % (init_config, err))
1335+ return True
1336+
1337+
1338+def get_slave_status(user=None, passwd=None):
1339+ """
1340+ Reads SHOW SLAVE STATUS from the local server
1341+ :param user: user to connect to MySQL
1342+ :param passwd: password to connect to MySQL
1343+ :return: dictionary with SHOW SLAVE STATUS result
1344+ """
1345+ conn = get_mysql_connection(user, passwd)
1346+ if conn:
1347+ cursor = conn.cursor(dictionary=True)
1348+ else:
1349+ return None
1350+ result = {
1351+ "mysql_server_id": None,
1352+ "mysql_master_server_id": None,
1353+ "mysql_master_host": None,
1354+ "mysql_seconds_behind_master": None,
1355+ "mysql_slave_io_running": None,
1356+ "mysql_slave_sql_running": None
1357+ }
1358 try:
1359- conn = get_mysql_connection()
1360- if conn:
1361- cursor = conn.cursor(dictionary=True)
1362- else:
1363- return None
1364-
1365- result["mysql_server_id"] = None
1366- result["mysql_master_server_id"] = None
1367- result["mysql_master_host"] = None
1368- result["mysql_seconds_behind_master"] = None
1369- result["mysql_slave_io_running"] = None
1370- result["mysql_slave_sql_running"] = None
1371-
1372 cursor.execute("SHOW SLAVE STATUS")
1373 for row in cursor:
1374 result["mysql_master_server_id"] = row["Master_Server_Id"]
1375@@ -1061,35 +906,44 @@
1376 result["mysql_seconds_behind_master"] = row["Seconds_Behind_Master"]
1377 result["mysql_slave_io_running"] = row["Slave_IO_Running"]
1378 result["mysql_slave_sql_running"] = row["Slave_SQL_Running"]
1379-
1380+ except mysql.connector.Error as err:
1381+ logger.error("Could get SHOW SLAVE STATUS")
1382+ logger.error("MySQL Error: " % err)
1383+ return None
1384+ try:
1385 cursor.execute("SELECT @@server_id AS server_id")
1386 for row in cursor:
1387 result["mysql_server_id"] = row["server_id"]
1388-
1389 cursor.close()
1390 conn.close()
1391- except:
1392- logger.error("Could not read Master_host from MySQL")
1393- logger.error("Unexpected error: %s,%s" % sys.exc_info()[:2]);
1394+ except mysql.connector.Error as err:
1395+ logger.error("Could not read server_id")
1396+ logger.error("MySQL Error: " % err)
1397 return None
1398 return result
1399
1400
1401 def has_mysql_access(username=None, password=None, grant_capability=True):
1402+ """
1403+ Reports if a user has all required MySQL privileges
1404+ :param username: MySQL user
1405+ :param password: MySQL password
1406+ :param grant_capability: TODO to
1407+ :return: a pair of a boolean that tells whether MySQL user has all required privileges
1408+ and a list of missing privileges
1409+ """
1410 has_required_grants = False
1411
1412 # list of missing privileges
1413 missing_privileges = []
1414
1415 try:
1416- if username is None or password is None:
1417- conn = get_mysql_connection()
1418- else:
1419- conn = get_mysql_connection(username, password)
1420+ conn = get_mysql_connection(username, password)
1421
1422 if isinstance(conn, mysql.connector.MySQLConnection):
1423 cursor = conn.cursor(dictionary=True)
1424 else:
1425+ missing_privileges = ['RELOAD', 'SUPER', 'LOCK TABLES', 'REPLICATION CLIENT', 'CREATE TABLESPACE']
1426 return has_required_grants, missing_privileges
1427
1428 # Fetch the current user and matching host part as it could either be
1429@@ -1149,95 +1003,84 @@
1430
1431 cursor.close()
1432 conn.close()
1433- except:
1434+ except mysql.connector.Error as err:
1435 logger.error("Could not read the grants information from MySQL")
1436- logger.error("Unexpected error: %s,%s" % sys.exc_info()[:2])
1437-
1438+ logger.error("MySQL Error: %s" % err)
1439 return has_required_grants, missing_privileges
1440
1441
1442-# Registers this server in TwinDB dispatcher
1443-# Inputs
1444-# code - string with secret registration code
1445-# Returns
1446-# True - if server was successfully registered
1447-# Exits if error happened
1448 def action_handler_register(code):
1449- global logger
1450- global init_config
1451- global ssh_private_key
1452- global ssh_public_key
1453- global gpg_homedir
1454- global server_id
1455- global mysql_user
1456- global mysql_password
1457-
1458+ """
1459+ Registers this server in TwinDB dispatcher
1460+ Inputs
1461+ code - string with secret registration code
1462+ Returns
1463+ True - if server was successfully registered
1464+ Exits if error happened
1465+ """
1466 # Check early to see that the MySQL user passed to the agent has enough
1467 # privileges to create a separate MySQL user needed by TwinDB
1468 mysql_access_available, missing_mysql_privileges = has_mysql_access()
1469 if not mysql_access_available:
1470 logger.error("The MySQL user %s does not have enough privileges" % mysql_user)
1471- if len(missing_mysql_privileges) > 0:
1472+ if missing_mysql_privileges:
1473 logger.error("Following privileges are missing: %s" % ','.join(missing_mysql_privileges))
1474-
1475 return False
1476
1477 logger.info("Registering TwinDB agent with code %s" % code)
1478- logger.info("The agent needs to generate cryptographically strong keys.")
1479- logger.info("It may take really, really long time. Please be patient.")
1480 name = os.uname()[1].strip() # un[1] is a hostname
1481
1482 twindb_email = "%s@twindb.com" % server_id
1483
1484- # Generate GPG key
1485+ # Read GPG public key
1486 enc_public_key = None
1487+ cmd = ["gpg", "--homedir", gpg_homedir, "--armor", "--export", twindb_email]
1488 try:
1489 logger.debug("Reading GPG public key of %s." % twindb_email)
1490- p1 = subprocess.Popen(["gpg", "--homedir", gpg_homedir,
1491- "--armor", "--export", twindb_email],
1492- stdout=subprocess.PIPE)
1493- enc_public_key = p1.stdout.read()
1494- except:
1495- logger.error("Failed to export GPG keys of %s from %s."
1496- % (twindb_email, gpg_homedir))
1497- exit_on_error(traceback.format_exc())
1498+ p1 = subprocess.Popen(cmd, stdout=subprocess.PIPE)
1499+ enc_public_key = p1.communicate()[0]
1500+ except OSError as err:
1501+ logger.error("Failed to run command %r. %s" % (cmd, err))
1502+ exit_on_error("Failed to export GPG keys of %s from %s." % (twindb_email, gpg_homedir))
1503
1504- # Generate SSH key
1505- try:
1506- if not os.path.isfile(ssh_private_key):
1507+ if not os.path.isfile(ssh_private_key_file):
1508+ try:
1509 logger.info("Generating SSH keys pair.")
1510- subprocess.call(["ssh-keygen", "-N", "", "-f", ssh_private_key])
1511- except:
1512- logger.error("Failed to generate SSH keys.")
1513- exit_on_error(traceback.format_exc())
1514-
1515+ subprocess.call(["ssh-keygen", "-N", "", "-f", ssh_private_key_file])
1516+ except OSError as err:
1517+ logger.error("Failed to run command %r. %s" % (cmd, err))
1518+ exit_on_error("Failed to generate SSH keys.")
1519+ ssh_public_key = None
1520 try:
1521- logger.info("Reading SSH public key from %s." % ssh_public_key)
1522- f = open(ssh_public_key, 'r')
1523+ logger.info("Reading SSH public key from %s." % ssh_public_key_file)
1524+ f = open(ssh_public_key_file, 'r')
1525 ssh_public_key = f.read()
1526 f.close()
1527- except:
1528- logger.error("Failed to read SSH keys.")
1529- exit_on_error(traceback.format_exc())
1530+ except IOError as err:
1531+ logger.error("Failed to read from file %s. %s" % (ssh_public_key_file, err))
1532+ exit_on_error("Failed to read SSH keys.")
1533
1534 # Read local ip addresses
1535 cmd = "ip addr"
1536 cmd += "| grep -w inet"
1537 cmd += "| awk '{ print $2}'"
1538 cmd += "| awk -F/ '{ print $1}'"
1539- ss = get_slave_status()
1540-
1541- if ss is None:
1542- logger.error("Could not get slave status on this server")
1543- exit_on_error()
1544- p = subprocess.Popen(cmd, stdout=subprocess.PIPE, shell=True)
1545-
1546+ cout = None
1547+ logger.debug("Getting list of local IP addresses")
1548+ try:
1549+ logger.debug("Running: %s" % cmd)
1550+ p = subprocess.Popen(cmd, stdout=subprocess.PIPE, shell=True)
1551+ cout, cerr = p.communicate()
1552+ logger.debug("STDOUT: %s" % cout)
1553+ logger.debug("STDERR: %s" % cerr)
1554+ except OSError as err:
1555+ logger.error("Failed to run command %r. %s" % (cmd, err))
1556 local_ip = list()
1557- for row in p.stdout:
1558- row = row.rstrip('\n')
1559- if row != "127.0.0.1":
1560+ for row in cout.split("\n"):
1561+ row = row.rstrip("\n")
1562+ if row and row != "127.0.0.1":
1563 local_ip.append(row)
1564-
1565+ ss = get_slave_status()
1566 data = {
1567 "type": "register",
1568 "params": {
1569@@ -1255,9 +1098,8 @@
1570 "local_ip": local_ip
1571 }
1572 }
1573-
1574 api_response = get_response(data)
1575- if api_response is not None:
1576+ if api_response:
1577 json_decoder = json.JSONDecoder()
1578 response_decoded = json_decoder.decode(api_response)
1579 logger.debug(response_decoded)
1580@@ -1274,18 +1116,16 @@
1581 elif "errors" in response_decoded:
1582 error_msg = response_decoded["errors"]["msg"]
1583
1584- logger.error("Failed to register the agent: %s" % error_msg)
1585- sys.exit(2)
1586- del json_decoder
1587+ exit_on_error("Failed to register the agent: %s" % error_msg)
1588 else:
1589 exit_on_error("Empty response from server")
1590 return True
1591
1592
1593 def create_agent_user():
1594- global mysql_user
1595- global mysql_password
1596-
1597+ """
1598+ Creates local MySQL user for twindb agent
1599+ """
1600 config = get_config()
1601 try:
1602 conn = get_mysql_connection()
1603@@ -1294,9 +1134,8 @@
1604 cursor = conn.cursor()
1605 cursor.execute(q, (config["mysql_user"], config["mysql_password"]))
1606 except mysql.connector.Error as err:
1607- logger.error("Failed to create MySQL user %s@localhost for TwinDB agent" % config["mysql_user"])
1608- logger.error("MySQL replied: %s" % err.msg)
1609- exit_on_error(traceback.format_exc())
1610+ logger.error("MySQL replied: %s" % err)
1611+ exit_on_error("Failed to create MySQL user %s@localhost for TwinDB agent" % config["mysql_user"])
1612 logger.info("Created MySQL user %s@localhost for TwinDB agent" % config["mysql_user"])
1613 logger.info("Congratulations! The server is successfully registered in TwinDB.")
1614 logger.info("TwinDB will backup the server accordingly to the default config.")
1615@@ -1304,15 +1143,13 @@
1616 return True
1617
1618
1619-# Unregisters this server in TwinDB dispatcher
1620-# Returns
1621-# True - if server was successfully unregistered
1622-# False - if error happened
1623-
1624 def action_handler_unregister(delete_backups):
1625- global logger
1626- global init_config
1627-
1628+ """
1629+ Unregisters this server in TwinDB dispatcher
1630+ Returns
1631+ True - if server was successfully unregistered
1632+ False - if error happened
1633+ """
1634 data = {
1635 "type": "unregister",
1636 "params": {
1637@@ -1322,7 +1159,7 @@
1638 logger.debug("Unregistration request:")
1639 logger.debug(data)
1640 response = get_response(data)
1641- if response <> None:
1642+ if response:
1643 jd = json.JSONDecoder()
1644 r = jd.decode(response)
1645 logger.debug(r)
1646@@ -1330,25 +1167,21 @@
1647 logger.info("The server is successfully unregistered")
1648 return True
1649 else:
1650- logger.error("Failed to unregister the agent: "
1651- + jd.decode(decrypt(r["response"]))["error"])
1652- sys.exit(2)
1653- del jd
1654+ exit_on_error("Failed to unregister the agent: " + jd.decode(decrypt(r["response"]))["error"])
1655 else:
1656- logger.error("Empty response from server")
1657- sys.exit(2)
1658+ exit_on_error("Empty response from server")
1659 return False
1660
1661
1662-# Starts immediate backup job
1663 def action_handler_backup():
1664- global logger
1665-
1666+ """
1667+ Starts immediate backup job
1668+ """
1669 if schedule_backup():
1670 config = get_config()
1671 job = get_job()
1672 if job:
1673- if 0 == process_job(config, job):
1674+ if process_job(config, job) == 0:
1675 return True
1676 else:
1677 logger.error("Didn't get any job from the dispatcher")
1678@@ -1359,10 +1192,10 @@
1679 return False
1680
1681
1682-# Asks dispatcher to schedule a job for this server
1683 def schedule_backup():
1684- global logger
1685-
1686+ """
1687+ Asks dispatcher to schedule a job for this server
1688+ """
1689 data = {
1690 "type": "schedule_backup",
1691 "params": {
1692@@ -1371,7 +1204,7 @@
1693 logger.debug("Schedule backup request:")
1694 logger.debug(data)
1695 response = get_response(data)
1696- if response <> None:
1697+ if response:
1698 jd = json.JSONDecoder()
1699 r = jd.decode(response)
1700 logger.debug(r)
1701@@ -1382,45 +1215,43 @@
1702 logger.error("Failed to schedule a job: "
1703 + jd.decode(decrypt(r["response"]))["error"])
1704 return False
1705- del jd
1706 else:
1707 exit_on_error("Empty response from server")
1708
1709
1710-# Checks if it's enough space in TwinDB storage
1711-# Inputs
1712-# config - backup config
1713-# job - job order
1714-# Returns always True
1715-
1716-def check_space(config, job_id):
1717+def check_space():
1718+ """
1719+ Checks if it's enough space in TwinDB storage
1720+ Inputs
1721+ config - backup config
1722+ job - job order
1723+ Returns always True
1724+ TODO: implement the fucntion
1725+ """
1726 return True
1727
1728
1729-# Checks if MySQL user from backup config has enough privileges to perform backup
1730-# Inputs
1731-# config - backup config
1732-# job - job order
1733-# Returns
1734-# True - if all necessary privileges are granted
1735-# False - if not all necessary privilegers are granted
1736-
1737 def check_mysql_permissions(config):
1738- global logger
1739-
1740+ """
1741+ Checks if MySQL user from backup config has enough privileges to perform backup
1742+ Inputs
1743+ config - backup config
1744+ Returns
1745+ True - if all necessary privileges are granted
1746+ False - if not all necessary privilegers are granted
1747+ """
1748 try:
1749 logger.info("Checking if '%s'@'localhost' has enough rights to backup MySQL" % config["mysql_user"])
1750- result = False
1751 con = get_mysql_connection(user=config["mysql_user"], passwd=config["mysql_password"])
1752 required_priv = ["RELOAD", "LOCK TABLES", "REPLICATION CLIENT", "SUPER", "CREATE TABLESPACE"]
1753 for priv in required_priv:
1754 cursor = con.cursor()
1755 logger.debug("Checking if privilege %s is granted" % priv)
1756 query = """
1757-SELECT
1758- PRIVILEGE_TYPE
1759-FROM information_schema.USER_PRIVILEGES
1760-WHERE GRANTEE = '\'%s\'@\\\'localhost\\\'' AND PRIVILEGE_TYPE = %s"""
1761+ SELECT
1762+ PRIVILEGE_TYPE
1763+ FROM information_schema.USER_PRIVILEGES
1764+ WHERE GRANTEE = '\'%s\'@\\\'localhost\\\'' AND PRIVILEGE_TYPE = %s"""
1765 logger.debug("Sending query : %s" % query)
1766 cursor.execute(query, (config["mysql_user"], priv))
1767 cursor.fetchall()
1768@@ -1430,288 +1261,213 @@
1769 logger.debug("Privilege %s is granted to %s@localhost" % (priv, config["mysql_user"]))
1770 else:
1771 logger.info("%20s ... NOT GRANTED" % priv)
1772- logger.error("Privilege %s is not granted to %s@localhost" % ( priv, config["mysql_user"]))
1773- logger.info(
1774- "Please execute following SQL to grant %s to user %s@localhost:" % ( priv, config["mysql_user"]))
1775- logger.info("GRANT %s ON *.* TO '%s'@'localhost';" % ( priv, config["mysql_user"]))
1776+ logger.error("Privilege %s is not granted to %s@localhost" % (priv, config["mysql_user"]))
1777+ logger.info("Please execute following SQL to grant %s to user %s@localhost:"
1778+ % (priv, config["mysql_user"]))
1779+ logger.info("GRANT %s ON *.* TO '%s'@'localhost';" % (priv, config["mysql_user"]))
1780 return False
1781 cursor.close()
1782 con.close()
1783- result = True
1784- except:
1785- logger.error("Unexpected error: %s,%s" % sys.exc_info()[:2])
1786- result = False
1787- if result:
1788- logger.debug("MySQL user %s@'localhost' has enough grants to take backup" % config["mysql_user"])
1789- else:
1790- logger.error("MySQL user %s@'localhost' doesn't have enough grants to take backup" % config["mysql_user"])
1791- return result
1792-
1793-
1794-# Meta fuction that calls actual backup fucntion depending on tool in backup config
1795-# Inputs
1796-# config - backup config
1797-# job - job order
1798-# Returns what actual backup function returned or -1 if the tool is not supported
1799+ except mysql.connector.Error as err:
1800+ logger.error("MySQL Error: %s" % err)
1801+ return False
1802+ logger.debug("MySQL user %s@'localhost' has enough grants to take backup" % config["mysql_user"])
1803+ return True
1804+
1805
1806 def take_backup(config, job):
1807- global logger
1808-
1809+ """
1810+ Meta function that calls actual backup fucntion depending on tool in backup config
1811+ :param config: backup config
1812+ :param job: job order
1813+ :return: what actual backup function returned or -1 if the tool is not supported
1814+ """
1815 ret = -1
1816 logger.info("Starting backup job")
1817- try:
1818- job_id = int(job["job_id"])
1819- notify_params = {"event": "start_job", "job_id": job_id}
1820- if log_job_notify(job_id, notify_params):
1821- ret = take_backup_xtrabackup(config, job)
1822- notify_params = {"event": "stop_job", "job_id": job_id, "ret_code": ret}
1823- log_job_notify(job_id, notify_params)
1824- logger.info("Backup job is complete")
1825- else:
1826- logger.info("Backup job can not start")
1827- except:
1828- logger.error("Unexpected error: %s,%s" % sys.exc_info()[:2]);
1829-
1830+ notify_params = {"event": "start_job", "job_id": job["job_id"]}
1831+ if log_job_notify(notify_params):
1832+ ret = take_backup_xtrabackup(config, job)
1833+ notify_params = {"event": "stop_job", "job_id": job["job_id"], "ret_code": ret}
1834+ log_job_notify(notify_params)
1835+ logger.info("Backup job is complete")
1836+ else:
1837+ logger.info("Backup job can not start")
1838 return ret
1839
1840
1841-# Starts SSH process to TwinDB storage and saves input in file backup_name
1842-# Inputs
1843-# config - backup config
1844-# backup_name - file name to save input in
1845-# stdin, stderr - respective IO handlers
1846-# Returns what SSH process returns
1847-
1848 def start_ssh_cmd(config, job_params, backup_name, stdin, stderr):
1849- global logger
1850- global ssh_private_key
1851- global ssh_port
1852-
1853- p = None
1854+ """
1855+ Starts an SSH process to TwinDB storage and saves input in file backup_name
1856+ :param config: backup config
1857+ :param job_params: job parameters
1858+ :param backup_name: file name to save input in
1859+ :param stdin: respective IO handlers
1860+ :param stderr: respective IO handlers
1861+ :return: what SSH process returns
1862+ """
1863+ ssh_process = None
1864+ ssh_cmd = ["ssh", "-oStrictHostKeyChecking=no", "-i", ssh_private_key_file, "-p", str(ssh_port),
1865+ "user_id_%s@%s" % (config["user_id"], job_params["ip"]), "/bin/cat - > %s" % backup_name]
1866 try:
1867- ssh_cmd = ["ssh", "-oStrictHostKeyChecking=no", "-i", ssh_private_key]
1868- ssh_cmd.append("-p")
1869- ssh_cmd.append(str(ssh_port))
1870- ssh_cmd.append("user_id_%s@%s" % (config["user_id"], job_params["ip"]))
1871- ssh_cmd.append("/bin/cat - > %s" % backup_name)
1872 logger.debug("Starting SSH process: %r" % ssh_cmd)
1873- p = subprocess.Popen(ssh_cmd, stdin=stdin, stdout=subprocess.PIPE, stderr=stderr)
1874- except:
1875- logger.error(traceback.format_exc())
1876- return p
1877-
1878-
1879-# Starts GPG process, encrypts STDIN and outputs in into STDOUT
1880-# Inputs
1881-# config - backup config
1882-# stdin, stderr - respective IO handlers
1883-# Returns what GPG process returns
1884-
1885-def start_gpg_cmd(config, stdin, stderr):
1886- global logger
1887- global ssh_private_key
1888- global ssh_port
1889- global server_id
1890-
1891- p = None
1892+ ssh_process = subprocess.Popen(ssh_cmd, stdin=stdin, stdout=subprocess.PIPE, stderr=stderr)
1893+ except OSError as err:
1894+ logger.error("Failed to run command %r. %s" % (ssh_cmd, err))
1895+ return ssh_process
1896+
1897+
1898+def start_gpg_cmd(stdin, stderr):
1899+ """
1900+ Starts GPG process, encrypts STDIN and outputs in into STDOUT
1901+ :param stdin: respective IO handlers
1902+ :param stderr: respective IO handlers
1903+ :return: what GPG process returns
1904+ """
1905+ gpg_process = None
1906+ gpg_cmd = ["gpg", "--encrypt", "--yes", "--batch", "--no-permission-warning", "--quiet", "--recipient", server_id]
1907 try:
1908- gpg_cmd = ["gpg", "--encrypt", "--yes", "--batch", "--no-permission-warning", "--quiet"]
1909- gpg_cmd.append("--recipient")
1910- gpg_cmd.append("%s" % (server_id))
1911-
1912 logger.debug("Starting GPG process: %r" % gpg_cmd)
1913- p = subprocess.Popen(gpg_cmd, stdin=stdin, stdout=subprocess.PIPE, stderr=stderr)
1914- except:
1915- logger.error(traceback.format_exc())
1916- return p
1917-
1918-
1919-# Takes backup copy with XtraBackup
1920-# Inputs
1921-# config - backup config
1922-# job - job order
1923-# Returns
1924-# True - if backup successfully taken
1925-# False - if backup failed
1926+ gpg_process = subprocess.Popen(gpg_cmd, stdin=stdin, stdout=subprocess.PIPE, stderr=stderr)
1927+ except OSError as err:
1928+ logger.error("Failed to run command %r. %s" % (gpg_cmd, err))
1929+ return gpg_process
1930+
1931
1932 def take_backup_xtrabackup(config, job):
1933- global logger
1934- global server_id
1935-
1936- suffix = "tar"
1937+ """
1938+ # Takes backup copy with XtraBackup
1939+ :param config: backup config
1940+ :param job: job order
1941+ :return: True if backup was successfully taken or False if it has failed
1942+ """
1943+ suffix = "xbstream"
1944 backup_name = "server_id_%s_%s.%s.gpg" % (server_id, datetime.now().isoformat(), suffix)
1945- extra_config = ""
1946 ret_code = 0
1947-
1948+ if "params" not in job:
1949+ logger.error("There are no params in the job order")
1950+ return -1
1951+ # Check that job order has all required parameters
1952+ mandatory_params = ["ancestor", "backup_type", "ip", "lsn", "type", "volume_id"]
1953+ for param in mandatory_params:
1954+ if param not in job["params"]:
1955+ logger.error("There is no %s in the job order" % param)
1956+ return -1
1957+ backup_type = job["params"]["backup_type"]
1958+ xtrabackup_cmd = [
1959+ "innobackupex",
1960+ "--stream=xbstream",
1961+ "--user=%s" % config["mysql_user"],
1962+ "--password=%s" % config["mysql_password"],
1963+ "--socket=%s" % get_unix_socket(),
1964+ "--slave-info",
1965+ "--safe-slave-backup",
1966+ "--safe-slave-backup-timeout=3600"]
1967+ if backup_type == 'incremental':
1968+ last_lsn = job["params"]["lsn"]
1969+ xtrabackup_cmd.append("--incremental")
1970+ xtrabackup_cmd.append(".")
1971+ xtrabackup_cmd.append("--incremental-lsn=%s" % last_lsn)
1972+ else:
1973+ xtrabackup_cmd.append(".")
1974+ extra_config = gen_extra_config(config)
1975+ if extra_config:
1976+ xtrabackup_cmd.append("--defaults-extra-file=%s" % extra_config)
1977+ err_descriptors = dict()
1978+ for desc in ["gpg", "ssh", "xtrabackup"]:
1979+ desc_file = ("/tmp/twindb.%s.err" % desc)
1980+ try:
1981+ err_descriptors[desc] = open(desc_file, "w+")
1982+ except IOError as err:
1983+ logger.error("Failed to open file %s. %s" % (desc_file, err))
1984+ return -1
1985 try:
1986- mysql_user = config["mysql_user"]
1987- mysql_password = config["mysql_password"]
1988- d = json.JSONDecoder()
1989- backup_type = job["params"]["backup_type"]
1990- xtrabackup_cmd = ["innobackupex"]
1991- xtrabackup_cmd.append("--stream=xbstream")
1992- xtrabackup_cmd.append("--user=%s" % mysql_user)
1993- xtrabackup_cmd.append("--password=%s" % mysql_password)
1994- xtrabackup_cmd.append("--socket=%s" % get_unix_socket())
1995- xtrabackup_cmd.append("--slave-info")
1996- xtrabackup_cmd.append("--safe-slave-backup")
1997- xtrabackup_cmd.append("--safe-slave-backup-timeout=3600")
1998- if backup_type == 'incremental':
1999- last_lsn = job["params"]["lsn"]
2000- xtrabackup_cmd.append("--incremental")
2001- xtrabackup_cmd.append(".")
2002- xtrabackup_cmd.append("--incremental-lsn=%s" % last_lsn)
2003- else:
2004- xtrabackup_cmd.append(".")
2005- extra_config = gen_extra_config(config)
2006- if extra_config:
2007- xtrabackup_cmd.append("--defaults-extra-file=%s" % extra_config)
2008-
2009- xtr_err = open('/tmp/twindb.xtrabackup.err', "w+")
2010- gpg_err = open('/tmp/twindb.gpg.err', "w+")
2011- ssh_err = open('/tmp/twindb.ssh.err', "w+")
2012- p1 = subprocess.Popen(xtrabackup_cmd, stdout=subprocess.PIPE, stderr=xtr_err)
2013- p2 = start_gpg_cmd(config, p1.stdout, gpg_err)
2014- p3 = start_ssh_cmd(config, job["params"], backup_name, p2.stdout, ssh_err)
2015-
2016- p1.stdout.close()
2017- p2.stdout.close()
2018- p3.communicate()
2019-
2020- ret_code_ssh = p3.returncode
2021- ret_code_gpg = p2.wait()
2022- ret_code_xbk = p1.wait()
2023-
2024- xtr_err.seek(0)
2025- gpg_err.seek(0)
2026- ssh_err.seek(0)
2027-
2028- xtrabackup_stderr = xtr_err.read()
2029- gpg_stderr = gpg_err.read()
2030- ssh_stderr = ssh_err.read()
2031-
2032- if len(xtrabackup_stderr) == 0:
2033- xtrabackup_stderr = "no output"
2034- if len(gpg_stderr) == 0:
2035- gpg_stderr = "no output"
2036- if len(ssh_stderr) == 0:
2037- ssh_stderr = "no output"
2038-
2039- logger.info("XtraBackup stderr: " + xtrabackup_stderr)
2040- logger.info("GPG stderr: " + gpg_stderr)
2041- logger.info("SSH stderr: " + ssh_stderr)
2042- ancestor = 0
2043-
2044- if ret_code_xbk == 0 and ret_code_gpg == 0 and ret_code_ssh == 0:
2045- lsn = grep_lsn(xtrabackup_stderr)
2046- if lsn == None:
2047- logger.error("Could not find LSN in XtrabBackup output")
2048- return -1
2049- file_size = get_backup_size(config, job["params"], backup_name)
2050- if file_size == 0:
2051- logger.error("Backup copy size must not be zero")
2052- return -1
2053- job_id = job["job_id"]
2054- volume_id = job["params"]["volume_id"]
2055- ancestor = job["params"]["ancestor"]
2056- if not record_backup(job_id, backup_name, volume_id, file_size, lsn, ancestor):
2057- logger.error("Failed to save backup copy details")
2058- return -1
2059- else:
2060- logger.error("Failed to take backup")
2061- ret_code = -1
2062- except:
2063+ logger.debug("Starting XtraBackup process: %r" % xtrabackup_cmd)
2064+ xbk_proc = subprocess.Popen(xtrabackup_cmd, stdout=subprocess.PIPE, stderr=err_descriptors["xtrabackup"])
2065+ except OSError as err:
2066+ logger.error("Failed to run command %r. %s" % (xtrabackup_cmd, err))
2067+ return -1
2068+ gpg_proc = start_gpg_cmd(xbk_proc.stdout, err_descriptors["gpg"])
2069+ ssh_proc = start_ssh_cmd(config, job["params"], backup_name, gpg_proc.stdout, err_descriptors["ssh"])
2070+
2071+ ssh_proc.wait()
2072+ xbk_proc.wait()
2073+ gpg_proc.wait()
2074+
2075+ ret_code_ssh = ssh_proc.returncode
2076+ ret_code_gpg = gpg_proc.returncode
2077+ ret_code_xbk = xbk_proc.returncode
2078+
2079+ err_str = dict()
2080+ for desc in ["gpg", "ssh", "xtrabackup"]:
2081+ err_descriptors[desc].seek(0)
2082+ err_str[desc] = err_descriptors[desc].read()
2083+ if not err_str[desc]:
2084+ err_str[desc] = "no output"
2085+
2086+ logger.info("XtraBackup stderr: " + err_str["xtrabackup"])
2087+ logger.info("GPG stderr: " + err_str["gpg"])
2088+ logger.info("SSH stderr: " + err_str["ssh"])
2089+
2090+ if ret_code_xbk == 0 and ret_code_gpg == 0 and ret_code_ssh == 0:
2091+ lsn = grep_lsn(err_str["xtrabackup"])
2092+ if not lsn:
2093+ logger.error("Could not find LSN in XtrabBackup output")
2094+ return -1
2095+ file_size = get_backup_size(config, job["params"], backup_name)
2096+ if not file_size:
2097+ logger.error("Backup copy size must not be zero")
2098+ return -1
2099+ volume_id = job["params"]["volume_id"]
2100+ ancestor = job["params"]["ancestor"]
2101+ if not record_backup(backup_name, volume_id, file_size, lsn, ancestor):
2102+ logger.error("Failed to save backup copy details")
2103+ return -1
2104+ else:
2105 logger.error("Failed to take backup")
2106- logger.error(traceback.format_exc())
2107 return -1
2108- finally:
2109- for f in [extra_config,
2110- "/tmp/twindb.xtrabackup.err",
2111- "/tmp/twindb.gpg.err",
2112- "/tmp/twindb.ssh.err"]:
2113- if os.path.isfile(f):
2114+ for f in [extra_config, "/tmp/twindb.xtrabackup.err", "/tmp/twindb.gpg.err", "/tmp/twindb.ssh.err"]:
2115+ if os.path.isfile(f):
2116+ try:
2117 os.remove(f)
2118+ except IOError as err:
2119+ logger.error("Failed to remove file %s. %s" % (f, err))
2120 return ret_code
2121
2122
2123-# Generates MySQL config with datadir option
2124-# Inputs
2125-# config - backup config
2126-# Returns
2127-# File name with MySQL config
2128-# None - if error happened
2129-
2130 def gen_extra_config(config):
2131- global logger
2132-
2133- extra_config = None
2134+ """
2135+ Generates MySQL config with datadir option
2136+ :param config: backup config
2137+ :return: File name with MySQL config or None if error happened
2138+ """
2139 try:
2140 f, extra_config = tempfile.mkstemp()
2141 os.write(f, "[mysqld]\n")
2142- con = get_mysql_connection(
2143- user=config["mysql_user"],
2144- passwd=config["mysql_password"])
2145+ con = get_mysql_connection(config["mysql_user"], config["mysql_password"])
2146 cur = con.cursor()
2147 cur.execute("SELECT @@datadir")
2148 row = cur.fetchone()
2149 os.write(f, 'datadir="%s"\n' % row[0])
2150 cur.close()
2151 os.close(f)
2152- except:
2153- logger.error("Failed to generate extra defaults file")
2154- logger.error(traceback.format_exc())
2155+ except IOError as err:
2156+ logger.error("Failed to generate extra defaults file. %s" % err)
2157 extra_config = None
2158 return extra_config
2159
2160
2161-# Checks if binary log is enabled in local MySQL
2162-# Inputs
2163-# config - backup config
2164-# Returns
2165-# True - binary log enabled
2166-# False - binary log disabled
2167-
2168-def is_binlog_enabled(config):
2169- global logger
2170-
2171- result = False
2172- try:
2173- con = MySQLdb.connect(user=config["mysql_user"], passwd=config["mysql_password"])
2174- with con:
2175- cur = con.cursor()
2176- cur.execute("SELECT @@GLOBAL.log_bin")
2177- row = cur.fetchone()
2178- if row[0] == "1":
2179- result = True
2180- else:
2181- result = False
2182- cur.close()
2183- del con
2184- except:
2185- logger.error("Failed to check if binlog is enabled")
2186- logger.error(traceback.format_exc())
2187- return result
2188-
2189-
2190-# Logs in to TwinDB storage and get size of backup
2191-# Inputs
2192-# config - backup config
2193-# backup_name - file name with backup
2194-# Returns
2195-# Size of backup in bytes
2196-# Zero if error happened
2197-
2198 def get_backup_size(config, job_params, backup_name):
2199- global logger
2200- global ssh_private_key
2201- global ssh_port
2202- backup_size = 0
2203-
2204+ """
2205+ Logs in to TwinDB storage and get size of backup
2206+ :param config: backup config
2207+ :param job_params: job parameters
2208+ :param backup_name: file name with backup
2209+ :return: size of backup in bytes or zeor if error happened
2210+ """
2211 logger.debug("Getting size of %s" % backup_name)
2212+ ssh_cmd = ["ssh", "-oStrictHostKeyChecking=no", "-i", ssh_private_key_file, "-p", str(ssh_port),
2213+ "user_id_%s@%s" % (config["user_id"], job_params["ip"]), "/bin/du -b %s" % backup_name]
2214 try:
2215- ssh_cmd = ["ssh", "-oStrictHostKeyChecking=no", "-i", ssh_private_key, "-p", str(ssh_port),
2216- "user_id_%s@%s" % (config["user_id"], job_params["ip"]), "/bin/du -b %s" % backup_name]
2217-
2218 process = subprocess.Popen(ssh_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
2219 cout, cerr = process.communicate()
2220
2221@@ -1726,30 +1482,28 @@
2222 except subprocess.CalledProcessError as e:
2223 logger.error("Failed to get size of backup %s" % backup_name)
2224 logger.error(str(e))
2225+ return 0
2226 except OSError as e:
2227 logger.error("Failed to get size of backup %s" % backup_name)
2228- logger.error("Command execution failed: %s" % str(e))
2229- except:
2230- logger.error("Failed to get size of backup %s" % backup_name)
2231- logger.error(traceback.format_exc())
2232- else:
2233- logger.debug("Size of %s = %d bytes (%s)" % (backup_name, backup_size, h_size(backup_size)))
2234-
2235+ logger.error("Failed to run command %r: %s" % (ssh_cmd, e))
2236+ return 0
2237+ logger.debug("Size of %s = %d bytes (%s)" % (backup_name, backup_size, h_size(backup_size)))
2238 return backup_size
2239
2240
2241-def handler_send_key(config, job):
2242+def handler_send_key(job):
2243 """
2244 Processes send_key job
2245 """
2246 # Get owner of the GPG key
2247+ cmd_1 = ["gpg", "--list-packets"]
2248 try:
2249 gpg_pub_key = job["params"]["gpg_pub_key"]
2250 if gpg_pub_key:
2251- cmd_1 = ["gpg", "--list-packets"]
2252 logger.debug("Starting %r" % cmd_1)
2253 p1 = subprocess.Popen(cmd_1, stdin=subprocess.PIPE, stdout=subprocess.PIPE)
2254 cout, cerr = p1.communicate(gpg_pub_key)
2255+ keyid = "Unknown"
2256 for line in cout.split("\n"):
2257 if "keyid:" in line:
2258 keyid = line.replace("keyid:", "").strip()
2259@@ -1759,420 +1513,388 @@
2260 logger.error("Requestor public key is empty")
2261 return
2262 except OSError as err:
2263- logger.error(err)
2264+ logger.error("Failed to run command %r: %s" % (cmd_1, err))
2265 return
2266 # Import public GPG key. It's a user public key sent by the dispatcher
2267 try:
2268 logger.debug("Importing requestor's key %s" % keyid)
2269 cmd_1 = ["gpg", "--import"]
2270- p1 = subprocess.Popen(cmd_1, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
2271+ p1 = subprocess.Popen(cmd_1, stdin=subprocess.PIPE)
2272 p1.communicate(gpg_pub_key)
2273 except OSError as err:
2274- logger.error(err)
2275+ logger.error("Failed to run command %r: %s" % (cmd_1, err))
2276 return
2277 # Get private key and encrypt it
2278- try:
2279- gpg_pub_key = job["params"]["gpg_pub_key"]
2280- if gpg_pub_key:
2281- logger.debug("Exporting private key of server %s" % server_id)
2282- cmd_1 = ["gpg", "--armor", "--export-secret-key", server_id]
2283- cmd_2 = ["gpg", "--armor", "--encrypt", "--sign", "--batch", "-r", keyid, "--local-user", server_id, "--trust-model", "always"]
2284+ gpg_pub_key = job["params"]["gpg_pub_key"]
2285+ if gpg_pub_key:
2286+ logger.debug("Exporting private key of server %s" % server_id)
2287+ cmd_1 = ["gpg", "--armor", "--export-secret-key", server_id]
2288+ cmd_2 = ["gpg", "--armor", "--encrypt", "--sign", "--batch", "-r", keyid, "--local-user", server_id,
2289+ "--trust-model", "always"]
2290+ try:
2291 logger.debug("Starting %r" % cmd_1)
2292 p1 = subprocess.Popen(cmd_1, stdout=subprocess.PIPE)
2293+ except OSError as err:
2294+ logger.error("Failed to run command %r: %s" % (cmd_1, err))
2295+ return
2296+ try:
2297 logger.debug("Starting %r" % cmd_2)
2298 p2 = subprocess.Popen(cmd_2, stdin=p1.stdout, stdout=subprocess.PIPE)
2299 cout, cerr = p2.communicate()
2300 enc_private_key = cout
2301 logger.debug("Encrypted private key %s" % enc_private_key)
2302- except OSError as err:
2303- logger.error(err)
2304- return
2305- # Now send the private key to dispatcher
2306- job_id = job["job_id"]
2307- data = {
2308- "type": "send_key",
2309- "params": {
2310- "enc_private_key": enc_private_key,
2311- "job_id": job_id
2312+ except OSError as err:
2313+ logger.error("Failed to run command %r: %s" % (cmd_2, err))
2314+ return
2315+ # Now send the private key to dispatcher
2316+ data = {
2317+ "type": "send_key",
2318+ "params": {
2319+ "enc_private_key": enc_private_key,
2320+ "job_id": job_id
2321+ }
2322 }
2323- }
2324- get_response(data)
2325-
2326-# Check if directory is empty
2327-# Inputs
2328-# dir - directory name
2329-# Returns
2330-# True - directory is empty
2331-# False - directory is not empty
2332-
2333-def is_dir_empty(dir):
2334- return len(os.listdir(dir)) == 0
2335-
2336-
2337-# Meta fuction that calls actual restore fucntion depending on tool in backup config
2338-# Inputs
2339-# config - backup config
2340-# job - job order
2341-# Returns what actual restore function returned or -1 if the tool is not supported
2342+ get_response(data)
2343+ else:
2344+ logger.error("The job order requested send_key, but no public key was provided")
2345+
2346+
2347+def is_dir_empty(directory):
2348+ """
2349+ Checks if directory is empty
2350+ :param directory: directory name
2351+ :return: True if the directory is empty of False otherwise
2352+ """
2353+ return len(os.listdir(directory)) == 0
2354+
2355
2356 def restore_backup(config, job):
2357- global logger
2358-
2359- ret = -1
2360- logger.info("Starting restore job %r" % job)
2361- log_start_job(int(job["job_id"]))
2362-
2363- if config["tool"] == "xtrabackup":
2364+ """
2365+ Meta function that calls actual restore fucntion depending on tool in backup config
2366+ :param config: backup config
2367+ :param job: job order
2368+ :return: what actual restore function returned or -1 if the tool is not supported
2369+ """
2370+ ret = None
2371+ logger.info("Starting restore job: %s"
2372+ % json.dumps(job, indent=4, sort_keys=True))
2373+ notify_params = {"event": "start_job", "job_id": job["job_id"]}
2374+ if log_job_notify(notify_params):
2375 ret = restore_xtrabackup(config, job)
2376- elif config["tool"] == "mysqldump":
2377- ret = restore_mysqldump(config, job)
2378+ notify_params = {"event": "stop_job", "job_id": job["job_id"], "ret_code": ret}
2379+ log_job_notify(notify_params)
2380+ logger.info("Restore job is complete")
2381 else:
2382- logger.error("Can't restore backup with unsupported tool %s" % config["tool"])
2383- ret = -1
2384- log_stop_job(int(job["job_id"]), ret)
2385- logger.info("Restore job is complete")
2386+ logger.error("Restore job can not start")
2387 return ret
2388
2389
2390-# Extracts an Xtrabackup archive arc in dst_dir
2391-# Inputs
2392-# config - backup config
2393-# arc - file name with archive to extract
2394-# dst_dir - local destination directory
2395-# Returns
2396-# True - if archive is successfully extracted
2397-# False - if error happened
2398-
2399-def extract_archive(config, arc, dst_dir):
2400- global logger
2401- global ssh_private_key
2402- global ssh_port
2403-
2404- logger.info("Extracting %s in %s" % (arc, dst_dir))
2405- try:
2406- ssh_cmd = ["ssh", "-oStrictHostKeyChecking=no", "-i", ssh_private_key, "-p", str(ssh_port)]
2407- ssh_cmd.append(config["username"] + "@" + config["dst_ip"])
2408- ssh_cmd.append("/bin/cat %s" % arc)
2409-
2410- gpg_cmd = ["gpg", "--decrypt"]
2411-
2412- xb_cmd = ["xbstream", "-x"]
2413-
2414- xb_err = open('/tmp/twindb.xb.err', "w+")
2415- gpg_err = open('/tmp/twindb.gpg.err', "w+")
2416- ssh_err = open('/tmp/twindb.ssh.err', "w+")
2417- logger.info("Starting: %r" % ssh_cmd)
2418- p1 = subprocess.Popen(ssh_cmd, stdout=subprocess.PIPE, stderr=ssh_err)
2419- logger.info("Starting: %r" % gpg_cmd)
2420- p2 = subprocess.Popen(gpg_cmd, stdin=p1.stdout, stdout=subprocess.PIPE, stderr=gpg_err)
2421- logger.info("Starting: %r" % xb_cmd)
2422- p3 = subprocess.Popen(xb_cmd, stdin=p2.stdout, stdout=subprocess.PIPE, stderr=mysql_err, cwd=dst_dir)
2423- p1.stdout.close()
2424- p2.stdout.close()
2425- p3.wait()
2426- xb_err.seek(0)
2427- gpg_err.seek(0)
2428- ssh_err.seek(0)
2429- logger.info("SSH stderr: " + ssh_err.read())
2430- logger.info("GPG stderr: " + gpg_err.read())
2431- logger.info("xbstream stderr: " + xb_err.read())
2432- if p3.returncode != 0:
2433- logger.info("Failed to extract backup %s into %s" % (arc, dst_dir))
2434- return False
2435- except:
2436- logger.info("Failed to extract backup %s into %s" % (arc, dst_dir))
2437- finally:
2438- todelete = ['/tmp/twindb.xb.err', '/tmp/twindb.gpg.err', '/tmp/twindb.ssh.err']
2439- for f in todelete:
2440- if os.path.isfile(f):
2441- os.remove(f)
2442- logger.info("Extracted successfully %s in %s" % (arc, dst_dir))
2443- return True
2444-
2445-
2446-# Restores backup copy with XtraBackup
2447-# Inputs
2448-# config - backup config
2449-# job - job order
2450-# Returns
2451-# True - if backup successfully restored
2452-# False - if restore job failed
2453-
2454 def restore_xtrabackup(config, job):
2455- global logger
2456- global ssh_private_key
2457- global ssh_port
2458-
2459- backup_name = job["restore_backup_copy"]
2460- backup_name_base = os.path.splitext(backup_name)
2461- dst_dir = job["restore_dir"]
2462+ """
2463+ # Restores backup copy with XtraBackup
2464+ # Inputs
2465+ # config - backup config
2466+ # job - job order
2467+ # Returns
2468+ # True - if backup successfully restored
2469+ # False - if restore job failed
2470+ :param config: backup config received from the dispatcher
2471+ :param job: job order
2472+ :return:
2473+ """
2474+ if "params" not in job:
2475+ logger.error("There are no params in the job order")
2476+ return -1
2477+ # Check that job order has all required parameters
2478+ mandatory_params = ["backup_copy_id", "restore_dir", "server_id"]
2479+ for param in mandatory_params:
2480+ if param not in job["params"]:
2481+ logger.error("There is no %s in the job order" % param)
2482+ return -1
2483+ dst_dir = job["params"]["restore_dir"]
2484 try:
2485 if os.path.isdir(dst_dir):
2486 if is_dir_empty(dst_dir):
2487 logger.info("Directory %s exists. But it's empty, so we can restore backup here" % dst_dir)
2488 else:
2489 logger.error("Directory %s exists and isn't empty, so we can not restore backup here" % dst_dir)
2490- return False
2491+ return -1
2492 else:
2493 os.makedirs(dst_dir)
2494- except:
2495+ except IOError as err:
2496+ logger.error(err)
2497 logger.error("Can't use directory %s as destination for backup" % dst_dir)
2498- logger.error(traceback.format_exc())
2499- return False
2500- full_copy = get_full_of(backup_name)
2501- logger.info("Restoring backup %s in %s" % (backup_name, dst_dir))
2502+ return -1
2503+
2504+ full_copy = get_full_of(job["params"]["backup_copy_id"])
2505+ full_copy_id = int(full_copy["backup_copy_id"])
2506+ if not full_copy:
2507+ logger.error("Failed to get full copy parameters")
2508+ return -1
2509+ logger.info("Restoring full copy %s in %s" % (full_copy["name"], dst_dir))
2510 try:
2511- extract_archive(config, full_copy, dst_dir)
2512+ if not extract_archive(config, full_copy, dst_dir):
2513+ raise JobError("Failed to extract %s" % full_copy["name"])
2514 # We restored last full backup in dst_dir
2515- xb_err = open('/tmp/twindb.xb.err', "w+")
2516- if full_copy == backup_name:
2517+ try:
2518+ xb_err = open('/tmp/twindb.xb.err', "w+")
2519+ except IOError as err:
2520+ raise JobError("Failed to open /tmp/twindb.xb.err: %s" % err)
2521+ if full_copy["backup_copy_id"] == job["params"]["backup_copy_id"]:
2522 xb_cmd = ["innobackupex", "--apply-log", dst_dir]
2523 else:
2524 xb_cmd = ["innobackupex", "--apply-log", "--redo-only", dst_dir]
2525- p_xb = subprocess.Popen(xb_cmd, stdout=xb_err, stderr=xb_err)
2526- p_xb.wait()
2527- xb_err.seek(0)
2528- logger.info("innobackupex stderr: " + xb_err.read())
2529- os.remove('/tmp/twindb.xb.err')
2530+ try:
2531+ p_xb = subprocess.Popen(xb_cmd, stdout=xb_err, stderr=xb_err)
2532+ p_xb.communicate()
2533+ except OSError as err:
2534+ raise JobError("Failed to run %r: %s" % (xb_cmd, err))
2535+ finally:
2536+ xb_err.seek(0)
2537+ logger.info("innobackupex stderr: " + xb_err.read())
2538+ os.remove('/tmp/twindb.xb.err')
2539 if p_xb.returncode != 0:
2540- logger.info("Failed to apply log on full copy %s" % (full_copy))
2541- return False
2542+ raise JobError("Failed to apply log on full copy %s" % full_copy["name"])
2543 # if we restore full backup return now
2544- if full_copy == backup_name:
2545- logger.info("Successfully restored backup %s in %s" % (backup_name, dst_dir))
2546- return True
2547+ if full_copy_id == job["params"]["backup_copy_id"]:
2548+ logger.info("Successfully restored backup %s in %s" % (full_copy["name"], dst_dir))
2549+ return 0
2550 # Now copy all incremental copies and apply then on dst_dir
2551- inc_copy = full_copy
2552+ backup_copy = full_copy
2553+ backup_copy_id = full_copy_id
2554 while True:
2555- inc_copy = get_child_of(full_copy)
2556+ backup_copy = get_child_of(backup_copy_id)
2557+ if not backup_copy:
2558+ raise JobError("Failed to get a child copy of backup_copy_id %d" % backup_copy_id)
2559+ backup_copy_id = int(backup_copy["backup_copy_id"])
2560 inc_dir = tempfile.mkdtemp()
2561- extract_archive(config, inc_copy, inc_dir)
2562-
2563- xb_err = open('/tmp/twindb.xb.err', "w+")
2564- xb_cmd = ["innobackupex", '--apply-log']
2565- if inc_copy != backup_name:
2566- xb_cmd.append('--redo-only')
2567+ if not extract_archive(config, backup_copy, inc_dir):
2568+ raise JobError("Failed to extract %s in %s" % (backup_copy["name"], inc_dir))
2569+ try:
2570+ xb_err = open("/tmp/twindb.xb.err", "w+")
2571+ except IOError as err:
2572+ raise JobError("Failed to open /tmp/twindb.xb.err: %s" % err)
2573+ xb_cmd = ["innobackupex", "--apply-log"]
2574+ if backup_copy_id != job["params"]["backup_copy_id"]:
2575+ xb_cmd.append("--redo-only")
2576 xb_cmd.append('--incremental-dir=%s' % inc_dir)
2577 xb_cmd.append(dst_dir)
2578- p_xb = subprocess.Popen(xb_cmd, stdout=xb_err, stderr=xb_err)
2579- p_xb.wait()
2580- xb_err.seek(0)
2581- logger.info("innobackupex stderr: " + xb_err.read())
2582- os.remove('/tmp/twindb.xb.err')
2583+ try:
2584+ p_xb = subprocess.Popen(xb_cmd, stdout=xb_err, stderr=xb_err)
2585+ p_xb.wait()
2586+ except OSError as err:
2587+ raise JobError("Failed to run %r: %s" % (xb_cmd, err))
2588+ finally:
2589+ xb_err.seek(0)
2590+ logger.info("innobackupex stderr: " + xb_err.read())
2591+ os.remove('/tmp/twindb.xb.err')
2592 if p_xb.returncode != 0:
2593- logger.info("Failed to apply log on full copy %s" % (full_copy))
2594- return False
2595- os.removedirs(inc_dir)
2596- if inc_copy == backup_name:
2597+ raise JobError("Failed to apply log on copy %s" % backup_copy["name"])
2598+ shutil.rmtree(inc_dir)
2599+ if backup_copy_id == job["params"]["backup_copy_id"]:
2600 break
2601- logger.info("Successfully restored backup %s in %s" % (backup_name, dst_dir))
2602- except:
2603+ logger.info("Successfully restored backup %s in %s" % (backup_copy["name"], dst_dir))
2604+ except JobError as err:
2605+ logger.error(err)
2606 logger.error("Failed to restore backup")
2607- logger.error(traceback.format_exc())
2608- return False
2609+ return -1
2610 finally:
2611 for f in ["/tmp/twindb.xb.err", "/tmp/twindb.gpg.err", "/tmp/twindb.ssh.err"]:
2612 if os.path.isfile(f):
2613 os.remove(f)
2614- return True
2615-
2616-
2617-# Restores backup copy taken with mysqldump
2618-# Inputs
2619-# config - backup config
2620-# job - job order
2621-# Returns
2622-# True - if backup successfully restored
2623-# False - if restore job failed
2624-
2625-def restore_mysqldump(config, job):
2626- global logger
2627- global ssh_private_key
2628- global ssh_port
2629-
2630- ret = True
2631- backup_name = job["restore_backup_copy"]
2632- backup_name_base = os.path.splitext(backup_name)
2633- dst_dir = job["restore_dir"]
2634- try:
2635- if os.path.isdir(dst_dir):
2636- if is_dir_empty(dst_dir):
2637- logger.info("Directory %s exists. But it's empty, so we can restore backup here" % dst_dir)
2638- else:
2639- logger.error("Directory %s exists and isn't empty, so we can not restore backup here" % dst_dir)
2640- return False
2641- else:
2642- os.makedirs(dst_dir)
2643- except:
2644- logger.error("Can't use directory %s as destination for backup" % dst_dir)
2645- logger.error(traceback.format_exc())
2646- return False
2647- full_copy = get_full_of(backup_name)
2648- # Init mysql datadir
2649- logger.info("Initializing MySQL datadir in %s" % dst_dir)
2650- try:
2651- mysql_socket = dst_dir + "/twindb.sock"
2652- mysql_pid = dst_dir + "/mysqld.pid"
2653- mysql_install_db_cmd = ["mysql_install_db", "--no-defaults"]
2654- mysql_install_db_cmd.append("--datadir=%s" % dst_dir)
2655- mysql_install_db_cmd.append("--user=root")
2656- mysql_install_db_cmd.append("--skip-networking")
2657- mysql_install_db_cmd.append("--log-error=/dev/stdout")
2658- mysql_install_db_cmd.append("--pid-file=%s" % mysql_pid)
2659- mysql_install_db_cmd.append("--socket=%s" % mysql_socket)
2660- p = subprocess.Popen(mysql_install_db_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
2661- out, err = p.communicate()
2662- ret = p.returncode
2663- del p
2664- if ret != 0:
2665- logger.error("Couldn't init MySQL datadir in " + dst_dir)
2666- logger.error("Command: %r" % mysql_install_db_cmd)
2667- logger.error("STDOUT: %s" % out)
2668- logger.error("STDERR: %s" % err)
2669+ return 0
2670+
2671+
2672+def extract_archive(config, arc, dst_dir):
2673+ """
2674+ Extracts an Xtrabackup archive arc in dst_dir
2675+ :param config: backup config
2676+ :param arc: dictionary with archive to extract
2677+ :param dst_dir: local destination directory
2678+ :return: True - if archive is successfully extracted.
2679+ False - if error happened
2680+ """
2681+ mandatory_params = ["backup_copy_id", "name", "ip"]
2682+ for param in mandatory_params:
2683+ if param not in arc:
2684+ logger.error("There is no %s in the archive parameters" % param)
2685 return False
2686- else:
2687- logger.info("MySQL datadir in %s is successfully created" % dst_dir)
2688- except:
2689- logger.error("Can't init MySQL datadir in " + dst_dir)
2690- logger.error(traceback.format_exc())
2691- return False
2692- # Start MySQL instance
2693- logger.info("Starting MySQL instance in %s" % dst_dir)
2694- try:
2695- mysqld_err = open('/tmp/twindb.mysqld.err', "w+")
2696- mysqld_cmd = ["mysqld", "--no-defaults"]
2697- mysqld_cmd.append("--datadir=%s" % dst_dir)
2698- mysqld_cmd.append("--user=root")
2699- mysqld_cmd.append("--skip-networking")
2700- mysqld_cmd.append("--skip-grant-tables")
2701- mysqld_cmd.append("--log-error=/dev/stdout")
2702- mysqld_cmd.append("--pid-file=%s" % mysql_pid)
2703- mysqld_cmd.append("--socket=%s" % mysql_socket)
2704- mysqld_proc = subprocess.Popen(mysqld_cmd, stdout=mysqld_err, stderr=mysqld_err)
2705- start_timeout = 5
2706- while start_timeout >= 0:
2707- if start_timeout == 0:
2708- logger.error("Can't start MySQL instance in " + dst_dir)
2709- try:
2710- mysqld_proc.terminate()
2711- except:
2712- logger.info("MySQL process is already terminated")
2713- finally:
2714- if os.path.isfile("/tmp/twindb.mysqld.err"):
2715- os.remove("/tmp/twindb.mysqld.err")
2716- mysqld_err.seek(0)
2717- logger.error("mysqld output: " + mysqld_err.read())
2718- mysqld_err.close()
2719- return False
2720- try:
2721- con = MySQLdb.connect(unix_socket=mysql_socket)
2722- logger.info("Started MySQL instance in %s successfully" % dst_dir)
2723- start_timeout = -1
2724- del con
2725- except:
2726- start_timeout -= 1
2727- time.sleep(1)
2728- except:
2729- mysqld_err.seek(0)
2730- logger.error("Can't start MySQL instance in " + dst_dir)
2731- logger.error("mysqld output: " + mysqld_err.read())
2732- logger.error(traceback.format_exc())
2733- mysqld_err.close()
2734- try:
2735- mysqld_proc.terminate()
2736- except:
2737- logger.info("MySQL process is already terminated")
2738- finally:
2739- if os.path.isfile("/tmp/twindb.mysqld.err"):
2740- os.remove("/tmp/twindb.mysqld.err")
2741- return False
2742- # Copy, decrypt and restore a dump
2743- logger.info("Restoring backup %s in %s" % (backup_name, dst_dir))
2744- try:
2745- ssh_cmd = ["ssh", "-oStrictHostKeyChecking=no", "-i", ssh_private_key, "-p", str(ssh_port)]
2746- ssh_cmd.append(config["username"] + "@" + config["dst_ip"])
2747- ssh_cmd.append("/bin/cat %s" % backup_name)
2748-
2749- gpg_cmd = ["gpg", "--decrypt"]
2750-
2751- mysql_cmd = ["mysql", "-S", mysql_socket]
2752-
2753- mysql_err = open('/tmp/twindb.mysql.err', "w+")
2754- gpg_err = open('/tmp/twindb.gpg.err', "w+")
2755- ssh_err = open('/tmp/twindb.ssh.err', "w+")
2756- logger.info("Starting: %r" % ssh_cmd)
2757- p1 = subprocess.Popen(ssh_cmd, stdout=subprocess.PIPE, stderr=ssh_err)
2758- logger.info("Starting: %r" % gpg_cmd)
2759- p2 = subprocess.Popen(gpg_cmd, stdin=p1.stdout, stdout=subprocess.PIPE, stderr=gpg_err)
2760- logger.info("Starting: %r" % mysql_cmd)
2761- p3 = subprocess.Popen(mysql_cmd, stdin=p2.stdout, stdout=subprocess.PIPE, stderr=mysql_err)
2762- p1.stdout.close()
2763- p2.stdout.close()
2764- out3 = p3.communicate()[0]
2765- mysql_err.seek(0)
2766- gpg_err.seek(0)
2767- ssh_err.seek(0)
2768- logger.info("SSH stderr: " + ssh_err.read())
2769- logger.info("GPG stderr: " + gpg_err.read())
2770- logger.info("mysql stderr: " + mysql_err.read())
2771- logger.info("Successfully restored backup %s in %s" % (backup_name, dst_dir))
2772- ret = p3.returncode
2773- # Stop MySQL
2774- mysqld_proc.terminate()
2775- except:
2776- logger.error("Failed to restore backup")
2777- logger.error(traceback.format_exc())
2778- return False
2779- finally:
2780- for f in ["/tmp/twindb.mysql.err", "/tmp/twindb.gpg.err", "/tmp/twindb.ssh.err"]:
2781- if os.path.isfile(f):
2782- os.remove(f)
2783- try:
2784- mysqld_proc.terminate()
2785- except:
2786- logger.info("MySQL process is already terminated")
2787+ logger.info("Extracting %s in %s" % (arc["name"], dst_dir))
2788+ ssh_cmd = [
2789+ "ssh",
2790+ "-oStrictHostKeyChecking=no",
2791+ "-i", ssh_private_key_file,
2792+ "-p", str(ssh_port),
2793+ "user_id_%s@%s" % (config["user_id"], arc["ip"]),
2794+ "/bin/cat %s" % arc["name"]
2795+ ]
2796+ gpg_cmd = ["gpg", "--decrypt"]
2797+ xb_cmd = ["xbstream", "-x"]
2798+
2799+ desc_file = None
2800+ try:
2801+ err_desc = dict()
2802+ for desc in ["xb", "gpg", "ssh"]:
2803+ desc_file = ("/tmp/twindb.%s.err" % desc)
2804+ err_desc[desc] = open(desc_file, "w+")
2805+ except IOError as err:
2806+ logger.error("Failed to open %s: %s" % (desc_file, err))
2807+ return False
2808+
2809+ logger.info("Starting: %r" % ssh_cmd)
2810+ p1 = subprocess.Popen(ssh_cmd, stdout=subprocess.PIPE, stderr=err_desc["ssh"])
2811+ logger.info("Starting: %r" % gpg_cmd)
2812+ p2 = subprocess.Popen(gpg_cmd, stdin=p1.stdout, stdout=subprocess.PIPE,
2813+ stderr=err_desc["gpg"])
2814+ logger.info("Starting: %r" % xb_cmd)
2815+ p3 = subprocess.Popen(xb_cmd, stdin=p2.stdout, stdout=subprocess.PIPE,
2816+ stderr=err_desc["xb"], cwd=dst_dir)
2817+ p1.wait()
2818+ p2.wait()
2819+ p3.wait()
2820+ for desc in ["xb", "gpg", "ssh"]:
2821+ err_desc[desc].seek(0)
2822+
2823+ logger.info("SSH stderr: " + err_desc["ssh"].read())
2824+ logger.info("GPG stderr: " + err_desc["gpg"].read())
2825+ logger.info("xbstream stderr: " + err_desc["xb"].read())
2826+ if p1.returncode != 0 or p2.returncode != 0 or p3.returncode != 0:
2827+ logger.info("Failed to extract backup %s in %s" % (arc["name"], dst_dir))
2828+ return False
2829+ desc_file = None
2830+ try:
2831+ for desc in ["xb", "gpg", "ssh"]:
2832+ desc_file = ("/tmp/twindb.%s.err" % desc)
2833+ if os.path.isfile(desc_file):
2834+ os.remove(desc_file)
2835+ except IOError as err:
2836+ logger.error("Failed to open %s: %s" % (desc_file, err))
2837+ logger.info("Extracted successfully %s in %s" % (arc["name"], dst_dir))
2838 return True
2839
2840
2841-# Finds LSN in XtraBackup output
2842-
2843-def grep_lsn(str):
2844+def get_full_of(backup_copy_id):
2845+ """
2846+ Gets full copy of a given backup_copy_id
2847+ :param backup_copy_id: backup_copy_id
2848+ :return: dictionary with full backup copy parameters:
2849+ {
2850+ "backup_copy_id": 188,
2851+ "name": "server_id_479a41b3-d22d-41a8-b7d3-4e40302622f6_2015-04-06T15:10:46.050984.tar.gp",
2852+ "ip": "127.0.0.1"
2853+ }
2854+ """
2855+ logger.debug("Getting full copy parameters of backup_copy_id %d" % backup_copy_id)
2856+ response_body = "Empty response"
2857+ full_copy = None
2858+ try:
2859+ data = {
2860+ "type": "get_full_copy_params",
2861+ "params": {
2862+ "backup_copy_id": backup_copy_id
2863+ }
2864+ }
2865+ response_body = get_response(data)
2866+ if not response_body:
2867+ logger.debug("Empty response from dispatcher")
2868+ return None
2869+ d = json.JSONDecoder()
2870+ response_body_decoded = d.decode(response_body)
2871+ if response_body_decoded:
2872+ msg_decrypted = decrypt(response_body_decoded["response"])
2873+ msg_pt = d.decode(msg_decrypted)
2874+ full_copy = msg_pt["data"]
2875+ logger.info("Got full copy params:\n%s"
2876+ % json.dumps(full_copy, indent=4, sort_keys=True))
2877+ if msg_pt["error"]:
2878+ logger.error(msg_pt["error"])
2879+ except exceptions.KeyError as err:
2880+ logger.error("Failed to decode %s" % response_body)
2881+ logger.error(err)
2882+ return None
2883+ return full_copy
2884+
2885+
2886+def get_child_of(backup_copy_id):
2887+ """
2888+ Returns a child copy params of a given backup copy
2889+ :param backup_copy_id:
2890+ :return:
2891+ """
2892+ logger.debug("Getting child copy parameters of backup_copy_id %d" % backup_copy_id)
2893+ response_body = "Empty response"
2894+ child_copy = None
2895+ try:
2896+ data = {
2897+ "type": "get_child_copy_params",
2898+ "params": {
2899+ "backup_copy_id": backup_copy_id
2900+ }
2901+ }
2902+ response_body = get_response(data)
2903+ if not response_body:
2904+ return None
2905+ d = json.JSONDecoder()
2906+ response_body_decoded = d.decode(response_body)
2907+ if response_body_decoded:
2908+ msg_decrypted = decrypt(response_body_decoded["response"])
2909+ msg_pt = d.decode(msg_decrypted)
2910+ child_copy = msg_pt["data"]
2911+ logger.info("Got child copy params:\n%s"
2912+ % json.dumps(child_copy, indent=4, sort_keys=True))
2913+ if msg_pt["error"]:
2914+ logger.error(msg_pt["error"])
2915+ except exceptions.KeyError as err:
2916+ logger.error("Failed to decode %s" % response_body)
2917+ logger.error(err)
2918+ return None
2919+ return child_copy
2920+
2921+
2922+def grep_lsn(output):
2923+ """
2924+ Finds LSN in XtraBackup output
2925+ :param output: string with Xtrabackup output
2926+ :return: LSN
2927+ """
2928 lsn = None
2929- for line in str.split("\n"):
2930+ for line in output.split("\n"):
2931 if line.startswith("xtrabackup: The latest check point (for incremental):"):
2932 lsn = line.split("'")[1]
2933 return lsn
2934
2935
2936-# Finds MySQL socket
2937 def get_unix_socket():
2938- socket = ""
2939+ """
2940+ Finds MySQL socket
2941+ :return: path to unix socket or None if not found
2942+ """
2943+ cmd = ["lsof", "-U", "-c", "/^mysqld$/", "-a", "-F", "n"]
2944 try:
2945- cmd = [
2946- "lsof", "-U", "-c", "/^mysqld$/", "-a", "-F", "n"
2947- ]
2948 p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
2949 cout, cerr = p.communicate()
2950 # Outputs socket in format
2951 # # lsof -U -c mysqld -a -F n
2952 # p11029
2953 # n/var/lib/mysql/mysql.sock
2954- socket = cout.split()[1][1:]
2955- if not os.path.exists(socket):
2956+ mysql_socket = cout.split()[1][1:]
2957+ if not os.path.exists(mysql_socket):
2958 return None
2959- except:
2960+ except OSError as err:
2961+ logger.error("Failed to run command %r. %s" % (cmd, err))
2962 return None
2963-
2964- return socket
2965+ return mysql_socket
2966
2967
2968 def pid_exists(pid):
2969- """Check whether pid exists in the current process table."""
2970+ """
2971+ Checks whether pid exists in the current process table.
2972+ """
2973 if pid < 0:
2974 return False
2975 try:
2976 os.kill(pid, 0)
2977- except OSError, e:
2978+ except OSError as e:
2979 return e.errno == errno.EPERM
2980 else:
2981 return True
2982@@ -2194,12 +1916,12 @@
2983 Raise TimeoutExpired on timeout expired (if specified).
2984 """
2985
2986- def check_timeout(delay):
2987+ def check_timeout(timeout_delay):
2988 if timeout is not None:
2989 if time.time() >= stop_at:
2990 raise TimeoutExpired
2991- time.sleep(delay)
2992- return min(delay * 2, 0.04)
2993+ time.sleep(timeout_delay)
2994+ return min(timeout_delay * 2, 0.04)
2995
2996 if timeout is not None:
2997 waitcall = lambda: os.waitpid(pid, os.WNOHANG)
2998@@ -2247,100 +1969,155 @@
2999 raise RuntimeError("unknown process exit status")
3000
3001
3002-# Removes pid file
3003-# Exits if error happened
3004-
3005 def remove_pid():
3006- global pid_file
3007- try:
3008- if os.path.isfile(pid_file):
3009+ """
3010+ Removes pid file
3011+ :return: nothing
3012+ Exits if error happened
3013+ """
3014+ if os.path.isfile(pid_file):
3015+ try:
3016 os.remove(pid_file)
3017- except:
3018- logger.error(traceback.format_exc())
3019- sys.exit(2)
3020-
3021-
3022-# Cleans up when TwinDB agent exists
3023+ except IOError as err:
3024+ exit_on_error("Failed to remove file %s. %s" % (pid_file, err))
3025+
3026+
3027+def check_pid():
3028+ """
3029+ Checks if pid file already exists.
3030+ If it does it detects whether twindb agent is already running.
3031+ If the pid file is stale it removes it.
3032+ :return: True if pid file doesn't exist or was stale and it was removed.
3033+ False if twindb agent is running or error happened
3034+ """
3035+ if os.path.isfile(pid_file):
3036+ pid = read_pid()
3037+ if pid_exists(pid):
3038+ try:
3039+ f = open("/proc/%d/cmdline" % pid, "r")
3040+ cmdline = f.readline()
3041+ f.close()
3042+ if "twindb.py" in cmdline:
3043+ # The process is a live twindb agent
3044+ return False
3045+ else:
3046+ # It's some other process, not a twindb agent
3047+ return True
3048+ except IOError as err:
3049+ logger.error("Can't read from file /proc/%d/cmdline:%s " % (pid, err))
3050+ return False
3051+ else:
3052+ try:
3053+ os.remove(pid_file)
3054+ # It's a stale pid file
3055+ return True
3056+ except IOError as err:
3057+ logger.error("Can't remove file %s: %s" % (pid_file, err))
3058+ return False
3059+ else:
3060+ # pid file doesn't exist
3061+ return True
3062+
3063+
3064+def read_pid():
3065+ """
3066+ Read pid from pid_file
3067+ :return: pid or zero if pid file doesn't exist
3068+ """
3069+ try:
3070+ f = open(pid_file, 'r')
3071+ pid = int(f.readline())
3072+ f.close()
3073+ except IOError as err:
3074+ logger.error("Couldn't read from %s: %s" % (pid_file, err))
3075+ return 0
3076+ return int(pid)
3077+
3078+
3079+def write_pid():
3080+ """
3081+ Writes pid of the current process to the pid file
3082+ :return: nothing.
3083+ Exists if error happened
3084+ """
3085+ try:
3086+ f = open(pid_file, "w")
3087+ f.write(str(os.getpid()))
3088+ f.close()
3089+ except IOError as err:
3090+ logger.error("Couldn't save process id in " + pid_file)
3091+ exit_on_error(err)
3092+
3093
3094 def cleanup(signum, frame):
3095- global logger
3096- global pid_file
3097+ """
3098+ Cleans up when TwinDB agent exists
3099+ :param signum:
3100+ :param frame:
3101+ :return:
3102+ """
3103
3104 logger.info("Cleaning up on signal " + str(signum))
3105- remove_pid()
3106+ logger.debug("Frame %r" % frame)
3107 logger.info("TwinDB agent is ready to exit")
3108 sys.exit(0)
3109
3110
3111-# Reports error removes pid and exits
3112-
3113-def exit_on_error(tb=None):
3114- if tb is not None:
3115- logger.debug(tb)
3116- remove_pid()
3117+def exit_on_error(message):
3118+ """
3119+ Reports error removes pid and exits
3120+ :rtype : object
3121+ :param message: message to display
3122+ :return:
3123+ """
3124+ logger.error(message)
3125+ logger.debug(traceback.format_exc())
3126 sys.exit(2)
3127
3128
3129 # Stops TwinDB agent
3130
3131 def stop():
3132- global logger
3133- global pid_file
3134+ """
3135+ Stops TwinDB agent
3136+ :return: nothing
3137+ """
3138
3139 logger.info("Shutting down TwinDB agent")
3140
3141 if not os.path.exists(pid_file):
3142 logger.info("Pid file %s does not exist. Probably twindb agent isn't running" % pid_file)
3143 sys.exit(0)
3144-
3145+ pid = None
3146 try:
3147 f = open(pid_file, 'r')
3148 pid = int(f.readline())
3149 os.kill(pid, signal.SIGTERM)
3150 wait_pid(pid, 300)
3151 f.close()
3152+ remove_pid()
3153 except OSError as err:
3154 logger.error("Couldn't kill process %d" % pid)
3155- logger.error(err)
3156- exit_on_error(traceback.format_exc())
3157+ exit_on_error(err)
3158 except IOError as err:
3159 logger.error("Couldn't read from %s" % pid_file)
3160- logger.error(err)
3161- exit_on_error(traceback.format_exc())
3162- except:
3163- logger.error("Unexpected error: %s,%s" % sys.exc_info()[:2]);
3164- exit_on_error(traceback.format_exc())
3165- finally:
3166- remove_pid()
3167+ exit_on_error(err)
3168 logger.info("TwinDB agent successfully shut down")
3169 sys.exit(0)
3170
3171
3172-# Starts TwinDB agent
3173-
3174 def start():
3175- global logger
3176- global pid_file
3177- global check_period
3178-
3179+ """
3180+ Starts TwinDB agent
3181+ :return: nothing
3182+ """
3183+ global job_id
3184 logger.info("Starting TwinDB agent")
3185
3186- if os.path.isfile(pid_file):
3187- logger.error("PID file " + pid_file + " exists");
3188- logger.error("Check if TwinDB agent is already running");
3189- logger.error("If not, remove file " + pid_file + " manually and restart the agent");
3190- sys.exit(2)
3191- try:
3192- f = open(pid_file, 'w')
3193- f.write(str(os.getpid()))
3194- f.close()
3195- except IOError as err:
3196- logger.error("Couldn't save process id in " + pid_file)
3197- logger.error(err)
3198- exit_on_error(traceback.format_exc())
3199- except:
3200- logger.error("Unexpected error: %s,%s" % sys.exc_info()[:2]);
3201- exit_on_error(traceback.format_exc())
3202+ if check_pid():
3203+ write_pid()
3204+ else:
3205+ exit_on_error("Another instance of TwinDB agent is running. Exiting")
3206 try:
3207 while True:
3208 read_config()
3209@@ -2348,7 +2125,7 @@
3210 if not config:
3211 time.sleep(check_period)
3212 continue
3213- report_sss()
3214+ report_sss(config)
3215 report_agent_privileges(config)
3216 job = get_job()
3217 if job:
3218@@ -2356,130 +2133,111 @@
3219 time.sleep(check_period)
3220 except SystemExit:
3221 remove_pid()
3222- except:
3223- exit_on_error(traceback.format_exc())
3224 sys.exit(0)
3225
3226
3227-# Processes job
3228-# Inputs
3229-# config - backup config
3230-# job - job order
3231-# Returns what respective job function returns or False if error happens
3232 def process_job(config, job):
3233- global logger
3234+ """
3235+ Processes job
3236+ :param config: backup config
3237+ :param job: job order
3238+ :return: what respective job function returns or False if error happens
3239+ """
3240 global job_id
3241+ # Check to see that the twindb_agent MySQL user has enough privileges
3242+ username = config["mysql_user"]
3243+ password = config["mysql_password"]
3244+
3245+ job_id = int(job["job_id"])
3246
3247 try:
3248- # Check to see that the twindb_agent MySQL user has enough privileges
3249- username = config["mysql_user"]
3250- password = config["mysql_password"]
3251-
3252 mysql_access_available, missing_mysql_privileges = has_mysql_access(username, password, grant_capability=False)
3253 if not mysql_access_available:
3254 logger.error("The MySQL user %s does not have all the required privileges." % username)
3255- if len(missing_mysql_privileges) > 0:
3256- logger.error("You can grant the required privileges by executing the following SQL: "
3257- "GRANT %s ON *.* TO '%s'@'localhost'" % (','.join(missing_mysql_privileges), username))
3258-
3259- return False
3260-
3261- if job["start_scheduled"] == None:
3262- logger.error("Job start time isn't set")
3263- return False
3264-
3265+ if missing_mysql_privileges:
3266+ raise JobError("You can grant the required privileges by executing the following SQL: "
3267+ "GRANT %s ON *.* TO '%s'@'localhost'" % (','.join(missing_mysql_privileges), username))
3268+ if not job["start_scheduled"]:
3269+ raise JobError("Job start time isn't set")
3270 start_scheduled = int(job["start_scheduled"])
3271 now = int(time.time())
3272 if now < start_scheduled:
3273- logger.info("Job is scheduled on %s, now %s"
3274- % (time.ctime(start_scheduled), time.ctime(now)))
3275- return False
3276-
3277+ raise JobTooSoonError("Job is scheduled on %s, now %s"
3278+ % (time.ctime(start_scheduled), time.ctime(now)))
3279 logger.info("Processing job_id = %d", int(job_id))
3280 if job["type"] == "backup":
3281 return take_backup(config, job)
3282 elif job["type"] == "restore":
3283- #return restore_backup(config, job)
3284- return False
3285+ return restore_backup(config, job)
3286 elif job["type"] == "send_key":
3287- return handler_send_key(config, job)
3288+ return handler_send_key(job)
3289 else:
3290- logger.error("Unsupported job type " + job["type"])
3291- return False
3292- except:
3293- logger.error("Unexpected error: %s,%s" % sys.exc_info()[:2])
3294+ raise JobError("Unsupported job type " + job["type"])
3295+ except JobError as err:
3296+ logger.error("Job error: %s", err)
3297+ return False
3298+ except JobTooSoonError as err:
3299+ logger.debug(err)
3300 return False
3301 finally:
3302 job_id = 0
3303- return False
3304
3305
3306 def setup_logging():
3307+ """
3308+ Setups logging handlers, formats
3309+ :return:
3310+ """
3311 global logger
3312
3313- ch = logging.StreamHandler()
3314- sh = logging.handlers.SysLogHandler()
3315- rh = RlogHandler()
3316+ console_handler = logging.StreamHandler()
3317+ remote_handler = RlogHandler()
3318+
3319 fmt_str = '%(name)s: %(levelname)s: %(funcName)s():%(lineno)d: %(message)s'
3320- sfmt = logging.Formatter(fmt_str)
3321- cfmt = logging.Formatter("%(asctime)s: " + fmt_str)
3322- ch.setFormatter(cfmt)
3323- sh.setFormatter(sfmt)
3324- rh.setFormatter(sfmt)
3325- logger.addHandler(sh)
3326- logger.addHandler(ch)
3327- logger.addHandler(rh)
3328+ remote_format = logging.Formatter(fmt_str)
3329+ console_format = logging.Formatter("%(asctime)s: " + fmt_str)
3330+
3331+ console_handler.setFormatter(console_format)
3332+ remote_handler.setFormatter(remote_format)
3333+
3334+ logger.addHandler(console_handler)
3335+ logger.addHandler(remote_handler)
3336 logger.setLevel(logging.INFO)
3337- # syslog handler shouldn't log DEBUG messages
3338- sh.setLevel(logging.INFO)
3339-
3340-
3341-# Main function
3342-# Parses options, creates log class etc
3343+
3344
3345 def main():
3346+ """
3347+ Main function
3348+ Parses options, creates log class etc
3349+ :return:
3350+ """
3351+ global server_id
3352+ global check_period
3353+ global host
3354+ global mysql_user
3355+ global mysql_password
3356+ global debug
3357+ global logger
3358+
3359+ setup_logging()
3360+ read_config()
3361 # before we do *anything* we must ensure server_id is generated or read from config
3362- global host
3363- global init_config
3364- global server_id
3365- global check_period
3366- global logger
3367- global mysql_user
3368- global mysql_password
3369- global debug
3370-
3371 # Read or generate server id
3372- if os.path.exists(init_config):
3373- try:
3374- execfile(init_config, globals())
3375- if len(server_id) == 0:
3376- print("Error: Config %s doesn't set server_id"
3377- % init_config, file=sys.stderr)
3378- sys.exit(2)
3379- except:
3380- print(traceback.format_exc())
3381- print("Error: Failed to read from config in %s"
3382- % init_config, file=sys.stderr)
3383- sys.exit(2)
3384- else:
3385+ if not server_id:
3386 server_id = str(uuid.uuid4())
3387- save_config(server_id)
3388+ save_config()
3389
3390 # At this point server_id must be set
3391- if len(server_id) == 0:
3392- print("Error: Server id is empty. Can not continue operation",
3393- file=sys.stderr)
3394+ if not server_id:
3395+ print("Error: Server id is empty. Can not continue operation", file=sys.stderr)
3396 sys.exit(2)
3397-
3398- setup_logging()
3399-
3400 # Signal handlers
3401 signal.signal(signal.SIGTERM, cleanup)
3402 signal.signal(signal.SIGINT, cleanup)
3403
3404 # Not clear why I ignore SIGCHLD.
3405 # it make subprocess.call throw exception, so I comment it out so far
3406- #signal.signal(signal.SIGCHLD, signal.SIG_IGN)
3407+ # signal.signal(signal.SIGCHLD, signal.SIG_IGN)
3408
3409 opts = []
3410 try:
3411@@ -2489,15 +2247,11 @@
3412 "unregister", "delete-backups", "backup", "is-registered"])
3413 except getopt.GetoptError as err:
3414 logger.error(err)
3415- exit_on_error(traceback.format_exc())
3416- except:
3417- logger.error("Unexpected error: %s,%s" % sys.exc_info()[:2]);
3418- usage()
3419- exit_on_error(traceback.format_exc())
3420
3421 # Set options first
3422 action = None
3423 delete_backups = False
3424+ regcode = None
3425 for opt, arg in opts:
3426 if opt == '-i':
3427 check_period = int(arg)
3428@@ -2547,29 +2301,32 @@
3429 check_period = 1
3430
3431 if action == "start":
3432- if is_registered():
3433- start()
3434- else:
3435- logger.error(
3436- "The server must be registered first. Run following command:\n\ntwindb --register <registration code>\n")
3437- logger.error("Get your code on https://console.twindb.com/?get_code")
3438- sys.exit(2)
3439+ while True:
3440+ # It'll keep checking until the agent is registered
3441+ # Then it starts
3442+ if is_registered():
3443+ start()
3444+ else:
3445+ logger.error("The server must be registered first. Run following command:\n\n"
3446+ "twindb --register <registration code>\n")
3447+ logger.error("Get your code on https://console.twindb.com/?get_code")
3448+ time.sleep(check_period)
3449 elif action == "stop":
3450 stop()
3451 elif action == "register":
3452 action_handler_register(regcode)
3453 elif action == "unregister":
3454 if not is_registered():
3455- logger.error(
3456- "The server must be registered first. Run following command:\n\ntwindb --register <registration code>\n")
3457+ logger.error("The server must be registered first. Run following command:"
3458+ "\n\ntwindb --register <registration code>\n")
3459 logger.error("Get your code on https://console.twindb.com/?get_code")
3460 sys.exit(2)
3461 if action_handler_unregister(delete_backups):
3462 stop()
3463 elif action == "backup":
3464 if not is_registered():
3465- logger.error(
3466- "The server must be registered first. Run following command:\n\ntwindb --register <registration code>\n")
3467+ logger.error("The server must be registered first. Run following command:"
3468+ "\n\ntwindb --register <registration code>\n")
3469 logger.error("Get your code on https://console.twindb.com/?get_code")
3470 sys.exit(2)
3471 if not action_handler_backup():
3472@@ -2583,9 +2340,8 @@
3473 sys.exit(0)
3474 else:
3475 # If we got there print usage() and exit
3476- logger.error("Neither --start nor --stop nor --register is specified")
3477+ logger.error("Failed to parse command line options")
3478 usage()
3479- remove_pid()
3480 sys.exit(2)
3481
3482

Subscribers

People subscribed via source and target branches

to all changes: