Merge lp:~akuzminsky/twindb-agent/restore into lp:twindb-agent
- restore
- Merge into trunk
Proposed by
Aleksandr Kuzminsky
Status: | Merged |
---|---|
Merged at revision: | 18 |
Proposed branch: | lp:~akuzminsky/twindb-agent/restore |
Merge into: | lp:twindb-agent |
Diff against target: |
3480 lines (+1215/-1459) 1 file modified
twindb.py (+1215/-1459) |
To merge this branch: | bzr merge lp:~akuzminsky/twindb-agent/restore |
Related bugs: |
Reviewer | Review Type | Date Requested | Status |
---|---|---|---|
Ovais Tariq | Pending | ||
Review via email: mp+255386@code.launchpad.net |
Commit message
Description of the change
To post a comment you must log in.
Preview Diff
[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1 | === modified file 'twindb.py' |
2 | --- twindb.py 2015-03-16 02:16:03 +0000 |
3 | +++ twindb.py 2015-04-07 16:22:26 +0000 |
4 | @@ -2,31 +2,34 @@ |
5 | from __future__ import print_function |
6 | from __future__ import unicode_literals |
7 | from __future__ import absolute_import |
8 | +import ConfigParser |
9 | from base64 import b64encode, b64decode |
10 | from datetime import datetime |
11 | -import sys |
12 | -import time |
13 | +import errno |
14 | +import exceptions |
15 | import getopt |
16 | +import httplib |
17 | +import json |
18 | +import logging.handlers |
19 | +import os |
20 | +import pwd |
21 | import signal |
22 | -import traceback |
23 | -import exceptions |
24 | -import errno |
25 | +import shutil |
26 | import socket |
27 | -import logging.handlers |
28 | -import os.path |
29 | import subprocess |
30 | -import httplib |
31 | -import json |
32 | +import sys |
33 | +import tempfile |
34 | +import time |
35 | +import traceback |
36 | import urllib |
37 | - |
38 | +import uuid |
39 | |
40 | try: |
41 | import mysql.connector |
42 | except ImportError: |
43 | + # On CentOS 5 mysql.connector is in python 2.6 directory |
44 | sys.path.insert(0, '/usr/lib/python2.6/site-packages') |
45 | import mysql.connector |
46 | -import tempfile |
47 | -import uuid |
48 | |
49 | # global variables |
50 | host = "dispatcher.twindb.com" |
51 | @@ -36,8 +39,8 @@ |
52 | debug_http = True |
53 | debug_gpg = False |
54 | init_config = "/etc/twindb.cfg" |
55 | -ssh_private_key = "/root/.ssh/twindb.key" |
56 | -ssh_public_key = "/root/.ssh/twindb.key.pub" |
57 | +ssh_private_key_file = "/root/.ssh/twindb.key" |
58 | +ssh_public_key_file = "/root/.ssh/twindb.key.pub" |
59 | ssh_port = 4194 |
60 | gpg_homedir = "/root/.gnupg/" |
61 | pid_file = "/var/run/twindb.pid" |
62 | @@ -45,9 +48,10 @@ |
63 | time_zone = "UTC" |
64 | api_email = "api@twindb.com" |
65 | server_id = "" |
66 | -job_id = 0 |
67 | -mysql_user = "root" |
68 | -mysql_password = "" |
69 | +job_id = None |
70 | +mysql_user = None |
71 | +mysql_password = None |
72 | +agent_version = "@@TWINDB_AGENT_VERSION@@" |
73 | api_pub_key = """ |
74 | -----BEGIN PGP PUBLIC KEY BLOCK----- |
75 | Version: GnuPG v1 |
76 | @@ -102,24 +106,24 @@ |
77 | =62IM |
78 | -----END PGP PUBLIC KEY BLOCK----- |
79 | """ |
80 | -agent_version = "@@TWINDB_AGENT_VERSION@@" |
81 | - |
82 | - |
83 | -# Logging handler that logs to remote TwiDB dispatcher |
84 | + |
85 | + |
86 | class RlogHandler(logging.Handler): |
87 | + """ |
88 | + Logging handler that logs to remote TwiDB dispatcher |
89 | + """ |
90 | + log_flag = True |
91 | + |
92 | def __init__(self): |
93 | logging.Handler.__init__(self) |
94 | self.log_flag = True |
95 | |
96 | def emit(self, record): |
97 | - global server_id |
98 | - global job_id |
99 | - global debug |
100 | - |
101 | - request = {} |
102 | - request["type"] = "log" |
103 | - request["params"] = {} |
104 | - if job_id != 0: |
105 | + request = { |
106 | + "type": "log", |
107 | + "params": {} |
108 | + } |
109 | + if job_id: |
110 | request["params"]["job_id"] = job_id |
111 | request["params"]["msg"] = record.getMessage() |
112 | if self.log_flag and not debug: |
113 | @@ -128,83 +132,106 @@ |
114 | self.log_flag = True |
115 | |
116 | |
117 | +class JobError(Exception): |
118 | + def __init__(self, value): |
119 | + self.value = value |
120 | + |
121 | + def __str__(self): |
122 | + return self.value |
123 | + |
124 | + |
125 | +class JobTooSoonError(Exception): |
126 | + def __init__(self, value): |
127 | + self.value = value |
128 | + |
129 | + def __str__(self): |
130 | + return self.value |
131 | + |
132 | + |
133 | def usage(): |
134 | - print("Usage:\n\ |
135 | -twindb --start | --stop | --register <registration code> [-g] [-i interval]\n\ |
136 | - [-u user] [-p password]\n\ |
137 | -\n\ |
138 | - --start Start TwinDB agent.\n\ |
139 | - --stop Stop TwinDB agent.\n\ |
140 | -\n\ |
141 | - --register <code> Register this server in TwinDB.\n\ |
142 | - <code> is string like 7b90ae21ac642f2f8fc0a285c4789147\n\ |
143 | - Get your code on https://console.twindb.com/?get_code\n\ |
144 | -\n\ |
145 | - --unregister [--delete-backups] Unregister this server from TwinDB.\n\ |
146 | - --backup Start backup from this server immediately.\n\ |
147 | - --is-registered Checks whether the server is registered and prints YES or NO.\n\ |
148 | - -g Print debug message.\n\ |
149 | - -i interval Check for a job every interval seconds.\n\ |
150 | - Default 300.\n\ |
151 | - During registration TwinDB agent needs to get some information from MySQL,\n\ |
152 | - (master host etc)\n\ |
153 | - This is credentials to connect to local MySQL instance\n\ |
154 | - -u | --user username MySQL user name\n\ |
155 | - -p | --password password MySQL password") |
156 | + print("""Usage: |
157 | +twindb --start | --stop | --register <registration code> [-g] [-i interval] |
158 | + [-u user] [-p password] |
159 | + |
160 | + --start Start TwinDB agent. |
161 | + --stop Stop TwinDB agent. |
162 | + |
163 | + --register <code> Register this server in TwinDB. |
164 | + <code> is string like 7b90ae21ac642f2f8fc0a285c4789147 |
165 | + Get your code on https://console.twindb.com/?get_code |
166 | + |
167 | + --unregister [--delete-backups] Unregister this server from TwinDB. |
168 | + --backup Start backup from this server immediately. |
169 | + --is-registered Checks whether the server is registered and prints YES or NO. |
170 | + -g Print debug message. |
171 | + -i interval Check for a job every interval seconds. |
172 | + Default {check_period}. |
173 | + During registration TwinDB agent needs to get some information from MySQL, |
174 | + (master host etc) |
175 | + This is credentials to connect to local MySQL instance |
176 | + -u | --user username MySQL user name |
177 | + -p | --password password MySQL password""".format(check_period=check_period)) |
178 | return |
179 | |
180 | |
181 | def get_mysql_connection(user=None, passwd=None): |
182 | - global mysql_user |
183 | - global mysql_password |
184 | - |
185 | + """ |
186 | + Returns connection to the local MySQL instance. |
187 | + If user is passed as an argument it'll be used to connect, |
188 | + otherwise the second choice will be to use mysql_user. |
189 | + If neither user names are set the function will try to use either of MySQL option files |
190 | + (/etc/my.cnf, /etc/mysql/my.cnf, or /root/.my.cnf). If the option files don't exist |
191 | + it'll try to connect as root w/o password. |
192 | + """ |
193 | try: |
194 | - if user is None: |
195 | - user = mysql_user |
196 | - if passwd is None: |
197 | - passwd = mysql_password |
198 | - mysql_option_files = [] |
199 | unix_socket = get_unix_socket() |
200 | - conn = mysql.connector.connect() |
201 | - if user is not None: |
202 | - for f in ['/etc/mysql/my.cnf', '/etc/my.cnf', '/root/.my.cnf']: |
203 | - if os.path.exists(f): |
204 | - mysql_option_files.append(f) |
205 | - if mysql_option_files: |
206 | - conn = mysql.connector.connect(option_files=mysql_option_files, unix_socket=unix_socket) |
207 | - logger.debug( |
208 | - "Connected to MySQL as '%s'@'localhost' with options files %r" % (conn.user, mysql_option_files)) |
209 | - if not conn.is_connected(): |
210 | - conn = mysql.connector.connect(user=user, passwd=passwd, unix_socket=unix_socket) |
211 | - logger.debug("Connected to MySQL as %s@localhost " % conn.user) |
212 | + if not user: |
213 | + if mysql_user: |
214 | + logger.debug('Using MySQL user specified in the command line') |
215 | + user = mysql_user |
216 | + passwd = mysql_password |
217 | + else: |
218 | + for options_file in ["/etc/my.cnf", "/etc/mysql/my.cnf", "/usr/etc/my.cnf", |
219 | + "/root/.my.cnf", "/root/.mylogin.cnf"]: |
220 | + if os.path.exists(options_file): |
221 | + config = ConfigParser.ConfigParser() |
222 | + config.read(options_file) |
223 | + for section in ["client", "twindb"]: |
224 | + if config.has_section(section): |
225 | + if config.has_option(section, "user"): |
226 | + user = config.get(section, "user") |
227 | + if config.has_option(section, "password"): |
228 | + passwd = config.get(section, "user") |
229 | + # If user isn't set by the function argument, global mysql_user |
230 | + # or MySQL options file connect as unix user w/ empty password |
231 | + if not user: |
232 | + user = pwd.getpwuid(os.getuid()).pw_name |
233 | + passwd = "" |
234 | + logger.debug("Connecting to MySQL as unix user %s" % user) |
235 | + conn = mysql.connector.connect(user=user, passwd=passwd, unix_socket=unix_socket) |
236 | + logger.debug("Connected to MySQL as %s@localhost " % conn.user) |
237 | except mysql.connector.Error as err: |
238 | logger.error("Can not connect to local MySQL server") |
239 | logger.error("MySQL said: %s" % err.msg) |
240 | return None |
241 | - except: |
242 | - logger.error("Can not connect to local MySQL server") |
243 | - logger.error("Unexpected error: %s,%s" % sys.exc_info()[:2]); |
244 | - return None |
245 | return conn |
246 | |
247 | |
248 | -# Checks if TwinDB API public key is installed |
249 | -# Returns |
250 | -# True - if the key is installed |
251 | -# False - if not |
252 | def is_gpg_key_installed(email, key_type="public"): |
253 | - global logger |
254 | - global gpg_homedir |
255 | - global debug_gpg |
256 | - |
257 | + """ |
258 | + Checks if TwinDB API public key is installed |
259 | + Returns |
260 | + True - if the key is installed |
261 | + False - if not |
262 | + """ |
263 | result = False |
264 | - if len(email) == 0: |
265 | + if not email: |
266 | logger.error("Can not check public key of an empty email") |
267 | return False |
268 | if debug_gpg: |
269 | logger.debug("Checking if public key of %s is installed" % email) |
270 | + gpg_cmd = ["gpg", "--homedir", gpg_homedir] |
271 | try: |
272 | - gpg_cmd = ["gpg", "--homedir", gpg_homedir] |
273 | if key_type == "public": |
274 | gpg_cmd.append("-k") |
275 | else: |
276 | @@ -212,63 +239,51 @@ |
277 | gpg_cmd.append(email) |
278 | if debug_gpg: |
279 | logger.debug(gpg_cmd) |
280 | - devnull = open('/dev/null', 'w') |
281 | + devnull = open("/dev/null", "w") |
282 | ret = subprocess.call(gpg_cmd, stdout=devnull, stderr=devnull) |
283 | - file.close(devnull) |
284 | + devnull.close() |
285 | if ret != 0: |
286 | if debug_gpg: |
287 | - logger.debug("GPG returned %d" % ret); |
288 | + logger.debug("GPG returned %d" % ret) |
289 | logger.debug(key_type + " key " + email + " is NOT installed") |
290 | result = False |
291 | else: |
292 | if debug_gpg: |
293 | logger.debug(key_type + " key is already installed") |
294 | result = True |
295 | - except: |
296 | - logger.error("Couldn't get " + key_type + " key of %s" % email) |
297 | - exit_on_error(traceback.format_exc()) |
298 | + except OSError as err: |
299 | + logger.error("Couldn't run command %r. %s" % (gpg_cmd, err)) |
300 | + exit_on_error("Couldn't get %s key of %s." % (key_type, email)) |
301 | return result |
302 | |
303 | |
304 | -# Installs TwinDB public key |
305 | -# Returns |
306 | -# True - if the key is successfully installed |
307 | -# Exits if the key wasn't installed |
308 | - |
309 | def install_api_pub_key(): |
310 | - global logger |
311 | - global api_pub_key |
312 | - global gpg_homedir |
313 | - global debug_gpg |
314 | - |
315 | + """ |
316 | + Installs TwinDB public key |
317 | + Returns |
318 | + True - if the key is successfully installed |
319 | + Exits if the key wasn't installed |
320 | + """ |
321 | logger.info("Installing twindb public key") |
322 | + gpg_cmd = ["gpg", "--homedir", gpg_homedir, "--import"] |
323 | try: |
324 | - p1 = subprocess.Popen(["echo", api_pub_key], stdout=subprocess.PIPE) |
325 | - p2 = subprocess.Popen(["gpg", "--homedir", gpg_homedir, "--import"], |
326 | - stdin=p1.stdout) |
327 | - p1.stdout.close() |
328 | - except: |
329 | - logger.error("Couldn't install TwinDB public key") |
330 | - exit_on_error(traceback.format_exc()) |
331 | + p = subprocess.Popen(gpg_cmd, stdin=subprocess.PIPE) |
332 | + p.communicate(api_pub_key) |
333 | + except OSError as err: |
334 | + logger.error("Couldn't run command %r. %s" % (gpg_cmd, err)) |
335 | + exit_on_error("Couldn't install TwinDB public key") |
336 | logger.info("Twindb public key successfully installed") |
337 | return True |
338 | |
339 | |
340 | -# Checks if GPG environment is good to start TwinDB agent |
341 | -# Installs TwinDB public key if necessary |
342 | -# Returns |
343 | -# True - if the GPG environment good to proceed |
344 | -# Exits if GPG wasn't configured correctly |
345 | - |
346 | def check_gpg(): |
347 | - global logger |
348 | - global api_email |
349 | - global api_pub_key |
350 | - global gpg_homedir |
351 | - global init_config |
352 | - global server_id |
353 | - global debug_gpg |
354 | - |
355 | + """ |
356 | + Checks if GPG environment is good to start TwinDB agent |
357 | + Installs TwinDB public key if necessary |
358 | + Returns |
359 | + True - if the GPG environment good to proceed |
360 | + Exits if GPG wasn't configured correctly |
361 | + """ |
362 | if debug_gpg: |
363 | logger.debug("Checking if GPG config is initialized") |
364 | if os.path.exists(gpg_homedir): |
365 | @@ -282,13 +297,13 @@ |
366 | try: |
367 | os.mkdir(gpg_homedir, 0700) |
368 | install_api_pub_key() |
369 | - except: |
370 | - logger.error("Couldn't create directory " + gpg_homedir) |
371 | - exit_on_error(traceback.format_exc()) |
372 | - if len(server_id) == 0: |
373 | + except OSError as err: |
374 | + logger.error("Failed to create directory %s. %s" % (gpg_homedir, err)) |
375 | + exit_on_error("Couldn't create directory " + gpg_homedir) |
376 | + if not server_id: |
377 | exit_on_error("Server id is neither read from config nor generated") |
378 | email = "%s@twindb.com" % server_id |
379 | - if not ( is_gpg_key_installed(email) and is_gpg_key_installed(email, "private")): |
380 | + if not (is_gpg_key_installed(email) and is_gpg_key_installed(email, "private")): |
381 | gen_entropy() |
382 | gen_gpg_keypair("%s@twindb.com" % server_id) |
383 | if debug_gpg: |
384 | @@ -296,13 +311,13 @@ |
385 | return True |
386 | |
387 | |
388 | -# Checks the environment if it's OK to start TwinDB agent |
389 | -# Returns |
390 | -# True - if the environment is OK |
391 | -# Exits if the environment is not OK |
392 | - |
393 | def check_env(): |
394 | - global logger |
395 | + """ |
396 | + Checks the environment if it's OK to start TwinDB agent |
397 | + Returns |
398 | + True - if the environment is OK |
399 | + Exits if the environment is not OK |
400 | + """ |
401 | logger.debug("Checking environment") |
402 | if os.getuid() != 0: |
403 | exit_on_error("TwinDB agent must be run by root") |
404 | @@ -311,41 +326,24 @@ |
405 | return True |
406 | |
407 | |
408 | -# Checks how much entropy is available in the system |
409 | -# If not enough, does some disk activity to generate more |
410 | def gen_entropy(): |
411 | + """ |
412 | + Checks how much entropy is available in the system |
413 | + If not enough, does some disk activity to generate more |
414 | + """ |
415 | # Do nothing until I find good way to generate entropy |
416 | return |
417 | - try: |
418 | - f = open("/proc/sys/kernel/random/entropy_avail", "r") |
419 | - entropy_avail = int(f.read()) |
420 | - f.close() |
421 | - i = 0 |
422 | - while entropy_avail < 2048: |
423 | - logger.info("Low entropy level %d. Will run 'find / -name file' to generate more") |
424 | - cmd = ["find", "/", "-name", "file"] |
425 | - p = subprocess.Popen(cmd) |
426 | - p.wait() |
427 | - f = open("/proc/sys/kernel/random/entropy_avail", "r") |
428 | - entropy_avail = int(f.read()) |
429 | - f.close() |
430 | - # Do not run find more than 10 times |
431 | - if i > 10: |
432 | - break |
433 | - i = i + 1 |
434 | - except: |
435 | - logger.error("Failed to generate entropy") |
436 | - return |
437 | - |
438 | - |
439 | -# Formats bytes count to human readable form |
440 | -# Inputs: |
441 | -# num - bytes count |
442 | -# decimals - number of digits to save after point (Default 2) |
443 | -# Returns |
444 | -# human-readable string like "20.33 MB" |
445 | + |
446 | |
447 | def h_size(num, decimals=2): |
448 | + """ |
449 | + Formats bytes count to human readable form |
450 | + Inputs: |
451 | + num - bytes count |
452 | + decimals - number of digits to save after point (Default 2) |
453 | + Returns |
454 | + human-readable string like "20.33 MB" |
455 | + """ |
456 | fmt = "%3." + str(decimals) + "f %s" |
457 | for x in ['bytes', 'kB', 'MB', 'GB', 'TB', 'PB']: |
458 | if num < 1024.0: |
459 | @@ -353,30 +351,19 @@ |
460 | num /= 1024.0 |
461 | |
462 | |
463 | -# Generates random string of bytes good for crypthography |
464 | -# Inputs |
465 | -# n - Number of bytes |
466 | -# Returns |
467 | -# binary string of n bytes size |
468 | - |
469 | -def gen_key(n): |
470 | - return os.urandom(n) |
471 | - |
472 | - |
473 | -# Generates GPG private and public keys for a given recipient |
474 | -# Inputs |
475 | -# email - recipient |
476 | -# Returns |
477 | -# True on success or exits |
478 | - |
479 | def gen_gpg_keypair(email): |
480 | - global server_id |
481 | - global gpg_homedir |
482 | - global debug_gpg |
483 | - |
484 | - if len(email) == 0: |
485 | + """ |
486 | + Generates GPG private and public keys for a given recipient |
487 | + Inputs |
488 | + email - recipient |
489 | + Returns |
490 | + True on success or exits |
491 | + """ |
492 | + if not email: |
493 | exit_on_error("Can not generate GPG keypair for an empty email") |
494 | + gpg_cmd = ["gpg", "--homedir", gpg_homedir, "--batch", "--gen-key"] |
495 | try: |
496 | + logger.info("The agent needs to generate cryptographically strong keys.") |
497 | logger.info("Generating GPG keys pair for %s" % email) |
498 | logger.info("It may take really, really long time. Please be patient.") |
499 | gen_entropy() |
500 | @@ -393,36 +380,25 @@ |
501 | %%commit |
502 | %%echo done |
503 | """ % (server_id, email) |
504 | - p1 = subprocess.Popen(["echo", gpg_script], stdout=subprocess.PIPE) |
505 | - p2 = subprocess.Popen(["gpg", "--homedir", gpg_homedir, "--batch", |
506 | - "--gen-key"], stdin=p1.stdout) |
507 | - p1.stdout.close() |
508 | - p2.wait() |
509 | - del p1, p2 |
510 | - except: |
511 | - logger.error("Failed to generate GPG keys pair") |
512 | - exit_on_error(traceback.format_exc()) |
513 | + |
514 | + p = subprocess.Popen(gpg_cmd, stdin=subprocess.PIPE) |
515 | + p.communicate(gpg_script) |
516 | + except OSError as err: |
517 | + logger.error("Failed to run command %r. %s" % (gpg_cmd, err)) |
518 | + exit_on_error("Failed to generate GPG keys pair") |
519 | return True |
520 | |
521 | |
522 | -# Encrypts message with TwinDB public key |
523 | -# If server_id is non-zero (which means the server is registered) |
524 | -# signs the message with the server's private key |
525 | -# Inputs |
526 | -# msg - string to encrypt |
527 | -# Returns |
528 | -# 64-base encoded and encrypted message. To read - decrypt and 64-base decode |
529 | -# EDIT: encrypted message |
530 | -# None - if error happens |
531 | - |
532 | def encrypt(msg): |
533 | - global logger |
534 | - global api_email |
535 | - global gpg_homedir |
536 | - global server_id |
537 | - global debug_gpg |
538 | - |
539 | - if len(server_id) == 0: |
540 | + """ |
541 | + Encrypts message with TwinDB public key |
542 | + If server_id is non-zero (which means the server is registered) |
543 | + signs the message with the server's private key |
544 | + :param msg: string to encrypt |
545 | + :return: 64-base encoded and encrypted message or None if error happens. |
546 | + To read the encrypted message - decrypt and 64-base decode |
547 | + """ |
548 | + if not server_id: |
549 | exit_on_error("Trying to encrypt message while server_id is empty") |
550 | server_email = "%s@twindb.com" % server_id |
551 | if not is_gpg_key_installed(api_email): |
552 | @@ -433,12 +409,10 @@ |
553 | logger.debug("Public key of %s is not installed." % server_email) |
554 | logger.debug("Will not encrypt message") |
555 | return None |
556 | - enc_cmd = ["gpg", "--homedir", gpg_homedir, "-r", api_email, "--batch", |
557 | - "--trust-model", "always", "--armor"] |
558 | - enc_cmd.append("--sign") |
559 | - enc_cmd.append("--local-user") |
560 | - enc_cmd.append(server_email) |
561 | - enc_cmd.append("--encrypt") |
562 | + enc_cmd = ["gpg", "--homedir", gpg_homedir, "-r", api_email, "--batch", "--trust-model", "always", "--armor", |
563 | + "--sign", "--local-user", server_email, "--encrypt"] |
564 | + cout = "No output" |
565 | + cerr = "No output" |
566 | try: |
567 | if debug_gpg: |
568 | logger.debug("Encrypting message:") |
569 | @@ -450,11 +424,14 @@ |
570 | stdout=subprocess.PIPE, |
571 | stderr=subprocess.PIPE) |
572 | cout, cerr = p.communicate(msg) |
573 | + if p.returncode != 0: |
574 | + raise OSError(p.returncode) |
575 | ct = cout |
576 | ct_64 = b64encode(ct) |
577 | if debug_gpg: |
578 | logger.debug("Encrypted message: " + ct_64) |
579 | - except: |
580 | + except OSError as err: |
581 | + logger.error("Failed to run command %r. %s" % (enc_cmd, err)) |
582 | logger.error("Failed to encrypt message: " + msg) |
583 | logger.error("STDOUT: " + cout) |
584 | logger.error("STDERR: " + cerr) |
585 | @@ -462,52 +439,34 @@ |
586 | return ct_64 |
587 | |
588 | |
589 | -# Decrypts message with local private key |
590 | -# Inputs |
591 | -# msg - 64-base encoded and encrypted message. |
592 | -# Before encryption the message was 64-base encoded |
593 | -# Returns |
594 | -# String - Plain text message |
595 | -# None - if error happens |
596 | - |
597 | def decrypt(msg_64): |
598 | - global logger |
599 | - global api_email |
600 | - global gpg_homedir |
601 | - global server_id |
602 | - global debug_gpg |
603 | - |
604 | + """ |
605 | + Decrypts message with local private key |
606 | + :param msg_64: 64-base encoded and encrypted message. Before encryption the message was 64-base encoded |
607 | + :return: Plain text message or None if error happens |
608 | + """ |
609 | if not msg_64: |
610 | logger.error("Will not decrypt empty message") |
611 | return None |
612 | - |
613 | - cout = None |
614 | - cerr = None |
615 | - |
616 | + cout = "No output" |
617 | + cerr = "No output" |
618 | + gpg_cmd = ["gpg", "--homedir", gpg_homedir, "-d", "-q"] |
619 | try: |
620 | - dec_cmd = ["gpg", "--homedir", gpg_homedir, "-d", "-q"] |
621 | - debug_gpg = True |
622 | if debug_gpg: |
623 | logger.debug("Decrypting message:") |
624 | logger.debug(msg_64) |
625 | logger.debug("Decryptor command:") |
626 | - logger.debug(dec_cmd) |
627 | - p = subprocess.Popen(dec_cmd, |
628 | - stdin=subprocess.PIPE, |
629 | - stdout=subprocess.PIPE, |
630 | - stderr=subprocess.PIPE) |
631 | + logger.debug(gpg_cmd) |
632 | + p = subprocess.Popen(gpg_cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) |
633 | msg = b64decode(msg_64) |
634 | cout, cerr = p.communicate(msg) |
635 | - |
636 | if p.returncode != 0: |
637 | raise OSError(p.returncode) |
638 | - except: |
639 | + except OSError as err: |
640 | + logger.error("Failed to run command %r. %s" % (gpg_cmd, err)) |
641 | logger.error("Failed to decrypt message: " + msg_64) |
642 | - logger.error("Unexpected error: %s, %s" % sys.exc_info()[:2]); |
643 | - if cout: |
644 | - logger.error("STDOUT: " + cout) |
645 | - if cerr: |
646 | - logger.error("STDERR: " + cerr) |
647 | + logger.error("STDOUT: " + cout) |
648 | + logger.error("STDERR: " + cerr) |
649 | return None |
650 | if debug_gpg: |
651 | logger.debug("Decrypted message:") |
652 | @@ -515,26 +474,20 @@ |
653 | return cout |
654 | |
655 | |
656 | -# Sends HTTP POST request to TwinDB dispatcher |
657 | -# It converts python data structure in "data" variable into JSON string, |
658 | -# then encrypts it and then sends as variable "data" in HTTP request |
659 | -# Inputs |
660 | -# uri - URI to send the request |
661 | -# data - Data structure with variables |
662 | -# Returns |
663 | -# String with body of HTTP response |
664 | -# None - if error happened or empty response |
665 | - |
666 | def get_response(request): |
667 | - global logger |
668 | - global host |
669 | - global proto |
670 | - global api_dir |
671 | - |
672 | + """ |
673 | + Sends HTTP POST request to TwinDB dispatcher |
674 | + It converts python data structure in "data" variable into JSON string, |
675 | + then encrypts it and then sends as variable "data" in HTTP request |
676 | + Inputs |
677 | + uri - URI to send the request |
678 | + data - Data structure with variables |
679 | + Returns |
680 | + String with body of HTTP response |
681 | + None - if error happened or empty response |
682 | + """ |
683 | uri = "api.php" |
684 | response_body = None |
685 | - conn = None |
686 | - |
687 | logger.debug("Enter get_response(uri=" + uri + ")") |
688 | if proto == "http": |
689 | conn = httplib.HTTPConnection(host) |
690 | @@ -542,8 +495,10 @@ |
691 | conn = httplib.HTTPSConnection(host) |
692 | else: |
693 | logger.error("Unsupported protocol " + proto) |
694 | + return None |
695 | + url = proto + "://" + host + "/" + api_dir + "/" + uri |
696 | + http_response = "Empty response" |
697 | try: |
698 | - url = proto + "://" + host + "/" + api_dir + "/" + uri |
699 | conn.connect() |
700 | logger.debug("Sending to " + host + ": %s" % json.dumps(request, indent=4, sort_keys=True)) |
701 | data_json = json.dumps(request) |
702 | @@ -588,46 +543,38 @@ |
703 | logger.error("Please check that DNS server is reachable and works") |
704 | return None |
705 | except exceptions.KeyError as err: |
706 | - logger.error("Failed to decode response from server %s" % http_response) |
707 | + logger.error("Failed to decode response from server: %s" % http_response) |
708 | logger.error("Could not find key %s" % err) |
709 | return None |
710 | - except: |
711 | - logger.error("Couldn't make HTTP request " + url) |
712 | - logger.error("Unexpected error: %s, %s" % sys.exc_info()[:2]); |
713 | - logger.debug(traceback.format_exc()) |
714 | - return None |
715 | finally: |
716 | conn.close() |
717 | return response_body |
718 | |
719 | |
720 | -# Replaces password from config with ******** |
721 | -# Inputs |
722 | -# config - python dictionary with backup config |
723 | -# Returns |
724 | -# Sanitized config |
725 | - |
726 | -def sanitize_config(c): |
727 | - if not c: |
728 | +def sanitize_config(config): |
729 | + """ |
730 | + Replaces password from config with ******** |
731 | + :param config: python dictionary with backup config |
732 | + :return: Sanitized config |
733 | + """ |
734 | + if not config: |
735 | return None |
736 | - cc = dict(c) |
737 | - try: |
738 | - cc["mysql_password"] = "********" |
739 | - except: |
740 | - logger.debug("Given config %r doesn't contain password" % c) |
741 | - return cc |
742 | - |
743 | - |
744 | -# Gets backup config from TwinDB dispatcher |
745 | -# Returns |
746 | -# Backup config |
747 | -# None - if error happened |
748 | + sanitized_config = dict(config) |
749 | + if "mysql_password" in config: |
750 | + sanitized_config["mysql_password"] = "********" |
751 | + else: |
752 | + logger.debug("Given config %r doesn't contain password" % config) |
753 | + return sanitized_config |
754 | + |
755 | |
756 | def get_config(): |
757 | - global logger |
758 | - global server_id |
759 | - |
760 | + """ |
761 | + Gets backup config from TwinDB dispatcher |
762 | + :return: Backup config or None if error happened |
763 | + """ |
764 | logger.debug("Getting config for server_id = %s" % server_id) |
765 | + response_body = "Empty response" |
766 | + config = None |
767 | try: |
768 | data = { |
769 | "type": "get_config", |
770 | @@ -648,114 +595,87 @@ |
771 | if msg_pt["error"]: |
772 | logger.error(msg_pt["error"]) |
773 | except exceptions.KeyError as err: |
774 | - logger.error("Failed to decode %s" % response_body); |
775 | - logger.error(err); |
776 | - return None |
777 | - except: |
778 | - logger.error("Couldn't get backup config") |
779 | - logger.error("Unexpected error: %s,%s" % sys.exc_info()[:2]); |
780 | - logger.debug(traceback.format_exc()) |
781 | + logger.error("Failed to decode %s" % response_body) |
782 | + logger.error(err) |
783 | return None |
784 | return config |
785 | |
786 | |
787 | -# Reports slave status to TwinDB dispatcher |
788 | -def report_sss(): |
789 | - global logger |
790 | - global server_id |
791 | - |
792 | +def report_sss(config): |
793 | + """ |
794 | + Reports slave status to TwinDB dispatcher |
795 | + :param config: server config received from the dispatcher |
796 | + :return: nothing |
797 | + """ |
798 | logger.debug("Reporting SHOW SLAVE STATUS for server_id = %s" % server_id) |
799 | - try: |
800 | - ss = get_slave_status() |
801 | - data = { |
802 | - "type": "report_sss", |
803 | - "params": { |
804 | - "server_id": server_id, |
805 | - "mysql_server_id": ss["mysql_server_id"], |
806 | - "mysql_master_server_id": ss["mysql_master_server_id"], |
807 | - "mysql_master_host": ss["mysql_master_host"], |
808 | - "mysql_seconds_behind_master": ss["mysql_seconds_behind_master"], |
809 | - "mysql_slave_io_running": ss["mysql_slave_io_running"], |
810 | - "mysql_slave_sql_running": ss["mysql_slave_sql_running"], |
811 | - } |
812 | + ss = get_slave_status(config["mysql_user"], config["mysql_password"]) |
813 | + data = { |
814 | + "type": "report_sss", |
815 | + "params": { |
816 | + "server_id": server_id, |
817 | + "mysql_server_id": ss["mysql_server_id"], |
818 | + "mysql_master_server_id": ss["mysql_master_server_id"], |
819 | + "mysql_master_host": ss["mysql_master_host"], |
820 | + "mysql_seconds_behind_master": ss["mysql_seconds_behind_master"], |
821 | + "mysql_slave_io_running": ss["mysql_slave_io_running"], |
822 | + "mysql_slave_sql_running": ss["mysql_slave_sql_running"], |
823 | } |
824 | - response_body = get_response(data) |
825 | - except: |
826 | - logger.error("Couldn't get report SHOW SLAVE STATUS") |
827 | - logger.error("Unexpected error: %s,%s" % sys.exc_info()[:2]); |
828 | - logger.debug(traceback.format_exc()) |
829 | - return None |
830 | + } |
831 | + get_response(data) |
832 | return |
833 | |
834 | |
835 | -# Reports what privileges are given to the agent |
836 | def report_agent_privileges(config): |
837 | - global logger |
838 | - global server_id |
839 | - |
840 | + """ |
841 | + Reports what privileges are given to the agent |
842 | + :param config: server config received from the dispatcher |
843 | + :return: nothing |
844 | + """ |
845 | logger.debug("Reporting agent privileges for server_id = %s" % server_id) |
846 | - try: |
847 | - mysql_user = config["mysql_user"] |
848 | - mysql_password = config["mysql_password"] |
849 | - except: |
850 | - logger.error("Failed to read config") |
851 | - logger.error("Unexpected error: %s,%s" % sys.exc_info()[:2]) |
852 | - return |
853 | - try: |
854 | - con = get_mysql_connection(user=mysql_user, passwd=mysql_password) |
855 | - cursor = con.cursor() |
856 | - query = "SELECT PRIVILEGE_TYPE FROM information_schema.USER_PRIVILEGES" |
857 | - logger.debug("Sending query : %s" % query) |
858 | - cursor.execute(query) |
859 | - |
860 | - privileges = dict() |
861 | - privileges["Reload_priv"] = "N" |
862 | - privileges["Lock_tables_priv"] = "N" |
863 | - privileges["Repl_client_priv"] = "N" |
864 | - privileges["Super_priv"] = "N" |
865 | - privileges["Create_tablespace_priv"] = "N" |
866 | - |
867 | - for (priv,) in cursor: |
868 | - if priv == "RELOAD": |
869 | - privileges["Reload_priv"] = "Y" |
870 | - elif priv == "LOCK TABLES": |
871 | - privileges["Lock_tables_priv"] = "Y" |
872 | - elif priv == "REPLICATION CLIENT": |
873 | - privileges["Repl_client_priv"] = "Y" |
874 | - elif priv == "SUPER": |
875 | - privileges["Super_priv"] = "Y" |
876 | - elif priv == "CREATE TABLESPACE": |
877 | - privileges["Create_tablespace_priv"] = "Y" |
878 | - data = { |
879 | - "type": "report_agent_privileges", |
880 | - "params": { |
881 | - "Reload_priv": privileges["Reload_priv"], |
882 | - "Lock_tables_priv": privileges["Lock_tables_priv"], |
883 | - "Repl_client_priv": privileges["Repl_client_priv"], |
884 | - "Super_priv": privileges["Super_priv"], |
885 | - "Create_tablespace_priv": privileges["Create_tablespace_priv"] |
886 | - } |
887 | + con = get_mysql_connection(user=config["mysql_user"], passwd=config["mysql_password"]) |
888 | + cursor = con.cursor() |
889 | + query = "SELECT PRIVILEGE_TYPE FROM information_schema.USER_PRIVILEGES" |
890 | + logger.debug("Sending query : %s" % query) |
891 | + cursor.execute(query) |
892 | + |
893 | + privileges = { |
894 | + "Reload_priv": "N", |
895 | + "Lock_tables_priv": "N", |
896 | + "Repl_client_priv": "N", |
897 | + "Super_priv": "N", |
898 | + "Create_tablespace_priv": "N" |
899 | + } |
900 | + for (priv,) in cursor: |
901 | + if priv == "RELOAD": |
902 | + privileges["Reload_priv"] = "Y" |
903 | + elif priv == "LOCK TABLES": |
904 | + privileges["Lock_tables_priv"] = "Y" |
905 | + elif priv == "REPLICATION CLIENT": |
906 | + privileges["Repl_client_priv"] = "Y" |
907 | + elif priv == "SUPER": |
908 | + privileges["Super_priv"] = "Y" |
909 | + elif priv == "CREATE TABLESPACE": |
910 | + privileges["Create_tablespace_priv"] = "Y" |
911 | + data = { |
912 | + "type": "report_agent_privileges", |
913 | + "params": { |
914 | + "Reload_priv": privileges["Reload_priv"], |
915 | + "Lock_tables_priv": privileges["Lock_tables_priv"], |
916 | + "Repl_client_priv": privileges["Repl_client_priv"], |
917 | + "Super_priv": privileges["Super_priv"], |
918 | + "Create_tablespace_priv": privileges["Create_tablespace_priv"] |
919 | } |
920 | - get_response(data) |
921 | - except: |
922 | - logger.error("Couldn't get privileges granted to %s@localhost on server %s" % (mysql_user, server_id)) |
923 | - logger.error("Unexpected error: %s,%s" % sys.exc_info()[:2]); |
924 | - logger.debug(traceback.format_exc()) |
925 | - return None |
926 | + } |
927 | + get_response(data) |
928 | return |
929 | |
930 | |
931 | -# Gets job order from TwinDB dispatcher |
932 | -# Returns |
933 | -# Job order in python dictionary |
934 | -# None - if error happened |
935 | - |
936 | def get_job(): |
937 | - global logger |
938 | - global server_id |
939 | - global job_id |
940 | + """ |
941 | + Gets job order from TwinDB dispatcher |
942 | + :return: Job order in python dictionary or None if error happened |
943 | + """ |
944 | job = None |
945 | - |
946 | logger.debug("Getting job for server_id = %s" % server_id) |
947 | try: |
948 | d = json.JSONDecoder() |
949 | @@ -764,36 +684,41 @@ |
950 | "params": {} |
951 | } |
952 | response_body = get_response(data) |
953 | + if not response_body: |
954 | + raise JobError("Empty response from dispatcher") |
955 | response_body_decoded = d.decode(response_body) |
956 | + if "response" not in response_body_decoded: |
957 | + raise JobError("There is no 'response' key in the response from dispatcher") |
958 | + if "success" not in response_body_decoded: |
959 | + raise JobError("There is no 'success' key in the response from dispatcher") |
960 | msg_enc = response_body_decoded["response"] |
961 | - if response_body_decoded["success"] == True: |
962 | + if response_body_decoded["success"]: |
963 | job_json = decrypt(msg_enc) |
964 | + logger.debug("job_json = %s" % job_json) |
965 | + if "data" not in job_json: |
966 | + raise JobError("There is no 'data' in decrypted response %s" % job_json) |
967 | job = d.decode(job_json)["data"] |
968 | - job["params"] = d.decode(job["params"]) |
969 | - job_id = job["job_id"] |
970 | - logger.info("Got job:\n%s" % json.dumps(job, indent=4, sort_keys=True)) |
971 | + if job and "params" in job and job["params"]: |
972 | + job["params"] = d.decode(job["params"]) |
973 | + logger.info("Got job:\n%s" % json.dumps(job, indent=4, sort_keys=True)) |
974 | else: |
975 | logger.error("Couldn't get job") |
976 | job_json = decrypt(msg_enc) |
977 | logger.error("Server said: %s" % d.decode(job_json)["error"]) |
978 | - logger.debug("Server said: %s" % d.decode(job_json)["debug"]) |
979 | except exceptions.TypeError as err: |
980 | + logger.error("Failed to get a job: %s" % err) |
981 | return None |
982 | - except: |
983 | - logger.error("Couldn't get job") |
984 | - logger.error("Unexpected error: %s,%s" % sys.exc_info()[:2]); |
985 | + except JobError as err: |
986 | + logger.error(err) |
987 | return None |
988 | return job |
989 | |
990 | |
991 | -# Checks whether the server is registered or not |
992 | -# Returns |
993 | -# True - registered |
994 | -# False - not so much |
995 | def is_registered(): |
996 | - global logger |
997 | - global server_id |
998 | - |
999 | + """ |
1000 | + Checks whether the server is registered or not |
1001 | + :return: True if registered, False if not so much |
1002 | + """ |
1003 | logger.debug("Getting registration status for server_id = %s" % server_id) |
1004 | |
1005 | twindb_email = "%s@twindb.com" % server_id |
1006 | @@ -801,13 +726,13 @@ |
1007 | |
1008 | enc_public_key = None |
1009 | # Reading the GPG key |
1010 | + gpg_cmd = ["gpg", "--homedir", gpg_homedir, "--armor", "--export", twindb_email] |
1011 | try: |
1012 | - p1 = subprocess.Popen(["gpg", "--homedir", gpg_homedir, "--armor", "--export", twindb_email], |
1013 | - stdout = subprocess.PIPE) |
1014 | - enc_public_key = p1.stdout.read() |
1015 | - except: |
1016 | - logger.error("Failed to export GPG keys of %s from %s." % (twindb_email, gpg_homedir)) |
1017 | - exit_on_error(traceback.format_exc()) |
1018 | + p = subprocess.Popen(gpg_cmd, stdout=subprocess.PIPE) |
1019 | + enc_public_key = p.communicate()[0] |
1020 | + except OSError as err: |
1021 | + logger.error("Failed to run command %r. %s" % (gpg_cmd, err)) |
1022 | + exit_on_error("Failed to export GPG keys of %s from %s." % (twindb_email, gpg_homedir)) |
1023 | |
1024 | # Call the TwinDB api to check for server registration |
1025 | response_body = None |
1026 | @@ -819,21 +744,17 @@ |
1027 | "enc_public_key": enc_public_key |
1028 | } |
1029 | } |
1030 | - |
1031 | response_body = get_response(data) |
1032 | if not response_body: |
1033 | - return None |
1034 | - |
1035 | + return False |
1036 | json_decoder = json.JSONDecoder() |
1037 | response_body_decoded = json_decoder.decode(response_body) |
1038 | - |
1039 | if response_body_decoded: |
1040 | if "response" in response_body_decoded: |
1041 | msg_decrypted = decrypt(response_body_decoded["response"]) |
1042 | if msg_decrypted is None: |
1043 | logger.debug("No valid response from dispatcher. Consider agent unregistered") |
1044 | return False |
1045 | - |
1046 | msg_pt = json_decoder.decode(msg_decrypted) |
1047 | registration_status = msg_pt["data"] |
1048 | logger.debug("Got registration status:\n%s" % json.dumps(registration_status, indent=4, sort_keys=True)) |
1049 | @@ -844,216 +765,140 @@ |
1050 | else: |
1051 | logger.debug("No valid response from dispatcher. Consider agent unregistered") |
1052 | return False |
1053 | - except exceptions.KeyError: |
1054 | - logger.error("Failed to decode %s" % response_body) |
1055 | - exit_on_error(traceback.format_exc()) |
1056 | - except: |
1057 | - logger.error("Couldn't get backup config") |
1058 | - logger.error("Unexpected error: %s,%s" % sys.exc_info()[:2]); |
1059 | - exit_on_error(traceback.format_exc()) |
1060 | + except exceptions.KeyError as err: |
1061 | + exit_on_error("Failed to decode response from dispatcher: %s. %s" % (response_body, err)) |
1062 | return False |
1063 | |
1064 | |
1065 | -# Gets name of initial full backup that corresponds to given incremental backup |
1066 | -# Inputs |
1067 | -# f - Valid name of incremental backup |
1068 | -# Returns |
1069 | -# String with name of the initial full backup. |
1070 | -# If full backup was given it will return the same name |
1071 | -# None - if error happened |
1072 | - |
1073 | -def get_full_of(f): |
1074 | - global logger |
1075 | - global server_id |
1076 | - |
1077 | - try: |
1078 | - logger.info("Getting full backup name of %s" % f) |
1079 | - d = json.JSONDecoder() |
1080 | - response = d.decode(get_response("get_full_of.php", {"backup_name": f})) |
1081 | - msg_enc = response["data"] |
1082 | - logger.debug("Reply from server %s" % msg_enc) |
1083 | - full = d.decode(decrypt(msg_enc)) |
1084 | - logger.info("Got full backup name %s" % full["name"]) |
1085 | - except: |
1086 | - logger.error("Couldn't get name of initial full backup of %s" % f) |
1087 | - logger.error(traceback.format_exc()) |
1088 | - return None |
1089 | - return full["name"] |
1090 | - |
1091 | - |
1092 | -# Gets name of child backup copy of the given backup name |
1093 | -# Inputs |
1094 | -# f - Valid name of incremental or full backup |
1095 | -# Returns |
1096 | -# String with name of child backup copy of the given backup copy |
1097 | -# None - if error happened |
1098 | - |
1099 | -def get_child_of(f): |
1100 | - global logger |
1101 | - global server_id |
1102 | - |
1103 | - try: |
1104 | - logger.info("Getting child backup name of %s" % f) |
1105 | - d = json.JSONDecoder() |
1106 | - response = d.decode(get_response("get_child_of.php", |
1107 | - {"backup_name": f})) |
1108 | - msg_enc = response["data"] |
1109 | - logger.debug("Reply from server %s" % msg_enc) |
1110 | - child = d.decode(decrypt(msg_enc)) |
1111 | - logger.info("Got child backup name %s" % child["name"]) |
1112 | - except: |
1113 | - logger.error("Couldn't get name of child backup of %s" % f) |
1114 | - logger.error(traceback.format_exc()) |
1115 | - return None |
1116 | - return child["name"] |
1117 | - |
1118 | - |
1119 | -# Notifies a job event to TwinDB dispatcher |
1120 | -# Inputs |
1121 | -# job_id - id of a job in jobs table |
1122 | -# params - { event: "start_job", job_id: job_id } or |
1123 | -# { event: "stop_job", job_id: job_id, ret_code: ret } |
1124 | -# Returns |
1125 | -# True |
1126 | -# False - if error happened |
1127 | - |
1128 | -def log_job_notify(job_id, params): |
1129 | - global logging |
1130 | - result = False |
1131 | - |
1132 | +def log_job_notify(params): |
1133 | + """ |
1134 | + Notifies a job event to TwinDB dispatcher |
1135 | + :param params: { event: "start_job", job_id: job_id } or |
1136 | + { event: "stop_job", job_id: job_id, ret_code: ret } |
1137 | + :return: True of False if error happened |
1138 | + """ |
1139 | logger.info("Sending event notification %s" % params["event"]) |
1140 | - try: |
1141 | - data = { |
1142 | - "type": "notify", |
1143 | - "params": params |
1144 | - } |
1145 | - response_body = get_response(data) |
1146 | - d = json.JSONDecoder() |
1147 | - response_body_decoded = d.decode(response_body) |
1148 | - msg_enc = response_body_decoded["response"] |
1149 | - if response_body_decoded["success"] == True: |
1150 | - logger.debug("Dispatcher acknowledged job_id = %d start" % job_id) |
1151 | - result = True |
1152 | - else: |
1153 | - logger.error("Dispatcher didn't acknowledge job_id = %d start" % job_id) |
1154 | - result = False |
1155 | - except: |
1156 | - logger.error("Unexpected error: %s,%s" % sys.exc_info()[:2]); |
1157 | + data = { |
1158 | + "type": "notify", |
1159 | + "params": params |
1160 | + } |
1161 | + response_body = get_response(data) |
1162 | + d = json.JSONDecoder() |
1163 | + response_body_decoded = d.decode(response_body) |
1164 | + if response_body_decoded["success"]: |
1165 | + logger.debug("Dispatcher acknowledged job_id = %d notification" % job_id) |
1166 | + result = True |
1167 | + else: |
1168 | + logger.error("Dispatcher didn't acknowledge job_id = %d notification" % job_id) |
1169 | + result = False |
1170 | return result |
1171 | |
1172 | |
1173 | -# Saves details about backup copy in TwinDB dispatcher |
1174 | -# Inputs |
1175 | -# job_id - id of a job in jobs table |
1176 | -# name - name of backup |
1177 | -# size - size of the backup in bytes |
1178 | -# lsn - last LSN if it was incremental backup |
1179 | -# ancestor - Ansector of the backup (not used now) |
1180 | -# Returns |
1181 | -# JSON string with status of the request i.e. { "success": True } |
1182 | -# None - if error happened |
1183 | - |
1184 | -def record_backup(job_id, name, volume_id, size, lsn=None, ancestor=0): |
1185 | - global logger |
1186 | - |
1187 | - try: |
1188 | - logger.info("Saving information about backup:") |
1189 | - logger.info("File name : %s" % name) |
1190 | - logger.info("Volume id : %d" % int(volume_id)) |
1191 | - logger.info("Size : %d (%s)" % (int(size), h_size(size) )) |
1192 | - logger.info("Ancestor : %d" % int(ancestor)) |
1193 | - data = { |
1194 | - "type": "update_backup_data", |
1195 | - "params": { |
1196 | - "job_id": job_id, |
1197 | - "name": name, |
1198 | - "volume_id": volume_id, |
1199 | - "size": size, |
1200 | - "lsn": lsn, |
1201 | - "ancestor": ancestor |
1202 | - } |
1203 | +def record_backup(name, volume_id, size, lsn=None, ancestor=0): |
1204 | + """ |
1205 | + Saves details about backup copy in TwinDB dispatcher |
1206 | + :param name: name of backup |
1207 | + :param volume_id: id of a volume where the backup copy is saved |
1208 | + :param size: size of the backup in bytes |
1209 | + :param lsn: last LSN if it was incremental backup |
1210 | + :param ancestor: Ansector of the backup (not used now) |
1211 | + :return: JSON string with status of the request i.e. { "success": True } or None if error happened |
1212 | + """ |
1213 | + logger.info("Saving information about backup:") |
1214 | + logger.info("File name : %s" % name) |
1215 | + logger.info("Volume id : %d" % int(volume_id)) |
1216 | + logger.info("Size : %d (%s)" % (int(size), h_size(size))) |
1217 | + logger.info("Ancestor : %d" % int(ancestor)) |
1218 | + data = { |
1219 | + "type": "update_backup_data", |
1220 | + "params": { |
1221 | + "job_id": job_id, |
1222 | + "name": name, |
1223 | + "volume_id": volume_id, |
1224 | + "size": size, |
1225 | + "lsn": lsn, |
1226 | + "ancestor": ancestor |
1227 | } |
1228 | - logger.debug("Saving a record %s" % data) |
1229 | - response = get_response(data) |
1230 | - if response <> None: |
1231 | - jd = json.JSONDecoder() |
1232 | - r = jd.decode(response) |
1233 | - logger.debug(r) |
1234 | - if r["success"]: |
1235 | - logger.info("Saved backup copy details") |
1236 | - return True |
1237 | - else: |
1238 | - logger.error("Failed to save backup copy details: " |
1239 | - + jd.decode(decrypt(r["response"]))["error"]) |
1240 | - return False |
1241 | - del jd |
1242 | + } |
1243 | + logger.debug("Saving a record %s" % data) |
1244 | + response = get_response(data) |
1245 | + if response: |
1246 | + jd = json.JSONDecoder() |
1247 | + r = jd.decode(response) |
1248 | + logger.debug(r) |
1249 | + if r["success"]: |
1250 | + logger.info("Saved backup copy details") |
1251 | + return True |
1252 | else: |
1253 | - logger.error("Empty response from server") |
1254 | + logger.error("Failed to save backup copy details: " |
1255 | + + jd.decode(decrypt(r["response"]))["error"]) |
1256 | return False |
1257 | - except: |
1258 | - logger.error("Failed to save backup copy details") |
1259 | - logger.error("Unexpected error: %s,%s" % sys.exc_info()[:2]); |
1260 | + else: |
1261 | + logger.error("Empty response from server") |
1262 | return False |
1263 | - return True |
1264 | - |
1265 | - |
1266 | -# Reads config from /etc/twindb.cfg and sets server_id variable |
1267 | -# Returns |
1268 | -# True - if config was successfully read |
1269 | -# Exits if error happened |
1270 | + |
1271 | |
1272 | def read_config(): |
1273 | - global init_config |
1274 | + """ |
1275 | + Reads config from /etc/twindb.cfg and sets server_id variable |
1276 | + :return: True if config was successfully read |
1277 | + Exits if error happened |
1278 | + """ |
1279 | global server_id |
1280 | - |
1281 | try: |
1282 | if os.path.exists(init_config): |
1283 | - execfile(init_config) |
1284 | + ns = {} |
1285 | + execfile(init_config, ns) |
1286 | + if "server_id" in ns: |
1287 | + server_id = ns["server_id"] |
1288 | else: |
1289 | - exit_on_error("Config %s doesn't exist" % init_config) |
1290 | - if len(server_id) == 0: |
1291 | + print("Config %s doesn't exist" % init_config, file=sys.stderr) |
1292 | + return False |
1293 | + if not server_id: |
1294 | exit_on_error("Config %s doesn't set server_id" % init_config) |
1295 | - except: |
1296 | - logger.error(traceback.format_exc()) |
1297 | - exit_on_error("Failed to read from config in %s" % init_config) |
1298 | + except IOError as err: |
1299 | + exit_on_error("Failed to read config file %s. %s" % (init_config, err)) |
1300 | return True |
1301 | |
1302 | |
1303 | -# Saves server_id variable in /etc/twindb.cfg |
1304 | -# Returns |
1305 | -# True - if config was successfully saved |
1306 | -# Exits if error happened |
1307 | - |
1308 | -def save_config(server_id): |
1309 | - global init_config |
1310 | - |
1311 | +def save_config(): |
1312 | + """ |
1313 | + Saves server_id variable in file init_config (/etc/twindb.cfg) |
1314 | + :return: True - if config was successfully saved |
1315 | + Exits if error happened |
1316 | + """ |
1317 | + if not server_id: |
1318 | + exit_on_error("Can not save agent config file because server_id is empty") |
1319 | try: |
1320 | - f = open(init_config, 'w') |
1321 | + f = open(init_config, "w") |
1322 | f.write("server_id='%s'\n" % server_id) |
1323 | f.close() |
1324 | - except: |
1325 | - print("Failed to save new config in %s" % init_config) |
1326 | - sys.exit(2) |
1327 | - return |
1328 | - |
1329 | - |
1330 | -def get_slave_status(): |
1331 | - result = dict() |
1332 | - # Read master host |
1333 | + except IOError as err: |
1334 | + exit_on_error("Failed to save new config in %s. %s" % (init_config, err)) |
1335 | + return True |
1336 | + |
1337 | + |
1338 | +def get_slave_status(user=None, passwd=None): |
1339 | + """ |
1340 | + Reads SHOW SLAVE STATUS from the local server |
1341 | + :param user: user to connect to MySQL |
1342 | + :param passwd: password to connect to MySQL |
1343 | + :return: dictionary with SHOW SLAVE STATUS result |
1344 | + """ |
1345 | + conn = get_mysql_connection(user, passwd) |
1346 | + if conn: |
1347 | + cursor = conn.cursor(dictionary=True) |
1348 | + else: |
1349 | + return None |
1350 | + result = { |
1351 | + "mysql_server_id": None, |
1352 | + "mysql_master_server_id": None, |
1353 | + "mysql_master_host": None, |
1354 | + "mysql_seconds_behind_master": None, |
1355 | + "mysql_slave_io_running": None, |
1356 | + "mysql_slave_sql_running": None |
1357 | + } |
1358 | try: |
1359 | - conn = get_mysql_connection() |
1360 | - if conn: |
1361 | - cursor = conn.cursor(dictionary=True) |
1362 | - else: |
1363 | - return None |
1364 | - |
1365 | - result["mysql_server_id"] = None |
1366 | - result["mysql_master_server_id"] = None |
1367 | - result["mysql_master_host"] = None |
1368 | - result["mysql_seconds_behind_master"] = None |
1369 | - result["mysql_slave_io_running"] = None |
1370 | - result["mysql_slave_sql_running"] = None |
1371 | - |
1372 | cursor.execute("SHOW SLAVE STATUS") |
1373 | for row in cursor: |
1374 | result["mysql_master_server_id"] = row["Master_Server_Id"] |
1375 | @@ -1061,35 +906,44 @@ |
1376 | result["mysql_seconds_behind_master"] = row["Seconds_Behind_Master"] |
1377 | result["mysql_slave_io_running"] = row["Slave_IO_Running"] |
1378 | result["mysql_slave_sql_running"] = row["Slave_SQL_Running"] |
1379 | - |
1380 | + except mysql.connector.Error as err: |
1381 | + logger.error("Could get SHOW SLAVE STATUS") |
1382 | + logger.error("MySQL Error: " % err) |
1383 | + return None |
1384 | + try: |
1385 | cursor.execute("SELECT @@server_id AS server_id") |
1386 | for row in cursor: |
1387 | result["mysql_server_id"] = row["server_id"] |
1388 | - |
1389 | cursor.close() |
1390 | conn.close() |
1391 | - except: |
1392 | - logger.error("Could not read Master_host from MySQL") |
1393 | - logger.error("Unexpected error: %s,%s" % sys.exc_info()[:2]); |
1394 | + except mysql.connector.Error as err: |
1395 | + logger.error("Could not read server_id") |
1396 | + logger.error("MySQL Error: " % err) |
1397 | return None |
1398 | return result |
1399 | |
1400 | |
1401 | def has_mysql_access(username=None, password=None, grant_capability=True): |
1402 | + """ |
1403 | + Reports if a user has all required MySQL privileges |
1404 | + :param username: MySQL user |
1405 | + :param password: MySQL password |
1406 | + :param grant_capability: TODO to |
1407 | + :return: a pair of a boolean that tells whether MySQL user has all required privileges |
1408 | + and a list of missing privileges |
1409 | + """ |
1410 | has_required_grants = False |
1411 | |
1412 | # list of missing privileges |
1413 | missing_privileges = [] |
1414 | |
1415 | try: |
1416 | - if username is None or password is None: |
1417 | - conn = get_mysql_connection() |
1418 | - else: |
1419 | - conn = get_mysql_connection(username, password) |
1420 | + conn = get_mysql_connection(username, password) |
1421 | |
1422 | if isinstance(conn, mysql.connector.MySQLConnection): |
1423 | cursor = conn.cursor(dictionary=True) |
1424 | else: |
1425 | + missing_privileges = ['RELOAD', 'SUPER', 'LOCK TABLES', 'REPLICATION CLIENT', 'CREATE TABLESPACE'] |
1426 | return has_required_grants, missing_privileges |
1427 | |
1428 | # Fetch the current user and matching host part as it could either be |
1429 | @@ -1149,95 +1003,84 @@ |
1430 | |
1431 | cursor.close() |
1432 | conn.close() |
1433 | - except: |
1434 | + except mysql.connector.Error as err: |
1435 | logger.error("Could not read the grants information from MySQL") |
1436 | - logger.error("Unexpected error: %s,%s" % sys.exc_info()[:2]) |
1437 | - |
1438 | + logger.error("MySQL Error: %s" % err) |
1439 | return has_required_grants, missing_privileges |
1440 | |
1441 | |
1442 | -# Registers this server in TwinDB dispatcher |
1443 | -# Inputs |
1444 | -# code - string with secret registration code |
1445 | -# Returns |
1446 | -# True - if server was successfully registered |
1447 | -# Exits if error happened |
1448 | def action_handler_register(code): |
1449 | - global logger |
1450 | - global init_config |
1451 | - global ssh_private_key |
1452 | - global ssh_public_key |
1453 | - global gpg_homedir |
1454 | - global server_id |
1455 | - global mysql_user |
1456 | - global mysql_password |
1457 | - |
1458 | + """ |
1459 | + Registers this server in TwinDB dispatcher |
1460 | + Inputs |
1461 | + code - string with secret registration code |
1462 | + Returns |
1463 | + True - if server was successfully registered |
1464 | + Exits if error happened |
1465 | + """ |
1466 | # Check early to see that the MySQL user passed to the agent has enough |
1467 | # privileges to create a separate MySQL user needed by TwinDB |
1468 | mysql_access_available, missing_mysql_privileges = has_mysql_access() |
1469 | if not mysql_access_available: |
1470 | logger.error("The MySQL user %s does not have enough privileges" % mysql_user) |
1471 | - if len(missing_mysql_privileges) > 0: |
1472 | + if missing_mysql_privileges: |
1473 | logger.error("Following privileges are missing: %s" % ','.join(missing_mysql_privileges)) |
1474 | - |
1475 | return False |
1476 | |
1477 | logger.info("Registering TwinDB agent with code %s" % code) |
1478 | - logger.info("The agent needs to generate cryptographically strong keys.") |
1479 | - logger.info("It may take really, really long time. Please be patient.") |
1480 | name = os.uname()[1].strip() # un[1] is a hostname |
1481 | |
1482 | twindb_email = "%s@twindb.com" % server_id |
1483 | |
1484 | - # Generate GPG key |
1485 | + # Read GPG public key |
1486 | enc_public_key = None |
1487 | + cmd = ["gpg", "--homedir", gpg_homedir, "--armor", "--export", twindb_email] |
1488 | try: |
1489 | logger.debug("Reading GPG public key of %s." % twindb_email) |
1490 | - p1 = subprocess.Popen(["gpg", "--homedir", gpg_homedir, |
1491 | - "--armor", "--export", twindb_email], |
1492 | - stdout=subprocess.PIPE) |
1493 | - enc_public_key = p1.stdout.read() |
1494 | - except: |
1495 | - logger.error("Failed to export GPG keys of %s from %s." |
1496 | - % (twindb_email, gpg_homedir)) |
1497 | - exit_on_error(traceback.format_exc()) |
1498 | + p1 = subprocess.Popen(cmd, stdout=subprocess.PIPE) |
1499 | + enc_public_key = p1.communicate()[0] |
1500 | + except OSError as err: |
1501 | + logger.error("Failed to run command %r. %s" % (cmd, err)) |
1502 | + exit_on_error("Failed to export GPG keys of %s from %s." % (twindb_email, gpg_homedir)) |
1503 | |
1504 | - # Generate SSH key |
1505 | - try: |
1506 | - if not os.path.isfile(ssh_private_key): |
1507 | + if not os.path.isfile(ssh_private_key_file): |
1508 | + try: |
1509 | logger.info("Generating SSH keys pair.") |
1510 | - subprocess.call(["ssh-keygen", "-N", "", "-f", ssh_private_key]) |
1511 | - except: |
1512 | - logger.error("Failed to generate SSH keys.") |
1513 | - exit_on_error(traceback.format_exc()) |
1514 | - |
1515 | + subprocess.call(["ssh-keygen", "-N", "", "-f", ssh_private_key_file]) |
1516 | + except OSError as err: |
1517 | + logger.error("Failed to run command %r. %s" % (cmd, err)) |
1518 | + exit_on_error("Failed to generate SSH keys.") |
1519 | + ssh_public_key = None |
1520 | try: |
1521 | - logger.info("Reading SSH public key from %s." % ssh_public_key) |
1522 | - f = open(ssh_public_key, 'r') |
1523 | + logger.info("Reading SSH public key from %s." % ssh_public_key_file) |
1524 | + f = open(ssh_public_key_file, 'r') |
1525 | ssh_public_key = f.read() |
1526 | f.close() |
1527 | - except: |
1528 | - logger.error("Failed to read SSH keys.") |
1529 | - exit_on_error(traceback.format_exc()) |
1530 | + except IOError as err: |
1531 | + logger.error("Failed to read from file %s. %s" % (ssh_public_key_file, err)) |
1532 | + exit_on_error("Failed to read SSH keys.") |
1533 | |
1534 | # Read local ip addresses |
1535 | cmd = "ip addr" |
1536 | cmd += "| grep -w inet" |
1537 | cmd += "| awk '{ print $2}'" |
1538 | cmd += "| awk -F/ '{ print $1}'" |
1539 | - ss = get_slave_status() |
1540 | - |
1541 | - if ss is None: |
1542 | - logger.error("Could not get slave status on this server") |
1543 | - exit_on_error() |
1544 | - p = subprocess.Popen(cmd, stdout=subprocess.PIPE, shell=True) |
1545 | - |
1546 | + cout = None |
1547 | + logger.debug("Getting list of local IP addresses") |
1548 | + try: |
1549 | + logger.debug("Running: %s" % cmd) |
1550 | + p = subprocess.Popen(cmd, stdout=subprocess.PIPE, shell=True) |
1551 | + cout, cerr = p.communicate() |
1552 | + logger.debug("STDOUT: %s" % cout) |
1553 | + logger.debug("STDERR: %s" % cerr) |
1554 | + except OSError as err: |
1555 | + logger.error("Failed to run command %r. %s" % (cmd, err)) |
1556 | local_ip = list() |
1557 | - for row in p.stdout: |
1558 | - row = row.rstrip('\n') |
1559 | - if row != "127.0.0.1": |
1560 | + for row in cout.split("\n"): |
1561 | + row = row.rstrip("\n") |
1562 | + if row and row != "127.0.0.1": |
1563 | local_ip.append(row) |
1564 | - |
1565 | + ss = get_slave_status() |
1566 | data = { |
1567 | "type": "register", |
1568 | "params": { |
1569 | @@ -1255,9 +1098,8 @@ |
1570 | "local_ip": local_ip |
1571 | } |
1572 | } |
1573 | - |
1574 | api_response = get_response(data) |
1575 | - if api_response is not None: |
1576 | + if api_response: |
1577 | json_decoder = json.JSONDecoder() |
1578 | response_decoded = json_decoder.decode(api_response) |
1579 | logger.debug(response_decoded) |
1580 | @@ -1274,18 +1116,16 @@ |
1581 | elif "errors" in response_decoded: |
1582 | error_msg = response_decoded["errors"]["msg"] |
1583 | |
1584 | - logger.error("Failed to register the agent: %s" % error_msg) |
1585 | - sys.exit(2) |
1586 | - del json_decoder |
1587 | + exit_on_error("Failed to register the agent: %s" % error_msg) |
1588 | else: |
1589 | exit_on_error("Empty response from server") |
1590 | return True |
1591 | |
1592 | |
1593 | def create_agent_user(): |
1594 | - global mysql_user |
1595 | - global mysql_password |
1596 | - |
1597 | + """ |
1598 | + Creates local MySQL user for twindb agent |
1599 | + """ |
1600 | config = get_config() |
1601 | try: |
1602 | conn = get_mysql_connection() |
1603 | @@ -1294,9 +1134,8 @@ |
1604 | cursor = conn.cursor() |
1605 | cursor.execute(q, (config["mysql_user"], config["mysql_password"])) |
1606 | except mysql.connector.Error as err: |
1607 | - logger.error("Failed to create MySQL user %s@localhost for TwinDB agent" % config["mysql_user"]) |
1608 | - logger.error("MySQL replied: %s" % err.msg) |
1609 | - exit_on_error(traceback.format_exc()) |
1610 | + logger.error("MySQL replied: %s" % err) |
1611 | + exit_on_error("Failed to create MySQL user %s@localhost for TwinDB agent" % config["mysql_user"]) |
1612 | logger.info("Created MySQL user %s@localhost for TwinDB agent" % config["mysql_user"]) |
1613 | logger.info("Congratulations! The server is successfully registered in TwinDB.") |
1614 | logger.info("TwinDB will backup the server accordingly to the default config.") |
1615 | @@ -1304,15 +1143,13 @@ |
1616 | return True |
1617 | |
1618 | |
1619 | -# Unregisters this server in TwinDB dispatcher |
1620 | -# Returns |
1621 | -# True - if server was successfully unregistered |
1622 | -# False - if error happened |
1623 | - |
1624 | def action_handler_unregister(delete_backups): |
1625 | - global logger |
1626 | - global init_config |
1627 | - |
1628 | + """ |
1629 | + Unregisters this server in TwinDB dispatcher |
1630 | + Returns |
1631 | + True - if server was successfully unregistered |
1632 | + False - if error happened |
1633 | + """ |
1634 | data = { |
1635 | "type": "unregister", |
1636 | "params": { |
1637 | @@ -1322,7 +1159,7 @@ |
1638 | logger.debug("Unregistration request:") |
1639 | logger.debug(data) |
1640 | response = get_response(data) |
1641 | - if response <> None: |
1642 | + if response: |
1643 | jd = json.JSONDecoder() |
1644 | r = jd.decode(response) |
1645 | logger.debug(r) |
1646 | @@ -1330,25 +1167,21 @@ |
1647 | logger.info("The server is successfully unregistered") |
1648 | return True |
1649 | else: |
1650 | - logger.error("Failed to unregister the agent: " |
1651 | - + jd.decode(decrypt(r["response"]))["error"]) |
1652 | - sys.exit(2) |
1653 | - del jd |
1654 | + exit_on_error("Failed to unregister the agent: " + jd.decode(decrypt(r["response"]))["error"]) |
1655 | else: |
1656 | - logger.error("Empty response from server") |
1657 | - sys.exit(2) |
1658 | + exit_on_error("Empty response from server") |
1659 | return False |
1660 | |
1661 | |
1662 | -# Starts immediate backup job |
1663 | def action_handler_backup(): |
1664 | - global logger |
1665 | - |
1666 | + """ |
1667 | + Starts immediate backup job |
1668 | + """ |
1669 | if schedule_backup(): |
1670 | config = get_config() |
1671 | job = get_job() |
1672 | if job: |
1673 | - if 0 == process_job(config, job): |
1674 | + if process_job(config, job) == 0: |
1675 | return True |
1676 | else: |
1677 | logger.error("Didn't get any job from the dispatcher") |
1678 | @@ -1359,10 +1192,10 @@ |
1679 | return False |
1680 | |
1681 | |
1682 | -# Asks dispatcher to schedule a job for this server |
1683 | def schedule_backup(): |
1684 | - global logger |
1685 | - |
1686 | + """ |
1687 | + Asks dispatcher to schedule a job for this server |
1688 | + """ |
1689 | data = { |
1690 | "type": "schedule_backup", |
1691 | "params": { |
1692 | @@ -1371,7 +1204,7 @@ |
1693 | logger.debug("Schedule backup request:") |
1694 | logger.debug(data) |
1695 | response = get_response(data) |
1696 | - if response <> None: |
1697 | + if response: |
1698 | jd = json.JSONDecoder() |
1699 | r = jd.decode(response) |
1700 | logger.debug(r) |
1701 | @@ -1382,45 +1215,43 @@ |
1702 | logger.error("Failed to schedule a job: " |
1703 | + jd.decode(decrypt(r["response"]))["error"]) |
1704 | return False |
1705 | - del jd |
1706 | else: |
1707 | exit_on_error("Empty response from server") |
1708 | |
1709 | |
1710 | -# Checks if it's enough space in TwinDB storage |
1711 | -# Inputs |
1712 | -# config - backup config |
1713 | -# job - job order |
1714 | -# Returns always True |
1715 | - |
1716 | -def check_space(config, job_id): |
1717 | +def check_space(): |
1718 | + """ |
1719 | + Checks if it's enough space in TwinDB storage |
1720 | + Inputs |
1721 | + config - backup config |
1722 | + job - job order |
1723 | + Returns always True |
1724 | + TODO: implement the fucntion |
1725 | + """ |
1726 | return True |
1727 | |
1728 | |
1729 | -# Checks if MySQL user from backup config has enough privileges to perform backup |
1730 | -# Inputs |
1731 | -# config - backup config |
1732 | -# job - job order |
1733 | -# Returns |
1734 | -# True - if all necessary privileges are granted |
1735 | -# False - if not all necessary privilegers are granted |
1736 | - |
1737 | def check_mysql_permissions(config): |
1738 | - global logger |
1739 | - |
1740 | + """ |
1741 | + Checks if MySQL user from backup config has enough privileges to perform backup |
1742 | + Inputs |
1743 | + config - backup config |
1744 | + Returns |
1745 | + True - if all necessary privileges are granted |
1746 | + False - if not all necessary privilegers are granted |
1747 | + """ |
1748 | try: |
1749 | logger.info("Checking if '%s'@'localhost' has enough rights to backup MySQL" % config["mysql_user"]) |
1750 | - result = False |
1751 | con = get_mysql_connection(user=config["mysql_user"], passwd=config["mysql_password"]) |
1752 | required_priv = ["RELOAD", "LOCK TABLES", "REPLICATION CLIENT", "SUPER", "CREATE TABLESPACE"] |
1753 | for priv in required_priv: |
1754 | cursor = con.cursor() |
1755 | logger.debug("Checking if privilege %s is granted" % priv) |
1756 | query = """ |
1757 | -SELECT |
1758 | - PRIVILEGE_TYPE |
1759 | -FROM information_schema.USER_PRIVILEGES |
1760 | -WHERE GRANTEE = '\'%s\'@\\\'localhost\\\'' AND PRIVILEGE_TYPE = %s""" |
1761 | + SELECT |
1762 | + PRIVILEGE_TYPE |
1763 | + FROM information_schema.USER_PRIVILEGES |
1764 | + WHERE GRANTEE = '\'%s\'@\\\'localhost\\\'' AND PRIVILEGE_TYPE = %s""" |
1765 | logger.debug("Sending query : %s" % query) |
1766 | cursor.execute(query, (config["mysql_user"], priv)) |
1767 | cursor.fetchall() |
1768 | @@ -1430,288 +1261,213 @@ |
1769 | logger.debug("Privilege %s is granted to %s@localhost" % (priv, config["mysql_user"])) |
1770 | else: |
1771 | logger.info("%20s ... NOT GRANTED" % priv) |
1772 | - logger.error("Privilege %s is not granted to %s@localhost" % ( priv, config["mysql_user"])) |
1773 | - logger.info( |
1774 | - "Please execute following SQL to grant %s to user %s@localhost:" % ( priv, config["mysql_user"])) |
1775 | - logger.info("GRANT %s ON *.* TO '%s'@'localhost';" % ( priv, config["mysql_user"])) |
1776 | + logger.error("Privilege %s is not granted to %s@localhost" % (priv, config["mysql_user"])) |
1777 | + logger.info("Please execute following SQL to grant %s to user %s@localhost:" |
1778 | + % (priv, config["mysql_user"])) |
1779 | + logger.info("GRANT %s ON *.* TO '%s'@'localhost';" % (priv, config["mysql_user"])) |
1780 | return False |
1781 | cursor.close() |
1782 | con.close() |
1783 | - result = True |
1784 | - except: |
1785 | - logger.error("Unexpected error: %s,%s" % sys.exc_info()[:2]) |
1786 | - result = False |
1787 | - if result: |
1788 | - logger.debug("MySQL user %s@'localhost' has enough grants to take backup" % config["mysql_user"]) |
1789 | - else: |
1790 | - logger.error("MySQL user %s@'localhost' doesn't have enough grants to take backup" % config["mysql_user"]) |
1791 | - return result |
1792 | - |
1793 | - |
1794 | -# Meta fuction that calls actual backup fucntion depending on tool in backup config |
1795 | -# Inputs |
1796 | -# config - backup config |
1797 | -# job - job order |
1798 | -# Returns what actual backup function returned or -1 if the tool is not supported |
1799 | + except mysql.connector.Error as err: |
1800 | + logger.error("MySQL Error: %s" % err) |
1801 | + return False |
1802 | + logger.debug("MySQL user %s@'localhost' has enough grants to take backup" % config["mysql_user"]) |
1803 | + return True |
1804 | + |
1805 | |
1806 | def take_backup(config, job): |
1807 | - global logger |
1808 | - |
1809 | + """ |
1810 | + Meta function that calls actual backup fucntion depending on tool in backup config |
1811 | + :param config: backup config |
1812 | + :param job: job order |
1813 | + :return: what actual backup function returned or -1 if the tool is not supported |
1814 | + """ |
1815 | ret = -1 |
1816 | logger.info("Starting backup job") |
1817 | - try: |
1818 | - job_id = int(job["job_id"]) |
1819 | - notify_params = {"event": "start_job", "job_id": job_id} |
1820 | - if log_job_notify(job_id, notify_params): |
1821 | - ret = take_backup_xtrabackup(config, job) |
1822 | - notify_params = {"event": "stop_job", "job_id": job_id, "ret_code": ret} |
1823 | - log_job_notify(job_id, notify_params) |
1824 | - logger.info("Backup job is complete") |
1825 | - else: |
1826 | - logger.info("Backup job can not start") |
1827 | - except: |
1828 | - logger.error("Unexpected error: %s,%s" % sys.exc_info()[:2]); |
1829 | - |
1830 | + notify_params = {"event": "start_job", "job_id": job["job_id"]} |
1831 | + if log_job_notify(notify_params): |
1832 | + ret = take_backup_xtrabackup(config, job) |
1833 | + notify_params = {"event": "stop_job", "job_id": job["job_id"], "ret_code": ret} |
1834 | + log_job_notify(notify_params) |
1835 | + logger.info("Backup job is complete") |
1836 | + else: |
1837 | + logger.info("Backup job can not start") |
1838 | return ret |
1839 | |
1840 | |
1841 | -# Starts SSH process to TwinDB storage and saves input in file backup_name |
1842 | -# Inputs |
1843 | -# config - backup config |
1844 | -# backup_name - file name to save input in |
1845 | -# stdin, stderr - respective IO handlers |
1846 | -# Returns what SSH process returns |
1847 | - |
1848 | def start_ssh_cmd(config, job_params, backup_name, stdin, stderr): |
1849 | - global logger |
1850 | - global ssh_private_key |
1851 | - global ssh_port |
1852 | - |
1853 | - p = None |
1854 | + """ |
1855 | + Starts an SSH process to TwinDB storage and saves input in file backup_name |
1856 | + :param config: backup config |
1857 | + :param job_params: job parameters |
1858 | + :param backup_name: file name to save input in |
1859 | + :param stdin: respective IO handlers |
1860 | + :param stderr: respective IO handlers |
1861 | + :return: what SSH process returns |
1862 | + """ |
1863 | + ssh_process = None |
1864 | + ssh_cmd = ["ssh", "-oStrictHostKeyChecking=no", "-i", ssh_private_key_file, "-p", str(ssh_port), |
1865 | + "user_id_%s@%s" % (config["user_id"], job_params["ip"]), "/bin/cat - > %s" % backup_name] |
1866 | try: |
1867 | - ssh_cmd = ["ssh", "-oStrictHostKeyChecking=no", "-i", ssh_private_key] |
1868 | - ssh_cmd.append("-p") |
1869 | - ssh_cmd.append(str(ssh_port)) |
1870 | - ssh_cmd.append("user_id_%s@%s" % (config["user_id"], job_params["ip"])) |
1871 | - ssh_cmd.append("/bin/cat - > %s" % backup_name) |
1872 | logger.debug("Starting SSH process: %r" % ssh_cmd) |
1873 | - p = subprocess.Popen(ssh_cmd, stdin=stdin, stdout=subprocess.PIPE, stderr=stderr) |
1874 | - except: |
1875 | - logger.error(traceback.format_exc()) |
1876 | - return p |
1877 | - |
1878 | - |
1879 | -# Starts GPG process, encrypts STDIN and outputs in into STDOUT |
1880 | -# Inputs |
1881 | -# config - backup config |
1882 | -# stdin, stderr - respective IO handlers |
1883 | -# Returns what GPG process returns |
1884 | - |
1885 | -def start_gpg_cmd(config, stdin, stderr): |
1886 | - global logger |
1887 | - global ssh_private_key |
1888 | - global ssh_port |
1889 | - global server_id |
1890 | - |
1891 | - p = None |
1892 | + ssh_process = subprocess.Popen(ssh_cmd, stdin=stdin, stdout=subprocess.PIPE, stderr=stderr) |
1893 | + except OSError as err: |
1894 | + logger.error("Failed to run command %r. %s" % (ssh_cmd, err)) |
1895 | + return ssh_process |
1896 | + |
1897 | + |
1898 | +def start_gpg_cmd(stdin, stderr): |
1899 | + """ |
1900 | + Starts GPG process, encrypts STDIN and outputs in into STDOUT |
1901 | + :param stdin: respective IO handlers |
1902 | + :param stderr: respective IO handlers |
1903 | + :return: what GPG process returns |
1904 | + """ |
1905 | + gpg_process = None |
1906 | + gpg_cmd = ["gpg", "--encrypt", "--yes", "--batch", "--no-permission-warning", "--quiet", "--recipient", server_id] |
1907 | try: |
1908 | - gpg_cmd = ["gpg", "--encrypt", "--yes", "--batch", "--no-permission-warning", "--quiet"] |
1909 | - gpg_cmd.append("--recipient") |
1910 | - gpg_cmd.append("%s" % (server_id)) |
1911 | - |
1912 | logger.debug("Starting GPG process: %r" % gpg_cmd) |
1913 | - p = subprocess.Popen(gpg_cmd, stdin=stdin, stdout=subprocess.PIPE, stderr=stderr) |
1914 | - except: |
1915 | - logger.error(traceback.format_exc()) |
1916 | - return p |
1917 | - |
1918 | - |
1919 | -# Takes backup copy with XtraBackup |
1920 | -# Inputs |
1921 | -# config - backup config |
1922 | -# job - job order |
1923 | -# Returns |
1924 | -# True - if backup successfully taken |
1925 | -# False - if backup failed |
1926 | + gpg_process = subprocess.Popen(gpg_cmd, stdin=stdin, stdout=subprocess.PIPE, stderr=stderr) |
1927 | + except OSError as err: |
1928 | + logger.error("Failed to run command %r. %s" % (gpg_cmd, err)) |
1929 | + return gpg_process |
1930 | + |
1931 | |
1932 | def take_backup_xtrabackup(config, job): |
1933 | - global logger |
1934 | - global server_id |
1935 | - |
1936 | - suffix = "tar" |
1937 | + """ |
1938 | + # Takes backup copy with XtraBackup |
1939 | + :param config: backup config |
1940 | + :param job: job order |
1941 | + :return: True if backup was successfully taken or False if it has failed |
1942 | + """ |
1943 | + suffix = "xbstream" |
1944 | backup_name = "server_id_%s_%s.%s.gpg" % (server_id, datetime.now().isoformat(), suffix) |
1945 | - extra_config = "" |
1946 | ret_code = 0 |
1947 | - |
1948 | + if "params" not in job: |
1949 | + logger.error("There are no params in the job order") |
1950 | + return -1 |
1951 | + # Check that job order has all required parameters |
1952 | + mandatory_params = ["ancestor", "backup_type", "ip", "lsn", "type", "volume_id"] |
1953 | + for param in mandatory_params: |
1954 | + if param not in job["params"]: |
1955 | + logger.error("There is no %s in the job order" % param) |
1956 | + return -1 |
1957 | + backup_type = job["params"]["backup_type"] |
1958 | + xtrabackup_cmd = [ |
1959 | + "innobackupex", |
1960 | + "--stream=xbstream", |
1961 | + "--user=%s" % config["mysql_user"], |
1962 | + "--password=%s" % config["mysql_password"], |
1963 | + "--socket=%s" % get_unix_socket(), |
1964 | + "--slave-info", |
1965 | + "--safe-slave-backup", |
1966 | + "--safe-slave-backup-timeout=3600"] |
1967 | + if backup_type == 'incremental': |
1968 | + last_lsn = job["params"]["lsn"] |
1969 | + xtrabackup_cmd.append("--incremental") |
1970 | + xtrabackup_cmd.append(".") |
1971 | + xtrabackup_cmd.append("--incremental-lsn=%s" % last_lsn) |
1972 | + else: |
1973 | + xtrabackup_cmd.append(".") |
1974 | + extra_config = gen_extra_config(config) |
1975 | + if extra_config: |
1976 | + xtrabackup_cmd.append("--defaults-extra-file=%s" % extra_config) |
1977 | + err_descriptors = dict() |
1978 | + for desc in ["gpg", "ssh", "xtrabackup"]: |
1979 | + desc_file = ("/tmp/twindb.%s.err" % desc) |
1980 | + try: |
1981 | + err_descriptors[desc] = open(desc_file, "w+") |
1982 | + except IOError as err: |
1983 | + logger.error("Failed to open file %s. %s" % (desc_file, err)) |
1984 | + return -1 |
1985 | try: |
1986 | - mysql_user = config["mysql_user"] |
1987 | - mysql_password = config["mysql_password"] |
1988 | - d = json.JSONDecoder() |
1989 | - backup_type = job["params"]["backup_type"] |
1990 | - xtrabackup_cmd = ["innobackupex"] |
1991 | - xtrabackup_cmd.append("--stream=xbstream") |
1992 | - xtrabackup_cmd.append("--user=%s" % mysql_user) |
1993 | - xtrabackup_cmd.append("--password=%s" % mysql_password) |
1994 | - xtrabackup_cmd.append("--socket=%s" % get_unix_socket()) |
1995 | - xtrabackup_cmd.append("--slave-info") |
1996 | - xtrabackup_cmd.append("--safe-slave-backup") |
1997 | - xtrabackup_cmd.append("--safe-slave-backup-timeout=3600") |
1998 | - if backup_type == 'incremental': |
1999 | - last_lsn = job["params"]["lsn"] |
2000 | - xtrabackup_cmd.append("--incremental") |
2001 | - xtrabackup_cmd.append(".") |
2002 | - xtrabackup_cmd.append("--incremental-lsn=%s" % last_lsn) |
2003 | - else: |
2004 | - xtrabackup_cmd.append(".") |
2005 | - extra_config = gen_extra_config(config) |
2006 | - if extra_config: |
2007 | - xtrabackup_cmd.append("--defaults-extra-file=%s" % extra_config) |
2008 | - |
2009 | - xtr_err = open('/tmp/twindb.xtrabackup.err', "w+") |
2010 | - gpg_err = open('/tmp/twindb.gpg.err', "w+") |
2011 | - ssh_err = open('/tmp/twindb.ssh.err', "w+") |
2012 | - p1 = subprocess.Popen(xtrabackup_cmd, stdout=subprocess.PIPE, stderr=xtr_err) |
2013 | - p2 = start_gpg_cmd(config, p1.stdout, gpg_err) |
2014 | - p3 = start_ssh_cmd(config, job["params"], backup_name, p2.stdout, ssh_err) |
2015 | - |
2016 | - p1.stdout.close() |
2017 | - p2.stdout.close() |
2018 | - p3.communicate() |
2019 | - |
2020 | - ret_code_ssh = p3.returncode |
2021 | - ret_code_gpg = p2.wait() |
2022 | - ret_code_xbk = p1.wait() |
2023 | - |
2024 | - xtr_err.seek(0) |
2025 | - gpg_err.seek(0) |
2026 | - ssh_err.seek(0) |
2027 | - |
2028 | - xtrabackup_stderr = xtr_err.read() |
2029 | - gpg_stderr = gpg_err.read() |
2030 | - ssh_stderr = ssh_err.read() |
2031 | - |
2032 | - if len(xtrabackup_stderr) == 0: |
2033 | - xtrabackup_stderr = "no output" |
2034 | - if len(gpg_stderr) == 0: |
2035 | - gpg_stderr = "no output" |
2036 | - if len(ssh_stderr) == 0: |
2037 | - ssh_stderr = "no output" |
2038 | - |
2039 | - logger.info("XtraBackup stderr: " + xtrabackup_stderr) |
2040 | - logger.info("GPG stderr: " + gpg_stderr) |
2041 | - logger.info("SSH stderr: " + ssh_stderr) |
2042 | - ancestor = 0 |
2043 | - |
2044 | - if ret_code_xbk == 0 and ret_code_gpg == 0 and ret_code_ssh == 0: |
2045 | - lsn = grep_lsn(xtrabackup_stderr) |
2046 | - if lsn == None: |
2047 | - logger.error("Could not find LSN in XtrabBackup output") |
2048 | - return -1 |
2049 | - file_size = get_backup_size(config, job["params"], backup_name) |
2050 | - if file_size == 0: |
2051 | - logger.error("Backup copy size must not be zero") |
2052 | - return -1 |
2053 | - job_id = job["job_id"] |
2054 | - volume_id = job["params"]["volume_id"] |
2055 | - ancestor = job["params"]["ancestor"] |
2056 | - if not record_backup(job_id, backup_name, volume_id, file_size, lsn, ancestor): |
2057 | - logger.error("Failed to save backup copy details") |
2058 | - return -1 |
2059 | - else: |
2060 | - logger.error("Failed to take backup") |
2061 | - ret_code = -1 |
2062 | - except: |
2063 | + logger.debug("Starting XtraBackup process: %r" % xtrabackup_cmd) |
2064 | + xbk_proc = subprocess.Popen(xtrabackup_cmd, stdout=subprocess.PIPE, stderr=err_descriptors["xtrabackup"]) |
2065 | + except OSError as err: |
2066 | + logger.error("Failed to run command %r. %s" % (xtrabackup_cmd, err)) |
2067 | + return -1 |
2068 | + gpg_proc = start_gpg_cmd(xbk_proc.stdout, err_descriptors["gpg"]) |
2069 | + ssh_proc = start_ssh_cmd(config, job["params"], backup_name, gpg_proc.stdout, err_descriptors["ssh"]) |
2070 | + |
2071 | + ssh_proc.wait() |
2072 | + xbk_proc.wait() |
2073 | + gpg_proc.wait() |
2074 | + |
2075 | + ret_code_ssh = ssh_proc.returncode |
2076 | + ret_code_gpg = gpg_proc.returncode |
2077 | + ret_code_xbk = xbk_proc.returncode |
2078 | + |
2079 | + err_str = dict() |
2080 | + for desc in ["gpg", "ssh", "xtrabackup"]: |
2081 | + err_descriptors[desc].seek(0) |
2082 | + err_str[desc] = err_descriptors[desc].read() |
2083 | + if not err_str[desc]: |
2084 | + err_str[desc] = "no output" |
2085 | + |
2086 | + logger.info("XtraBackup stderr: " + err_str["xtrabackup"]) |
2087 | + logger.info("GPG stderr: " + err_str["gpg"]) |
2088 | + logger.info("SSH stderr: " + err_str["ssh"]) |
2089 | + |
2090 | + if ret_code_xbk == 0 and ret_code_gpg == 0 and ret_code_ssh == 0: |
2091 | + lsn = grep_lsn(err_str["xtrabackup"]) |
2092 | + if not lsn: |
2093 | + logger.error("Could not find LSN in XtrabBackup output") |
2094 | + return -1 |
2095 | + file_size = get_backup_size(config, job["params"], backup_name) |
2096 | + if not file_size: |
2097 | + logger.error("Backup copy size must not be zero") |
2098 | + return -1 |
2099 | + volume_id = job["params"]["volume_id"] |
2100 | + ancestor = job["params"]["ancestor"] |
2101 | + if not record_backup(backup_name, volume_id, file_size, lsn, ancestor): |
2102 | + logger.error("Failed to save backup copy details") |
2103 | + return -1 |
2104 | + else: |
2105 | logger.error("Failed to take backup") |
2106 | - logger.error(traceback.format_exc()) |
2107 | return -1 |
2108 | - finally: |
2109 | - for f in [extra_config, |
2110 | - "/tmp/twindb.xtrabackup.err", |
2111 | - "/tmp/twindb.gpg.err", |
2112 | - "/tmp/twindb.ssh.err"]: |
2113 | - if os.path.isfile(f): |
2114 | + for f in [extra_config, "/tmp/twindb.xtrabackup.err", "/tmp/twindb.gpg.err", "/tmp/twindb.ssh.err"]: |
2115 | + if os.path.isfile(f): |
2116 | + try: |
2117 | os.remove(f) |
2118 | + except IOError as err: |
2119 | + logger.error("Failed to remove file %s. %s" % (f, err)) |
2120 | return ret_code |
2121 | |
2122 | |
2123 | -# Generates MySQL config with datadir option |
2124 | -# Inputs |
2125 | -# config - backup config |
2126 | -# Returns |
2127 | -# File name with MySQL config |
2128 | -# None - if error happened |
2129 | - |
2130 | def gen_extra_config(config): |
2131 | - global logger |
2132 | - |
2133 | - extra_config = None |
2134 | + """ |
2135 | + Generates MySQL config with datadir option |
2136 | + :param config: backup config |
2137 | + :return: File name with MySQL config or None if error happened |
2138 | + """ |
2139 | try: |
2140 | f, extra_config = tempfile.mkstemp() |
2141 | os.write(f, "[mysqld]\n") |
2142 | - con = get_mysql_connection( |
2143 | - user=config["mysql_user"], |
2144 | - passwd=config["mysql_password"]) |
2145 | + con = get_mysql_connection(config["mysql_user"], config["mysql_password"]) |
2146 | cur = con.cursor() |
2147 | cur.execute("SELECT @@datadir") |
2148 | row = cur.fetchone() |
2149 | os.write(f, 'datadir="%s"\n' % row[0]) |
2150 | cur.close() |
2151 | os.close(f) |
2152 | - except: |
2153 | - logger.error("Failed to generate extra defaults file") |
2154 | - logger.error(traceback.format_exc()) |
2155 | + except IOError as err: |
2156 | + logger.error("Failed to generate extra defaults file. %s" % err) |
2157 | extra_config = None |
2158 | return extra_config |
2159 | |
2160 | |
2161 | -# Checks if binary log is enabled in local MySQL |
2162 | -# Inputs |
2163 | -# config - backup config |
2164 | -# Returns |
2165 | -# True - binary log enabled |
2166 | -# False - binary log disabled |
2167 | - |
2168 | -def is_binlog_enabled(config): |
2169 | - global logger |
2170 | - |
2171 | - result = False |
2172 | - try: |
2173 | - con = MySQLdb.connect(user=config["mysql_user"], passwd=config["mysql_password"]) |
2174 | - with con: |
2175 | - cur = con.cursor() |
2176 | - cur.execute("SELECT @@GLOBAL.log_bin") |
2177 | - row = cur.fetchone() |
2178 | - if row[0] == "1": |
2179 | - result = True |
2180 | - else: |
2181 | - result = False |
2182 | - cur.close() |
2183 | - del con |
2184 | - except: |
2185 | - logger.error("Failed to check if binlog is enabled") |
2186 | - logger.error(traceback.format_exc()) |
2187 | - return result |
2188 | - |
2189 | - |
2190 | -# Logs in to TwinDB storage and get size of backup |
2191 | -# Inputs |
2192 | -# config - backup config |
2193 | -# backup_name - file name with backup |
2194 | -# Returns |
2195 | -# Size of backup in bytes |
2196 | -# Zero if error happened |
2197 | - |
2198 | def get_backup_size(config, job_params, backup_name): |
2199 | - global logger |
2200 | - global ssh_private_key |
2201 | - global ssh_port |
2202 | - backup_size = 0 |
2203 | - |
2204 | + """ |
2205 | + Logs in to TwinDB storage and get size of backup |
2206 | + :param config: backup config |
2207 | + :param job_params: job parameters |
2208 | + :param backup_name: file name with backup |
2209 | + :return: size of backup in bytes or zeor if error happened |
2210 | + """ |
2211 | logger.debug("Getting size of %s" % backup_name) |
2212 | + ssh_cmd = ["ssh", "-oStrictHostKeyChecking=no", "-i", ssh_private_key_file, "-p", str(ssh_port), |
2213 | + "user_id_%s@%s" % (config["user_id"], job_params["ip"]), "/bin/du -b %s" % backup_name] |
2214 | try: |
2215 | - ssh_cmd = ["ssh", "-oStrictHostKeyChecking=no", "-i", ssh_private_key, "-p", str(ssh_port), |
2216 | - "user_id_%s@%s" % (config["user_id"], job_params["ip"]), "/bin/du -b %s" % backup_name] |
2217 | - |
2218 | process = subprocess.Popen(ssh_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) |
2219 | cout, cerr = process.communicate() |
2220 | |
2221 | @@ -1726,30 +1482,28 @@ |
2222 | except subprocess.CalledProcessError as e: |
2223 | logger.error("Failed to get size of backup %s" % backup_name) |
2224 | logger.error(str(e)) |
2225 | + return 0 |
2226 | except OSError as e: |
2227 | logger.error("Failed to get size of backup %s" % backup_name) |
2228 | - logger.error("Command execution failed: %s" % str(e)) |
2229 | - except: |
2230 | - logger.error("Failed to get size of backup %s" % backup_name) |
2231 | - logger.error(traceback.format_exc()) |
2232 | - else: |
2233 | - logger.debug("Size of %s = %d bytes (%s)" % (backup_name, backup_size, h_size(backup_size))) |
2234 | - |
2235 | + logger.error("Failed to run command %r: %s" % (ssh_cmd, e)) |
2236 | + return 0 |
2237 | + logger.debug("Size of %s = %d bytes (%s)" % (backup_name, backup_size, h_size(backup_size))) |
2238 | return backup_size |
2239 | |
2240 | |
2241 | -def handler_send_key(config, job): |
2242 | +def handler_send_key(job): |
2243 | """ |
2244 | Processes send_key job |
2245 | """ |
2246 | # Get owner of the GPG key |
2247 | + cmd_1 = ["gpg", "--list-packets"] |
2248 | try: |
2249 | gpg_pub_key = job["params"]["gpg_pub_key"] |
2250 | if gpg_pub_key: |
2251 | - cmd_1 = ["gpg", "--list-packets"] |
2252 | logger.debug("Starting %r" % cmd_1) |
2253 | p1 = subprocess.Popen(cmd_1, stdin=subprocess.PIPE, stdout=subprocess.PIPE) |
2254 | cout, cerr = p1.communicate(gpg_pub_key) |
2255 | + keyid = "Unknown" |
2256 | for line in cout.split("\n"): |
2257 | if "keyid:" in line: |
2258 | keyid = line.replace("keyid:", "").strip() |
2259 | @@ -1759,420 +1513,388 @@ |
2260 | logger.error("Requestor public key is empty") |
2261 | return |
2262 | except OSError as err: |
2263 | - logger.error(err) |
2264 | + logger.error("Failed to run command %r: %s" % (cmd_1, err)) |
2265 | return |
2266 | # Import public GPG key. It's a user public key sent by the dispatcher |
2267 | try: |
2268 | logger.debug("Importing requestor's key %s" % keyid) |
2269 | cmd_1 = ["gpg", "--import"] |
2270 | - p1 = subprocess.Popen(cmd_1, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) |
2271 | + p1 = subprocess.Popen(cmd_1, stdin=subprocess.PIPE) |
2272 | p1.communicate(gpg_pub_key) |
2273 | except OSError as err: |
2274 | - logger.error(err) |
2275 | + logger.error("Failed to run command %r: %s" % (cmd_1, err)) |
2276 | return |
2277 | # Get private key and encrypt it |
2278 | - try: |
2279 | - gpg_pub_key = job["params"]["gpg_pub_key"] |
2280 | - if gpg_pub_key: |
2281 | - logger.debug("Exporting private key of server %s" % server_id) |
2282 | - cmd_1 = ["gpg", "--armor", "--export-secret-key", server_id] |
2283 | - cmd_2 = ["gpg", "--armor", "--encrypt", "--sign", "--batch", "-r", keyid, "--local-user", server_id, "--trust-model", "always"] |
2284 | + gpg_pub_key = job["params"]["gpg_pub_key"] |
2285 | + if gpg_pub_key: |
2286 | + logger.debug("Exporting private key of server %s" % server_id) |
2287 | + cmd_1 = ["gpg", "--armor", "--export-secret-key", server_id] |
2288 | + cmd_2 = ["gpg", "--armor", "--encrypt", "--sign", "--batch", "-r", keyid, "--local-user", server_id, |
2289 | + "--trust-model", "always"] |
2290 | + try: |
2291 | logger.debug("Starting %r" % cmd_1) |
2292 | p1 = subprocess.Popen(cmd_1, stdout=subprocess.PIPE) |
2293 | + except OSError as err: |
2294 | + logger.error("Failed to run command %r: %s" % (cmd_1, err)) |
2295 | + return |
2296 | + try: |
2297 | logger.debug("Starting %r" % cmd_2) |
2298 | p2 = subprocess.Popen(cmd_2, stdin=p1.stdout, stdout=subprocess.PIPE) |
2299 | cout, cerr = p2.communicate() |
2300 | enc_private_key = cout |
2301 | logger.debug("Encrypted private key %s" % enc_private_key) |
2302 | - except OSError as err: |
2303 | - logger.error(err) |
2304 | - return |
2305 | - # Now send the private key to dispatcher |
2306 | - job_id = job["job_id"] |
2307 | - data = { |
2308 | - "type": "send_key", |
2309 | - "params": { |
2310 | - "enc_private_key": enc_private_key, |
2311 | - "job_id": job_id |
2312 | + except OSError as err: |
2313 | + logger.error("Failed to run command %r: %s" % (cmd_2, err)) |
2314 | + return |
2315 | + # Now send the private key to dispatcher |
2316 | + data = { |
2317 | + "type": "send_key", |
2318 | + "params": { |
2319 | + "enc_private_key": enc_private_key, |
2320 | + "job_id": job_id |
2321 | + } |
2322 | } |
2323 | - } |
2324 | - get_response(data) |
2325 | - |
2326 | -# Check if directory is empty |
2327 | -# Inputs |
2328 | -# dir - directory name |
2329 | -# Returns |
2330 | -# True - directory is empty |
2331 | -# False - directory is not empty |
2332 | - |
2333 | -def is_dir_empty(dir): |
2334 | - return len(os.listdir(dir)) == 0 |
2335 | - |
2336 | - |
2337 | -# Meta fuction that calls actual restore fucntion depending on tool in backup config |
2338 | -# Inputs |
2339 | -# config - backup config |
2340 | -# job - job order |
2341 | -# Returns what actual restore function returned or -1 if the tool is not supported |
2342 | + get_response(data) |
2343 | + else: |
2344 | + logger.error("The job order requested send_key, but no public key was provided") |
2345 | + |
2346 | + |
2347 | +def is_dir_empty(directory): |
2348 | + """ |
2349 | + Checks if directory is empty |
2350 | + :param directory: directory name |
2351 | + :return: True if the directory is empty of False otherwise |
2352 | + """ |
2353 | + return len(os.listdir(directory)) == 0 |
2354 | + |
2355 | |
2356 | def restore_backup(config, job): |
2357 | - global logger |
2358 | - |
2359 | - ret = -1 |
2360 | - logger.info("Starting restore job %r" % job) |
2361 | - log_start_job(int(job["job_id"])) |
2362 | - |
2363 | - if config["tool"] == "xtrabackup": |
2364 | + """ |
2365 | + Meta function that calls actual restore fucntion depending on tool in backup config |
2366 | + :param config: backup config |
2367 | + :param job: job order |
2368 | + :return: what actual restore function returned or -1 if the tool is not supported |
2369 | + """ |
2370 | + ret = None |
2371 | + logger.info("Starting restore job: %s" |
2372 | + % json.dumps(job, indent=4, sort_keys=True)) |
2373 | + notify_params = {"event": "start_job", "job_id": job["job_id"]} |
2374 | + if log_job_notify(notify_params): |
2375 | ret = restore_xtrabackup(config, job) |
2376 | - elif config["tool"] == "mysqldump": |
2377 | - ret = restore_mysqldump(config, job) |
2378 | + notify_params = {"event": "stop_job", "job_id": job["job_id"], "ret_code": ret} |
2379 | + log_job_notify(notify_params) |
2380 | + logger.info("Restore job is complete") |
2381 | else: |
2382 | - logger.error("Can't restore backup with unsupported tool %s" % config["tool"]) |
2383 | - ret = -1 |
2384 | - log_stop_job(int(job["job_id"]), ret) |
2385 | - logger.info("Restore job is complete") |
2386 | + logger.error("Restore job can not start") |
2387 | return ret |
2388 | |
2389 | |
2390 | -# Extracts an Xtrabackup archive arc in dst_dir |
2391 | -# Inputs |
2392 | -# config - backup config |
2393 | -# arc - file name with archive to extract |
2394 | -# dst_dir - local destination directory |
2395 | -# Returns |
2396 | -# True - if archive is successfully extracted |
2397 | -# False - if error happened |
2398 | - |
2399 | -def extract_archive(config, arc, dst_dir): |
2400 | - global logger |
2401 | - global ssh_private_key |
2402 | - global ssh_port |
2403 | - |
2404 | - logger.info("Extracting %s in %s" % (arc, dst_dir)) |
2405 | - try: |
2406 | - ssh_cmd = ["ssh", "-oStrictHostKeyChecking=no", "-i", ssh_private_key, "-p", str(ssh_port)] |
2407 | - ssh_cmd.append(config["username"] + "@" + config["dst_ip"]) |
2408 | - ssh_cmd.append("/bin/cat %s" % arc) |
2409 | - |
2410 | - gpg_cmd = ["gpg", "--decrypt"] |
2411 | - |
2412 | - xb_cmd = ["xbstream", "-x"] |
2413 | - |
2414 | - xb_err = open('/tmp/twindb.xb.err', "w+") |
2415 | - gpg_err = open('/tmp/twindb.gpg.err', "w+") |
2416 | - ssh_err = open('/tmp/twindb.ssh.err', "w+") |
2417 | - logger.info("Starting: %r" % ssh_cmd) |
2418 | - p1 = subprocess.Popen(ssh_cmd, stdout=subprocess.PIPE, stderr=ssh_err) |
2419 | - logger.info("Starting: %r" % gpg_cmd) |
2420 | - p2 = subprocess.Popen(gpg_cmd, stdin=p1.stdout, stdout=subprocess.PIPE, stderr=gpg_err) |
2421 | - logger.info("Starting: %r" % xb_cmd) |
2422 | - p3 = subprocess.Popen(xb_cmd, stdin=p2.stdout, stdout=subprocess.PIPE, stderr=mysql_err, cwd=dst_dir) |
2423 | - p1.stdout.close() |
2424 | - p2.stdout.close() |
2425 | - p3.wait() |
2426 | - xb_err.seek(0) |
2427 | - gpg_err.seek(0) |
2428 | - ssh_err.seek(0) |
2429 | - logger.info("SSH stderr: " + ssh_err.read()) |
2430 | - logger.info("GPG stderr: " + gpg_err.read()) |
2431 | - logger.info("xbstream stderr: " + xb_err.read()) |
2432 | - if p3.returncode != 0: |
2433 | - logger.info("Failed to extract backup %s into %s" % (arc, dst_dir)) |
2434 | - return False |
2435 | - except: |
2436 | - logger.info("Failed to extract backup %s into %s" % (arc, dst_dir)) |
2437 | - finally: |
2438 | - todelete = ['/tmp/twindb.xb.err', '/tmp/twindb.gpg.err', '/tmp/twindb.ssh.err'] |
2439 | - for f in todelete: |
2440 | - if os.path.isfile(f): |
2441 | - os.remove(f) |
2442 | - logger.info("Extracted successfully %s in %s" % (arc, dst_dir)) |
2443 | - return True |
2444 | - |
2445 | - |
2446 | -# Restores backup copy with XtraBackup |
2447 | -# Inputs |
2448 | -# config - backup config |
2449 | -# job - job order |
2450 | -# Returns |
2451 | -# True - if backup successfully restored |
2452 | -# False - if restore job failed |
2453 | - |
2454 | def restore_xtrabackup(config, job): |
2455 | - global logger |
2456 | - global ssh_private_key |
2457 | - global ssh_port |
2458 | - |
2459 | - backup_name = job["restore_backup_copy"] |
2460 | - backup_name_base = os.path.splitext(backup_name) |
2461 | - dst_dir = job["restore_dir"] |
2462 | + """ |
2463 | + # Restores backup copy with XtraBackup |
2464 | + # Inputs |
2465 | + # config - backup config |
2466 | + # job - job order |
2467 | + # Returns |
2468 | + # True - if backup successfully restored |
2469 | + # False - if restore job failed |
2470 | + :param config: backup config received from the dispatcher |
2471 | + :param job: job order |
2472 | + :return: |
2473 | + """ |
2474 | + if "params" not in job: |
2475 | + logger.error("There are no params in the job order") |
2476 | + return -1 |
2477 | + # Check that job order has all required parameters |
2478 | + mandatory_params = ["backup_copy_id", "restore_dir", "server_id"] |
2479 | + for param in mandatory_params: |
2480 | + if param not in job["params"]: |
2481 | + logger.error("There is no %s in the job order" % param) |
2482 | + return -1 |
2483 | + dst_dir = job["params"]["restore_dir"] |
2484 | try: |
2485 | if os.path.isdir(dst_dir): |
2486 | if is_dir_empty(dst_dir): |
2487 | logger.info("Directory %s exists. But it's empty, so we can restore backup here" % dst_dir) |
2488 | else: |
2489 | logger.error("Directory %s exists and isn't empty, so we can not restore backup here" % dst_dir) |
2490 | - return False |
2491 | + return -1 |
2492 | else: |
2493 | os.makedirs(dst_dir) |
2494 | - except: |
2495 | + except IOError as err: |
2496 | + logger.error(err) |
2497 | logger.error("Can't use directory %s as destination for backup" % dst_dir) |
2498 | - logger.error(traceback.format_exc()) |
2499 | - return False |
2500 | - full_copy = get_full_of(backup_name) |
2501 | - logger.info("Restoring backup %s in %s" % (backup_name, dst_dir)) |
2502 | + return -1 |
2503 | + |
2504 | + full_copy = get_full_of(job["params"]["backup_copy_id"]) |
2505 | + full_copy_id = int(full_copy["backup_copy_id"]) |
2506 | + if not full_copy: |
2507 | + logger.error("Failed to get full copy parameters") |
2508 | + return -1 |
2509 | + logger.info("Restoring full copy %s in %s" % (full_copy["name"], dst_dir)) |
2510 | try: |
2511 | - extract_archive(config, full_copy, dst_dir) |
2512 | + if not extract_archive(config, full_copy, dst_dir): |
2513 | + raise JobError("Failed to extract %s" % full_copy["name"]) |
2514 | # We restored last full backup in dst_dir |
2515 | - xb_err = open('/tmp/twindb.xb.err', "w+") |
2516 | - if full_copy == backup_name: |
2517 | + try: |
2518 | + xb_err = open('/tmp/twindb.xb.err', "w+") |
2519 | + except IOError as err: |
2520 | + raise JobError("Failed to open /tmp/twindb.xb.err: %s" % err) |
2521 | + if full_copy["backup_copy_id"] == job["params"]["backup_copy_id"]: |
2522 | xb_cmd = ["innobackupex", "--apply-log", dst_dir] |
2523 | else: |
2524 | xb_cmd = ["innobackupex", "--apply-log", "--redo-only", dst_dir] |
2525 | - p_xb = subprocess.Popen(xb_cmd, stdout=xb_err, stderr=xb_err) |
2526 | - p_xb.wait() |
2527 | - xb_err.seek(0) |
2528 | - logger.info("innobackupex stderr: " + xb_err.read()) |
2529 | - os.remove('/tmp/twindb.xb.err') |
2530 | + try: |
2531 | + p_xb = subprocess.Popen(xb_cmd, stdout=xb_err, stderr=xb_err) |
2532 | + p_xb.communicate() |
2533 | + except OSError as err: |
2534 | + raise JobError("Failed to run %r: %s" % (xb_cmd, err)) |
2535 | + finally: |
2536 | + xb_err.seek(0) |
2537 | + logger.info("innobackupex stderr: " + xb_err.read()) |
2538 | + os.remove('/tmp/twindb.xb.err') |
2539 | if p_xb.returncode != 0: |
2540 | - logger.info("Failed to apply log on full copy %s" % (full_copy)) |
2541 | - return False |
2542 | + raise JobError("Failed to apply log on full copy %s" % full_copy["name"]) |
2543 | # if we restore full backup return now |
2544 | - if full_copy == backup_name: |
2545 | - logger.info("Successfully restored backup %s in %s" % (backup_name, dst_dir)) |
2546 | - return True |
2547 | + if full_copy_id == job["params"]["backup_copy_id"]: |
2548 | + logger.info("Successfully restored backup %s in %s" % (full_copy["name"], dst_dir)) |
2549 | + return 0 |
2550 | # Now copy all incremental copies and apply then on dst_dir |
2551 | - inc_copy = full_copy |
2552 | + backup_copy = full_copy |
2553 | + backup_copy_id = full_copy_id |
2554 | while True: |
2555 | - inc_copy = get_child_of(full_copy) |
2556 | + backup_copy = get_child_of(backup_copy_id) |
2557 | + if not backup_copy: |
2558 | + raise JobError("Failed to get a child copy of backup_copy_id %d" % backup_copy_id) |
2559 | + backup_copy_id = int(backup_copy["backup_copy_id"]) |
2560 | inc_dir = tempfile.mkdtemp() |
2561 | - extract_archive(config, inc_copy, inc_dir) |
2562 | - |
2563 | - xb_err = open('/tmp/twindb.xb.err', "w+") |
2564 | - xb_cmd = ["innobackupex", '--apply-log'] |
2565 | - if inc_copy != backup_name: |
2566 | - xb_cmd.append('--redo-only') |
2567 | + if not extract_archive(config, backup_copy, inc_dir): |
2568 | + raise JobError("Failed to extract %s in %s" % (backup_copy["name"], inc_dir)) |
2569 | + try: |
2570 | + xb_err = open("/tmp/twindb.xb.err", "w+") |
2571 | + except IOError as err: |
2572 | + raise JobError("Failed to open /tmp/twindb.xb.err: %s" % err) |
2573 | + xb_cmd = ["innobackupex", "--apply-log"] |
2574 | + if backup_copy_id != job["params"]["backup_copy_id"]: |
2575 | + xb_cmd.append("--redo-only") |
2576 | xb_cmd.append('--incremental-dir=%s' % inc_dir) |
2577 | xb_cmd.append(dst_dir) |
2578 | - p_xb = subprocess.Popen(xb_cmd, stdout=xb_err, stderr=xb_err) |
2579 | - p_xb.wait() |
2580 | - xb_err.seek(0) |
2581 | - logger.info("innobackupex stderr: " + xb_err.read()) |
2582 | - os.remove('/tmp/twindb.xb.err') |
2583 | + try: |
2584 | + p_xb = subprocess.Popen(xb_cmd, stdout=xb_err, stderr=xb_err) |
2585 | + p_xb.wait() |
2586 | + except OSError as err: |
2587 | + raise JobError("Failed to run %r: %s" % (xb_cmd, err)) |
2588 | + finally: |
2589 | + xb_err.seek(0) |
2590 | + logger.info("innobackupex stderr: " + xb_err.read()) |
2591 | + os.remove('/tmp/twindb.xb.err') |
2592 | if p_xb.returncode != 0: |
2593 | - logger.info("Failed to apply log on full copy %s" % (full_copy)) |
2594 | - return False |
2595 | - os.removedirs(inc_dir) |
2596 | - if inc_copy == backup_name: |
2597 | + raise JobError("Failed to apply log on copy %s" % backup_copy["name"]) |
2598 | + shutil.rmtree(inc_dir) |
2599 | + if backup_copy_id == job["params"]["backup_copy_id"]: |
2600 | break |
2601 | - logger.info("Successfully restored backup %s in %s" % (backup_name, dst_dir)) |
2602 | - except: |
2603 | + logger.info("Successfully restored backup %s in %s" % (backup_copy["name"], dst_dir)) |
2604 | + except JobError as err: |
2605 | + logger.error(err) |
2606 | logger.error("Failed to restore backup") |
2607 | - logger.error(traceback.format_exc()) |
2608 | - return False |
2609 | + return -1 |
2610 | finally: |
2611 | for f in ["/tmp/twindb.xb.err", "/tmp/twindb.gpg.err", "/tmp/twindb.ssh.err"]: |
2612 | if os.path.isfile(f): |
2613 | os.remove(f) |
2614 | - return True |
2615 | - |
2616 | - |
2617 | -# Restores backup copy taken with mysqldump |
2618 | -# Inputs |
2619 | -# config - backup config |
2620 | -# job - job order |
2621 | -# Returns |
2622 | -# True - if backup successfully restored |
2623 | -# False - if restore job failed |
2624 | - |
2625 | -def restore_mysqldump(config, job): |
2626 | - global logger |
2627 | - global ssh_private_key |
2628 | - global ssh_port |
2629 | - |
2630 | - ret = True |
2631 | - backup_name = job["restore_backup_copy"] |
2632 | - backup_name_base = os.path.splitext(backup_name) |
2633 | - dst_dir = job["restore_dir"] |
2634 | - try: |
2635 | - if os.path.isdir(dst_dir): |
2636 | - if is_dir_empty(dst_dir): |
2637 | - logger.info("Directory %s exists. But it's empty, so we can restore backup here" % dst_dir) |
2638 | - else: |
2639 | - logger.error("Directory %s exists and isn't empty, so we can not restore backup here" % dst_dir) |
2640 | - return False |
2641 | - else: |
2642 | - os.makedirs(dst_dir) |
2643 | - except: |
2644 | - logger.error("Can't use directory %s as destination for backup" % dst_dir) |
2645 | - logger.error(traceback.format_exc()) |
2646 | - return False |
2647 | - full_copy = get_full_of(backup_name) |
2648 | - # Init mysql datadir |
2649 | - logger.info("Initializing MySQL datadir in %s" % dst_dir) |
2650 | - try: |
2651 | - mysql_socket = dst_dir + "/twindb.sock" |
2652 | - mysql_pid = dst_dir + "/mysqld.pid" |
2653 | - mysql_install_db_cmd = ["mysql_install_db", "--no-defaults"] |
2654 | - mysql_install_db_cmd.append("--datadir=%s" % dst_dir) |
2655 | - mysql_install_db_cmd.append("--user=root") |
2656 | - mysql_install_db_cmd.append("--skip-networking") |
2657 | - mysql_install_db_cmd.append("--log-error=/dev/stdout") |
2658 | - mysql_install_db_cmd.append("--pid-file=%s" % mysql_pid) |
2659 | - mysql_install_db_cmd.append("--socket=%s" % mysql_socket) |
2660 | - p = subprocess.Popen(mysql_install_db_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) |
2661 | - out, err = p.communicate() |
2662 | - ret = p.returncode |
2663 | - del p |
2664 | - if ret != 0: |
2665 | - logger.error("Couldn't init MySQL datadir in " + dst_dir) |
2666 | - logger.error("Command: %r" % mysql_install_db_cmd) |
2667 | - logger.error("STDOUT: %s" % out) |
2668 | - logger.error("STDERR: %s" % err) |
2669 | + return 0 |
2670 | + |
2671 | + |
2672 | +def extract_archive(config, arc, dst_dir): |
2673 | + """ |
2674 | + Extracts an Xtrabackup archive arc in dst_dir |
2675 | + :param config: backup config |
2676 | + :param arc: dictionary with archive to extract |
2677 | + :param dst_dir: local destination directory |
2678 | + :return: True - if archive is successfully extracted. |
2679 | + False - if error happened |
2680 | + """ |
2681 | + mandatory_params = ["backup_copy_id", "name", "ip"] |
2682 | + for param in mandatory_params: |
2683 | + if param not in arc: |
2684 | + logger.error("There is no %s in the archive parameters" % param) |
2685 | return False |
2686 | - else: |
2687 | - logger.info("MySQL datadir in %s is successfully created" % dst_dir) |
2688 | - except: |
2689 | - logger.error("Can't init MySQL datadir in " + dst_dir) |
2690 | - logger.error(traceback.format_exc()) |
2691 | - return False |
2692 | - # Start MySQL instance |
2693 | - logger.info("Starting MySQL instance in %s" % dst_dir) |
2694 | - try: |
2695 | - mysqld_err = open('/tmp/twindb.mysqld.err', "w+") |
2696 | - mysqld_cmd = ["mysqld", "--no-defaults"] |
2697 | - mysqld_cmd.append("--datadir=%s" % dst_dir) |
2698 | - mysqld_cmd.append("--user=root") |
2699 | - mysqld_cmd.append("--skip-networking") |
2700 | - mysqld_cmd.append("--skip-grant-tables") |
2701 | - mysqld_cmd.append("--log-error=/dev/stdout") |
2702 | - mysqld_cmd.append("--pid-file=%s" % mysql_pid) |
2703 | - mysqld_cmd.append("--socket=%s" % mysql_socket) |
2704 | - mysqld_proc = subprocess.Popen(mysqld_cmd, stdout=mysqld_err, stderr=mysqld_err) |
2705 | - start_timeout = 5 |
2706 | - while start_timeout >= 0: |
2707 | - if start_timeout == 0: |
2708 | - logger.error("Can't start MySQL instance in " + dst_dir) |
2709 | - try: |
2710 | - mysqld_proc.terminate() |
2711 | - except: |
2712 | - logger.info("MySQL process is already terminated") |
2713 | - finally: |
2714 | - if os.path.isfile("/tmp/twindb.mysqld.err"): |
2715 | - os.remove("/tmp/twindb.mysqld.err") |
2716 | - mysqld_err.seek(0) |
2717 | - logger.error("mysqld output: " + mysqld_err.read()) |
2718 | - mysqld_err.close() |
2719 | - return False |
2720 | - try: |
2721 | - con = MySQLdb.connect(unix_socket=mysql_socket) |
2722 | - logger.info("Started MySQL instance in %s successfully" % dst_dir) |
2723 | - start_timeout = -1 |
2724 | - del con |
2725 | - except: |
2726 | - start_timeout -= 1 |
2727 | - time.sleep(1) |
2728 | - except: |
2729 | - mysqld_err.seek(0) |
2730 | - logger.error("Can't start MySQL instance in " + dst_dir) |
2731 | - logger.error("mysqld output: " + mysqld_err.read()) |
2732 | - logger.error(traceback.format_exc()) |
2733 | - mysqld_err.close() |
2734 | - try: |
2735 | - mysqld_proc.terminate() |
2736 | - except: |
2737 | - logger.info("MySQL process is already terminated") |
2738 | - finally: |
2739 | - if os.path.isfile("/tmp/twindb.mysqld.err"): |
2740 | - os.remove("/tmp/twindb.mysqld.err") |
2741 | - return False |
2742 | - # Copy, decrypt and restore a dump |
2743 | - logger.info("Restoring backup %s in %s" % (backup_name, dst_dir)) |
2744 | - try: |
2745 | - ssh_cmd = ["ssh", "-oStrictHostKeyChecking=no", "-i", ssh_private_key, "-p", str(ssh_port)] |
2746 | - ssh_cmd.append(config["username"] + "@" + config["dst_ip"]) |
2747 | - ssh_cmd.append("/bin/cat %s" % backup_name) |
2748 | - |
2749 | - gpg_cmd = ["gpg", "--decrypt"] |
2750 | - |
2751 | - mysql_cmd = ["mysql", "-S", mysql_socket] |
2752 | - |
2753 | - mysql_err = open('/tmp/twindb.mysql.err', "w+") |
2754 | - gpg_err = open('/tmp/twindb.gpg.err', "w+") |
2755 | - ssh_err = open('/tmp/twindb.ssh.err', "w+") |
2756 | - logger.info("Starting: %r" % ssh_cmd) |
2757 | - p1 = subprocess.Popen(ssh_cmd, stdout=subprocess.PIPE, stderr=ssh_err) |
2758 | - logger.info("Starting: %r" % gpg_cmd) |
2759 | - p2 = subprocess.Popen(gpg_cmd, stdin=p1.stdout, stdout=subprocess.PIPE, stderr=gpg_err) |
2760 | - logger.info("Starting: %r" % mysql_cmd) |
2761 | - p3 = subprocess.Popen(mysql_cmd, stdin=p2.stdout, stdout=subprocess.PIPE, stderr=mysql_err) |
2762 | - p1.stdout.close() |
2763 | - p2.stdout.close() |
2764 | - out3 = p3.communicate()[0] |
2765 | - mysql_err.seek(0) |
2766 | - gpg_err.seek(0) |
2767 | - ssh_err.seek(0) |
2768 | - logger.info("SSH stderr: " + ssh_err.read()) |
2769 | - logger.info("GPG stderr: " + gpg_err.read()) |
2770 | - logger.info("mysql stderr: " + mysql_err.read()) |
2771 | - logger.info("Successfully restored backup %s in %s" % (backup_name, dst_dir)) |
2772 | - ret = p3.returncode |
2773 | - # Stop MySQL |
2774 | - mysqld_proc.terminate() |
2775 | - except: |
2776 | - logger.error("Failed to restore backup") |
2777 | - logger.error(traceback.format_exc()) |
2778 | - return False |
2779 | - finally: |
2780 | - for f in ["/tmp/twindb.mysql.err", "/tmp/twindb.gpg.err", "/tmp/twindb.ssh.err"]: |
2781 | - if os.path.isfile(f): |
2782 | - os.remove(f) |
2783 | - try: |
2784 | - mysqld_proc.terminate() |
2785 | - except: |
2786 | - logger.info("MySQL process is already terminated") |
2787 | + logger.info("Extracting %s in %s" % (arc["name"], dst_dir)) |
2788 | + ssh_cmd = [ |
2789 | + "ssh", |
2790 | + "-oStrictHostKeyChecking=no", |
2791 | + "-i", ssh_private_key_file, |
2792 | + "-p", str(ssh_port), |
2793 | + "user_id_%s@%s" % (config["user_id"], arc["ip"]), |
2794 | + "/bin/cat %s" % arc["name"] |
2795 | + ] |
2796 | + gpg_cmd = ["gpg", "--decrypt"] |
2797 | + xb_cmd = ["xbstream", "-x"] |
2798 | + |
2799 | + desc_file = None |
2800 | + try: |
2801 | + err_desc = dict() |
2802 | + for desc in ["xb", "gpg", "ssh"]: |
2803 | + desc_file = ("/tmp/twindb.%s.err" % desc) |
2804 | + err_desc[desc] = open(desc_file, "w+") |
2805 | + except IOError as err: |
2806 | + logger.error("Failed to open %s: %s" % (desc_file, err)) |
2807 | + return False |
2808 | + |
2809 | + logger.info("Starting: %r" % ssh_cmd) |
2810 | + p1 = subprocess.Popen(ssh_cmd, stdout=subprocess.PIPE, stderr=err_desc["ssh"]) |
2811 | + logger.info("Starting: %r" % gpg_cmd) |
2812 | + p2 = subprocess.Popen(gpg_cmd, stdin=p1.stdout, stdout=subprocess.PIPE, |
2813 | + stderr=err_desc["gpg"]) |
2814 | + logger.info("Starting: %r" % xb_cmd) |
2815 | + p3 = subprocess.Popen(xb_cmd, stdin=p2.stdout, stdout=subprocess.PIPE, |
2816 | + stderr=err_desc["xb"], cwd=dst_dir) |
2817 | + p1.wait() |
2818 | + p2.wait() |
2819 | + p3.wait() |
2820 | + for desc in ["xb", "gpg", "ssh"]: |
2821 | + err_desc[desc].seek(0) |
2822 | + |
2823 | + logger.info("SSH stderr: " + err_desc["ssh"].read()) |
2824 | + logger.info("GPG stderr: " + err_desc["gpg"].read()) |
2825 | + logger.info("xbstream stderr: " + err_desc["xb"].read()) |
2826 | + if p1.returncode != 0 or p2.returncode != 0 or p3.returncode != 0: |
2827 | + logger.info("Failed to extract backup %s in %s" % (arc["name"], dst_dir)) |
2828 | + return False |
2829 | + desc_file = None |
2830 | + try: |
2831 | + for desc in ["xb", "gpg", "ssh"]: |
2832 | + desc_file = ("/tmp/twindb.%s.err" % desc) |
2833 | + if os.path.isfile(desc_file): |
2834 | + os.remove(desc_file) |
2835 | + except IOError as err: |
2836 | + logger.error("Failed to open %s: %s" % (desc_file, err)) |
2837 | + logger.info("Extracted successfully %s in %s" % (arc["name"], dst_dir)) |
2838 | return True |
2839 | |
2840 | |
2841 | -# Finds LSN in XtraBackup output |
2842 | - |
2843 | -def grep_lsn(str): |
2844 | +def get_full_of(backup_copy_id): |
2845 | + """ |
2846 | + Gets full copy of a given backup_copy_id |
2847 | + :param backup_copy_id: backup_copy_id |
2848 | + :return: dictionary with full backup copy parameters: |
2849 | + { |
2850 | + "backup_copy_id": 188, |
2851 | + "name": "server_id_479a41b3-d22d-41a8-b7d3-4e40302622f6_2015-04-06T15:10:46.050984.tar.gp", |
2852 | + "ip": "127.0.0.1" |
2853 | + } |
2854 | + """ |
2855 | + logger.debug("Getting full copy parameters of backup_copy_id %d" % backup_copy_id) |
2856 | + response_body = "Empty response" |
2857 | + full_copy = None |
2858 | + try: |
2859 | + data = { |
2860 | + "type": "get_full_copy_params", |
2861 | + "params": { |
2862 | + "backup_copy_id": backup_copy_id |
2863 | + } |
2864 | + } |
2865 | + response_body = get_response(data) |
2866 | + if not response_body: |
2867 | + logger.debug("Empty response from dispatcher") |
2868 | + return None |
2869 | + d = json.JSONDecoder() |
2870 | + response_body_decoded = d.decode(response_body) |
2871 | + if response_body_decoded: |
2872 | + msg_decrypted = decrypt(response_body_decoded["response"]) |
2873 | + msg_pt = d.decode(msg_decrypted) |
2874 | + full_copy = msg_pt["data"] |
2875 | + logger.info("Got full copy params:\n%s" |
2876 | + % json.dumps(full_copy, indent=4, sort_keys=True)) |
2877 | + if msg_pt["error"]: |
2878 | + logger.error(msg_pt["error"]) |
2879 | + except exceptions.KeyError as err: |
2880 | + logger.error("Failed to decode %s" % response_body) |
2881 | + logger.error(err) |
2882 | + return None |
2883 | + return full_copy |
2884 | + |
2885 | + |
2886 | +def get_child_of(backup_copy_id): |
2887 | + """ |
2888 | + Returns a child copy params of a given backup copy |
2889 | + :param backup_copy_id: |
2890 | + :return: |
2891 | + """ |
2892 | + logger.debug("Getting child copy parameters of backup_copy_id %d" % backup_copy_id) |
2893 | + response_body = "Empty response" |
2894 | + child_copy = None |
2895 | + try: |
2896 | + data = { |
2897 | + "type": "get_child_copy_params", |
2898 | + "params": { |
2899 | + "backup_copy_id": backup_copy_id |
2900 | + } |
2901 | + } |
2902 | + response_body = get_response(data) |
2903 | + if not response_body: |
2904 | + return None |
2905 | + d = json.JSONDecoder() |
2906 | + response_body_decoded = d.decode(response_body) |
2907 | + if response_body_decoded: |
2908 | + msg_decrypted = decrypt(response_body_decoded["response"]) |
2909 | + msg_pt = d.decode(msg_decrypted) |
2910 | + child_copy = msg_pt["data"] |
2911 | + logger.info("Got child copy params:\n%s" |
2912 | + % json.dumps(child_copy, indent=4, sort_keys=True)) |
2913 | + if msg_pt["error"]: |
2914 | + logger.error(msg_pt["error"]) |
2915 | + except exceptions.KeyError as err: |
2916 | + logger.error("Failed to decode %s" % response_body) |
2917 | + logger.error(err) |
2918 | + return None |
2919 | + return child_copy |
2920 | + |
2921 | + |
2922 | +def grep_lsn(output): |
2923 | + """ |
2924 | + Finds LSN in XtraBackup output |
2925 | + :param output: string with Xtrabackup output |
2926 | + :return: LSN |
2927 | + """ |
2928 | lsn = None |
2929 | - for line in str.split("\n"): |
2930 | + for line in output.split("\n"): |
2931 | if line.startswith("xtrabackup: The latest check point (for incremental):"): |
2932 | lsn = line.split("'")[1] |
2933 | return lsn |
2934 | |
2935 | |
2936 | -# Finds MySQL socket |
2937 | def get_unix_socket(): |
2938 | - socket = "" |
2939 | + """ |
2940 | + Finds MySQL socket |
2941 | + :return: path to unix socket or None if not found |
2942 | + """ |
2943 | + cmd = ["lsof", "-U", "-c", "/^mysqld$/", "-a", "-F", "n"] |
2944 | try: |
2945 | - cmd = [ |
2946 | - "lsof", "-U", "-c", "/^mysqld$/", "-a", "-F", "n" |
2947 | - ] |
2948 | p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) |
2949 | cout, cerr = p.communicate() |
2950 | # Outputs socket in format |
2951 | # # lsof -U -c mysqld -a -F n |
2952 | # p11029 |
2953 | # n/var/lib/mysql/mysql.sock |
2954 | - socket = cout.split()[1][1:] |
2955 | - if not os.path.exists(socket): |
2956 | + mysql_socket = cout.split()[1][1:] |
2957 | + if not os.path.exists(mysql_socket): |
2958 | return None |
2959 | - except: |
2960 | + except OSError as err: |
2961 | + logger.error("Failed to run command %r. %s" % (cmd, err)) |
2962 | return None |
2963 | - |
2964 | - return socket |
2965 | + return mysql_socket |
2966 | |
2967 | |
2968 | def pid_exists(pid): |
2969 | - """Check whether pid exists in the current process table.""" |
2970 | + """ |
2971 | + Checks whether pid exists in the current process table. |
2972 | + """ |
2973 | if pid < 0: |
2974 | return False |
2975 | try: |
2976 | os.kill(pid, 0) |
2977 | - except OSError, e: |
2978 | + except OSError as e: |
2979 | return e.errno == errno.EPERM |
2980 | else: |
2981 | return True |
2982 | @@ -2194,12 +1916,12 @@ |
2983 | Raise TimeoutExpired on timeout expired (if specified). |
2984 | """ |
2985 | |
2986 | - def check_timeout(delay): |
2987 | + def check_timeout(timeout_delay): |
2988 | if timeout is not None: |
2989 | if time.time() >= stop_at: |
2990 | raise TimeoutExpired |
2991 | - time.sleep(delay) |
2992 | - return min(delay * 2, 0.04) |
2993 | + time.sleep(timeout_delay) |
2994 | + return min(timeout_delay * 2, 0.04) |
2995 | |
2996 | if timeout is not None: |
2997 | waitcall = lambda: os.waitpid(pid, os.WNOHANG) |
2998 | @@ -2247,100 +1969,155 @@ |
2999 | raise RuntimeError("unknown process exit status") |
3000 | |
3001 | |
3002 | -# Removes pid file |
3003 | -# Exits if error happened |
3004 | - |
3005 | def remove_pid(): |
3006 | - global pid_file |
3007 | - try: |
3008 | - if os.path.isfile(pid_file): |
3009 | + """ |
3010 | + Removes pid file |
3011 | + :return: nothing |
3012 | + Exits if error happened |
3013 | + """ |
3014 | + if os.path.isfile(pid_file): |
3015 | + try: |
3016 | os.remove(pid_file) |
3017 | - except: |
3018 | - logger.error(traceback.format_exc()) |
3019 | - sys.exit(2) |
3020 | - |
3021 | - |
3022 | -# Cleans up when TwinDB agent exists |
3023 | + except IOError as err: |
3024 | + exit_on_error("Failed to remove file %s. %s" % (pid_file, err)) |
3025 | + |
3026 | + |
3027 | +def check_pid(): |
3028 | + """ |
3029 | + Checks if pid file already exists. |
3030 | + If it does it detects whether twindb agent is already running. |
3031 | + If the pid file is stale it removes it. |
3032 | + :return: True if pid file doesn't exist or was stale and it was removed. |
3033 | + False if twindb agent is running or error happened |
3034 | + """ |
3035 | + if os.path.isfile(pid_file): |
3036 | + pid = read_pid() |
3037 | + if pid_exists(pid): |
3038 | + try: |
3039 | + f = open("/proc/%d/cmdline" % pid, "r") |
3040 | + cmdline = f.readline() |
3041 | + f.close() |
3042 | + if "twindb.py" in cmdline: |
3043 | + # The process is a live twindb agent |
3044 | + return False |
3045 | + else: |
3046 | + # It's some other process, not a twindb agent |
3047 | + return True |
3048 | + except IOError as err: |
3049 | + logger.error("Can't read from file /proc/%d/cmdline:%s " % (pid, err)) |
3050 | + return False |
3051 | + else: |
3052 | + try: |
3053 | + os.remove(pid_file) |
3054 | + # It's a stale pid file |
3055 | + return True |
3056 | + except IOError as err: |
3057 | + logger.error("Can't remove file %s: %s" % (pid_file, err)) |
3058 | + return False |
3059 | + else: |
3060 | + # pid file doesn't exist |
3061 | + return True |
3062 | + |
3063 | + |
3064 | +def read_pid(): |
3065 | + """ |
3066 | + Read pid from pid_file |
3067 | + :return: pid or zero if pid file doesn't exist |
3068 | + """ |
3069 | + try: |
3070 | + f = open(pid_file, 'r') |
3071 | + pid = int(f.readline()) |
3072 | + f.close() |
3073 | + except IOError as err: |
3074 | + logger.error("Couldn't read from %s: %s" % (pid_file, err)) |
3075 | + return 0 |
3076 | + return int(pid) |
3077 | + |
3078 | + |
3079 | +def write_pid(): |
3080 | + """ |
3081 | + Writes pid of the current process to the pid file |
3082 | + :return: nothing. |
3083 | + Exists if error happened |
3084 | + """ |
3085 | + try: |
3086 | + f = open(pid_file, "w") |
3087 | + f.write(str(os.getpid())) |
3088 | + f.close() |
3089 | + except IOError as err: |
3090 | + logger.error("Couldn't save process id in " + pid_file) |
3091 | + exit_on_error(err) |
3092 | + |
3093 | |
3094 | def cleanup(signum, frame): |
3095 | - global logger |
3096 | - global pid_file |
3097 | + """ |
3098 | + Cleans up when TwinDB agent exists |
3099 | + :param signum: |
3100 | + :param frame: |
3101 | + :return: |
3102 | + """ |
3103 | |
3104 | logger.info("Cleaning up on signal " + str(signum)) |
3105 | - remove_pid() |
3106 | + logger.debug("Frame %r" % frame) |
3107 | logger.info("TwinDB agent is ready to exit") |
3108 | sys.exit(0) |
3109 | |
3110 | |
3111 | -# Reports error removes pid and exits |
3112 | - |
3113 | -def exit_on_error(tb=None): |
3114 | - if tb is not None: |
3115 | - logger.debug(tb) |
3116 | - remove_pid() |
3117 | +def exit_on_error(message): |
3118 | + """ |
3119 | + Reports error removes pid and exits |
3120 | + :rtype : object |
3121 | + :param message: message to display |
3122 | + :return: |
3123 | + """ |
3124 | + logger.error(message) |
3125 | + logger.debug(traceback.format_exc()) |
3126 | sys.exit(2) |
3127 | |
3128 | |
3129 | # Stops TwinDB agent |
3130 | |
3131 | def stop(): |
3132 | - global logger |
3133 | - global pid_file |
3134 | + """ |
3135 | + Stops TwinDB agent |
3136 | + :return: nothing |
3137 | + """ |
3138 | |
3139 | logger.info("Shutting down TwinDB agent") |
3140 | |
3141 | if not os.path.exists(pid_file): |
3142 | logger.info("Pid file %s does not exist. Probably twindb agent isn't running" % pid_file) |
3143 | sys.exit(0) |
3144 | - |
3145 | + pid = None |
3146 | try: |
3147 | f = open(pid_file, 'r') |
3148 | pid = int(f.readline()) |
3149 | os.kill(pid, signal.SIGTERM) |
3150 | wait_pid(pid, 300) |
3151 | f.close() |
3152 | + remove_pid() |
3153 | except OSError as err: |
3154 | logger.error("Couldn't kill process %d" % pid) |
3155 | - logger.error(err) |
3156 | - exit_on_error(traceback.format_exc()) |
3157 | + exit_on_error(err) |
3158 | except IOError as err: |
3159 | logger.error("Couldn't read from %s" % pid_file) |
3160 | - logger.error(err) |
3161 | - exit_on_error(traceback.format_exc()) |
3162 | - except: |
3163 | - logger.error("Unexpected error: %s,%s" % sys.exc_info()[:2]); |
3164 | - exit_on_error(traceback.format_exc()) |
3165 | - finally: |
3166 | - remove_pid() |
3167 | + exit_on_error(err) |
3168 | logger.info("TwinDB agent successfully shut down") |
3169 | sys.exit(0) |
3170 | |
3171 | |
3172 | -# Starts TwinDB agent |
3173 | - |
3174 | def start(): |
3175 | - global logger |
3176 | - global pid_file |
3177 | - global check_period |
3178 | - |
3179 | + """ |
3180 | + Starts TwinDB agent |
3181 | + :return: nothing |
3182 | + """ |
3183 | + global job_id |
3184 | logger.info("Starting TwinDB agent") |
3185 | |
3186 | - if os.path.isfile(pid_file): |
3187 | - logger.error("PID file " + pid_file + " exists"); |
3188 | - logger.error("Check if TwinDB agent is already running"); |
3189 | - logger.error("If not, remove file " + pid_file + " manually and restart the agent"); |
3190 | - sys.exit(2) |
3191 | - try: |
3192 | - f = open(pid_file, 'w') |
3193 | - f.write(str(os.getpid())) |
3194 | - f.close() |
3195 | - except IOError as err: |
3196 | - logger.error("Couldn't save process id in " + pid_file) |
3197 | - logger.error(err) |
3198 | - exit_on_error(traceback.format_exc()) |
3199 | - except: |
3200 | - logger.error("Unexpected error: %s,%s" % sys.exc_info()[:2]); |
3201 | - exit_on_error(traceback.format_exc()) |
3202 | + if check_pid(): |
3203 | + write_pid() |
3204 | + else: |
3205 | + exit_on_error("Another instance of TwinDB agent is running. Exiting") |
3206 | try: |
3207 | while True: |
3208 | read_config() |
3209 | @@ -2348,7 +2125,7 @@ |
3210 | if not config: |
3211 | time.sleep(check_period) |
3212 | continue |
3213 | - report_sss() |
3214 | + report_sss(config) |
3215 | report_agent_privileges(config) |
3216 | job = get_job() |
3217 | if job: |
3218 | @@ -2356,130 +2133,111 @@ |
3219 | time.sleep(check_period) |
3220 | except SystemExit: |
3221 | remove_pid() |
3222 | - except: |
3223 | - exit_on_error(traceback.format_exc()) |
3224 | sys.exit(0) |
3225 | |
3226 | |
3227 | -# Processes job |
3228 | -# Inputs |
3229 | -# config - backup config |
3230 | -# job - job order |
3231 | -# Returns what respective job function returns or False if error happens |
3232 | def process_job(config, job): |
3233 | - global logger |
3234 | + """ |
3235 | + Processes job |
3236 | + :param config: backup config |
3237 | + :param job: job order |
3238 | + :return: what respective job function returns or False if error happens |
3239 | + """ |
3240 | global job_id |
3241 | + # Check to see that the twindb_agent MySQL user has enough privileges |
3242 | + username = config["mysql_user"] |
3243 | + password = config["mysql_password"] |
3244 | + |
3245 | + job_id = int(job["job_id"]) |
3246 | |
3247 | try: |
3248 | - # Check to see that the twindb_agent MySQL user has enough privileges |
3249 | - username = config["mysql_user"] |
3250 | - password = config["mysql_password"] |
3251 | - |
3252 | mysql_access_available, missing_mysql_privileges = has_mysql_access(username, password, grant_capability=False) |
3253 | if not mysql_access_available: |
3254 | logger.error("The MySQL user %s does not have all the required privileges." % username) |
3255 | - if len(missing_mysql_privileges) > 0: |
3256 | - logger.error("You can grant the required privileges by executing the following SQL: " |
3257 | - "GRANT %s ON *.* TO '%s'@'localhost'" % (','.join(missing_mysql_privileges), username)) |
3258 | - |
3259 | - return False |
3260 | - |
3261 | - if job["start_scheduled"] == None: |
3262 | - logger.error("Job start time isn't set") |
3263 | - return False |
3264 | - |
3265 | + if missing_mysql_privileges: |
3266 | + raise JobError("You can grant the required privileges by executing the following SQL: " |
3267 | + "GRANT %s ON *.* TO '%s'@'localhost'" % (','.join(missing_mysql_privileges), username)) |
3268 | + if not job["start_scheduled"]: |
3269 | + raise JobError("Job start time isn't set") |
3270 | start_scheduled = int(job["start_scheduled"]) |
3271 | now = int(time.time()) |
3272 | if now < start_scheduled: |
3273 | - logger.info("Job is scheduled on %s, now %s" |
3274 | - % (time.ctime(start_scheduled), time.ctime(now))) |
3275 | - return False |
3276 | - |
3277 | + raise JobTooSoonError("Job is scheduled on %s, now %s" |
3278 | + % (time.ctime(start_scheduled), time.ctime(now))) |
3279 | logger.info("Processing job_id = %d", int(job_id)) |
3280 | if job["type"] == "backup": |
3281 | return take_backup(config, job) |
3282 | elif job["type"] == "restore": |
3283 | - #return restore_backup(config, job) |
3284 | - return False |
3285 | + return restore_backup(config, job) |
3286 | elif job["type"] == "send_key": |
3287 | - return handler_send_key(config, job) |
3288 | + return handler_send_key(job) |
3289 | else: |
3290 | - logger.error("Unsupported job type " + job["type"]) |
3291 | - return False |
3292 | - except: |
3293 | - logger.error("Unexpected error: %s,%s" % sys.exc_info()[:2]) |
3294 | + raise JobError("Unsupported job type " + job["type"]) |
3295 | + except JobError as err: |
3296 | + logger.error("Job error: %s", err) |
3297 | + return False |
3298 | + except JobTooSoonError as err: |
3299 | + logger.debug(err) |
3300 | return False |
3301 | finally: |
3302 | job_id = 0 |
3303 | - return False |
3304 | |
3305 | |
3306 | def setup_logging(): |
3307 | + """ |
3308 | + Setups logging handlers, formats |
3309 | + :return: |
3310 | + """ |
3311 | global logger |
3312 | |
3313 | - ch = logging.StreamHandler() |
3314 | - sh = logging.handlers.SysLogHandler() |
3315 | - rh = RlogHandler() |
3316 | + console_handler = logging.StreamHandler() |
3317 | + remote_handler = RlogHandler() |
3318 | + |
3319 | fmt_str = '%(name)s: %(levelname)s: %(funcName)s():%(lineno)d: %(message)s' |
3320 | - sfmt = logging.Formatter(fmt_str) |
3321 | - cfmt = logging.Formatter("%(asctime)s: " + fmt_str) |
3322 | - ch.setFormatter(cfmt) |
3323 | - sh.setFormatter(sfmt) |
3324 | - rh.setFormatter(sfmt) |
3325 | - logger.addHandler(sh) |
3326 | - logger.addHandler(ch) |
3327 | - logger.addHandler(rh) |
3328 | + remote_format = logging.Formatter(fmt_str) |
3329 | + console_format = logging.Formatter("%(asctime)s: " + fmt_str) |
3330 | + |
3331 | + console_handler.setFormatter(console_format) |
3332 | + remote_handler.setFormatter(remote_format) |
3333 | + |
3334 | + logger.addHandler(console_handler) |
3335 | + logger.addHandler(remote_handler) |
3336 | logger.setLevel(logging.INFO) |
3337 | - # syslog handler shouldn't log DEBUG messages |
3338 | - sh.setLevel(logging.INFO) |
3339 | - |
3340 | - |
3341 | -# Main function |
3342 | -# Parses options, creates log class etc |
3343 | + |
3344 | |
3345 | def main(): |
3346 | + """ |
3347 | + Main function |
3348 | + Parses options, creates log class etc |
3349 | + :return: |
3350 | + """ |
3351 | + global server_id |
3352 | + global check_period |
3353 | + global host |
3354 | + global mysql_user |
3355 | + global mysql_password |
3356 | + global debug |
3357 | + global logger |
3358 | + |
3359 | + setup_logging() |
3360 | + read_config() |
3361 | # before we do *anything* we must ensure server_id is generated or read from config |
3362 | - global host |
3363 | - global init_config |
3364 | - global server_id |
3365 | - global check_period |
3366 | - global logger |
3367 | - global mysql_user |
3368 | - global mysql_password |
3369 | - global debug |
3370 | - |
3371 | # Read or generate server id |
3372 | - if os.path.exists(init_config): |
3373 | - try: |
3374 | - execfile(init_config, globals()) |
3375 | - if len(server_id) == 0: |
3376 | - print("Error: Config %s doesn't set server_id" |
3377 | - % init_config, file=sys.stderr) |
3378 | - sys.exit(2) |
3379 | - except: |
3380 | - print(traceback.format_exc()) |
3381 | - print("Error: Failed to read from config in %s" |
3382 | - % init_config, file=sys.stderr) |
3383 | - sys.exit(2) |
3384 | - else: |
3385 | + if not server_id: |
3386 | server_id = str(uuid.uuid4()) |
3387 | - save_config(server_id) |
3388 | + save_config() |
3389 | |
3390 | # At this point server_id must be set |
3391 | - if len(server_id) == 0: |
3392 | - print("Error: Server id is empty. Can not continue operation", |
3393 | - file=sys.stderr) |
3394 | + if not server_id: |
3395 | + print("Error: Server id is empty. Can not continue operation", file=sys.stderr) |
3396 | sys.exit(2) |
3397 | - |
3398 | - setup_logging() |
3399 | - |
3400 | # Signal handlers |
3401 | signal.signal(signal.SIGTERM, cleanup) |
3402 | signal.signal(signal.SIGINT, cleanup) |
3403 | |
3404 | # Not clear why I ignore SIGCHLD. |
3405 | # it make subprocess.call throw exception, so I comment it out so far |
3406 | - #signal.signal(signal.SIGCHLD, signal.SIG_IGN) |
3407 | + # signal.signal(signal.SIGCHLD, signal.SIG_IGN) |
3408 | |
3409 | opts = [] |
3410 | try: |
3411 | @@ -2489,15 +2247,11 @@ |
3412 | "unregister", "delete-backups", "backup", "is-registered"]) |
3413 | except getopt.GetoptError as err: |
3414 | logger.error(err) |
3415 | - exit_on_error(traceback.format_exc()) |
3416 | - except: |
3417 | - logger.error("Unexpected error: %s,%s" % sys.exc_info()[:2]); |
3418 | - usage() |
3419 | - exit_on_error(traceback.format_exc()) |
3420 | |
3421 | # Set options first |
3422 | action = None |
3423 | delete_backups = False |
3424 | + regcode = None |
3425 | for opt, arg in opts: |
3426 | if opt == '-i': |
3427 | check_period = int(arg) |
3428 | @@ -2547,29 +2301,32 @@ |
3429 | check_period = 1 |
3430 | |
3431 | if action == "start": |
3432 | - if is_registered(): |
3433 | - start() |
3434 | - else: |
3435 | - logger.error( |
3436 | - "The server must be registered first. Run following command:\n\ntwindb --register <registration code>\n") |
3437 | - logger.error("Get your code on https://console.twindb.com/?get_code") |
3438 | - sys.exit(2) |
3439 | + while True: |
3440 | + # It'll keep checking until the agent is registered |
3441 | + # Then it starts |
3442 | + if is_registered(): |
3443 | + start() |
3444 | + else: |
3445 | + logger.error("The server must be registered first. Run following command:\n\n" |
3446 | + "twindb --register <registration code>\n") |
3447 | + logger.error("Get your code on https://console.twindb.com/?get_code") |
3448 | + time.sleep(check_period) |
3449 | elif action == "stop": |
3450 | stop() |
3451 | elif action == "register": |
3452 | action_handler_register(regcode) |
3453 | elif action == "unregister": |
3454 | if not is_registered(): |
3455 | - logger.error( |
3456 | - "The server must be registered first. Run following command:\n\ntwindb --register <registration code>\n") |
3457 | + logger.error("The server must be registered first. Run following command:" |
3458 | + "\n\ntwindb --register <registration code>\n") |
3459 | logger.error("Get your code on https://console.twindb.com/?get_code") |
3460 | sys.exit(2) |
3461 | if action_handler_unregister(delete_backups): |
3462 | stop() |
3463 | elif action == "backup": |
3464 | if not is_registered(): |
3465 | - logger.error( |
3466 | - "The server must be registered first. Run following command:\n\ntwindb --register <registration code>\n") |
3467 | + logger.error("The server must be registered first. Run following command:" |
3468 | + "\n\ntwindb --register <registration code>\n") |
3469 | logger.error("Get your code on https://console.twindb.com/?get_code") |
3470 | sys.exit(2) |
3471 | if not action_handler_backup(): |
3472 | @@ -2583,9 +2340,8 @@ |
3473 | sys.exit(0) |
3474 | else: |
3475 | # If we got there print usage() and exit |
3476 | - logger.error("Neither --start nor --stop nor --register is specified") |
3477 | + logger.error("Failed to parse command line options") |
3478 | usage() |
3479 | - remove_pid() |
3480 | sys.exit(2) |
3481 | |
3482 |