Merge lp:~ovais-tariq/twindb-agent/bug-1426993 into lp:twindb-agent

Proposed by Ovais Tariq
Status: Merged
Approved by: Aleksandr Kuzminsky
Approved revision: 11
Merged at revision: 10
Proposed branch: lp:~ovais-tariq/twindb-agent/bug-1426993
Merge into: lp:twindb-agent
Diff against target: 1800 lines (+446/-272)
1 file modified
twindb.py (+446/-272)
To merge this branch: bzr merge lp:~ovais-tariq/twindb-agent/bug-1426993
Reviewer Review Type Date Requested Status
TwinDB Developers Pending
Review via email: mp+251876@code.launchpad.net
To post a comment you must log in.

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'twindb.py'
2--- twindb.py 2015-02-22 01:45:12 +0000
3+++ twindb.py 2015-03-05 07:58:37 +0000
4@@ -1,24 +1,25 @@
5-
6 # modules
7 from __future__ import print_function
8 from __future__ import unicode_literals
9 from __future__ import absolute_import
10-from sys import argv
11 from base64 import b64encode, b64decode
12 from datetime import datetime
13-import sys, time, getopt, signal, traceback
14+import sys
15+import time
16+import getopt
17+import signal
18+import traceback
19 import exceptions
20 import errno
21 import socket
22-import logging
23 import logging.handlers
24 import os.path
25-import binascii
26 import subprocess
27 import httplib
28 import json
29 import urllib
30-import re
31+
32+
33 try:
34 import mysql.connector
35 except ImportError:
36@@ -28,26 +29,26 @@
37 import uuid
38
39 # global variables
40-host = "dispatcher.twindb.com"
41-proto = "http"
42-api_dir = ""
43-debug = False
44-debug_http = True
45-debug_gpg = False
46-init_config = "/etc/twindb.cfg"
47+host = "dispatcher.twindb.com"
48+proto = "http"
49+api_dir = ""
50+debug = False
51+debug_http = True
52+debug_gpg = False
53+init_config = "/etc/twindb.cfg"
54 ssh_private_key = "/root/.ssh/twindb.key"
55-ssh_public_key = "/root/.ssh/twindb.key.pub"
56-ssh_port = 4194
57-gpg_homedir = "/root/.gnupg/"
58-pid_file = "/var/run/twindb.pid"
59-check_period = 300
60-time_zone = "UTC"
61-api_email = "api@twindb.com"
62-server_id = ""
63-job_id = 0
64-mysql_user = "root"
65-mysql_password = ""
66-api_pub_key = """
67+ssh_public_key = "/root/.ssh/twindb.key.pub"
68+ssh_port = 4194
69+gpg_homedir = "/root/.gnupg/"
70+pid_file = "/var/run/twindb.pid"
71+check_period = 300
72+time_zone = "UTC"
73+api_email = "api@twindb.com"
74+server_id = ""
75+job_id = 0
76+mysql_user = "root"
77+mysql_password = ""
78+api_pub_key = """
79 -----BEGIN PGP PUBLIC KEY BLOCK-----
80 Version: GnuPG v1
81
82@@ -101,7 +102,7 @@
83 =62IM
84 -----END PGP PUBLIC KEY BLOCK-----
85 """
86-agent_version = "@@TWINDB_AGENT_VERSION@@"
87+agent_version = "@@TWINDB_AGENT_VERSION@@"
88
89
90 # Logging handler that logs to remote TwiDB dispatcher
91@@ -126,6 +127,7 @@
92 get_response(request)
93 self.log_flag = True
94
95+
96 def usage():
97 print("Usage:\n\
98 twindb --start | --stop | --register <registration code> [-g] [-i interval]\n\
99@@ -152,27 +154,28 @@
100 return
101
102
103-def get_mysql_connection(user = None, passwd = None):
104+def get_mysql_connection(user=None, passwd=None):
105 global mysql_user
106 global mysql_password
107
108 try:
109- if user == None:
110+ if user is None:
111 user = mysql_user
112- if passwd == None:
113+ if passwd is None:
114 passwd = mysql_password
115 mysql_option_files = []
116 unix_socket = get_unix_socket()
117 conn = mysql.connector.connect()
118- if user != None:
119+ if user is not None:
120 for f in ['/etc/mysql/my.cnf', '/etc/my.cnf', '/root/.my.cnf']:
121 if os.path.exists(f):
122 mysql_option_files.append(f)
123 if mysql_option_files:
124- conn = mysql.connector.connect(option_files = mysql_option_files, unix_socket = unix_socket)
125- logger.debug("Connected to MySQL as '%s'@'localhost' with options files %r" % (conn.user, mysql_option_files) )
126+ conn = mysql.connector.connect(option_files=mysql_option_files, unix_socket=unix_socket)
127+ logger.debug(
128+ "Connected to MySQL as '%s'@'localhost' with options files %r" % (conn.user, mysql_option_files))
129 if not conn.is_connected():
130- conn = mysql.connector.connect(user = user, passwd = passwd, unix_socket = unix_socket)
131+ conn = mysql.connector.connect(user=user, passwd=passwd, unix_socket=unix_socket)
132 logger.debug("Connected to MySQL as %s@localhost " % conn.user)
133 except mysql.connector.Error as err:
134 logger.error("Can not connect to local MySQL server")
135@@ -184,11 +187,12 @@
136 return None
137 return conn
138
139+
140 # Checks if TwinDB API public key is installed
141 # Returns
142-# True - if the key is installed
143-# False - if not
144-def is_gpg_key_installed(email, key_type = "public"):
145+# True - if the key is installed
146+# False - if not
147+def is_gpg_key_installed(email, key_type="public"):
148 global logger
149 global gpg_homedir
150 global debug_gpg
151@@ -225,6 +229,7 @@
152 exit_on_error(traceback.format_exc())
153 return result
154
155+
156 # Installs TwinDB public key
157 # Returns
158 # True - if the key is successfully installed
159@@ -238,9 +243,9 @@
160
161 logger.info("Installing twindb public key")
162 try:
163- p1 = subprocess.Popen(["echo", api_pub_key], stdout = subprocess.PIPE)
164- p2 = subprocess.Popen(["gpg", "--homedir", gpg_homedir, "--import"],
165- stdin = p1.stdout)
166+ p1 = subprocess.Popen(["echo", api_pub_key], stdout=subprocess.PIPE)
167+ p2 = subprocess.Popen(["gpg", "--homedir", gpg_homedir, "--import"],
168+ stdin=p1.stdout)
169 p1.stdout.close()
170 except:
171 logger.error("Couldn't install TwinDB public key")
172@@ -248,6 +253,7 @@
173 logger.info("Twindb public key successfully installed")
174 return True
175
176+
177 # Checks if GPG environment is good to start TwinDB agent
178 # Installs TwinDB public key if necessary
179 # Returns
180@@ -262,7 +268,7 @@
181 global init_config
182 global server_id
183 global debug_gpg
184-
185+
186 if debug_gpg:
187 logger.debug("Checking if GPG config is initialized")
188 if os.path.exists(gpg_homedir):
189@@ -282,27 +288,29 @@
190 if len(server_id) == 0:
191 exit_on_error("Server id is neither read from config nor generated")
192 email = "%s@twindb.com" % server_id
193- if not ( is_gpg_key_installed(email) and is_gpg_key_installed(email, "private")) :
194+ if not ( is_gpg_key_installed(email) and is_gpg_key_installed(email, "private")):
195 gen_entropy()
196 gen_gpg_keypair("%s@twindb.com" % server_id)
197 if debug_gpg:
198 logger.debug("GPG config is OK")
199 return True
200
201-# Checks the enviroment if it's OK to start TwinDB agent
202+
203+# Checks the environment if it's OK to start TwinDB agent
204 # Returns
205-# True - if the enviroment is OK
206+# True - if the environment is OK
207 # Exits if the environment is not OK
208
209 def check_env():
210 global logger
211- logger.debug("Checking enviroment")
212+ logger.debug("Checking environment")
213 if os.getuid() != 0:
214 exit_on_error("TwinDB agent must be run by root")
215 check_gpg()
216- logger.debug("Enviroment is OK")
217+ logger.debug("Environment is OK")
218 return True
219
220+
221 # Checks how much entropy is available in the system
222 # If not enough, does some disk activity to generate more
223 def gen_entropy():
224@@ -327,7 +335,7 @@
225 i = i + 1
226 except:
227 logger.error("Failed to generate entropy")
228- return
229+ return
230
231
232 # Formats bytes count to human readable form
233@@ -337,13 +345,14 @@
234 # Returns
235 # human-readable string like "20.33 MB"
236
237-def h_size(num, decimals = 2):
238+def h_size(num, decimals=2):
239 fmt = "%3." + str(decimals) + "f %s"
240- for x in ['bytes','kB','MB','GB','TB', 'PB']:
241+ for x in ['bytes', 'kB', 'MB', 'GB', 'TB', 'PB']:
242 if num < 1024.0:
243 return fmt % (num, x)
244 num /= 1024.0
245
246+
247 # Generates random string of bytes good for crypthography
248 # Inputs
249 # n - Number of bytes
250@@ -353,6 +362,7 @@
251 def gen_key(n):
252 return os.urandom(n)
253
254+
255 # Generates GPG private and public keys for a given recipient
256 # Inputs
257 # email - recipient
258@@ -383,17 +393,18 @@
259 %%commit
260 %%echo done
261 """ % (server_id, email)
262- p1 = subprocess.Popen(["echo", gpg_script], stdout = subprocess.PIPE)
263- p2 = subprocess.Popen(["gpg", "--homedir", gpg_homedir, "--batch",
264- "--gen-key"], stdin = p1.stdout)
265+ p1 = subprocess.Popen(["echo", gpg_script], stdout=subprocess.PIPE)
266+ p2 = subprocess.Popen(["gpg", "--homedir", gpg_homedir, "--batch",
267+ "--gen-key"], stdin=p1.stdout)
268 p1.stdout.close()
269 p2.wait()
270- del p1,p2
271+ del p1, p2
272 except:
273 logger.error("Failed to generate GPG keys pair")
274 exit_on_error(traceback.format_exc())
275 return True
276
277+
278 # Encrypts message with TwinDB public key
279 # If server_id is non-zero (which means the server is registered)
280 # signs the message with the server's private key
281@@ -423,7 +434,7 @@
282 logger.debug("Will not encrypt message")
283 return None
284 enc_cmd = ["gpg", "--homedir", gpg_homedir, "-r", api_email, "--batch",
285- "--trust-model", "always", "--armor"]
286+ "--trust-model", "always", "--armor"]
287 enc_cmd.append("--sign")
288 enc_cmd.append("--local-user")
289 enc_cmd.append(server_email)
290@@ -434,10 +445,10 @@
291 logger.debug(msg)
292 logger.debug("Encryptor command: ")
293 logger.debug(enc_cmd)
294- p = subprocess.Popen(enc_cmd,
295- stdin = subprocess.PIPE,
296- stdout = subprocess.PIPE,
297- stderr = subprocess.PIPE)
298+ p = subprocess.Popen(enc_cmd,
299+ stdin=subprocess.PIPE,
300+ stdout=subprocess.PIPE,
301+ stderr=subprocess.PIPE)
302 cout, cerr = p.communicate(msg)
303 ct = cout
304 ct_64 = b64encode(ct)
305@@ -450,12 +461,13 @@
306 return None
307 return ct_64
308
309+
310 # Decrypts message with local private key
311 # Inputs
312 # msg - 64-base encoded and encrypted message.
313 # Before encryption the message was 64-base encoded
314 # Returns
315-# Plain text messgae
316+# String - Plain text message
317 # None - if error happens
318
319 def decrypt(msg_64):
320@@ -468,20 +480,27 @@
321 if not msg_64:
322 logger.error("Will not decrypt empty message")
323 return None
324+
325+ cout = None
326+ cerr = None
327+
328 try:
329 dec_cmd = ["gpg", "--homedir", gpg_homedir, "-d", "-q"]
330+ debug_gpg = True
331 if debug_gpg:
332 logger.debug("Decrypting message:")
333 logger.debug(msg_64)
334 logger.debug("Decryptor command:")
335 logger.debug(dec_cmd)
336- p = subprocess.Popen(dec_cmd,
337- stdin = subprocess.PIPE,
338- stdout = subprocess.PIPE,
339- stderr = subprocess.PIPE)
340+ p = subprocess.Popen(dec_cmd,
341+ stdin=subprocess.PIPE,
342+ stdout=subprocess.PIPE,
343+ stderr=subprocess.PIPE)
344 msg = b64decode(msg_64)
345- cout, cerr = p.communicate(msg);
346- pt = cout
347+ cout, cerr = p.communicate(msg)
348+
349+ if p.returncode != 0:
350+ raise OSError(p.returncode)
351 except:
352 logger.error("Failed to decrypt message: " + msg_64)
353 logger.error("Unexpected error: %s, %s" % sys.exc_info()[:2]);
354@@ -492,8 +511,8 @@
355 return None
356 if debug_gpg:
357 logger.debug("Decrypted message:")
358- logger.debug(pt)
359- return pt
360+ logger.debug(cout)
361+ return cout
362
363
364 # Sends HTTP POST request to TwinDB dispatcher
365@@ -511,9 +530,11 @@
366 global host
367 global proto
368 global api_dir
369-
370+
371 uri = "api.php"
372 response_body = None
373+ conn = None
374+
375 logger.debug("Enter get_response(uri=" + uri + ")")
376 if proto == "http":
377 conn = httplib.HTTPConnection(host)
378@@ -528,8 +549,8 @@
379 data_json = json.dumps(request)
380 data_json_enc = encrypt(data_json)
381 data_json_enc_urlenc = urllib.urlencode({'data': data_json_enc})
382- request = conn.putrequest('POST', "/" + api_dir + "/" + uri)
383- headers = {}
384+ conn.putrequest('POST', "/" + api_dir + "/" + uri)
385+ headers = dict()
386 headers['Content-Length'] = "%d" % (len(data_json_enc_urlenc))
387 headers['Content-Type'] = 'application/x-www-form-urlencoded'
388 for k in headers:
389@@ -537,25 +558,26 @@
390 conn.endheaders()
391 conn.send(data_json_enc_urlenc)
392 http_response = conn.getresponse()
393- if(http_response.status == 200):
394+
395+ if http_response.status == 200:
396 response_body = http_response.read()
397- if(len(response_body) == 0):
398+ if len(response_body) == 0:
399 return None
400 url = "%(proto)s://%(host)s/%(api_dir)s/%(uri)s" % {
401- "proto": proto,
402- "host": host,
403- "api_dir": api_dir,
404+ "proto": proto,
405+ "host": host,
406+ "api_dir": api_dir,
407 "uri": uri
408 }
409 d = json.JSONDecoder()
410 try:
411- json_object = json.loads(response_body)
412- except ValueError as e:
413+ json.loads(response_body)
414+ except ValueError:
415 msg = response_body
416 else:
417 msg = json.dumps(d.decode(response_body), indent=4, sort_keys=True)
418 logger.debug("Response from %(url)s : %(resp)s" % {
419- "url": url,
420+ "url": url,
421 "resp": msg
422 })
423 else:
424@@ -566,11 +588,11 @@
425 logger.error("Please check that DNS server is reachable and works")
426 return None
427 except exceptions.KeyError as err:
428- logger.error("Failed to decode response from server %s" % response)
429+ logger.error("Failed to decode response from server %s" % http_response)
430 logger.error("Could not find key %s" % err)
431 return None
432 except:
433- logger.error("Couldn't make HTTP request " + url)
434+ logger.error("Couldn't make HTTP request " + url)
435 logger.error("Unexpected error: %s, %s" % sys.exc_info()[:2]);
436 logger.debug(traceback.format_exc())
437 return None
438@@ -578,6 +600,7 @@
439 conn.close()
440 return response_body
441
442+
443 # Replaces password from config with ********
444 # Inputs
445 # config - python dictionary with backup config
446@@ -594,6 +617,7 @@
447 logger.debug("Given config %r doesn't contain password" % c)
448 return cc
449
450+
451 # Gets backup config from TwinDB dispatcher
452 # Returns
453 # Backup config
454@@ -602,13 +626,13 @@
455 def get_config():
456 global logger
457 global server_id
458-
459+
460 logger.debug("Getting config for server_id = %s" % server_id)
461 try:
462 data = {
463- "type": "get_config",
464- "params": {
465- "server_id": server_id
466+ "type": "get_config",
467+ "params": {
468+ "server_id": server_id
469 }
470 }
471 response_body = get_response(data)
472@@ -617,8 +641,8 @@
473 d = json.JSONDecoder()
474 response_body_decoded = d.decode(response_body)
475 if response_body_decoded:
476- msg_enc = response_body_decoded["response"]
477- msg_pt = d.decode(decrypt(msg_enc))
478+ msg_decrypted = decrypt(response_body_decoded["response"])
479+ msg_pt = d.decode(msg_decrypted)
480 config = msg_pt["data"]
481 logger.info("Got config:\n%s" % json.dumps(sanitize_config(config), indent=4, sort_keys=True))
482 if msg_pt["error"]:
483@@ -634,24 +658,25 @@
484 return None
485 return config
486
487+
488 # Reports slave status to TwinDB dispatcher
489 def report_sss():
490 global logger
491 global server_id
492-
493+
494 logger.debug("Reporting SHOW SLAVE STATUS for server_id = %s" % server_id)
495 try:
496 ss = get_slave_status()
497 data = {
498- "type": "report_sss",
499- "params": {
500- "server_id": server_id,
501- "mysql_server_id": ss["mysql_server_id"],
502- "mysql_master_server_id": ss["mysql_master_server_id"],
503- "mysql_master_host": ss["mysql_master_host"],
504- "mysql_seconds_behind_master": ss["mysql_seconds_behind_master"],
505- "mysql_slave_io_running": ss["mysql_slave_io_running"],
506- "mysql_slave_sql_running": ss["mysql_slave_sql_running"],
507+ "type": "report_sss",
508+ "params": {
509+ "server_id": server_id,
510+ "mysql_server_id": ss["mysql_server_id"],
511+ "mysql_master_server_id": ss["mysql_master_server_id"],
512+ "mysql_master_host": ss["mysql_master_host"],
513+ "mysql_seconds_behind_master": ss["mysql_seconds_behind_master"],
514+ "mysql_slave_io_running": ss["mysql_slave_io_running"],
515+ "mysql_slave_sql_running": ss["mysql_slave_sql_running"],
516 }
517 }
518 response_body = get_response(data)
519@@ -660,13 +685,14 @@
520 logger.error("Unexpected error: %s,%s" % sys.exc_info()[:2]);
521 logger.debug(traceback.format_exc())
522 return None
523- return
524+ return
525+
526
527 # Reports what privileges are given to the agent
528 def report_agent_privileges(config):
529 global logger
530 global server_id
531-
532+
533 logger.debug("Reporting agent privileges for server_id = %s" % server_id)
534 try:
535 mysql_user = config["mysql_user"]
536@@ -681,14 +707,14 @@
537 query = "SELECT PRIVILEGE_TYPE FROM information_schema.USER_PRIVILEGES"
538 logger.debug("Sending query : %s" % query)
539 cursor.execute(query)
540-
541+
542 privileges = dict()
543 privileges["Reload_priv"] = "N"
544 privileges["Lock_tables_priv"] = "N"
545 privileges["Repl_client_priv"] = "N"
546 privileges["Super_priv"] = "N"
547 privileges["Create_tablespace_priv"] = "N"
548-
549+
550 for (priv,) in cursor:
551 if priv == "RELOAD":
552 privileges["Reload_priv"] = "Y"
553@@ -701,13 +727,13 @@
554 elif priv == "CREATE TABLESPACE":
555 privileges["Create_tablespace_priv"] = "Y"
556 data = {
557- "type": "report_agent_privileges",
558- "params": {
559- "Reload_priv": privileges["Reload_priv"],
560- "Lock_tables_priv": privileges["Lock_tables_priv"],
561- "Repl_client_priv": privileges["Repl_client_priv"],
562- "Super_priv": privileges["Super_priv"],
563- "Create_tablespace_priv": privileges["Create_tablespace_priv"]
564+ "type": "report_agent_privileges",
565+ "params": {
566+ "Reload_priv": privileges["Reload_priv"],
567+ "Lock_tables_priv": privileges["Lock_tables_priv"],
568+ "Repl_client_priv": privileges["Repl_client_priv"],
569+ "Super_priv": privileges["Super_priv"],
570+ "Create_tablespace_priv": privileges["Create_tablespace_priv"]
571 }
572 }
573 get_response(data)
574@@ -716,7 +742,8 @@
575 logger.error("Unexpected error: %s,%s" % sys.exc_info()[:2]);
576 logger.debug(traceback.format_exc())
577 return None
578- return
579+ return
580+
581
582 # Gets job order from TwinDB dispatcher
583 # Returns
584@@ -728,13 +755,13 @@
585 global server_id
586 global job_id
587 job = None
588-
589+
590 logger.debug("Getting job for server_id = %s" % server_id)
591 try:
592 d = json.JSONDecoder()
593 data = {
594- "type": "get_job",
595- "params": {}
596+ "type": "get_job",
597+ "params": {}
598 }
599 response_body = get_response(data)
600 response_body_decoded = d.decode(response_body)
601@@ -758,32 +785,56 @@
602 return None
603 return job
604
605+
606 # Checks whether the server is registered or not
607 # Returns
608 # True - registered
609 # False - not so much
610-
611 def is_registered():
612 global logger
613 global server_id
614-
615+
616 logger.debug("Getting registration status for server_id = %s" % server_id)
617+
618+ twindb_email = "%s@twindb.com" % server_id
619+ logger.info("Reading GPG public key of %s." % twindb_email)
620+
621+ enc_public_key = None
622+ # Reading the GPG key
623+ try:
624+ p1 = subprocess.Popen(["gpg", "--homedir", gpg_homedir, "--armor", "--export", twindb_email],
625+ stdout = subprocess.PIPE)
626+ enc_public_key = p1.stdout.read()
627+ except:
628+ logger.error("Failed to export GPG keys of %s from %s." % (twindb_email, gpg_homedir))
629+ exit_on_error(traceback.format_exc())
630+
631+ # Call the TwinDB api to check for server registration
632+ response_body = None
633 try:
634 data = {
635- "type": "is_registered",
636- "params": {
637- "server_id": server_id
638+ "type": "is_registered",
639+ "params": {
640+ "server_id": server_id,
641+ "enc_public_key": enc_public_key
642 }
643 }
644+
645 response_body = get_response(data)
646 if not response_body:
647 return None
648- d = json.JSONDecoder()
649- response_body_decoded = d.decode(response_body)
650+
651+ json_decoder = json.JSONDecoder()
652+ response_body_decoded = json_decoder.decode(response_body)
653+
654 if response_body_decoded:
655 if "response" in response_body_decoded:
656- msg_enc = response_body_decoded["response"]
657- msg_pt = d.decode(decrypt(msg_enc))
658+ msg_decrypted = decrypt(response_body_decoded["response"])
659+ if msg_decrypted is None:
660+ logger.debug("No valid response from dispatcher. Consider agent unregistered")
661+ return False
662+
663+ msg_pt = json_decoder.decode(msg_decrypted)
664 registration_status = msg_pt["data"]
665 logger.debug("Got registration status:\n%s" % json.dumps(registration_status, indent=4, sort_keys=True))
666 if msg_pt["error"]:
667@@ -791,10 +842,10 @@
668 exit_on_error(msg_pt["error"])
669 return registration_status["registered"]
670 else:
671- logger.debug("No valid response from dispatcher. Conseder agent unregistered")
672+ logger.debug("No valid response from dispatcher. Consider agent unregistered")
673 return False
674- except exceptions.KeyError as err:
675- logger.error("Failed to decode %s" % response_body);
676+ except exceptions.KeyError:
677+ logger.error("Failed to decode %s" % response_body)
678 exit_on_error(traceback.format_exc())
679 except:
680 logger.error("Couldn't get backup config")
681@@ -802,6 +853,7 @@
682 exit_on_error(traceback.format_exc())
683 return False
684
685+
686 # Gets name of initial full backup that corresponds to given incremental backup
687 # Inputs
688 # f - Valid name of incremental backup
689@@ -813,7 +865,7 @@
690 def get_full_of(f):
691 global logger
692 global server_id
693-
694+
695 try:
696 logger.info("Getting full backup name of %s" % f)
697 d = json.JSONDecoder()
698@@ -828,6 +880,7 @@
699 return None
700 return full["name"]
701
702+
703 # Gets name of child backup copy of the given backup name
704 # Inputs
705 # f - Valid name of incremental or full backup
706@@ -838,12 +891,12 @@
707 def get_child_of(f):
708 global logger
709 global server_id
710-
711+
712 try:
713 logger.info("Getting child backup name of %s" % f)
714 d = json.JSONDecoder()
715- response = d.decode(get_response("get_child_of.php",
716- {"backup_name": f }))
717+ response = d.decode(get_response("get_child_of.php",
718+ {"backup_name": f}))
719 msg_enc = response["data"]
720 logger.debug("Reply from server %s" % msg_enc)
721 child = d.decode(decrypt(msg_enc))
722@@ -853,7 +906,8 @@
723 logger.error(traceback.format_exc())
724 return None
725 return child["name"]
726-
727+
728+
729 # Notifies a job event to TwinDB dispatcher
730 # Inputs
731 # job_id - id of a job in jobs table
732@@ -899,24 +953,24 @@
733 # JSON string with status of the request i.e. { "success": True }
734 # None - if error happened
735
736-def record_backup(job_id, name, volume_id, size, lsn = None, ancestor = 0):
737+def record_backup(job_id, name, volume_id, size, lsn=None, ancestor=0):
738 global logger
739-
740+
741 try:
742 logger.info("Saving information about backup:")
743 logger.info("File name : %s" % name)
744- logger.info("Volume id : %d" % int(volume_id) )
745- logger.info("Size : %d (%s)" % (int(size), h_size(size) ) )
746- logger.info("Ancestor : %d" % int(ancestor) )
747+ logger.info("Volume id : %d" % int(volume_id))
748+ logger.info("Size : %d (%s)" % (int(size), h_size(size) ))
749+ logger.info("Ancestor : %d" % int(ancestor))
750 data = {
751 "type": "update_backup_data",
752 "params": {
753- "job_id": job_id,
754- "name": name,
755- "volume_id": volume_id,
756- "size": size,
757- "lsn": lsn,
758- "ancestor": ancestor
759+ "job_id": job_id,
760+ "name": name,
761+ "volume_id": volume_id,
762+ "size": size,
763+ "lsn": lsn,
764+ "ancestor": ancestor
765 }
766 }
767 logger.debug("Saving a record %s" % data)
768@@ -929,19 +983,20 @@
769 logger.info("Saved backup copy details")
770 return True
771 else:
772- logger.error("Failed to save backup copy details: "
773- + jd.decode(decrypt(r["response"]))["error"])
774+ logger.error("Failed to save backup copy details: "
775+ + jd.decode(decrypt(r["response"]))["error"])
776 return False
777 del jd
778 else:
779 logger.error("Empty response from server")
780 return False
781 except:
782- logger.error("Failed to save backup copy details")
783+ logger.error("Failed to save backup copy details")
784 logger.error("Unexpected error: %s,%s" % sys.exc_info()[:2]);
785 return False
786 return True
787
788+
789 # Reads config from /etc/twindb.cfg and sets server_id variable
790 # Returns
791 # True - if config was successfully read
792@@ -962,7 +1017,8 @@
793 logger.error(traceback.format_exc())
794 exit_on_error("Failed to read from config in %s" % init_config)
795 return True
796-
797+
798+
799 # Saves server_id variable in /etc/twindb.cfg
800 # Returns
801 # True - if config was successfully saved
802@@ -980,24 +1036,24 @@
803 sys.exit(2)
804 return
805
806+
807 def get_slave_status():
808-
809 result = dict()
810 # Read master host
811 try:
812- conn = get_mysql_connection()
813+ conn = get_mysql_connection()
814 if conn:
815 cursor = conn.cursor(dictionary=True)
816 else:
817 return None
818-
819+
820 result["mysql_server_id"] = None
821 result["mysql_master_server_id"] = None
822 result["mysql_master_host"] = None
823 result["mysql_seconds_behind_master"] = None
824 result["mysql_slave_io_running"] = None
825 result["mysql_slave_sql_running"] = None
826-
827+
828 cursor.execute("SHOW SLAVE STATUS")
829 for row in cursor:
830 result["mysql_master_server_id"] = row["Master_Server_Id"]
831@@ -1009,7 +1065,7 @@
832 cursor.execute("SELECT @@server_id AS server_id")
833 for row in cursor:
834 result["mysql_server_id"] = row["server_id"]
835-
836+
837 cursor.close()
838 conn.close()
839 except:
840@@ -1019,15 +1075,22 @@
841 return result
842
843
844-def has_mysql_access():
845+def has_mysql_access(username=None, password=None, grant_capability=True):
846 has_required_grants = False
847
848+ # list of missing privileges
849+ missing_privileges = []
850+
851 try:
852- conn = get_mysql_connection()
853- if conn:
854+ if username is None or password is None:
855+ conn = get_mysql_connection()
856+ else:
857+ conn = get_mysql_connection(username, password)
858+
859+ if isinstance(conn, mysql.connector.MySQLConnection):
860 cursor = conn.cursor(dictionary=True)
861 else:
862- return False
863+ return has_required_grants, missing_privileges
864
865 # Fetch the current user and matching host part as it could either be
866 # connecting using localhost or using '%'
867@@ -1035,23 +1098,62 @@
868 row = cursor.fetchone()
869 username, hostname = row['curr_user'].split('@')
870
871- sql = ("SELECT * FROM mysql.user WHERE user='%s' "
872- "AND host='%s'" % (username, hostname))
873+ required_privileges = ['RELOAD', 'SUPER', 'LOCK TABLES', 'REPLICATION CLIENT', 'CREATE TABLESPACE']
874+ quoted_privileges = ','.join("'%s'" % item for item in required_privileges)
875+
876+ sql = ("SELECT privilege_type, is_grantable FROM information_schema.user_privileges "
877+ "WHERE grantee=\"'%s'@'%s'\" AND privilege_type IN (%s)" % (username, hostname, quoted_privileges))
878 cursor.execute(sql)
879- user_info = cursor.fetchone()
880-
881+
882+ user_privileges = []
883+ grantable_privileges = []
884+ for row in cursor:
885+ user_privileges.append(row[u'privilege_type'])
886+ if row[u'is_grantable'] == 'YES':
887+ grantable_privileges.append(row[u'privilege_type'])
888+
889+ # Check that the user has all the required grants
890 has_required_grants = True
891- for privilege in ['Reload_priv', 'Lock_tables_priv',
892- 'Repl_client_priv', 'Super_priv',
893- 'Create_tablespace_priv', 'Grant_priv']:
894- if user_info[privilege] != 'Y':
895+ for privilege in required_privileges:
896+ if privilege in user_privileges:
897+ # If the user should also be able to grant the privilege then we check for the grant capability too
898+ # We consider the privilege not available if its not grantable in such a case
899+ if grant_capability and privilege not in grantable_privileges:
900+ has_required_grants = False
901+ else:
902 has_required_grants = False
903- break
904+ missing_privileges.append(privilege)
905+
906+ if len(missing_privileges) < 1 and grant_capability:
907+ required_privileges = ['INSERT', 'UPDATE']
908+ quoted_privileges = ','.join("'%s'" % item for item in required_privileges)
909+
910+ # If the user should be able to grant privileges too, such as the user that is used to create the
911+ # twindb_agent user then, insert and update privileges are needed on mysql.*
912+ sql = ("SELECT privilege_type FROM information_schema.schema_privileges "
913+ "WHERE grantee=\"'%s'@'%s'\" AND table_schema = 'mysql' AND privilege_type IN (%s)"
914+ "UNION "
915+ "SELECT privilege_type FROM information_schema.user_privileges "
916+ "WHERE grantee=\"'%s'@'%s'\" AND privilege_type IN (%s)"
917+ % (username, hostname, quoted_privileges, username, hostname, quoted_privileges))
918+ cursor.execute(sql)
919+
920+ user_privileges = []
921+ for row in cursor:
922+ user_privileges.append(row[u'privilege_type'])
923+
924+ for privilege in required_privileges:
925+ if privilege not in user_privileges:
926+ has_required_grants = False
927+ break
928+
929+ cursor.close()
930+ conn.close()
931 except:
932 logger.error("Could not read the grants information from MySQL")
933- logger.error(u"Unexpected error: {0:s},{1:s}".format(sys.exc_info()[:2]))
934+ logger.error("Unexpected error: %s,%s" % sys.exc_info()[:2])
935
936- return has_required_grants
937+ return has_required_grants, missing_privileges
938
939
940 # Registers this server in TwinDB dispatcher
941@@ -1070,30 +1172,36 @@
942 global mysql_user
943 global mysql_password
944
945- check_env()
946-
947 # Check early to see that the MySQL user passed to the agent has enough
948 # privileges to create a separate MySQL user needed by TwinDB
949- if not has_mysql_access():
950+ mysql_access_available, missing_mysql_privileges = has_mysql_access()
951+ if not mysql_access_available:
952 logger.error("The MySQL user %s does not have enough privileges" % mysql_user)
953+ if len(missing_mysql_privileges) > 0:
954+ logger.error("Following privileges are missing: %s" % ','.join(missing_mysql_privileges))
955+
956 return False
957
958 logger.info("Registering TwinDB agent with code %s" % code)
959 logger.info("The agent needs to generate cryptographically strong keys.")
960 logger.info("It may take really, really long time. Please be patient.")
961- name = os.uname()[1].strip() # un[1] is a hostname
962+ name = os.uname()[1].strip() # un[1] is a hostname
963+
964+ twindb_email = "%s@twindb.com" % server_id
965+
966 # Generate GPG key
967+ enc_public_key = None
968 try:
969- twindb_email = "%s@twindb.com" % server_id
970 logger.info("Reading GPG public key of %s." % twindb_email)
971- p1 = subprocess.Popen(["gpg", "--homedir", gpg_homedir,
972- "--armor", "--export", twindb_email],
973- stdout = subprocess.PIPE)
974+ p1 = subprocess.Popen(["gpg", "--homedir", gpg_homedir,
975+ "--armor", "--export", twindb_email],
976+ stdout=subprocess.PIPE)
977 enc_public_key = p1.stdout.read()
978 except:
979- logger.error("Failed to export GPG keys of %s from %s."
980- % (twindb_email, gpg_homedir))
981+ logger.error("Failed to export GPG keys of %s from %s."
982+ % (twindb_email, gpg_homedir))
983 exit_on_error(traceback.format_exc())
984+
985 # Generate SSH key
986 try:
987 if not os.path.isfile(ssh_private_key):
988@@ -1102,6 +1210,7 @@
989 except:
990 logger.error("Failed to generate SSH keys.")
991 exit_on_error(traceback.format_exc())
992+
993 try:
994 logger.info("Reading SSH public key from %s." % ssh_public_key)
995 f = open(ssh_public_key, 'r')
996@@ -1110,21 +1219,25 @@
997 except:
998 logger.error("Failed to read SSH keys.")
999 exit_on_error(traceback.format_exc())
1000+
1001 # Read local ip addresses
1002 cmd = "ip addr"
1003 cmd += "| grep -w inet"
1004 cmd += "| awk '{ print $2}'"
1005 cmd += "| awk -F/ '{ print $1}'"
1006 ss = get_slave_status()
1007+
1008 if ss is None:
1009 logger.error("Could not get slave status on this server")
1010 exit_on_error()
1011 p = subprocess.Popen(cmd, stdout=subprocess.PIPE, shell=True)
1012+
1013 local_ip = list()
1014 for row in p.stdout:
1015 row = row.rstrip('\n')
1016 if row != "127.0.0.1":
1017 local_ip.append(row)
1018+
1019 data = {
1020 "type": "register",
1021 "params": {
1022@@ -1140,27 +1253,36 @@
1023 "mysql_slave_io_running": ss["mysql_slave_io_running"],
1024 "mysql_slave_sql_running": ss["mysql_slave_sql_running"],
1025 "local_ip": local_ip
1026- }
1027+ }
1028 }
1029- response = get_response(data)
1030- if response <> None:
1031- jd = json.JSONDecoder()
1032- r = jd.decode(response)
1033- logger.debug(r)
1034- if r["success"]:
1035+
1036+ api_response = get_response(data)
1037+ if api_response is not None:
1038+ json_decoder = json.JSONDecoder()
1039+ response_decoded = json_decoder.decode(api_response)
1040+ logger.debug(response_decoded)
1041+
1042+ error_msg = "Unknown error"
1043+ if response_decoded["success"]:
1044 logger.info("Received successful response to register an agent")
1045 create_agent_user()
1046 else:
1047- logger.error("Failed to register the agent: "
1048- + jd.decode(decrypt(r["response"]))["error"])
1049+ if "response" in response_decoded:
1050+ msg_decrypted = decrypt(response_decoded["response"])
1051+ if msg_decrypted is not None:
1052+ error_msg = json_decoder.decode(msg_decrypted)["error"]
1053+ elif "errors" in response_decoded:
1054+ error_msg = response_decoded["errors"]["msg"]
1055+
1056+ logger.error("Failed to register the agent: %s" % error_msg)
1057 sys.exit(2)
1058- del jd
1059+ del json_decoder
1060 else:
1061- exit_on_error("Empty response from server" )
1062+ exit_on_error("Empty response from server")
1063 return True
1064
1065+
1066 def create_agent_user():
1067-
1068 global mysql_user
1069 global mysql_password
1070
1071@@ -1181,6 +1303,7 @@
1072 logger.info("You can change the schedule and retention policy on https://console.twindb.com/")
1073 return True
1074
1075+
1076 # Unregisters this server in TwinDB dispatcher
1077 # Returns
1078 # True - if server was successfully unregistered
1079@@ -1190,7 +1313,6 @@
1080 global logger
1081 global init_config
1082
1083- check_env()
1084 data = {
1085 "type": "unregister",
1086 "params": {
1087@@ -1208,21 +1330,20 @@
1088 logger.info("The server is successfully unregistered")
1089 return True
1090 else:
1091- logger.error("Failed to unregister the agent: "
1092- + jd.decode(decrypt(r["response"]))["error"])
1093+ logger.error("Failed to unregister the agent: "
1094+ + jd.decode(decrypt(r["response"]))["error"])
1095 sys.exit(2)
1096 del jd
1097 else:
1098- logger.error("Empty response from server" )
1099+ logger.error("Empty response from server")
1100 sys.exit(2)
1101 return False
1102
1103+
1104 # Starts immediate backup job
1105 def action_handler_backup():
1106 global logger
1107
1108- check_env()
1109-
1110 if schedule_backup():
1111 config = get_config()
1112 job = get_job()
1113@@ -1237,6 +1358,7 @@
1114 return False
1115 return False
1116
1117+
1118 # Asks dispatcher to schedule a job for this server
1119 def schedule_backup():
1120 global logger
1121@@ -1257,12 +1379,12 @@
1122 logger.info("A backup job is successfully registered")
1123 return True
1124 else:
1125- logger.error("Failed to schedule a job: "
1126- + jd.decode(decrypt(r["response"]))["error"])
1127+ logger.error("Failed to schedule a job: "
1128+ + jd.decode(decrypt(r["response"]))["error"])
1129 return False
1130 del jd
1131 else:
1132- exit_on_error("Empty response from server" )
1133+ exit_on_error("Empty response from server")
1134
1135
1136 # Checks if it's enough space in TwinDB storage
1137@@ -1274,6 +1396,7 @@
1138 def check_space(config, job_id):
1139 return True
1140
1141+
1142 # Checks if MySQL user from backup config has enough privileges to perform backup
1143 # Inputs
1144 # config - backup config
1145@@ -1301,15 +1424,16 @@
1146 logger.debug("Sending query : %s" % query)
1147 cursor.execute(query, (config["mysql_user"], priv))
1148 cursor.fetchall()
1149-
1150+
1151 if cursor.rowcount > 0:
1152 logger.info("%20s ... GRANTED" % priv)
1153 logger.debug("Privilege %s is granted to %s@localhost" % (priv, config["mysql_user"]))
1154 else:
1155 logger.info("%20s ... NOT GRANTED" % priv)
1156- logger.error("Privilege %s is not granted to %s@localhost" % ( priv, config["mysql_user"]) )
1157- logger.info("Please execute following SQL to grant %s to user %s@localhost:" % ( priv, config["mysql_user"]) )
1158- logger.info("GRANT %s ON *.* TO '%s'@'localhost';" % ( priv, config["mysql_user"]) )
1159+ logger.error("Privilege %s is not granted to %s@localhost" % ( priv, config["mysql_user"]))
1160+ logger.info(
1161+ "Please execute following SQL to grant %s to user %s@localhost:" % ( priv, config["mysql_user"]))
1162+ logger.info("GRANT %s ON *.* TO '%s'@'localhost';" % ( priv, config["mysql_user"]))
1163 return False
1164 cursor.close()
1165 con.close()
1166@@ -1323,6 +1447,7 @@
1167 logger.error("MySQL user %s@'localhost' doesn't have enough grants to take backup" % config["mysql_user"])
1168 return result
1169
1170+
1171 # Meta fuction that calls actual backup fucntion depending on tool in backup config
1172 # Inputs
1173 # config - backup config
1174@@ -1336,19 +1461,20 @@
1175 logger.info("Starting backup job")
1176 try:
1177 job_id = int(job["job_id"])
1178- notify_params = { "event": "start_job", "job_id": job_id }
1179+ notify_params = {"event": "start_job", "job_id": job_id}
1180 if log_job_notify(job_id, notify_params):
1181 ret = take_backup_xtrabackup(config, job)
1182- notify_params = { "event": "stop_job", "job_id": job_id, "ret_code": ret }
1183+ notify_params = {"event": "stop_job", "job_id": job_id, "ret_code": ret}
1184 log_job_notify(job_id, notify_params)
1185 logger.info("Backup job is complete")
1186 else:
1187 logger.info("Backup job can not start")
1188 except:
1189 logger.error("Unexpected error: %s,%s" % sys.exc_info()[:2]);
1190-
1191+
1192 return ret
1193
1194+
1195 # Starts SSH process to TwinDB storage and saves input in file backup_name
1196 # Inputs
1197 # config - backup config
1198@@ -1360,7 +1486,7 @@
1199 global logger
1200 global ssh_private_key
1201 global ssh_port
1202-
1203+
1204 p = None
1205 try:
1206 ssh_cmd = ["ssh", "-oStrictHostKeyChecking=no", "-i", ssh_private_key]
1207@@ -1369,11 +1495,12 @@
1208 ssh_cmd.append("user_id_%s@%s" % (config["user_id"], job_params["ip"]))
1209 ssh_cmd.append("/bin/cat - > %s" % backup_name)
1210 logger.debug("Starting SSH process: %r" % ssh_cmd)
1211- p = subprocess.Popen(ssh_cmd, stdin = stdin, stdout = subprocess.PIPE, stderr = stderr)
1212+ p = subprocess.Popen(ssh_cmd, stdin=stdin, stdout=subprocess.PIPE, stderr=stderr)
1213 except:
1214 logger.error(traceback.format_exc())
1215 return p
1216
1217+
1218 # Starts GPG process, encrypts STDIN and outputs in into STDOUT
1219 # Inputs
1220 # config - backup config
1221@@ -1393,11 +1520,12 @@
1222 gpg_cmd.append("%s" % (server_id))
1223
1224 logger.debug("Starting GPG process: %r" % gpg_cmd)
1225- p = subprocess.Popen(gpg_cmd, stdin = stdin, stdout = subprocess.PIPE, stderr = stderr)
1226+ p = subprocess.Popen(gpg_cmd, stdin=stdin, stdout=subprocess.PIPE, stderr=stderr)
1227 except:
1228 logger.error(traceback.format_exc())
1229 return p
1230
1231+
1232 # Takes backup copy with XtraBackup
1233 # Inputs
1234 # config - backup config
1235@@ -1409,9 +1537,9 @@
1236 def take_backup_xtrabackup(config, job):
1237 global logger
1238 global server_id
1239-
1240+
1241 suffix = "tar"
1242- backup_name="server_id_%s_%s.%s.gpg" % (server_id, datetime.now().isoformat(), suffix)
1243+ backup_name = "server_id_%s_%s.%s.gpg" % (server_id, datetime.now().isoformat(), suffix)
1244 extra_config = ""
1245 ret_code = 0
1246
1247@@ -1438,18 +1566,18 @@
1248 extra_config = gen_extra_config(config)
1249 if extra_config:
1250 xtrabackup_cmd.append("--defaults-extra-file=%s" % extra_config)
1251-
1252+
1253 xtr_err = open('/tmp/twindb.xtrabackup.err', "w+")
1254 gpg_err = open('/tmp/twindb.gpg.err', "w+")
1255 ssh_err = open('/tmp/twindb.ssh.err', "w+")
1256- p1 = subprocess.Popen(xtrabackup_cmd, stdout = subprocess.PIPE, stderr = xtr_err)
1257+ p1 = subprocess.Popen(xtrabackup_cmd, stdout=subprocess.PIPE, stderr=xtr_err)
1258 p2 = start_gpg_cmd(config, p1.stdout, gpg_err)
1259 p3 = start_ssh_cmd(config, job["params"], backup_name, p2.stdout, ssh_err)
1260-
1261+
1262 p1.stdout.close()
1263 p2.stdout.close()
1264 p3.communicate()
1265-
1266+
1267 ret_code_ssh = p3.returncode
1268 ret_code_gpg = p2.wait()
1269 ret_code_xbk = p1.wait()
1270@@ -1497,14 +1625,15 @@
1271 logger.error(traceback.format_exc())
1272 return -1
1273 finally:
1274- for f in [extra_config,
1275- "/tmp/twindb.xtrabackup.err",
1276- "/tmp/twindb.gpg.err",
1277- "/tmp/twindb.ssh.err"]:
1278+ for f in [extra_config,
1279+ "/tmp/twindb.xtrabackup.err",
1280+ "/tmp/twindb.gpg.err",
1281+ "/tmp/twindb.ssh.err"]:
1282 if os.path.isfile(f):
1283 os.remove(f)
1284 return ret_code
1285
1286+
1287 # Generates MySQL config with datadir option
1288 # Inputs
1289 # config - backup config
1290@@ -1520,8 +1649,8 @@
1291 f, extra_config = tempfile.mkstemp()
1292 os.write(f, "[mysqld]\n")
1293 con = get_mysql_connection(
1294- user = config["mysql_user"],
1295- passwd = config["mysql_password"])
1296+ user=config["mysql_user"],
1297+ passwd=config["mysql_password"])
1298 cur = con.cursor()
1299 cur.execute("SELECT @@datadir")
1300 row = cur.fetchone()
1301@@ -1533,7 +1662,8 @@
1302 logger.error(traceback.format_exc())
1303 extra_config = None
1304 return extra_config
1305-
1306+
1307+
1308 # Checks if binary log is enabled in local MySQL
1309 # Inputs
1310 # config - backup config
1311@@ -1561,7 +1691,7 @@
1312 logger.error("Failed to check if binlog is enabled")
1313 logger.error(traceback.format_exc())
1314 return result
1315-
1316+
1317
1318 # Logs in to TwinDB storage and get size of backup
1319 # Inputs
1320@@ -1576,17 +1706,17 @@
1321 global ssh_private_key
1322 global ssh_port
1323 size = 0
1324-
1325+
1326 logger.debug("Getting size of %s" % backup_name)
1327 try:
1328 ssh_cmd = ["ssh", "-oStrictHostKeyChecking=no", "-i", ssh_private_key, "-p", str(ssh_port)]
1329 ssh_cmd.append("user_id_" + config["user_id"] + "@" + job_params["ip"])
1330 ssh_cmd.append("/bin/du -b %s" % backup_name)
1331- p = subprocess.Popen(ssh_cmd,
1332- stdout = subprocess.PIPE,
1333- stderr = subprocess.PIPE)
1334+ p = subprocess.Popen(ssh_cmd,
1335+ stdout=subprocess.PIPE,
1336+ stderr=subprocess.PIPE)
1337 cout, cerr = p.communicate()
1338- size = int( cout.split()[0] )
1339+ size = int(cout.split()[0])
1340 except:
1341 logger.error("Failed to get size of backup %s" % backup_name)
1342 logger.error(traceback.format_exc())
1343@@ -1594,16 +1724,18 @@
1344 logger.debug("Size of %s = %d bytes (%s)" % (backup_name, size, h_size(size)))
1345 return size
1346
1347+
1348 # Check if directory is empty
1349 # Inputs
1350 # dir - directory name
1351 # Returns
1352 # True - directory is empty
1353 # False - directory is not empty
1354-
1355-def is_dir_empty(dir) :
1356+
1357+def is_dir_empty(dir):
1358 return len(os.listdir(dir)) == 0
1359
1360+
1361 # Meta fuction that calls actual restore fucntion depending on tool in backup config
1362 # Inputs
1363 # config - backup config
1364@@ -1612,11 +1744,11 @@
1365
1366 def restore_backup(config, job):
1367 global logger
1368-
1369+
1370 ret = -1
1371 logger.info("Starting restore job %r" % job)
1372 log_start_job(int(job["job_id"]))
1373-
1374+
1375 if config["tool"] == "xtrabackup":
1376 ret = restore_xtrabackup(config, job)
1377 elif config["tool"] == "mysqldump":
1378@@ -1628,6 +1760,7 @@
1379 logger.info("Restore job is complete")
1380 return ret
1381
1382+
1383 # Extracts an Xtrabackup archive arc in dst_dir
1384 # Inputs
1385 # config - backup config
1386@@ -1647,22 +1780,22 @@
1387 ssh_cmd = ["ssh", "-oStrictHostKeyChecking=no", "-i", ssh_private_key, "-p", str(ssh_port)]
1388 ssh_cmd.append(config["username"] + "@" + config["dst_ip"])
1389 ssh_cmd.append("/bin/cat %s" % arc)
1390-
1391+
1392 gpg_cmd = ["gpg", "--decrypt"]
1393-
1394+
1395 xb_cmd = ["xbstream", "-x"]
1396-
1397+
1398 xb_err = open('/tmp/twindb.xb.err', "w+")
1399 gpg_err = open('/tmp/twindb.gpg.err', "w+")
1400 ssh_err = open('/tmp/twindb.ssh.err', "w+")
1401 logger.info("Starting: %r" % ssh_cmd)
1402- p1 = subprocess.Popen(ssh_cmd, stdout = subprocess.PIPE, stderr = ssh_err)
1403+ p1 = subprocess.Popen(ssh_cmd, stdout=subprocess.PIPE, stderr=ssh_err)
1404 logger.info("Starting: %r" % gpg_cmd)
1405- p2 = subprocess.Popen(gpg_cmd, stdin = p1.stdout, stdout = subprocess.PIPE, stderr = gpg_err)
1406+ p2 = subprocess.Popen(gpg_cmd, stdin=p1.stdout, stdout=subprocess.PIPE, stderr=gpg_err)
1407 logger.info("Starting: %r" % xb_cmd)
1408- p3 = subprocess.Popen(xb_cmd, stdin = p2.stdout, stdout = subprocess.PIPE, stderr = mysql_err, cwd=dst_dir)
1409+ p3 = subprocess.Popen(xb_cmd, stdin=p2.stdout, stdout=subprocess.PIPE, stderr=mysql_err, cwd=dst_dir)
1410 p1.stdout.close()
1411- p2.stdout.close()
1412+ p2.stdout.close()
1413 p3.wait()
1414 xb_err.seek(0)
1415 gpg_err.seek(0)
1416@@ -1683,6 +1816,7 @@
1417 logger.info("Extracted successfully %s in %s" % (arc, dst_dir))
1418 return True
1419
1420+
1421 # Restores backup copy with XtraBackup
1422 # Inputs
1423 # config - backup config
1424@@ -1695,7 +1829,7 @@
1425 global logger
1426 global ssh_private_key
1427 global ssh_port
1428-
1429+
1430 backup_name = job["restore_backup_copy"]
1431 backup_name_base = os.path.splitext(backup_name)
1432 dst_dir = job["restore_dir"]
1433@@ -1722,7 +1856,7 @@
1434 xb_cmd = ["innobackupex", "--apply-log", dst_dir]
1435 else:
1436 xb_cmd = ["innobackupex", "--apply-log", "--redo-only", dst_dir]
1437- p_xb = subprocess.Popen(xb_cmd, stdout = xb_err, stderr = xb_err)
1438+ p_xb = subprocess.Popen(xb_cmd, stdout=xb_err, stderr=xb_err)
1439 p_xb.wait()
1440 xb_err.seek(0)
1441 logger.info("innobackupex stderr: " + xb_err.read())
1442@@ -1740,14 +1874,14 @@
1443 inc_copy = get_child_of(full_copy)
1444 inc_dir = tempfile.mkdtemp()
1445 extract_archive(config, inc_copy, inc_dir)
1446-
1447+
1448 xb_err = open('/tmp/twindb.xb.err', "w+")
1449 xb_cmd = ["innobackupex", '--apply-log']
1450 if inc_copy != backup_name:
1451 xb_cmd.append('--redo-only')
1452 xb_cmd.append('--incremental-dir=%s' % inc_dir)
1453 xb_cmd.append(dst_dir)
1454- p_xb = subprocess.Popen(xb_cmd, stdout = xb_err, stderr = xb_err)
1455+ p_xb = subprocess.Popen(xb_cmd, stdout=xb_err, stderr=xb_err)
1456 p_xb.wait()
1457 xb_err.seek(0)
1458 logger.info("innobackupex stderr: " + xb_err.read())
1459@@ -1768,7 +1902,8 @@
1460 if os.path.isfile(f):
1461 os.remove(f)
1462 return True
1463-
1464+
1465+
1466 # Restores backup copy taken with mysqldump
1467 # Inputs
1468 # config - backup config
1469@@ -1884,33 +2019,33 @@
1470 ssh_cmd = ["ssh", "-oStrictHostKeyChecking=no", "-i", ssh_private_key, "-p", str(ssh_port)]
1471 ssh_cmd.append(config["username"] + "@" + config["dst_ip"])
1472 ssh_cmd.append("/bin/cat %s" % backup_name)
1473-
1474+
1475 gpg_cmd = ["gpg", "--decrypt"]
1476-
1477+
1478 mysql_cmd = ["mysql", "-S", mysql_socket]
1479
1480 mysql_err = open('/tmp/twindb.mysql.err', "w+")
1481 gpg_err = open('/tmp/twindb.gpg.err', "w+")
1482 ssh_err = open('/tmp/twindb.ssh.err', "w+")
1483 logger.info("Starting: %r" % ssh_cmd)
1484- p1 = subprocess.Popen(ssh_cmd, stdout = subprocess.PIPE, stderr = ssh_err)
1485+ p1 = subprocess.Popen(ssh_cmd, stdout=subprocess.PIPE, stderr=ssh_err)
1486 logger.info("Starting: %r" % gpg_cmd)
1487- p2 = subprocess.Popen(gpg_cmd, stdin = p1.stdout, stdout = subprocess.PIPE, stderr = gpg_err)
1488+ p2 = subprocess.Popen(gpg_cmd, stdin=p1.stdout, stdout=subprocess.PIPE, stderr=gpg_err)
1489 logger.info("Starting: %r" % mysql_cmd)
1490- p3 = subprocess.Popen(mysql_cmd, stdin = p2.stdout, stdout = subprocess.PIPE, stderr = mysql_err)
1491+ p3 = subprocess.Popen(mysql_cmd, stdin=p2.stdout, stdout=subprocess.PIPE, stderr=mysql_err)
1492 p1.stdout.close()
1493- p2.stdout.close()
1494+ p2.stdout.close()
1495 out3 = p3.communicate()[0]
1496 mysql_err.seek(0)
1497 gpg_err.seek(0)
1498 ssh_err.seek(0)
1499 logger.info("SSH stderr: " + ssh_err.read())
1500 logger.info("GPG stderr: " + gpg_err.read())
1501- logger.info("mysql stderr: " + mysql_err.read())
1502+ logger.info("mysql stderr: " + mysql_err.read())
1503 logger.info("Successfully restored backup %s in %s" % (backup_name, dst_dir))
1504 ret = p3.returncode
1505 # Stop MySQL
1506- mysqld_proc.terminate()
1507+ mysqld_proc.terminate()
1508 except:
1509 logger.error("Failed to restore backup")
1510 logger.error(traceback.format_exc())
1511@@ -1925,25 +2060,25 @@
1512 logger.info("MySQL process is already terminated")
1513 return True
1514
1515+
1516 # Finds LSN in XtraBackup output
1517
1518 def grep_lsn(str):
1519-
1520 lsn = None
1521 for line in str.split("\n"):
1522 if line.startswith("xtrabackup: The latest check point (for incremental):"):
1523 lsn = line.split("'")[1]
1524 return lsn
1525
1526-# Finds MySQL socket
1527+
1528+# Finds MySQL socket
1529 def get_unix_socket():
1530-
1531 socket = ""
1532 try:
1533 cmd = [
1534 "lsof", "-U", "-c", "/^mysqld$/", "-a", "-F", "n"
1535 ]
1536- p = subprocess.Popen(cmd, stdout = subprocess.PIPE, stderr = subprocess.PIPE)
1537+ p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
1538 cout, cerr = p.communicate()
1539 # Outputs socket in format
1540 # # lsof -U -c mysqld -a -F n
1541@@ -1957,6 +2092,7 @@
1542
1543 return socket
1544
1545+
1546 def pid_exists(pid):
1547 """Check whether pid exists in the current process table."""
1548 if pid < 0:
1549@@ -1968,9 +2104,11 @@
1550 else:
1551 return True
1552
1553+
1554 class TimeoutExpired(Exception):
1555 pass
1556
1557+
1558 def wait_pid(pid, timeout=None):
1559 """Wait for process with pid 'pid' to terminate and return its
1560 exit status code as an integer.
1561@@ -1982,6 +2120,7 @@
1562
1563 Raise TimeoutExpired on timeout expired (if specified).
1564 """
1565+
1566 def check_timeout(delay):
1567 if timeout is not None:
1568 if time.time() >= stop_at:
1569@@ -2033,6 +2172,8 @@
1570 else:
1571 # should never happen
1572 raise RuntimeError("unknown process exit status")
1573+
1574+
1575 # Removes pid file
1576 # Exits if error happened
1577
1578@@ -2045,6 +2186,7 @@
1579 logger.error(traceback.format_exc())
1580 sys.exit(2)
1581
1582+
1583 # Cleans up when TwinDB agent exists
1584
1585 def cleanup(signum, frame):
1586@@ -2056,14 +2198,16 @@
1587 logger.info("TwinDB agent is ready to exit")
1588 sys.exit(0)
1589
1590+
1591 # Reports error removes pid and exits
1592
1593-def exit_on_error(tb = None):
1594+def exit_on_error(tb=None):
1595 if tb is not None:
1596 logger.debug(tb)
1597 remove_pid()
1598 sys.exit(2)
1599
1600+
1601 # Stops TwinDB agent
1602
1603 def stop():
1604@@ -2075,7 +2219,7 @@
1605 if not os.path.exists(pid_file):
1606 logger.info("Pid file %s does not exist. Probably twindb agent isn't running" % pid_file)
1607 sys.exit(0)
1608-
1609+
1610 try:
1611 f = open(pid_file, 'r')
1612 pid = int(f.readline())
1613@@ -2098,6 +2242,7 @@
1614 logger.info("TwinDB agent successfully shut down")
1615 sys.exit(0)
1616
1617+
1618 # Starts TwinDB agent
1619
1620 def start():
1621@@ -2106,7 +2251,7 @@
1622 global check_period
1623
1624 logger.info("Starting TwinDB agent")
1625- check_env()
1626+
1627 if os.path.isfile(pid_file):
1628 logger.error("PID file " + pid_file + " exists");
1629 logger.error("Check if TwinDB agent is already running");
1630@@ -2142,6 +2287,7 @@
1631 exit_on_error(traceback.format_exc())
1632 sys.exit(0)
1633
1634+
1635 # Processes job
1636 # Inputs
1637 # config - backup config
1638@@ -2152,8 +2298,19 @@
1639 global job_id
1640
1641 try:
1642- if not check_mysql_permissions(config):
1643+ # Check to see that the twindb_agent MySQL user has enough privileges
1644+ username = config["mysql_user"]
1645+ password = config["mysql_password"]
1646+
1647+ mysql_access_available, missing_mysql_privileges = has_mysql_access(username, password, grant_capability=False)
1648+ if not mysql_access_available:
1649+ logger.error("The MySQL user %s does not have all the required privileges." % username)
1650+ if len(missing_mysql_privileges) > 0:
1651+ logger.error("You can grant the required privileges by executing the following SQL: "
1652+ "GRANT %s ON *.* TO '%s'@'localhost'" % (','.join(missing_mysql_privileges), username))
1653+
1654 return False
1655+
1656 if job["start_scheduled"] == None:
1657 logger.error("Job start time isn't set")
1658 return False
1659@@ -2161,8 +2318,8 @@
1660 start_scheduled = int(job["start_scheduled"])
1661 now = int(time.time())
1662 if now < start_scheduled:
1663- logger.info("Job is scheduled on %s, now %s"
1664- % (time.ctime(start_scheduled), time.ctime(now)))
1665+ logger.info("Job is scheduled on %s, now %s"
1666+ % (time.ctime(start_scheduled), time.ctime(now)))
1667 return False
1668
1669 logger.info("Processing job_id = %d", int(job_id))
1670@@ -2181,6 +2338,7 @@
1671 job_id = 0
1672 return False
1673
1674+
1675 def setup_logging():
1676 global logger
1677
1678@@ -2200,6 +2358,7 @@
1679 # syslog handler shouldn't log DEBUG messages
1680 sh.setLevel(logging.INFO)
1681
1682+
1683 # Main function
1684 # Parses options, creates log class etc
1685
1686@@ -2219,36 +2378,40 @@
1687 try:
1688 execfile(init_config, globals())
1689 if len(server_id) == 0:
1690- print("Error: Config %s doesn't set server_id"
1691- % init_config, file=sys.stderr)
1692+ print("Error: Config %s doesn't set server_id"
1693+ % init_config, file=sys.stderr)
1694 sys.exit(2)
1695 except:
1696 print(traceback.format_exc())
1697- print("Error: Failed to read from config in %s"
1698- % init_config, file=sys.stderr)
1699+ print("Error: Failed to read from config in %s"
1700+ % init_config, file=sys.stderr)
1701 sys.exit(2)
1702 else:
1703 server_id = str(uuid.uuid4())
1704 save_config(server_id)
1705+
1706 # At this point server_id must be set
1707 if len(server_id) == 0:
1708- print("Error: Server id is empty. Can not continue operation",
1709- file=sys.stderr)
1710+ print("Error: Server id is empty. Can not continue operation",
1711+ file=sys.stderr)
1712 sys.exit(2)
1713-
1714+
1715 setup_logging()
1716
1717 # Signal handlers
1718 signal.signal(signal.SIGTERM, cleanup)
1719 signal.signal(signal.SIGINT, cleanup)
1720+
1721 # Not clear why I ignore SIGCHLD.
1722- # it make subprocess.call thow exception, so I comment it out so far
1723+ # it make subprocess.call throw exception, so I comment it out so far
1724 #signal.signal(signal.SIGCHLD, signal.SIG_IGN)
1725-
1726+
1727+ opts = []
1728 try:
1729- opts, args = getopt.getopt(sys.argv[1:], 'd:i:ghu:p:',
1730- ["start", "stop", "help", "register=", "dispatcher=", "user=", "password=", "version",
1731- "unregister", "delete-backups", "backup", "is-registered"])
1732+ opts, args = getopt.getopt(sys.argv[1:], 'd:i:ghu:p:',
1733+ ["start", "stop", "help", "register=", "dispatcher=", "user=", "password=",
1734+ "version",
1735+ "unregister", "delete-backups", "backup", "is-registered"])
1736 except getopt.GetoptError as err:
1737 logger.error(err)
1738 exit_on_error(traceback.format_exc())
1739@@ -2256,6 +2419,7 @@
1740 logger.error("Unexpected error: %s,%s" % sys.exc_info()[:2]);
1741 usage()
1742 exit_on_error(traceback.format_exc())
1743+
1744 # Set options first
1745 action = None
1746 delete_backups = False
1747@@ -2295,16 +2459,24 @@
1748 else:
1749 usage()
1750 sys.exit(2)
1751+
1752+ # Check that the option combinations have been used correctly
1753 if delete_backups and action != "unregister":
1754 logger.error("Hey, --delete-backups can be used only with --unregister")
1755 sys.exit(2)
1756+
1757+ # Check the environment and set it up correctly if its not already set
1758+ check_env()
1759+
1760 if check_period < 1:
1761 check_period = 1
1762+
1763 if action == "start":
1764 if is_registered():
1765 start()
1766 else:
1767- logger.error("The server must be registered first. Run following command:\n\ntwindb --register <registration code>\n")
1768+ logger.error(
1769+ "The server must be registered first. Run following command:\n\ntwindb --register <registration code>\n")
1770 logger.error("Get your code on https://console.twindb.com/?get_code")
1771 sys.exit(2)
1772 elif action == "stop":
1773@@ -2313,14 +2485,16 @@
1774 action_handler_register(regcode)
1775 elif action == "unregister":
1776 if not is_registered():
1777- logger.error("The server must be registered first. Run following command:\n\ntwindb --register <registration code>\n")
1778+ logger.error(
1779+ "The server must be registered first. Run following command:\n\ntwindb --register <registration code>\n")
1780 logger.error("Get your code on https://console.twindb.com/?get_code")
1781 sys.exit(2)
1782 if action_handler_unregister(delete_backups):
1783 stop()
1784 elif action == "backup":
1785 if not is_registered():
1786- logger.error("The server must be registered first. Run following command:\n\ntwindb --register <registration code>\n")
1787+ logger.error(
1788+ "The server must be registered first. Run following command:\n\ntwindb --register <registration code>\n")
1789 logger.error("Get your code on https://console.twindb.com/?get_code")
1790 sys.exit(2)
1791 if not action_handler_backup():
1792@@ -2339,7 +2513,7 @@
1793 remove_pid()
1794 sys.exit(2)
1795
1796+
1797 if __name__ == "__main__":
1798-
1799 logger = logging.getLogger('twindb')
1800 main()

Subscribers

People subscribed via source and target branches