Merge lp:~akuzminsky/twindb-agent/restore into lp:twindb-agent
- restore
- Merge into trunk
Proposed by
Aleksandr Kuzminsky
Status: | Merged |
---|---|
Merged at revision: | 18 |
Proposed branch: | lp:~akuzminsky/twindb-agent/restore |
Merge into: | lp:twindb-agent |
Diff against target: |
3480 lines (+1215/-1459) 1 file modified
twindb.py (+1215/-1459) |
To merge this branch: | bzr merge lp:~akuzminsky/twindb-agent/restore |
Related bugs: |
Reviewer | Review Type | Date Requested | Status |
---|---|---|---|
Ovais Tariq | Pending | ||
Review via email: mp+255386@code.launchpad.net |
Commit message
Description of the change
To post a comment you must log in.
Preview Diff
[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1 | === modified file 'twindb.py' | |||
2 | --- twindb.py 2015-03-16 02:16:03 +0000 | |||
3 | +++ twindb.py 2015-04-07 16:22:26 +0000 | |||
4 | @@ -2,31 +2,34 @@ | |||
5 | 2 | from __future__ import print_function | 2 | from __future__ import print_function |
6 | 3 | from __future__ import unicode_literals | 3 | from __future__ import unicode_literals |
7 | 4 | from __future__ import absolute_import | 4 | from __future__ import absolute_import |
8 | 5 | import ConfigParser | ||
9 | 5 | from base64 import b64encode, b64decode | 6 | from base64 import b64encode, b64decode |
10 | 6 | from datetime import datetime | 7 | from datetime import datetime |
13 | 7 | import sys | 8 | import errno |
14 | 8 | import time | 9 | import exceptions |
15 | 9 | import getopt | 10 | import getopt |
16 | 11 | import httplib | ||
17 | 12 | import json | ||
18 | 13 | import logging.handlers | ||
19 | 14 | import os | ||
20 | 15 | import pwd | ||
21 | 10 | import signal | 16 | import signal |
25 | 11 | import traceback | 17 | import shutil |
23 | 12 | import exceptions | ||
24 | 13 | import errno | ||
26 | 14 | import socket | 18 | import socket |
27 | 15 | import logging.handlers | ||
28 | 16 | import os.path | ||
29 | 17 | import subprocess | 19 | import subprocess |
32 | 18 | import httplib | 20 | import sys |
33 | 19 | import json | 21 | import tempfile |
34 | 22 | import time | ||
35 | 23 | import traceback | ||
36 | 20 | import urllib | 24 | import urllib |
38 | 21 | 25 | import uuid | |
39 | 22 | 26 | ||
40 | 23 | try: | 27 | try: |
41 | 24 | import mysql.connector | 28 | import mysql.connector |
42 | 25 | except ImportError: | 29 | except ImportError: |
43 | 30 | # On CentOS 5 mysql.connector is in python 2.6 directory | ||
44 | 26 | sys.path.insert(0, '/usr/lib/python2.6/site-packages') | 31 | sys.path.insert(0, '/usr/lib/python2.6/site-packages') |
45 | 27 | import mysql.connector | 32 | import mysql.connector |
46 | 28 | import tempfile | ||
47 | 29 | import uuid | ||
48 | 30 | 33 | ||
49 | 31 | # global variables | 34 | # global variables |
50 | 32 | host = "dispatcher.twindb.com" | 35 | host = "dispatcher.twindb.com" |
51 | @@ -36,8 +39,8 @@ | |||
52 | 36 | debug_http = True | 39 | debug_http = True |
53 | 37 | debug_gpg = False | 40 | debug_gpg = False |
54 | 38 | init_config = "/etc/twindb.cfg" | 41 | init_config = "/etc/twindb.cfg" |
57 | 39 | ssh_private_key = "/root/.ssh/twindb.key" | 42 | ssh_private_key_file = "/root/.ssh/twindb.key" |
58 | 40 | ssh_public_key = "/root/.ssh/twindb.key.pub" | 43 | ssh_public_key_file = "/root/.ssh/twindb.key.pub" |
59 | 41 | ssh_port = 4194 | 44 | ssh_port = 4194 |
60 | 42 | gpg_homedir = "/root/.gnupg/" | 45 | gpg_homedir = "/root/.gnupg/" |
61 | 43 | pid_file = "/var/run/twindb.pid" | 46 | pid_file = "/var/run/twindb.pid" |
62 | @@ -45,9 +48,10 @@ | |||
63 | 45 | time_zone = "UTC" | 48 | time_zone = "UTC" |
64 | 46 | api_email = "api@twindb.com" | 49 | api_email = "api@twindb.com" |
65 | 47 | server_id = "" | 50 | server_id = "" |
69 | 48 | job_id = 0 | 51 | job_id = None |
70 | 49 | mysql_user = "root" | 52 | mysql_user = None |
71 | 50 | mysql_password = "" | 53 | mysql_password = None |
72 | 54 | agent_version = "@@TWINDB_AGENT_VERSION@@" | ||
73 | 51 | api_pub_key = """ | 55 | api_pub_key = """ |
74 | 52 | -----BEGIN PGP PUBLIC KEY BLOCK----- | 56 | -----BEGIN PGP PUBLIC KEY BLOCK----- |
75 | 53 | Version: GnuPG v1 | 57 | Version: GnuPG v1 |
76 | @@ -102,24 +106,24 @@ | |||
77 | 102 | =62IM | 106 | =62IM |
78 | 103 | -----END PGP PUBLIC KEY BLOCK----- | 107 | -----END PGP PUBLIC KEY BLOCK----- |
79 | 104 | """ | 108 | """ |
84 | 105 | agent_version = "@@TWINDB_AGENT_VERSION@@" | 109 | |
85 | 106 | 110 | ||
82 | 107 | |||
83 | 108 | # Logging handler that logs to remote TwiDB dispatcher | ||
86 | 109 | class RlogHandler(logging.Handler): | 111 | class RlogHandler(logging.Handler): |
87 | 112 | """ | ||
88 | 113 | Logging handler that logs to remote TwiDB dispatcher | ||
89 | 114 | """ | ||
90 | 115 | log_flag = True | ||
91 | 116 | |||
92 | 110 | def __init__(self): | 117 | def __init__(self): |
93 | 111 | logging.Handler.__init__(self) | 118 | logging.Handler.__init__(self) |
94 | 112 | self.log_flag = True | 119 | self.log_flag = True |
95 | 113 | 120 | ||
96 | 114 | def emit(self, record): | 121 | def emit(self, record): |
105 | 115 | global server_id | 122 | request = { |
106 | 116 | global job_id | 123 | "type": "log", |
107 | 117 | global debug | 124 | "params": {} |
108 | 118 | 125 | } | |
109 | 119 | request = {} | 126 | if job_id: |
102 | 120 | request["type"] = "log" | ||
103 | 121 | request["params"] = {} | ||
104 | 122 | if job_id != 0: | ||
110 | 123 | request["params"]["job_id"] = job_id | 127 | request["params"]["job_id"] = job_id |
111 | 124 | request["params"]["msg"] = record.getMessage() | 128 | request["params"]["msg"] = record.getMessage() |
112 | 125 | if self.log_flag and not debug: | 129 | if self.log_flag and not debug: |
113 | @@ -128,83 +132,106 @@ | |||
114 | 128 | self.log_flag = True | 132 | self.log_flag = True |
115 | 129 | 133 | ||
116 | 130 | 134 | ||
117 | 135 | class JobError(Exception): | ||
118 | 136 | def __init__(self, value): | ||
119 | 137 | self.value = value | ||
120 | 138 | |||
121 | 139 | def __str__(self): | ||
122 | 140 | return self.value | ||
123 | 141 | |||
124 | 142 | |||
125 | 143 | class JobTooSoonError(Exception): | ||
126 | 144 | def __init__(self, value): | ||
127 | 145 | self.value = value | ||
128 | 146 | |||
129 | 147 | def __str__(self): | ||
130 | 148 | return self.value | ||
131 | 149 | |||
132 | 150 | |||
133 | 131 | def usage(): | 151 | def usage(): |
156 | 132 | print("Usage:\n\ | 152 | print("""Usage: |
157 | 133 | twindb --start | --stop | --register <registration code> [-g] [-i interval]\n\ | 153 | twindb --start | --stop | --register <registration code> [-g] [-i interval] |
158 | 134 | [-u user] [-p password]\n\ | 154 | [-u user] [-p password] |
159 | 135 | \n\ | 155 | |
160 | 136 | --start Start TwinDB agent.\n\ | 156 | --start Start TwinDB agent. |
161 | 137 | --stop Stop TwinDB agent.\n\ | 157 | --stop Stop TwinDB agent. |
162 | 138 | \n\ | 158 | |
163 | 139 | --register <code> Register this server in TwinDB.\n\ | 159 | --register <code> Register this server in TwinDB. |
164 | 140 | <code> is string like 7b90ae21ac642f2f8fc0a285c4789147\n\ | 160 | <code> is string like 7b90ae21ac642f2f8fc0a285c4789147 |
165 | 141 | Get your code on https://console.twindb.com/?get_code\n\ | 161 | Get your code on https://console.twindb.com/?get_code |
166 | 142 | \n\ | 162 | |
167 | 143 | --unregister [--delete-backups] Unregister this server from TwinDB.\n\ | 163 | --unregister [--delete-backups] Unregister this server from TwinDB. |
168 | 144 | --backup Start backup from this server immediately.\n\ | 164 | --backup Start backup from this server immediately. |
169 | 145 | --is-registered Checks whether the server is registered and prints YES or NO.\n\ | 165 | --is-registered Checks whether the server is registered and prints YES or NO. |
170 | 146 | -g Print debug message.\n\ | 166 | -g Print debug message. |
171 | 147 | -i interval Check for a job every interval seconds.\n\ | 167 | -i interval Check for a job every interval seconds. |
172 | 148 | Default 300.\n\ | 168 | Default {check_period}. |
173 | 149 | During registration TwinDB agent needs to get some information from MySQL,\n\ | 169 | During registration TwinDB agent needs to get some information from MySQL, |
174 | 150 | (master host etc)\n\ | 170 | (master host etc) |
175 | 151 | This is credentials to connect to local MySQL instance\n\ | 171 | This is credentials to connect to local MySQL instance |
176 | 152 | -u | --user username MySQL user name\n\ | 172 | -u | --user username MySQL user name |
177 | 153 | -p | --password password MySQL password") | 173 | -p | --password password MySQL password""".format(check_period=check_period)) |
178 | 154 | return | 174 | return |
179 | 155 | 175 | ||
180 | 156 | 176 | ||
181 | 157 | def get_mysql_connection(user=None, passwd=None): | 177 | def get_mysql_connection(user=None, passwd=None): |
185 | 158 | global mysql_user | 178 | """ |
186 | 159 | global mysql_password | 179 | Returns connection to the local MySQL instance. |
187 | 160 | 180 | If user is passed as an argument it'll be used to connect, | |
188 | 181 | otherwise the second choice will be to use mysql_user. | ||
189 | 182 | If neither user names are set the function will try to use either of MySQL option files | ||
190 | 183 | (/etc/my.cnf, /etc/mysql/my.cnf, or /root/.my.cnf). If the option files don't exist | ||
191 | 184 | it'll try to connect as root w/o password. | ||
192 | 185 | """ | ||
193 | 161 | try: | 186 | try: |
194 | 162 | if user is None: | ||
195 | 163 | user = mysql_user | ||
196 | 164 | if passwd is None: | ||
197 | 165 | passwd = mysql_password | ||
198 | 166 | mysql_option_files = [] | ||
199 | 167 | unix_socket = get_unix_socket() | 187 | unix_socket = get_unix_socket() |
212 | 168 | conn = mysql.connector.connect() | 188 | if not user: |
213 | 169 | if user is not None: | 189 | if mysql_user: |
214 | 170 | for f in ['/etc/mysql/my.cnf', '/etc/my.cnf', '/root/.my.cnf']: | 190 | logger.debug('Using MySQL user specified in the command line') |
215 | 171 | if os.path.exists(f): | 191 | user = mysql_user |
216 | 172 | mysql_option_files.append(f) | 192 | passwd = mysql_password |
217 | 173 | if mysql_option_files: | 193 | else: |
218 | 174 | conn = mysql.connector.connect(option_files=mysql_option_files, unix_socket=unix_socket) | 194 | for options_file in ["/etc/my.cnf", "/etc/mysql/my.cnf", "/usr/etc/my.cnf", |
219 | 175 | logger.debug( | 195 | "/root/.my.cnf", "/root/.mylogin.cnf"]: |
220 | 176 | "Connected to MySQL as '%s'@'localhost' with options files %r" % (conn.user, mysql_option_files)) | 196 | if os.path.exists(options_file): |
221 | 177 | if not conn.is_connected(): | 197 | config = ConfigParser.ConfigParser() |
222 | 178 | conn = mysql.connector.connect(user=user, passwd=passwd, unix_socket=unix_socket) | 198 | config.read(options_file) |
223 | 179 | logger.debug("Connected to MySQL as %s@localhost " % conn.user) | 199 | for section in ["client", "twindb"]: |
224 | 200 | if config.has_section(section): | ||
225 | 201 | if config.has_option(section, "user"): | ||
226 | 202 | user = config.get(section, "user") | ||
227 | 203 | if config.has_option(section, "password"): | ||
228 | 204 | passwd = config.get(section, "user") | ||
229 | 205 | # If user isn't set by the function argument, global mysql_user | ||
230 | 206 | # or MySQL options file connect as unix user w/ empty password | ||
231 | 207 | if not user: | ||
232 | 208 | user = pwd.getpwuid(os.getuid()).pw_name | ||
233 | 209 | passwd = "" | ||
234 | 210 | logger.debug("Connecting to MySQL as unix user %s" % user) | ||
235 | 211 | conn = mysql.connector.connect(user=user, passwd=passwd, unix_socket=unix_socket) | ||
236 | 212 | logger.debug("Connected to MySQL as %s@localhost " % conn.user) | ||
237 | 180 | except mysql.connector.Error as err: | 213 | except mysql.connector.Error as err: |
238 | 181 | logger.error("Can not connect to local MySQL server") | 214 | logger.error("Can not connect to local MySQL server") |
239 | 182 | logger.error("MySQL said: %s" % err.msg) | 215 | logger.error("MySQL said: %s" % err.msg) |
240 | 183 | return None | 216 | return None |
241 | 184 | except: | ||
242 | 185 | logger.error("Can not connect to local MySQL server") | ||
243 | 186 | logger.error("Unexpected error: %s,%s" % sys.exc_info()[:2]); | ||
244 | 187 | return None | ||
245 | 188 | return conn | 217 | return conn |
246 | 189 | 218 | ||
247 | 190 | 219 | ||
248 | 191 | # Checks if TwinDB API public key is installed | ||
249 | 192 | # Returns | ||
250 | 193 | # True - if the key is installed | ||
251 | 194 | # False - if not | ||
252 | 195 | def is_gpg_key_installed(email, key_type="public"): | 220 | def is_gpg_key_installed(email, key_type="public"): |
257 | 196 | global logger | 221 | """ |
258 | 197 | global gpg_homedir | 222 | Checks if TwinDB API public key is installed |
259 | 198 | global debug_gpg | 223 | Returns |
260 | 199 | 224 | True - if the key is installed | |
261 | 225 | False - if not | ||
262 | 226 | """ | ||
263 | 200 | result = False | 227 | result = False |
265 | 201 | if len(email) == 0: | 228 | if not email: |
266 | 202 | logger.error("Can not check public key of an empty email") | 229 | logger.error("Can not check public key of an empty email") |
267 | 203 | return False | 230 | return False |
268 | 204 | if debug_gpg: | 231 | if debug_gpg: |
269 | 205 | logger.debug("Checking if public key of %s is installed" % email) | 232 | logger.debug("Checking if public key of %s is installed" % email) |
270 | 233 | gpg_cmd = ["gpg", "--homedir", gpg_homedir] | ||
271 | 206 | try: | 234 | try: |
272 | 207 | gpg_cmd = ["gpg", "--homedir", gpg_homedir] | ||
273 | 208 | if key_type == "public": | 235 | if key_type == "public": |
274 | 209 | gpg_cmd.append("-k") | 236 | gpg_cmd.append("-k") |
275 | 210 | else: | 237 | else: |
276 | @@ -212,63 +239,51 @@ | |||
277 | 212 | gpg_cmd.append(email) | 239 | gpg_cmd.append(email) |
278 | 213 | if debug_gpg: | 240 | if debug_gpg: |
279 | 214 | logger.debug(gpg_cmd) | 241 | logger.debug(gpg_cmd) |
281 | 215 | devnull = open('/dev/null', 'w') | 242 | devnull = open("/dev/null", "w") |
282 | 216 | ret = subprocess.call(gpg_cmd, stdout=devnull, stderr=devnull) | 243 | ret = subprocess.call(gpg_cmd, stdout=devnull, stderr=devnull) |
284 | 217 | file.close(devnull) | 244 | devnull.close() |
285 | 218 | if ret != 0: | 245 | if ret != 0: |
286 | 219 | if debug_gpg: | 246 | if debug_gpg: |
288 | 220 | logger.debug("GPG returned %d" % ret); | 247 | logger.debug("GPG returned %d" % ret) |
289 | 221 | logger.debug(key_type + " key " + email + " is NOT installed") | 248 | logger.debug(key_type + " key " + email + " is NOT installed") |
290 | 222 | result = False | 249 | result = False |
291 | 223 | else: | 250 | else: |
292 | 224 | if debug_gpg: | 251 | if debug_gpg: |
293 | 225 | logger.debug(key_type + " key is already installed") | 252 | logger.debug(key_type + " key is already installed") |
294 | 226 | result = True | 253 | result = True |
298 | 227 | except: | 254 | except OSError as err: |
299 | 228 | logger.error("Couldn't get " + key_type + " key of %s" % email) | 255 | logger.error("Couldn't run command %r. %s" % (gpg_cmd, err)) |
300 | 229 | exit_on_error(traceback.format_exc()) | 256 | exit_on_error("Couldn't get %s key of %s." % (key_type, email)) |
301 | 230 | return result | 257 | return result |
302 | 231 | 258 | ||
303 | 232 | 259 | ||
304 | 233 | # Installs TwinDB public key | ||
305 | 234 | # Returns | ||
306 | 235 | # True - if the key is successfully installed | ||
307 | 236 | # Exits if the key wasn't installed | ||
308 | 237 | |||
309 | 238 | def install_api_pub_key(): | 260 | def install_api_pub_key(): |
315 | 239 | global logger | 261 | """ |
316 | 240 | global api_pub_key | 262 | Installs TwinDB public key |
317 | 241 | global gpg_homedir | 263 | Returns |
318 | 242 | global debug_gpg | 264 | True - if the key is successfully installed |
319 | 243 | 265 | Exits if the key wasn't installed | |
320 | 266 | """ | ||
321 | 244 | logger.info("Installing twindb public key") | 267 | logger.info("Installing twindb public key") |
322 | 268 | gpg_cmd = ["gpg", "--homedir", gpg_homedir, "--import"] | ||
323 | 245 | try: | 269 | try: |
331 | 246 | p1 = subprocess.Popen(["echo", api_pub_key], stdout=subprocess.PIPE) | 270 | p = subprocess.Popen(gpg_cmd, stdin=subprocess.PIPE) |
332 | 247 | p2 = subprocess.Popen(["gpg", "--homedir", gpg_homedir, "--import"], | 271 | p.communicate(api_pub_key) |
333 | 248 | stdin=p1.stdout) | 272 | except OSError as err: |
334 | 249 | p1.stdout.close() | 273 | logger.error("Couldn't run command %r. %s" % (gpg_cmd, err)) |
335 | 250 | except: | 274 | exit_on_error("Couldn't install TwinDB public key") |
329 | 251 | logger.error("Couldn't install TwinDB public key") | ||
330 | 252 | exit_on_error(traceback.format_exc()) | ||
336 | 253 | logger.info("Twindb public key successfully installed") | 275 | logger.info("Twindb public key successfully installed") |
337 | 254 | return True | 276 | return True |
338 | 255 | 277 | ||
339 | 256 | 278 | ||
340 | 257 | # Checks if GPG environment is good to start TwinDB agent | ||
341 | 258 | # Installs TwinDB public key if necessary | ||
342 | 259 | # Returns | ||
343 | 260 | # True - if the GPG environment good to proceed | ||
344 | 261 | # Exits if GPG wasn't configured correctly | ||
345 | 262 | |||
346 | 263 | def check_gpg(): | 279 | def check_gpg(): |
355 | 264 | global logger | 280 | """ |
356 | 265 | global api_email | 281 | Checks if GPG environment is good to start TwinDB agent |
357 | 266 | global api_pub_key | 282 | Installs TwinDB public key if necessary |
358 | 267 | global gpg_homedir | 283 | Returns |
359 | 268 | global init_config | 284 | True - if the GPG environment good to proceed |
360 | 269 | global server_id | 285 | Exits if GPG wasn't configured correctly |
361 | 270 | global debug_gpg | 286 | """ |
354 | 271 | |||
362 | 272 | if debug_gpg: | 287 | if debug_gpg: |
363 | 273 | logger.debug("Checking if GPG config is initialized") | 288 | logger.debug("Checking if GPG config is initialized") |
364 | 274 | if os.path.exists(gpg_homedir): | 289 | if os.path.exists(gpg_homedir): |
365 | @@ -282,13 +297,13 @@ | |||
366 | 282 | try: | 297 | try: |
367 | 283 | os.mkdir(gpg_homedir, 0700) | 298 | os.mkdir(gpg_homedir, 0700) |
368 | 284 | install_api_pub_key() | 299 | install_api_pub_key() |
373 | 285 | except: | 300 | except OSError as err: |
374 | 286 | logger.error("Couldn't create directory " + gpg_homedir) | 301 | logger.error("Failed to create directory %s. %s" % (gpg_homedir, err)) |
375 | 287 | exit_on_error(traceback.format_exc()) | 302 | exit_on_error("Couldn't create directory " + gpg_homedir) |
376 | 288 | if len(server_id) == 0: | 303 | if not server_id: |
377 | 289 | exit_on_error("Server id is neither read from config nor generated") | 304 | exit_on_error("Server id is neither read from config nor generated") |
378 | 290 | email = "%s@twindb.com" % server_id | 305 | email = "%s@twindb.com" % server_id |
380 | 291 | if not ( is_gpg_key_installed(email) and is_gpg_key_installed(email, "private")): | 306 | if not (is_gpg_key_installed(email) and is_gpg_key_installed(email, "private")): |
381 | 292 | gen_entropy() | 307 | gen_entropy() |
382 | 293 | gen_gpg_keypair("%s@twindb.com" % server_id) | 308 | gen_gpg_keypair("%s@twindb.com" % server_id) |
383 | 294 | if debug_gpg: | 309 | if debug_gpg: |
384 | @@ -296,13 +311,13 @@ | |||
385 | 296 | return True | 311 | return True |
386 | 297 | 312 | ||
387 | 298 | 313 | ||
388 | 299 | # Checks the environment if it's OK to start TwinDB agent | ||
389 | 300 | # Returns | ||
390 | 301 | # True - if the environment is OK | ||
391 | 302 | # Exits if the environment is not OK | ||
392 | 303 | |||
393 | 304 | def check_env(): | 314 | def check_env(): |
395 | 305 | global logger | 315 | """ |
396 | 316 | Checks the environment if it's OK to start TwinDB agent | ||
397 | 317 | Returns | ||
398 | 318 | True - if the environment is OK | ||
399 | 319 | Exits if the environment is not OK | ||
400 | 320 | """ | ||
401 | 306 | logger.debug("Checking environment") | 321 | logger.debug("Checking environment") |
402 | 307 | if os.getuid() != 0: | 322 | if os.getuid() != 0: |
403 | 308 | exit_on_error("TwinDB agent must be run by root") | 323 | exit_on_error("TwinDB agent must be run by root") |
404 | @@ -311,41 +326,24 @@ | |||
405 | 311 | return True | 326 | return True |
406 | 312 | 327 | ||
407 | 313 | 328 | ||
408 | 314 | # Checks how much entropy is available in the system | ||
409 | 315 | # If not enough, does some disk activity to generate more | ||
410 | 316 | def gen_entropy(): | 329 | def gen_entropy(): |
411 | 330 | """ | ||
412 | 331 | Checks how much entropy is available in the system | ||
413 | 332 | If not enough, does some disk activity to generate more | ||
414 | 333 | """ | ||
415 | 317 | # Do nothing until I find good way to generate entropy | 334 | # Do nothing until I find good way to generate entropy |
416 | 318 | return | 335 | return |
445 | 319 | try: | 336 | |
418 | 320 | f = open("/proc/sys/kernel/random/entropy_avail", "r") | ||
419 | 321 | entropy_avail = int(f.read()) | ||
420 | 322 | f.close() | ||
421 | 323 | i = 0 | ||
422 | 324 | while entropy_avail < 2048: | ||
423 | 325 | logger.info("Low entropy level %d. Will run 'find / -name file' to generate more") | ||
424 | 326 | cmd = ["find", "/", "-name", "file"] | ||
425 | 327 | p = subprocess.Popen(cmd) | ||
426 | 328 | p.wait() | ||
427 | 329 | f = open("/proc/sys/kernel/random/entropy_avail", "r") | ||
428 | 330 | entropy_avail = int(f.read()) | ||
429 | 331 | f.close() | ||
430 | 332 | # Do not run find more than 10 times | ||
431 | 333 | if i > 10: | ||
432 | 334 | break | ||
433 | 335 | i = i + 1 | ||
434 | 336 | except: | ||
435 | 337 | logger.error("Failed to generate entropy") | ||
436 | 338 | return | ||
437 | 339 | |||
438 | 340 | |||
439 | 341 | # Formats bytes count to human readable form | ||
440 | 342 | # Inputs: | ||
441 | 343 | # num - bytes count | ||
442 | 344 | # decimals - number of digits to save after point (Default 2) | ||
443 | 345 | # Returns | ||
444 | 346 | # human-readable string like "20.33 MB" | ||
446 | 347 | 337 | ||
447 | 348 | def h_size(num, decimals=2): | 338 | def h_size(num, decimals=2): |
448 | 339 | """ | ||
449 | 340 | Formats bytes count to human readable form | ||
450 | 341 | Inputs: | ||
451 | 342 | num - bytes count | ||
452 | 343 | decimals - number of digits to save after point (Default 2) | ||
453 | 344 | Returns | ||
454 | 345 | human-readable string like "20.33 MB" | ||
455 | 346 | """ | ||
456 | 349 | fmt = "%3." + str(decimals) + "f %s" | 347 | fmt = "%3." + str(decimals) + "f %s" |
457 | 350 | for x in ['bytes', 'kB', 'MB', 'GB', 'TB', 'PB']: | 348 | for x in ['bytes', 'kB', 'MB', 'GB', 'TB', 'PB']: |
458 | 351 | if num < 1024.0: | 349 | if num < 1024.0: |
459 | @@ -353,30 +351,19 @@ | |||
460 | 353 | num /= 1024.0 | 351 | num /= 1024.0 |
461 | 354 | 352 | ||
462 | 355 | 353 | ||
463 | 356 | # Generates random string of bytes good for crypthography | ||
464 | 357 | # Inputs | ||
465 | 358 | # n - Number of bytes | ||
466 | 359 | # Returns | ||
467 | 360 | # binary string of n bytes size | ||
468 | 361 | |||
469 | 362 | def gen_key(n): | ||
470 | 363 | return os.urandom(n) | ||
471 | 364 | |||
472 | 365 | |||
473 | 366 | # Generates GPG private and public keys for a given recipient | ||
474 | 367 | # Inputs | ||
475 | 368 | # email - recipient | ||
476 | 369 | # Returns | ||
477 | 370 | # True on success or exits | ||
478 | 371 | |||
479 | 372 | def gen_gpg_keypair(email): | 354 | def gen_gpg_keypair(email): |
485 | 373 | global server_id | 355 | """ |
486 | 374 | global gpg_homedir | 356 | Generates GPG private and public keys for a given recipient |
487 | 375 | global debug_gpg | 357 | Inputs |
488 | 376 | 358 | email - recipient | |
489 | 377 | if len(email) == 0: | 359 | Returns |
490 | 360 | True on success or exits | ||
491 | 361 | """ | ||
492 | 362 | if not email: | ||
493 | 378 | exit_on_error("Can not generate GPG keypair for an empty email") | 363 | exit_on_error("Can not generate GPG keypair for an empty email") |
494 | 364 | gpg_cmd = ["gpg", "--homedir", gpg_homedir, "--batch", "--gen-key"] | ||
495 | 379 | try: | 365 | try: |
496 | 366 | logger.info("The agent needs to generate cryptographically strong keys.") | ||
497 | 380 | logger.info("Generating GPG keys pair for %s" % email) | 367 | logger.info("Generating GPG keys pair for %s" % email) |
498 | 381 | logger.info("It may take really, really long time. Please be patient.") | 368 | logger.info("It may take really, really long time. Please be patient.") |
499 | 382 | gen_entropy() | 369 | gen_entropy() |
500 | @@ -393,36 +380,25 @@ | |||
501 | 393 | %%commit | 380 | %%commit |
502 | 394 | %%echo done | 381 | %%echo done |
503 | 395 | """ % (server_id, email) | 382 | """ % (server_id, email) |
513 | 396 | p1 = subprocess.Popen(["echo", gpg_script], stdout=subprocess.PIPE) | 383 | |
514 | 397 | p2 = subprocess.Popen(["gpg", "--homedir", gpg_homedir, "--batch", | 384 | p = subprocess.Popen(gpg_cmd, stdin=subprocess.PIPE) |
515 | 398 | "--gen-key"], stdin=p1.stdout) | 385 | p.communicate(gpg_script) |
516 | 399 | p1.stdout.close() | 386 | except OSError as err: |
517 | 400 | p2.wait() | 387 | logger.error("Failed to run command %r. %s" % (gpg_cmd, err)) |
518 | 401 | del p1, p2 | 388 | exit_on_error("Failed to generate GPG keys pair") |
510 | 402 | except: | ||
511 | 403 | logger.error("Failed to generate GPG keys pair") | ||
512 | 404 | exit_on_error(traceback.format_exc()) | ||
519 | 405 | return True | 389 | return True |
520 | 406 | 390 | ||
521 | 407 | 391 | ||
522 | 408 | # Encrypts message with TwinDB public key | ||
523 | 409 | # If server_id is non-zero (which means the server is registered) | ||
524 | 410 | # signs the message with the server's private key | ||
525 | 411 | # Inputs | ||
526 | 412 | # msg - string to encrypt | ||
527 | 413 | # Returns | ||
528 | 414 | # 64-base encoded and encrypted message. To read - decrypt and 64-base decode | ||
529 | 415 | # EDIT: encrypted message | ||
530 | 416 | # None - if error happens | ||
531 | 417 | |||
532 | 418 | def encrypt(msg): | 392 | def encrypt(msg): |
540 | 419 | global logger | 393 | """ |
541 | 420 | global api_email | 394 | Encrypts message with TwinDB public key |
542 | 421 | global gpg_homedir | 395 | If server_id is non-zero (which means the server is registered) |
543 | 422 | global server_id | 396 | signs the message with the server's private key |
544 | 423 | global debug_gpg | 397 | :param msg: string to encrypt |
545 | 424 | 398 | :return: 64-base encoded and encrypted message or None if error happens. | |
546 | 425 | if len(server_id) == 0: | 399 | To read the encrypted message - decrypt and 64-base decode |
547 | 400 | """ | ||
548 | 401 | if not server_id: | ||
549 | 426 | exit_on_error("Trying to encrypt message while server_id is empty") | 402 | exit_on_error("Trying to encrypt message while server_id is empty") |
550 | 427 | server_email = "%s@twindb.com" % server_id | 403 | server_email = "%s@twindb.com" % server_id |
551 | 428 | if not is_gpg_key_installed(api_email): | 404 | if not is_gpg_key_installed(api_email): |
552 | @@ -433,12 +409,10 @@ | |||
553 | 433 | logger.debug("Public key of %s is not installed." % server_email) | 409 | logger.debug("Public key of %s is not installed." % server_email) |
554 | 434 | logger.debug("Will not encrypt message") | 410 | logger.debug("Will not encrypt message") |
555 | 435 | return None | 411 | return None |
562 | 436 | enc_cmd = ["gpg", "--homedir", gpg_homedir, "-r", api_email, "--batch", | 412 | enc_cmd = ["gpg", "--homedir", gpg_homedir, "-r", api_email, "--batch", "--trust-model", "always", "--armor", |
563 | 437 | "--trust-model", "always", "--armor"] | 413 | "--sign", "--local-user", server_email, "--encrypt"] |
564 | 438 | enc_cmd.append("--sign") | 414 | cout = "No output" |
565 | 439 | enc_cmd.append("--local-user") | 415 | cerr = "No output" |
560 | 440 | enc_cmd.append(server_email) | ||
561 | 441 | enc_cmd.append("--encrypt") | ||
566 | 442 | try: | 416 | try: |
567 | 443 | if debug_gpg: | 417 | if debug_gpg: |
568 | 444 | logger.debug("Encrypting message:") | 418 | logger.debug("Encrypting message:") |
569 | @@ -450,11 +424,14 @@ | |||
570 | 450 | stdout=subprocess.PIPE, | 424 | stdout=subprocess.PIPE, |
571 | 451 | stderr=subprocess.PIPE) | 425 | stderr=subprocess.PIPE) |
572 | 452 | cout, cerr = p.communicate(msg) | 426 | cout, cerr = p.communicate(msg) |
573 | 427 | if p.returncode != 0: | ||
574 | 428 | raise OSError(p.returncode) | ||
575 | 453 | ct = cout | 429 | ct = cout |
576 | 454 | ct_64 = b64encode(ct) | 430 | ct_64 = b64encode(ct) |
577 | 455 | if debug_gpg: | 431 | if debug_gpg: |
578 | 456 | logger.debug("Encrypted message: " + ct_64) | 432 | logger.debug("Encrypted message: " + ct_64) |
580 | 457 | except: | 433 | except OSError as err: |
581 | 434 | logger.error("Failed to run command %r. %s" % (enc_cmd, err)) | ||
582 | 458 | logger.error("Failed to encrypt message: " + msg) | 435 | logger.error("Failed to encrypt message: " + msg) |
583 | 459 | logger.error("STDOUT: " + cout) | 436 | logger.error("STDOUT: " + cout) |
584 | 460 | logger.error("STDERR: " + cerr) | 437 | logger.error("STDERR: " + cerr) |
585 | @@ -462,52 +439,34 @@ | |||
586 | 462 | return ct_64 | 439 | return ct_64 |
587 | 463 | 440 | ||
588 | 464 | 441 | ||
589 | 465 | # Decrypts message with local private key | ||
590 | 466 | # Inputs | ||
591 | 467 | # msg - 64-base encoded and encrypted message. | ||
592 | 468 | # Before encryption the message was 64-base encoded | ||
593 | 469 | # Returns | ||
594 | 470 | # String - Plain text message | ||
595 | 471 | # None - if error happens | ||
596 | 472 | |||
597 | 473 | def decrypt(msg_64): | 442 | def decrypt(msg_64): |
604 | 474 | global logger | 443 | """ |
605 | 475 | global api_email | 444 | Decrypts message with local private key |
606 | 476 | global gpg_homedir | 445 | :param msg_64: 64-base encoded and encrypted message. Before encryption the message was 64-base encoded |
607 | 477 | global server_id | 446 | :return: Plain text message or None if error happens |
608 | 478 | global debug_gpg | 447 | """ |
603 | 479 | |||
609 | 480 | if not msg_64: | 448 | if not msg_64: |
610 | 481 | logger.error("Will not decrypt empty message") | 449 | logger.error("Will not decrypt empty message") |
611 | 482 | return None | 450 | return None |
616 | 483 | 451 | cout = "No output" | |
617 | 484 | cout = None | 452 | cerr = "No output" |
618 | 485 | cerr = None | 453 | gpg_cmd = ["gpg", "--homedir", gpg_homedir, "-d", "-q"] |
615 | 486 | |||
619 | 487 | try: | 454 | try: |
620 | 488 | dec_cmd = ["gpg", "--homedir", gpg_homedir, "-d", "-q"] | ||
621 | 489 | debug_gpg = True | ||
622 | 490 | if debug_gpg: | 455 | if debug_gpg: |
623 | 491 | logger.debug("Decrypting message:") | 456 | logger.debug("Decrypting message:") |
624 | 492 | logger.debug(msg_64) | 457 | logger.debug(msg_64) |
625 | 493 | logger.debug("Decryptor command:") | 458 | logger.debug("Decryptor command:") |
631 | 494 | logger.debug(dec_cmd) | 459 | logger.debug(gpg_cmd) |
632 | 495 | p = subprocess.Popen(dec_cmd, | 460 | p = subprocess.Popen(gpg_cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) |
628 | 496 | stdin=subprocess.PIPE, | ||
629 | 497 | stdout=subprocess.PIPE, | ||
630 | 498 | stderr=subprocess.PIPE) | ||
633 | 499 | msg = b64decode(msg_64) | 461 | msg = b64decode(msg_64) |
634 | 500 | cout, cerr = p.communicate(msg) | 462 | cout, cerr = p.communicate(msg) |
635 | 501 | |||
636 | 502 | if p.returncode != 0: | 463 | if p.returncode != 0: |
637 | 503 | raise OSError(p.returncode) | 464 | raise OSError(p.returncode) |
639 | 504 | except: | 465 | except OSError as err: |
640 | 466 | logger.error("Failed to run command %r. %s" % (gpg_cmd, err)) | ||
641 | 505 | logger.error("Failed to decrypt message: " + msg_64) | 467 | logger.error("Failed to decrypt message: " + msg_64) |
647 | 506 | logger.error("Unexpected error: %s, %s" % sys.exc_info()[:2]); | 468 | logger.error("STDOUT: " + cout) |
648 | 507 | if cout: | 469 | logger.error("STDERR: " + cerr) |
644 | 508 | logger.error("STDOUT: " + cout) | ||
645 | 509 | if cerr: | ||
646 | 510 | logger.error("STDERR: " + cerr) | ||
649 | 511 | return None | 470 | return None |
650 | 512 | if debug_gpg: | 471 | if debug_gpg: |
651 | 513 | logger.debug("Decrypted message:") | 472 | logger.debug("Decrypted message:") |
652 | @@ -515,26 +474,20 @@ | |||
653 | 515 | return cout | 474 | return cout |
654 | 516 | 475 | ||
655 | 517 | 476 | ||
656 | 518 | # Sends HTTP POST request to TwinDB dispatcher | ||
657 | 519 | # It converts python data structure in "data" variable into JSON string, | ||
658 | 520 | # then encrypts it and then sends as variable "data" in HTTP request | ||
659 | 521 | # Inputs | ||
660 | 522 | # uri - URI to send the request | ||
661 | 523 | # data - Data structure with variables | ||
662 | 524 | # Returns | ||
663 | 525 | # String with body of HTTP response | ||
664 | 526 | # None - if error happened or empty response | ||
665 | 527 | |||
666 | 528 | def get_response(request): | 477 | def get_response(request): |
672 | 529 | global logger | 478 | """ |
673 | 530 | global host | 479 | Sends HTTP POST request to TwinDB dispatcher |
674 | 531 | global proto | 480 | It converts python data structure in "data" variable into JSON string, |
675 | 532 | global api_dir | 481 | then encrypts it and then sends as variable "data" in HTTP request |
676 | 533 | 482 | Inputs | |
677 | 483 | uri - URI to send the request | ||
678 | 484 | data - Data structure with variables | ||
679 | 485 | Returns | ||
680 | 486 | String with body of HTTP response | ||
681 | 487 | None - if error happened or empty response | ||
682 | 488 | """ | ||
683 | 534 | uri = "api.php" | 489 | uri = "api.php" |
684 | 535 | response_body = None | 490 | response_body = None |
685 | 536 | conn = None | ||
686 | 537 | |||
687 | 538 | logger.debug("Enter get_response(uri=" + uri + ")") | 491 | logger.debug("Enter get_response(uri=" + uri + ")") |
688 | 539 | if proto == "http": | 492 | if proto == "http": |
689 | 540 | conn = httplib.HTTPConnection(host) | 493 | conn = httplib.HTTPConnection(host) |
690 | @@ -542,8 +495,10 @@ | |||
691 | 542 | conn = httplib.HTTPSConnection(host) | 495 | conn = httplib.HTTPSConnection(host) |
692 | 543 | else: | 496 | else: |
693 | 544 | logger.error("Unsupported protocol " + proto) | 497 | logger.error("Unsupported protocol " + proto) |
694 | 498 | return None | ||
695 | 499 | url = proto + "://" + host + "/" + api_dir + "/" + uri | ||
696 | 500 | http_response = "Empty response" | ||
697 | 545 | try: | 501 | try: |
698 | 546 | url = proto + "://" + host + "/" + api_dir + "/" + uri | ||
699 | 547 | conn.connect() | 502 | conn.connect() |
700 | 548 | logger.debug("Sending to " + host + ": %s" % json.dumps(request, indent=4, sort_keys=True)) | 503 | logger.debug("Sending to " + host + ": %s" % json.dumps(request, indent=4, sort_keys=True)) |
701 | 549 | data_json = json.dumps(request) | 504 | data_json = json.dumps(request) |
702 | @@ -588,46 +543,38 @@ | |||
703 | 588 | logger.error("Please check that DNS server is reachable and works") | 543 | logger.error("Please check that DNS server is reachable and works") |
704 | 589 | return None | 544 | return None |
705 | 590 | except exceptions.KeyError as err: | 545 | except exceptions.KeyError as err: |
707 | 591 | logger.error("Failed to decode response from server %s" % http_response) | 546 | logger.error("Failed to decode response from server: %s" % http_response) |
708 | 592 | logger.error("Could not find key %s" % err) | 547 | logger.error("Could not find key %s" % err) |
709 | 593 | return None | 548 | return None |
710 | 594 | except: | ||
711 | 595 | logger.error("Couldn't make HTTP request " + url) | ||
712 | 596 | logger.error("Unexpected error: %s, %s" % sys.exc_info()[:2]); | ||
713 | 597 | logger.debug(traceback.format_exc()) | ||
714 | 598 | return None | ||
715 | 599 | finally: | 549 | finally: |
716 | 600 | conn.close() | 550 | conn.close() |
717 | 601 | return response_body | 551 | return response_body |
718 | 602 | 552 | ||
719 | 603 | 553 | ||
728 | 604 | # Replaces password from config with ******** | 554 | def sanitize_config(config): |
729 | 605 | # Inputs | 555 | """ |
730 | 606 | # config - python dictionary with backup config | 556 | Replaces password from config with ******** |
731 | 607 | # Returns | 557 | :param config: python dictionary with backup config |
732 | 608 | # Sanitized config | 558 | :return: Sanitized config |
733 | 609 | 559 | """ | |
734 | 610 | def sanitize_config(c): | 560 | if not config: |
727 | 611 | if not c: | ||
735 | 612 | return None | 561 | return None |
748 | 613 | cc = dict(c) | 562 | sanitized_config = dict(config) |
749 | 614 | try: | 563 | if "mysql_password" in config: |
750 | 615 | cc["mysql_password"] = "********" | 564 | sanitized_config["mysql_password"] = "********" |
751 | 616 | except: | 565 | else: |
752 | 617 | logger.debug("Given config %r doesn't contain password" % c) | 566 | logger.debug("Given config %r doesn't contain password" % config) |
753 | 618 | return cc | 567 | return sanitized_config |
754 | 619 | 568 | ||
743 | 620 | |||
744 | 621 | # Gets backup config from TwinDB dispatcher | ||
745 | 622 | # Returns | ||
746 | 623 | # Backup config | ||
747 | 624 | # None - if error happened | ||
755 | 625 | 569 | ||
756 | 626 | def get_config(): | 570 | def get_config(): |
760 | 627 | global logger | 571 | """ |
761 | 628 | global server_id | 572 | Gets backup config from TwinDB dispatcher |
762 | 629 | 573 | :return: Backup config or None if error happened | |
763 | 574 | """ | ||
764 | 630 | logger.debug("Getting config for server_id = %s" % server_id) | 575 | logger.debug("Getting config for server_id = %s" % server_id) |
765 | 576 | response_body = "Empty response" | ||
766 | 577 | config = None | ||
767 | 631 | try: | 578 | try: |
768 | 632 | data = { | 579 | data = { |
769 | 633 | "type": "get_config", | 580 | "type": "get_config", |
770 | @@ -648,114 +595,87 @@ | |||
771 | 648 | if msg_pt["error"]: | 595 | if msg_pt["error"]: |
772 | 649 | logger.error(msg_pt["error"]) | 596 | logger.error(msg_pt["error"]) |
773 | 650 | except exceptions.KeyError as err: | 597 | except exceptions.KeyError as err: |
781 | 651 | logger.error("Failed to decode %s" % response_body); | 598 | logger.error("Failed to decode %s" % response_body) |
782 | 652 | logger.error(err); | 599 | logger.error(err) |
776 | 653 | return None | ||
777 | 654 | except: | ||
778 | 655 | logger.error("Couldn't get backup config") | ||
779 | 656 | logger.error("Unexpected error: %s,%s" % sys.exc_info()[:2]); | ||
780 | 657 | logger.debug(traceback.format_exc()) | ||
783 | 658 | return None | 600 | return None |
784 | 659 | return config | 601 | return config |
785 | 660 | 602 | ||
786 | 661 | 603 | ||
792 | 662 | # Reports slave status to TwinDB dispatcher | 604 | def report_sss(config): |
793 | 663 | def report_sss(): | 605 | """ |
794 | 664 | global logger | 606 | Reports slave status to TwinDB dispatcher |
795 | 665 | global server_id | 607 | :param config: server config received from the dispatcher |
796 | 666 | 608 | :return: nothing | |
797 | 609 | """ | ||
798 | 667 | logger.debug("Reporting SHOW SLAVE STATUS for server_id = %s" % server_id) | 610 | logger.debug("Reporting SHOW SLAVE STATUS for server_id = %s" % server_id) |
812 | 668 | try: | 611 | ss = get_slave_status(config["mysql_user"], config["mysql_password"]) |
813 | 669 | ss = get_slave_status() | 612 | data = { |
814 | 670 | data = { | 613 | "type": "report_sss", |
815 | 671 | "type": "report_sss", | 614 | "params": { |
816 | 672 | "params": { | 615 | "server_id": server_id, |
817 | 673 | "server_id": server_id, | 616 | "mysql_server_id": ss["mysql_server_id"], |
818 | 674 | "mysql_server_id": ss["mysql_server_id"], | 617 | "mysql_master_server_id": ss["mysql_master_server_id"], |
819 | 675 | "mysql_master_server_id": ss["mysql_master_server_id"], | 618 | "mysql_master_host": ss["mysql_master_host"], |
820 | 676 | "mysql_master_host": ss["mysql_master_host"], | 619 | "mysql_seconds_behind_master": ss["mysql_seconds_behind_master"], |
821 | 677 | "mysql_seconds_behind_master": ss["mysql_seconds_behind_master"], | 620 | "mysql_slave_io_running": ss["mysql_slave_io_running"], |
822 | 678 | "mysql_slave_io_running": ss["mysql_slave_io_running"], | 621 | "mysql_slave_sql_running": ss["mysql_slave_sql_running"], |
810 | 679 | "mysql_slave_sql_running": ss["mysql_slave_sql_running"], | ||
811 | 680 | } | ||
823 | 681 | } | 622 | } |
830 | 682 | response_body = get_response(data) | 623 | } |
831 | 683 | except: | 624 | get_response(data) |
826 | 684 | logger.error("Couldn't get report SHOW SLAVE STATUS") | ||
827 | 685 | logger.error("Unexpected error: %s,%s" % sys.exc_info()[:2]); | ||
828 | 686 | logger.debug(traceback.format_exc()) | ||
829 | 687 | return None | ||
832 | 688 | return | 625 | return |
833 | 689 | 626 | ||
834 | 690 | 627 | ||
835 | 691 | # Reports what privileges are given to the agent | ||
836 | 692 | def report_agent_privileges(config): | 628 | def report_agent_privileges(config): |
840 | 693 | global logger | 629 | """ |
841 | 694 | global server_id | 630 | Reports what privileges are given to the agent |
842 | 695 | 631 | :param config: server config received from the dispatcher | |
843 | 632 | :return: nothing | ||
844 | 633 | """ | ||
845 | 696 | logger.debug("Reporting agent privileges for server_id = %s" % server_id) | 634 | logger.debug("Reporting agent privileges for server_id = %s" % server_id) |
887 | 697 | try: | 635 | con = get_mysql_connection(user=config["mysql_user"], passwd=config["mysql_password"]) |
888 | 698 | mysql_user = config["mysql_user"] | 636 | cursor = con.cursor() |
889 | 699 | mysql_password = config["mysql_password"] | 637 | query = "SELECT PRIVILEGE_TYPE FROM information_schema.USER_PRIVILEGES" |
890 | 700 | except: | 638 | logger.debug("Sending query : %s" % query) |
891 | 701 | logger.error("Failed to read config") | 639 | cursor.execute(query) |
892 | 702 | logger.error("Unexpected error: %s,%s" % sys.exc_info()[:2]) | 640 | |
893 | 703 | return | 641 | privileges = { |
894 | 704 | try: | 642 | "Reload_priv": "N", |
895 | 705 | con = get_mysql_connection(user=mysql_user, passwd=mysql_password) | 643 | "Lock_tables_priv": "N", |
896 | 706 | cursor = con.cursor() | 644 | "Repl_client_priv": "N", |
897 | 707 | query = "SELECT PRIVILEGE_TYPE FROM information_schema.USER_PRIVILEGES" | 645 | "Super_priv": "N", |
898 | 708 | logger.debug("Sending query : %s" % query) | 646 | "Create_tablespace_priv": "N" |
899 | 709 | cursor.execute(query) | 647 | } |
900 | 710 | 648 | for (priv,) in cursor: | |
901 | 711 | privileges = dict() | 649 | if priv == "RELOAD": |
902 | 712 | privileges["Reload_priv"] = "N" | 650 | privileges["Reload_priv"] = "Y" |
903 | 713 | privileges["Lock_tables_priv"] = "N" | 651 | elif priv == "LOCK TABLES": |
904 | 714 | privileges["Repl_client_priv"] = "N" | 652 | privileges["Lock_tables_priv"] = "Y" |
905 | 715 | privileges["Super_priv"] = "N" | 653 | elif priv == "REPLICATION CLIENT": |
906 | 716 | privileges["Create_tablespace_priv"] = "N" | 654 | privileges["Repl_client_priv"] = "Y" |
907 | 717 | 655 | elif priv == "SUPER": | |
908 | 718 | for (priv,) in cursor: | 656 | privileges["Super_priv"] = "Y" |
909 | 719 | if priv == "RELOAD": | 657 | elif priv == "CREATE TABLESPACE": |
910 | 720 | privileges["Reload_priv"] = "Y" | 658 | privileges["Create_tablespace_priv"] = "Y" |
911 | 721 | elif priv == "LOCK TABLES": | 659 | data = { |
912 | 722 | privileges["Lock_tables_priv"] = "Y" | 660 | "type": "report_agent_privileges", |
913 | 723 | elif priv == "REPLICATION CLIENT": | 661 | "params": { |
914 | 724 | privileges["Repl_client_priv"] = "Y" | 662 | "Reload_priv": privileges["Reload_priv"], |
915 | 725 | elif priv == "SUPER": | 663 | "Lock_tables_priv": privileges["Lock_tables_priv"], |
916 | 726 | privileges["Super_priv"] = "Y" | 664 | "Repl_client_priv": privileges["Repl_client_priv"], |
917 | 727 | elif priv == "CREATE TABLESPACE": | 665 | "Super_priv": privileges["Super_priv"], |
918 | 728 | privileges["Create_tablespace_priv"] = "Y" | 666 | "Create_tablespace_priv": privileges["Create_tablespace_priv"] |
878 | 729 | data = { | ||
879 | 730 | "type": "report_agent_privileges", | ||
880 | 731 | "params": { | ||
881 | 732 | "Reload_priv": privileges["Reload_priv"], | ||
882 | 733 | "Lock_tables_priv": privileges["Lock_tables_priv"], | ||
883 | 734 | "Repl_client_priv": privileges["Repl_client_priv"], | ||
884 | 735 | "Super_priv": privileges["Super_priv"], | ||
885 | 736 | "Create_tablespace_priv": privileges["Create_tablespace_priv"] | ||
886 | 737 | } | ||
919 | 738 | } | 667 | } |
926 | 739 | get_response(data) | 668 | } |
927 | 740 | except: | 669 | get_response(data) |
922 | 741 | logger.error("Couldn't get privileges granted to %s@localhost on server %s" % (mysql_user, server_id)) | ||
923 | 742 | logger.error("Unexpected error: %s,%s" % sys.exc_info()[:2]); | ||
924 | 743 | logger.debug(traceback.format_exc()) | ||
925 | 744 | return None | ||
928 | 745 | return | 670 | return |
929 | 746 | 671 | ||
930 | 747 | 672 | ||
931 | 748 | # Gets job order from TwinDB dispatcher | ||
932 | 749 | # Returns | ||
933 | 750 | # Job order in python dictionary | ||
934 | 751 | # None - if error happened | ||
935 | 752 | |||
936 | 753 | def get_job(): | 673 | def get_job(): |
940 | 754 | global logger | 674 | """ |
941 | 755 | global server_id | 675 | Gets job order from TwinDB dispatcher |
942 | 756 | global job_id | 676 | :return: Job order in python dictionary or None if error happened |
943 | 677 | """ | ||
944 | 757 | job = None | 678 | job = None |
945 | 758 | |||
946 | 759 | logger.debug("Getting job for server_id = %s" % server_id) | 679 | logger.debug("Getting job for server_id = %s" % server_id) |
947 | 760 | try: | 680 | try: |
948 | 761 | d = json.JSONDecoder() | 681 | d = json.JSONDecoder() |
949 | @@ -764,36 +684,41 @@ | |||
950 | 764 | "params": {} | 684 | "params": {} |
951 | 765 | } | 685 | } |
952 | 766 | response_body = get_response(data) | 686 | response_body = get_response(data) |
953 | 687 | if not response_body: | ||
954 | 688 | raise JobError("Empty response from dispatcher") | ||
955 | 767 | response_body_decoded = d.decode(response_body) | 689 | response_body_decoded = d.decode(response_body) |
956 | 690 | if "response" not in response_body_decoded: | ||
957 | 691 | raise JobError("There is no 'response' key in the response from dispatcher") | ||
958 | 692 | if "success" not in response_body_decoded: | ||
959 | 693 | raise JobError("There is no 'success' key in the response from dispatcher") | ||
960 | 768 | msg_enc = response_body_decoded["response"] | 694 | msg_enc = response_body_decoded["response"] |
962 | 769 | if response_body_decoded["success"] == True: | 695 | if response_body_decoded["success"]: |
963 | 770 | job_json = decrypt(msg_enc) | 696 | job_json = decrypt(msg_enc) |
964 | 697 | logger.debug("job_json = %s" % job_json) | ||
965 | 698 | if "data" not in job_json: | ||
966 | 699 | raise JobError("There is no 'data' in decrypted response %s" % job_json) | ||
967 | 771 | job = d.decode(job_json)["data"] | 700 | job = d.decode(job_json)["data"] |
971 | 772 | job["params"] = d.decode(job["params"]) | 701 | if job and "params" in job and job["params"]: |
972 | 773 | job_id = job["job_id"] | 702 | job["params"] = d.decode(job["params"]) |
973 | 774 | logger.info("Got job:\n%s" % json.dumps(job, indent=4, sort_keys=True)) | 703 | logger.info("Got job:\n%s" % json.dumps(job, indent=4, sort_keys=True)) |
974 | 775 | else: | 704 | else: |
975 | 776 | logger.error("Couldn't get job") | 705 | logger.error("Couldn't get job") |
976 | 777 | job_json = decrypt(msg_enc) | 706 | job_json = decrypt(msg_enc) |
977 | 778 | logger.error("Server said: %s" % d.decode(job_json)["error"]) | 707 | logger.error("Server said: %s" % d.decode(job_json)["error"]) |
978 | 779 | logger.debug("Server said: %s" % d.decode(job_json)["debug"]) | ||
979 | 780 | except exceptions.TypeError as err: | 708 | except exceptions.TypeError as err: |
980 | 709 | logger.error("Failed to get a job: %s" % err) | ||
981 | 781 | return None | 710 | return None |
985 | 782 | except: | 711 | except JobError as err: |
986 | 783 | logger.error("Couldn't get job") | 712 | logger.error(err) |
984 | 784 | logger.error("Unexpected error: %s,%s" % sys.exc_info()[:2]); | ||
987 | 785 | return None | 713 | return None |
988 | 786 | return job | 714 | return job |
989 | 787 | 715 | ||
990 | 788 | 716 | ||
991 | 789 | # Checks whether the server is registered or not | ||
992 | 790 | # Returns | ||
993 | 791 | # True - registered | ||
994 | 792 | # False - not so much | ||
995 | 793 | def is_registered(): | 717 | def is_registered(): |
999 | 794 | global logger | 718 | """ |
1000 | 795 | global server_id | 719 | Checks whether the server is registered or not |
1001 | 796 | 720 | :return: True if registered, False if not so much | |
1002 | 721 | """ | ||
1003 | 797 | logger.debug("Getting registration status for server_id = %s" % server_id) | 722 | logger.debug("Getting registration status for server_id = %s" % server_id) |
1004 | 798 | 723 | ||
1005 | 799 | twindb_email = "%s@twindb.com" % server_id | 724 | twindb_email = "%s@twindb.com" % server_id |
1006 | @@ -801,13 +726,13 @@ | |||
1007 | 801 | 726 | ||
1008 | 802 | enc_public_key = None | 727 | enc_public_key = None |
1009 | 803 | # Reading the GPG key | 728 | # Reading the GPG key |
1010 | 729 | gpg_cmd = ["gpg", "--homedir", gpg_homedir, "--armor", "--export", twindb_email] | ||
1011 | 804 | try: | 730 | try: |
1018 | 805 | p1 = subprocess.Popen(["gpg", "--homedir", gpg_homedir, "--armor", "--export", twindb_email], | 731 | p = subprocess.Popen(gpg_cmd, stdout=subprocess.PIPE) |
1019 | 806 | stdout = subprocess.PIPE) | 732 | enc_public_key = p.communicate()[0] |
1020 | 807 | enc_public_key = p1.stdout.read() | 733 | except OSError as err: |
1021 | 808 | except: | 734 | logger.error("Failed to run command %r. %s" % (gpg_cmd, err)) |
1022 | 809 | logger.error("Failed to export GPG keys of %s from %s." % (twindb_email, gpg_homedir)) | 735 | exit_on_error("Failed to export GPG keys of %s from %s." % (twindb_email, gpg_homedir)) |
1017 | 810 | exit_on_error(traceback.format_exc()) | ||
1023 | 811 | 736 | ||
1024 | 812 | # Call the TwinDB api to check for server registration | 737 | # Call the TwinDB api to check for server registration |
1025 | 813 | response_body = None | 738 | response_body = None |
1026 | @@ -819,21 +744,17 @@ | |||
1027 | 819 | "enc_public_key": enc_public_key | 744 | "enc_public_key": enc_public_key |
1028 | 820 | } | 745 | } |
1029 | 821 | } | 746 | } |
1030 | 822 | |||
1031 | 823 | response_body = get_response(data) | 747 | response_body = get_response(data) |
1032 | 824 | if not response_body: | 748 | if not response_body: |
1035 | 825 | return None | 749 | return False |
1034 | 826 | |||
1036 | 827 | json_decoder = json.JSONDecoder() | 750 | json_decoder = json.JSONDecoder() |
1037 | 828 | response_body_decoded = json_decoder.decode(response_body) | 751 | response_body_decoded = json_decoder.decode(response_body) |
1038 | 829 | |||
1039 | 830 | if response_body_decoded: | 752 | if response_body_decoded: |
1040 | 831 | if "response" in response_body_decoded: | 753 | if "response" in response_body_decoded: |
1041 | 832 | msg_decrypted = decrypt(response_body_decoded["response"]) | 754 | msg_decrypted = decrypt(response_body_decoded["response"]) |
1042 | 833 | if msg_decrypted is None: | 755 | if msg_decrypted is None: |
1043 | 834 | logger.debug("No valid response from dispatcher. Consider agent unregistered") | 756 | logger.debug("No valid response from dispatcher. Consider agent unregistered") |
1044 | 835 | return False | 757 | return False |
1045 | 836 | |||
1046 | 837 | msg_pt = json_decoder.decode(msg_decrypted) | 758 | msg_pt = json_decoder.decode(msg_decrypted) |
1047 | 838 | registration_status = msg_pt["data"] | 759 | registration_status = msg_pt["data"] |
1048 | 839 | logger.debug("Got registration status:\n%s" % json.dumps(registration_status, indent=4, sort_keys=True)) | 760 | logger.debug("Got registration status:\n%s" % json.dumps(registration_status, indent=4, sort_keys=True)) |
1049 | @@ -844,216 +765,140 @@ | |||
1050 | 844 | else: | 765 | else: |
1051 | 845 | logger.debug("No valid response from dispatcher. Consider agent unregistered") | 766 | logger.debug("No valid response from dispatcher. Consider agent unregistered") |
1052 | 846 | return False | 767 | return False |
1060 | 847 | except exceptions.KeyError: | 768 | except exceptions.KeyError as err: |
1061 | 848 | logger.error("Failed to decode %s" % response_body) | 769 | exit_on_error("Failed to decode response from dispatcher: %s. %s" % (response_body, err)) |
1055 | 849 | exit_on_error(traceback.format_exc()) | ||
1056 | 850 | except: | ||
1057 | 851 | logger.error("Couldn't get backup config") | ||
1058 | 852 | logger.error("Unexpected error: %s,%s" % sys.exc_info()[:2]); | ||
1059 | 853 | exit_on_error(traceback.format_exc()) | ||
1062 | 854 | return False | 770 | return False |
1063 | 855 | 771 | ||
1064 | 856 | 772 | ||
1132 | 857 | # Gets name of initial full backup that corresponds to given incremental backup | 773 | def log_job_notify(params): |
1133 | 858 | # Inputs | 774 | """ |
1134 | 859 | # f - Valid name of incremental backup | 775 | Notifies a job event to TwinDB dispatcher |
1135 | 860 | # Returns | 776 | :param params: { event: "start_job", job_id: job_id } or |
1136 | 861 | # String with name of the initial full backup. | 777 | { event: "stop_job", job_id: job_id, ret_code: ret } |
1137 | 862 | # If full backup was given it will return the same name | 778 | :return: True of False if error happened |
1138 | 863 | # None - if error happened | 779 | """ |
1072 | 864 | |||
1073 | 865 | def get_full_of(f): | ||
1074 | 866 | global logger | ||
1075 | 867 | global server_id | ||
1076 | 868 | |||
1077 | 869 | try: | ||
1078 | 870 | logger.info("Getting full backup name of %s" % f) | ||
1079 | 871 | d = json.JSONDecoder() | ||
1080 | 872 | response = d.decode(get_response("get_full_of.php", {"backup_name": f})) | ||
1081 | 873 | msg_enc = response["data"] | ||
1082 | 874 | logger.debug("Reply from server %s" % msg_enc) | ||
1083 | 875 | full = d.decode(decrypt(msg_enc)) | ||
1084 | 876 | logger.info("Got full backup name %s" % full["name"]) | ||
1085 | 877 | except: | ||
1086 | 878 | logger.error("Couldn't get name of initial full backup of %s" % f) | ||
1087 | 879 | logger.error(traceback.format_exc()) | ||
1088 | 880 | return None | ||
1089 | 881 | return full["name"] | ||
1090 | 882 | |||
1091 | 883 | |||
1092 | 884 | # Gets name of child backup copy of the given backup name | ||
1093 | 885 | # Inputs | ||
1094 | 886 | # f - Valid name of incremental or full backup | ||
1095 | 887 | # Returns | ||
1096 | 888 | # String with name of child backup copy of the given backup copy | ||
1097 | 889 | # None - if error happened | ||
1098 | 890 | |||
1099 | 891 | def get_child_of(f): | ||
1100 | 892 | global logger | ||
1101 | 893 | global server_id | ||
1102 | 894 | |||
1103 | 895 | try: | ||
1104 | 896 | logger.info("Getting child backup name of %s" % f) | ||
1105 | 897 | d = json.JSONDecoder() | ||
1106 | 898 | response = d.decode(get_response("get_child_of.php", | ||
1107 | 899 | {"backup_name": f})) | ||
1108 | 900 | msg_enc = response["data"] | ||
1109 | 901 | logger.debug("Reply from server %s" % msg_enc) | ||
1110 | 902 | child = d.decode(decrypt(msg_enc)) | ||
1111 | 903 | logger.info("Got child backup name %s" % child["name"]) | ||
1112 | 904 | except: | ||
1113 | 905 | logger.error("Couldn't get name of child backup of %s" % f) | ||
1114 | 906 | logger.error(traceback.format_exc()) | ||
1115 | 907 | return None | ||
1116 | 908 | return child["name"] | ||
1117 | 909 | |||
1118 | 910 | |||
1119 | 911 | # Notifies a job event to TwinDB dispatcher | ||
1120 | 912 | # Inputs | ||
1121 | 913 | # job_id - id of a job in jobs table | ||
1122 | 914 | # params - { event: "start_job", job_id: job_id } or | ||
1123 | 915 | # { event: "stop_job", job_id: job_id, ret_code: ret } | ||
1124 | 916 | # Returns | ||
1125 | 917 | # True | ||
1126 | 918 | # False - if error happened | ||
1127 | 919 | |||
1128 | 920 | def log_job_notify(job_id, params): | ||
1129 | 921 | global logging | ||
1130 | 922 | result = False | ||
1131 | 923 | |||
1139 | 924 | logger.info("Sending event notification %s" % params["event"]) | 780 | logger.info("Sending event notification %s" % params["event"]) |
1157 | 925 | try: | 781 | data = { |
1158 | 926 | data = { | 782 | "type": "notify", |
1159 | 927 | "type": "notify", | 783 | "params": params |
1160 | 928 | "params": params | 784 | } |
1161 | 929 | } | 785 | response_body = get_response(data) |
1162 | 930 | response_body = get_response(data) | 786 | d = json.JSONDecoder() |
1163 | 931 | d = json.JSONDecoder() | 787 | response_body_decoded = d.decode(response_body) |
1164 | 932 | response_body_decoded = d.decode(response_body) | 788 | if response_body_decoded["success"]: |
1165 | 933 | msg_enc = response_body_decoded["response"] | 789 | logger.debug("Dispatcher acknowledged job_id = %d notification" % job_id) |
1166 | 934 | if response_body_decoded["success"] == True: | 790 | result = True |
1167 | 935 | logger.debug("Dispatcher acknowledged job_id = %d start" % job_id) | 791 | else: |
1168 | 936 | result = True | 792 | logger.error("Dispatcher didn't acknowledge job_id = %d notification" % job_id) |
1169 | 937 | else: | 793 | result = False |
1153 | 938 | logger.error("Dispatcher didn't acknowledge job_id = %d start" % job_id) | ||
1154 | 939 | result = False | ||
1155 | 940 | except: | ||
1156 | 941 | logger.error("Unexpected error: %s,%s" % sys.exc_info()[:2]); | ||
1170 | 942 | return result | 794 | return result |
1171 | 943 | 795 | ||
1172 | 944 | 796 | ||
1203 | 945 | # Saves details about backup copy in TwinDB dispatcher | 797 | def record_backup(name, volume_id, size, lsn=None, ancestor=0): |
1204 | 946 | # Inputs | 798 | """ |
1205 | 947 | # job_id - id of a job in jobs table | 799 | Saves details about backup copy in TwinDB dispatcher |
1206 | 948 | # name - name of backup | 800 | :param name: name of backup |
1207 | 949 | # size - size of the backup in bytes | 801 | :param volume_id: id of a volume where the backup copy is saved |
1208 | 950 | # lsn - last LSN if it was incremental backup | 802 | :param size: size of the backup in bytes |
1209 | 951 | # ancestor - Ansector of the backup (not used now) | 803 | :param lsn: last LSN if it was incremental backup |
1210 | 952 | # Returns | 804 | :param ancestor: Ansector of the backup (not used now) |
1211 | 953 | # JSON string with status of the request i.e. { "success": True } | 805 | :return: JSON string with status of the request i.e. { "success": True } or None if error happened |
1212 | 954 | # None - if error happened | 806 | """ |
1213 | 955 | 807 | logger.info("Saving information about backup:") | |
1214 | 956 | def record_backup(job_id, name, volume_id, size, lsn=None, ancestor=0): | 808 | logger.info("File name : %s" % name) |
1215 | 957 | global logger | 809 | logger.info("Volume id : %d" % int(volume_id)) |
1216 | 958 | 810 | logger.info("Size : %d (%s)" % (int(size), h_size(size))) | |
1217 | 959 | try: | 811 | logger.info("Ancestor : %d" % int(ancestor)) |
1218 | 960 | logger.info("Saving information about backup:") | 812 | data = { |
1219 | 961 | logger.info("File name : %s" % name) | 813 | "type": "update_backup_data", |
1220 | 962 | logger.info("Volume id : %d" % int(volume_id)) | 814 | "params": { |
1221 | 963 | logger.info("Size : %d (%s)" % (int(size), h_size(size) )) | 815 | "job_id": job_id, |
1222 | 964 | logger.info("Ancestor : %d" % int(ancestor)) | 816 | "name": name, |
1223 | 965 | data = { | 817 | "volume_id": volume_id, |
1224 | 966 | "type": "update_backup_data", | 818 | "size": size, |
1225 | 967 | "params": { | 819 | "lsn": lsn, |
1226 | 968 | "job_id": job_id, | 820 | "ancestor": ancestor |
1197 | 969 | "name": name, | ||
1198 | 970 | "volume_id": volume_id, | ||
1199 | 971 | "size": size, | ||
1200 | 972 | "lsn": lsn, | ||
1201 | 973 | "ancestor": ancestor | ||
1202 | 974 | } | ||
1227 | 975 | } | 821 | } |
1242 | 976 | logger.debug("Saving a record %s" % data) | 822 | } |
1243 | 977 | response = get_response(data) | 823 | logger.debug("Saving a record %s" % data) |
1244 | 978 | if response <> None: | 824 | response = get_response(data) |
1245 | 979 | jd = json.JSONDecoder() | 825 | if response: |
1246 | 980 | r = jd.decode(response) | 826 | jd = json.JSONDecoder() |
1247 | 981 | logger.debug(r) | 827 | r = jd.decode(response) |
1248 | 982 | if r["success"]: | 828 | logger.debug(r) |
1249 | 983 | logger.info("Saved backup copy details") | 829 | if r["success"]: |
1250 | 984 | return True | 830 | logger.info("Saved backup copy details") |
1251 | 985 | else: | 831 | return True |
1238 | 986 | logger.error("Failed to save backup copy details: " | ||
1239 | 987 | + jd.decode(decrypt(r["response"]))["error"]) | ||
1240 | 988 | return False | ||
1241 | 989 | del jd | ||
1252 | 990 | else: | 832 | else: |
1254 | 991 | logger.error("Empty response from server") | 833 | logger.error("Failed to save backup copy details: " |
1255 | 834 | + jd.decode(decrypt(r["response"]))["error"]) | ||
1256 | 992 | return False | 835 | return False |
1260 | 993 | except: | 836 | else: |
1261 | 994 | logger.error("Failed to save backup copy details") | 837 | logger.error("Empty response from server") |
1259 | 995 | logger.error("Unexpected error: %s,%s" % sys.exc_info()[:2]); | ||
1262 | 996 | return False | 838 | return False |
1270 | 997 | return True | 839 | |
1264 | 998 | |||
1265 | 999 | |||
1266 | 1000 | # Reads config from /etc/twindb.cfg and sets server_id variable | ||
1267 | 1001 | # Returns | ||
1268 | 1002 | # True - if config was successfully read | ||
1269 | 1003 | # Exits if error happened | ||
1271 | 1004 | 840 | ||
1272 | 1005 | def read_config(): | 841 | def read_config(): |
1274 | 1006 | global init_config | 842 | """ |
1275 | 843 | Reads config from /etc/twindb.cfg and sets server_id variable | ||
1276 | 844 | :return: True if config was successfully read | ||
1277 | 845 | Exits if error happened | ||
1278 | 846 | """ | ||
1279 | 1007 | global server_id | 847 | global server_id |
1280 | 1008 | |||
1281 | 1009 | try: | 848 | try: |
1282 | 1010 | if os.path.exists(init_config): | 849 | if os.path.exists(init_config): |
1284 | 1011 | execfile(init_config) | 850 | ns = {} |
1285 | 851 | execfile(init_config, ns) | ||
1286 | 852 | if "server_id" in ns: | ||
1287 | 853 | server_id = ns["server_id"] | ||
1288 | 1012 | else: | 854 | else: |
1291 | 1013 | exit_on_error("Config %s doesn't exist" % init_config) | 855 | print("Config %s doesn't exist" % init_config, file=sys.stderr) |
1292 | 1014 | if len(server_id) == 0: | 856 | return False |
1293 | 857 | if not server_id: | ||
1294 | 1015 | exit_on_error("Config %s doesn't set server_id" % init_config) | 858 | exit_on_error("Config %s doesn't set server_id" % init_config) |
1298 | 1016 | except: | 859 | except IOError as err: |
1299 | 1017 | logger.error(traceback.format_exc()) | 860 | exit_on_error("Failed to read config file %s. %s" % (init_config, err)) |
1297 | 1018 | exit_on_error("Failed to read from config in %s" % init_config) | ||
1300 | 1019 | return True | 861 | return True |
1301 | 1020 | 862 | ||
1302 | 1021 | 863 | ||
1311 | 1022 | # Saves server_id variable in /etc/twindb.cfg | 864 | def save_config(): |
1312 | 1023 | # Returns | 865 | """ |
1313 | 1024 | # True - if config was successfully saved | 866 | Saves server_id variable in file init_config (/etc/twindb.cfg) |
1314 | 1025 | # Exits if error happened | 867 | :return: True - if config was successfully saved |
1315 | 1026 | 868 | Exits if error happened | |
1316 | 1027 | def save_config(server_id): | 869 | """ |
1317 | 1028 | global init_config | 870 | if not server_id: |
1318 | 1029 | 871 | exit_on_error("Can not save agent config file because server_id is empty") | |
1319 | 1030 | try: | 872 | try: |
1321 | 1031 | f = open(init_config, 'w') | 873 | f = open(init_config, "w") |
1322 | 1032 | f.write("server_id='%s'\n" % server_id) | 874 | f.write("server_id='%s'\n" % server_id) |
1323 | 1033 | f.close() | 875 | f.close() |
1333 | 1034 | except: | 876 | except IOError as err: |
1334 | 1035 | print("Failed to save new config in %s" % init_config) | 877 | exit_on_error("Failed to save new config in %s. %s" % (init_config, err)) |
1335 | 1036 | sys.exit(2) | 878 | return True |
1336 | 1037 | return | 879 | |
1337 | 1038 | 880 | ||
1338 | 1039 | 881 | def get_slave_status(user=None, passwd=None): | |
1339 | 1040 | def get_slave_status(): | 882 | """ |
1340 | 1041 | result = dict() | 883 | Reads SHOW SLAVE STATUS from the local server |
1341 | 1042 | # Read master host | 884 | :param user: user to connect to MySQL |
1342 | 885 | :param passwd: password to connect to MySQL | ||
1343 | 886 | :return: dictionary with SHOW SLAVE STATUS result | ||
1344 | 887 | """ | ||
1345 | 888 | conn = get_mysql_connection(user, passwd) | ||
1346 | 889 | if conn: | ||
1347 | 890 | cursor = conn.cursor(dictionary=True) | ||
1348 | 891 | else: | ||
1349 | 892 | return None | ||
1350 | 893 | result = { | ||
1351 | 894 | "mysql_server_id": None, | ||
1352 | 895 | "mysql_master_server_id": None, | ||
1353 | 896 | "mysql_master_host": None, | ||
1354 | 897 | "mysql_seconds_behind_master": None, | ||
1355 | 898 | "mysql_slave_io_running": None, | ||
1356 | 899 | "mysql_slave_sql_running": None | ||
1357 | 900 | } | ||
1358 | 1043 | try: | 901 | try: |
1359 | 1044 | conn = get_mysql_connection() | ||
1360 | 1045 | if conn: | ||
1361 | 1046 | cursor = conn.cursor(dictionary=True) | ||
1362 | 1047 | else: | ||
1363 | 1048 | return None | ||
1364 | 1049 | |||
1365 | 1050 | result["mysql_server_id"] = None | ||
1366 | 1051 | result["mysql_master_server_id"] = None | ||
1367 | 1052 | result["mysql_master_host"] = None | ||
1368 | 1053 | result["mysql_seconds_behind_master"] = None | ||
1369 | 1054 | result["mysql_slave_io_running"] = None | ||
1370 | 1055 | result["mysql_slave_sql_running"] = None | ||
1371 | 1056 | |||
1372 | 1057 | cursor.execute("SHOW SLAVE STATUS") | 902 | cursor.execute("SHOW SLAVE STATUS") |
1373 | 1058 | for row in cursor: | 903 | for row in cursor: |
1374 | 1059 | result["mysql_master_server_id"] = row["Master_Server_Id"] | 904 | result["mysql_master_server_id"] = row["Master_Server_Id"] |
1375 | @@ -1061,35 +906,44 @@ | |||
1376 | 1061 | result["mysql_seconds_behind_master"] = row["Seconds_Behind_Master"] | 906 | result["mysql_seconds_behind_master"] = row["Seconds_Behind_Master"] |
1377 | 1062 | result["mysql_slave_io_running"] = row["Slave_IO_Running"] | 907 | result["mysql_slave_io_running"] = row["Slave_IO_Running"] |
1378 | 1063 | result["mysql_slave_sql_running"] = row["Slave_SQL_Running"] | 908 | result["mysql_slave_sql_running"] = row["Slave_SQL_Running"] |
1380 | 1064 | 909 | except mysql.connector.Error as err: | |
1381 | 910 | logger.error("Could get SHOW SLAVE STATUS") | ||
1382 | 911 | logger.error("MySQL Error: " % err) | ||
1383 | 912 | return None | ||
1384 | 913 | try: | ||
1385 | 1065 | cursor.execute("SELECT @@server_id AS server_id") | 914 | cursor.execute("SELECT @@server_id AS server_id") |
1386 | 1066 | for row in cursor: | 915 | for row in cursor: |
1387 | 1067 | result["mysql_server_id"] = row["server_id"] | 916 | result["mysql_server_id"] = row["server_id"] |
1388 | 1068 | |||
1389 | 1069 | cursor.close() | 917 | cursor.close() |
1390 | 1070 | conn.close() | 918 | conn.close() |
1394 | 1071 | except: | 919 | except mysql.connector.Error as err: |
1395 | 1072 | logger.error("Could not read Master_host from MySQL") | 920 | logger.error("Could not read server_id") |
1396 | 1073 | logger.error("Unexpected error: %s,%s" % sys.exc_info()[:2]); | 921 | logger.error("MySQL Error: " % err) |
1397 | 1074 | return None | 922 | return None |
1398 | 1075 | return result | 923 | return result |
1399 | 1076 | 924 | ||
1400 | 1077 | 925 | ||
1401 | 1078 | def has_mysql_access(username=None, password=None, grant_capability=True): | 926 | def has_mysql_access(username=None, password=None, grant_capability=True): |
1402 | 927 | """ | ||
1403 | 928 | Reports if a user has all required MySQL privileges | ||
1404 | 929 | :param username: MySQL user | ||
1405 | 930 | :param password: MySQL password | ||
1406 | 931 | :param grant_capability: TODO to | ||
1407 | 932 | :return: a pair of a boolean that tells whether MySQL user has all required privileges | ||
1408 | 933 | and a list of missing privileges | ||
1409 | 934 | """ | ||
1410 | 1079 | has_required_grants = False | 935 | has_required_grants = False |
1411 | 1080 | 936 | ||
1412 | 1081 | # list of missing privileges | 937 | # list of missing privileges |
1413 | 1082 | missing_privileges = [] | 938 | missing_privileges = [] |
1414 | 1083 | 939 | ||
1415 | 1084 | try: | 940 | try: |
1420 | 1085 | if username is None or password is None: | 941 | conn = get_mysql_connection(username, password) |
1417 | 1086 | conn = get_mysql_connection() | ||
1418 | 1087 | else: | ||
1419 | 1088 | conn = get_mysql_connection(username, password) | ||
1421 | 1089 | 942 | ||
1422 | 1090 | if isinstance(conn, mysql.connector.MySQLConnection): | 943 | if isinstance(conn, mysql.connector.MySQLConnection): |
1423 | 1091 | cursor = conn.cursor(dictionary=True) | 944 | cursor = conn.cursor(dictionary=True) |
1424 | 1092 | else: | 945 | else: |
1425 | 946 | missing_privileges = ['RELOAD', 'SUPER', 'LOCK TABLES', 'REPLICATION CLIENT', 'CREATE TABLESPACE'] | ||
1426 | 1093 | return has_required_grants, missing_privileges | 947 | return has_required_grants, missing_privileges |
1427 | 1094 | 948 | ||
1428 | 1095 | # Fetch the current user and matching host part as it could either be | 949 | # Fetch the current user and matching host part as it could either be |
1429 | @@ -1149,95 +1003,84 @@ | |||
1430 | 1149 | 1003 | ||
1431 | 1150 | cursor.close() | 1004 | cursor.close() |
1432 | 1151 | conn.close() | 1005 | conn.close() |
1434 | 1152 | except: | 1006 | except mysql.connector.Error as err: |
1435 | 1153 | logger.error("Could not read the grants information from MySQL") | 1007 | logger.error("Could not read the grants information from MySQL") |
1438 | 1154 | logger.error("Unexpected error: %s,%s" % sys.exc_info()[:2]) | 1008 | logger.error("MySQL Error: %s" % err) |
1437 | 1155 | |||
1439 | 1156 | return has_required_grants, missing_privileges | 1009 | return has_required_grants, missing_privileges |
1440 | 1157 | 1010 | ||
1441 | 1158 | 1011 | ||
1442 | 1159 | # Registers this server in TwinDB dispatcher | ||
1443 | 1160 | # Inputs | ||
1444 | 1161 | # code - string with secret registration code | ||
1445 | 1162 | # Returns | ||
1446 | 1163 | # True - if server was successfully registered | ||
1447 | 1164 | # Exits if error happened | ||
1448 | 1165 | def action_handler_register(code): | 1012 | def action_handler_register(code): |
1458 | 1166 | global logger | 1013 | """ |
1459 | 1167 | global init_config | 1014 | Registers this server in TwinDB dispatcher |
1460 | 1168 | global ssh_private_key | 1015 | Inputs |
1461 | 1169 | global ssh_public_key | 1016 | code - string with secret registration code |
1462 | 1170 | global gpg_homedir | 1017 | Returns |
1463 | 1171 | global server_id | 1018 | True - if server was successfully registered |
1464 | 1172 | global mysql_user | 1019 | Exits if error happened |
1465 | 1173 | global mysql_password | 1020 | """ |
1457 | 1174 | |||
1466 | 1175 | # Check early to see that the MySQL user passed to the agent has enough | 1021 | # Check early to see that the MySQL user passed to the agent has enough |
1467 | 1176 | # privileges to create a separate MySQL user needed by TwinDB | 1022 | # privileges to create a separate MySQL user needed by TwinDB |
1468 | 1177 | mysql_access_available, missing_mysql_privileges = has_mysql_access() | 1023 | mysql_access_available, missing_mysql_privileges = has_mysql_access() |
1469 | 1178 | if not mysql_access_available: | 1024 | if not mysql_access_available: |
1470 | 1179 | logger.error("The MySQL user %s does not have enough privileges" % mysql_user) | 1025 | logger.error("The MySQL user %s does not have enough privileges" % mysql_user) |
1472 | 1180 | if len(missing_mysql_privileges) > 0: | 1026 | if missing_mysql_privileges: |
1473 | 1181 | logger.error("Following privileges are missing: %s" % ','.join(missing_mysql_privileges)) | 1027 | logger.error("Following privileges are missing: %s" % ','.join(missing_mysql_privileges)) |
1474 | 1182 | |||
1475 | 1183 | return False | 1028 | return False |
1476 | 1184 | 1029 | ||
1477 | 1185 | logger.info("Registering TwinDB agent with code %s" % code) | 1030 | logger.info("Registering TwinDB agent with code %s" % code) |
1478 | 1186 | logger.info("The agent needs to generate cryptographically strong keys.") | ||
1479 | 1187 | logger.info("It may take really, really long time. Please be patient.") | ||
1480 | 1188 | name = os.uname()[1].strip() # un[1] is a hostname | 1031 | name = os.uname()[1].strip() # un[1] is a hostname |
1481 | 1189 | 1032 | ||
1482 | 1190 | twindb_email = "%s@twindb.com" % server_id | 1033 | twindb_email = "%s@twindb.com" % server_id |
1483 | 1191 | 1034 | ||
1485 | 1192 | # Generate GPG key | 1035 | # Read GPG public key |
1486 | 1193 | enc_public_key = None | 1036 | enc_public_key = None |
1487 | 1037 | cmd = ["gpg", "--homedir", gpg_homedir, "--armor", "--export", twindb_email] | ||
1488 | 1194 | try: | 1038 | try: |
1489 | 1195 | logger.debug("Reading GPG public key of %s." % twindb_email) | 1039 | logger.debug("Reading GPG public key of %s." % twindb_email) |
1498 | 1196 | p1 = subprocess.Popen(["gpg", "--homedir", gpg_homedir, | 1040 | p1 = subprocess.Popen(cmd, stdout=subprocess.PIPE) |
1499 | 1197 | "--armor", "--export", twindb_email], | 1041 | enc_public_key = p1.communicate()[0] |
1500 | 1198 | stdout=subprocess.PIPE) | 1042 | except OSError as err: |
1501 | 1199 | enc_public_key = p1.stdout.read() | 1043 | logger.error("Failed to run command %r. %s" % (cmd, err)) |
1502 | 1200 | except: | 1044 | exit_on_error("Failed to export GPG keys of %s from %s." % (twindb_email, gpg_homedir)) |
1495 | 1201 | logger.error("Failed to export GPG keys of %s from %s." | ||
1496 | 1202 | % (twindb_email, gpg_homedir)) | ||
1497 | 1203 | exit_on_error(traceback.format_exc()) | ||
1503 | 1204 | 1045 | ||
1507 | 1205 | # Generate SSH key | 1046 | if not os.path.isfile(ssh_private_key_file): |
1508 | 1206 | try: | 1047 | try: |
1506 | 1207 | if not os.path.isfile(ssh_private_key): | ||
1509 | 1208 | logger.info("Generating SSH keys pair.") | 1048 | logger.info("Generating SSH keys pair.") |
1515 | 1209 | subprocess.call(["ssh-keygen", "-N", "", "-f", ssh_private_key]) | 1049 | subprocess.call(["ssh-keygen", "-N", "", "-f", ssh_private_key_file]) |
1516 | 1210 | except: | 1050 | except OSError as err: |
1517 | 1211 | logger.error("Failed to generate SSH keys.") | 1051 | logger.error("Failed to run command %r. %s" % (cmd, err)) |
1518 | 1212 | exit_on_error(traceback.format_exc()) | 1052 | exit_on_error("Failed to generate SSH keys.") |
1519 | 1213 | 1053 | ssh_public_key = None | |
1520 | 1214 | try: | 1054 | try: |
1523 | 1215 | logger.info("Reading SSH public key from %s." % ssh_public_key) | 1055 | logger.info("Reading SSH public key from %s." % ssh_public_key_file) |
1524 | 1216 | f = open(ssh_public_key, 'r') | 1056 | f = open(ssh_public_key_file, 'r') |
1525 | 1217 | ssh_public_key = f.read() | 1057 | ssh_public_key = f.read() |
1526 | 1218 | f.close() | 1058 | f.close() |
1530 | 1219 | except: | 1059 | except IOError as err: |
1531 | 1220 | logger.error("Failed to read SSH keys.") | 1060 | logger.error("Failed to read from file %s. %s" % (ssh_public_key_file, err)) |
1532 | 1221 | exit_on_error(traceback.format_exc()) | 1061 | exit_on_error("Failed to read SSH keys.") |
1533 | 1222 | 1062 | ||
1534 | 1223 | # Read local ip addresses | 1063 | # Read local ip addresses |
1535 | 1224 | cmd = "ip addr" | 1064 | cmd = "ip addr" |
1536 | 1225 | cmd += "| grep -w inet" | 1065 | cmd += "| grep -w inet" |
1537 | 1226 | cmd += "| awk '{ print $2}'" | 1066 | cmd += "| awk '{ print $2}'" |
1538 | 1227 | cmd += "| awk -F/ '{ print $1}'" | 1067 | cmd += "| awk -F/ '{ print $1}'" |
1546 | 1228 | ss = get_slave_status() | 1068 | cout = None |
1547 | 1229 | 1069 | logger.debug("Getting list of local IP addresses") | |
1548 | 1230 | if ss is None: | 1070 | try: |
1549 | 1231 | logger.error("Could not get slave status on this server") | 1071 | logger.debug("Running: %s" % cmd) |
1550 | 1232 | exit_on_error() | 1072 | p = subprocess.Popen(cmd, stdout=subprocess.PIPE, shell=True) |
1551 | 1233 | p = subprocess.Popen(cmd, stdout=subprocess.PIPE, shell=True) | 1073 | cout, cerr = p.communicate() |
1552 | 1234 | 1074 | logger.debug("STDOUT: %s" % cout) | |
1553 | 1075 | logger.debug("STDERR: %s" % cerr) | ||
1554 | 1076 | except OSError as err: | ||
1555 | 1077 | logger.error("Failed to run command %r. %s" % (cmd, err)) | ||
1556 | 1235 | local_ip = list() | 1078 | local_ip = list() |
1560 | 1236 | for row in p.stdout: | 1079 | for row in cout.split("\n"): |
1561 | 1237 | row = row.rstrip('\n') | 1080 | row = row.rstrip("\n") |
1562 | 1238 | if row != "127.0.0.1": | 1081 | if row and row != "127.0.0.1": |
1563 | 1239 | local_ip.append(row) | 1082 | local_ip.append(row) |
1565 | 1240 | 1083 | ss = get_slave_status() | |
1566 | 1241 | data = { | 1084 | data = { |
1567 | 1242 | "type": "register", | 1085 | "type": "register", |
1568 | 1243 | "params": { | 1086 | "params": { |
1569 | @@ -1255,9 +1098,8 @@ | |||
1570 | 1255 | "local_ip": local_ip | 1098 | "local_ip": local_ip |
1571 | 1256 | } | 1099 | } |
1572 | 1257 | } | 1100 | } |
1573 | 1258 | |||
1574 | 1259 | api_response = get_response(data) | 1101 | api_response = get_response(data) |
1576 | 1260 | if api_response is not None: | 1102 | if api_response: |
1577 | 1261 | json_decoder = json.JSONDecoder() | 1103 | json_decoder = json.JSONDecoder() |
1578 | 1262 | response_decoded = json_decoder.decode(api_response) | 1104 | response_decoded = json_decoder.decode(api_response) |
1579 | 1263 | logger.debug(response_decoded) | 1105 | logger.debug(response_decoded) |
1580 | @@ -1274,18 +1116,16 @@ | |||
1581 | 1274 | elif "errors" in response_decoded: | 1116 | elif "errors" in response_decoded: |
1582 | 1275 | error_msg = response_decoded["errors"]["msg"] | 1117 | error_msg = response_decoded["errors"]["msg"] |
1583 | 1276 | 1118 | ||
1587 | 1277 | logger.error("Failed to register the agent: %s" % error_msg) | 1119 | exit_on_error("Failed to register the agent: %s" % error_msg) |
1585 | 1278 | sys.exit(2) | ||
1586 | 1279 | del json_decoder | ||
1588 | 1280 | else: | 1120 | else: |
1589 | 1281 | exit_on_error("Empty response from server") | 1121 | exit_on_error("Empty response from server") |
1590 | 1282 | return True | 1122 | return True |
1591 | 1283 | 1123 | ||
1592 | 1284 | 1124 | ||
1593 | 1285 | def create_agent_user(): | 1125 | def create_agent_user(): |
1597 | 1286 | global mysql_user | 1126 | """ |
1598 | 1287 | global mysql_password | 1127 | Creates local MySQL user for twindb agent |
1599 | 1288 | 1128 | """ | |
1600 | 1289 | config = get_config() | 1129 | config = get_config() |
1601 | 1290 | try: | 1130 | try: |
1602 | 1291 | conn = get_mysql_connection() | 1131 | conn = get_mysql_connection() |
1603 | @@ -1294,9 +1134,8 @@ | |||
1604 | 1294 | cursor = conn.cursor() | 1134 | cursor = conn.cursor() |
1605 | 1295 | cursor.execute(q, (config["mysql_user"], config["mysql_password"])) | 1135 | cursor.execute(q, (config["mysql_user"], config["mysql_password"])) |
1606 | 1296 | except mysql.connector.Error as err: | 1136 | except mysql.connector.Error as err: |
1610 | 1297 | logger.error("Failed to create MySQL user %s@localhost for TwinDB agent" % config["mysql_user"]) | 1137 | logger.error("MySQL replied: %s" % err) |
1611 | 1298 | logger.error("MySQL replied: %s" % err.msg) | 1138 | exit_on_error("Failed to create MySQL user %s@localhost for TwinDB agent" % config["mysql_user"]) |
1609 | 1299 | exit_on_error(traceback.format_exc()) | ||
1612 | 1300 | logger.info("Created MySQL user %s@localhost for TwinDB agent" % config["mysql_user"]) | 1139 | logger.info("Created MySQL user %s@localhost for TwinDB agent" % config["mysql_user"]) |
1613 | 1301 | logger.info("Congratulations! The server is successfully registered in TwinDB.") | 1140 | logger.info("Congratulations! The server is successfully registered in TwinDB.") |
1614 | 1302 | logger.info("TwinDB will backup the server accordingly to the default config.") | 1141 | logger.info("TwinDB will backup the server accordingly to the default config.") |
1615 | @@ -1304,15 +1143,13 @@ | |||
1616 | 1304 | return True | 1143 | return True |
1617 | 1305 | 1144 | ||
1618 | 1306 | 1145 | ||
1619 | 1307 | # Unregisters this server in TwinDB dispatcher | ||
1620 | 1308 | # Returns | ||
1621 | 1309 | # True - if server was successfully unregistered | ||
1622 | 1310 | # False - if error happened | ||
1623 | 1311 | |||
1624 | 1312 | def action_handler_unregister(delete_backups): | 1146 | def action_handler_unregister(delete_backups): |
1628 | 1313 | global logger | 1147 | """ |
1629 | 1314 | global init_config | 1148 | Unregisters this server in TwinDB dispatcher |
1630 | 1315 | 1149 | Returns | |
1631 | 1150 | True - if server was successfully unregistered | ||
1632 | 1151 | False - if error happened | ||
1633 | 1152 | """ | ||
1634 | 1316 | data = { | 1153 | data = { |
1635 | 1317 | "type": "unregister", | 1154 | "type": "unregister", |
1636 | 1318 | "params": { | 1155 | "params": { |
1637 | @@ -1322,7 +1159,7 @@ | |||
1638 | 1322 | logger.debug("Unregistration request:") | 1159 | logger.debug("Unregistration request:") |
1639 | 1323 | logger.debug(data) | 1160 | logger.debug(data) |
1640 | 1324 | response = get_response(data) | 1161 | response = get_response(data) |
1642 | 1325 | if response <> None: | 1162 | if response: |
1643 | 1326 | jd = json.JSONDecoder() | 1163 | jd = json.JSONDecoder() |
1644 | 1327 | r = jd.decode(response) | 1164 | r = jd.decode(response) |
1645 | 1328 | logger.debug(r) | 1165 | logger.debug(r) |
1646 | @@ -1330,25 +1167,21 @@ | |||
1647 | 1330 | logger.info("The server is successfully unregistered") | 1167 | logger.info("The server is successfully unregistered") |
1648 | 1331 | return True | 1168 | return True |
1649 | 1332 | else: | 1169 | else: |
1654 | 1333 | logger.error("Failed to unregister the agent: " | 1170 | exit_on_error("Failed to unregister the agent: " + jd.decode(decrypt(r["response"]))["error"]) |
1651 | 1334 | + jd.decode(decrypt(r["response"]))["error"]) | ||
1652 | 1335 | sys.exit(2) | ||
1653 | 1336 | del jd | ||
1655 | 1337 | else: | 1171 | else: |
1658 | 1338 | logger.error("Empty response from server") | 1172 | exit_on_error("Empty response from server") |
1657 | 1339 | sys.exit(2) | ||
1659 | 1340 | return False | 1173 | return False |
1660 | 1341 | 1174 | ||
1661 | 1342 | 1175 | ||
1662 | 1343 | # Starts immediate backup job | ||
1663 | 1344 | def action_handler_backup(): | 1176 | def action_handler_backup(): |
1666 | 1345 | global logger | 1177 | """ |
1667 | 1346 | 1178 | Starts immediate backup job | |
1668 | 1179 | """ | ||
1669 | 1347 | if schedule_backup(): | 1180 | if schedule_backup(): |
1670 | 1348 | config = get_config() | 1181 | config = get_config() |
1671 | 1349 | job = get_job() | 1182 | job = get_job() |
1672 | 1350 | if job: | 1183 | if job: |
1674 | 1351 | if 0 == process_job(config, job): | 1184 | if process_job(config, job) == 0: |
1675 | 1352 | return True | 1185 | return True |
1676 | 1353 | else: | 1186 | else: |
1677 | 1354 | logger.error("Didn't get any job from the dispatcher") | 1187 | logger.error("Didn't get any job from the dispatcher") |
1678 | @@ -1359,10 +1192,10 @@ | |||
1679 | 1359 | return False | 1192 | return False |
1680 | 1360 | 1193 | ||
1681 | 1361 | 1194 | ||
1682 | 1362 | # Asks dispatcher to schedule a job for this server | ||
1683 | 1363 | def schedule_backup(): | 1195 | def schedule_backup(): |
1686 | 1364 | global logger | 1196 | """ |
1687 | 1365 | 1197 | Asks dispatcher to schedule a job for this server | |
1688 | 1198 | """ | ||
1689 | 1366 | data = { | 1199 | data = { |
1690 | 1367 | "type": "schedule_backup", | 1200 | "type": "schedule_backup", |
1691 | 1368 | "params": { | 1201 | "params": { |
1692 | @@ -1371,7 +1204,7 @@ | |||
1693 | 1371 | logger.debug("Schedule backup request:") | 1204 | logger.debug("Schedule backup request:") |
1694 | 1372 | logger.debug(data) | 1205 | logger.debug(data) |
1695 | 1373 | response = get_response(data) | 1206 | response = get_response(data) |
1697 | 1374 | if response <> None: | 1207 | if response: |
1698 | 1375 | jd = json.JSONDecoder() | 1208 | jd = json.JSONDecoder() |
1699 | 1376 | r = jd.decode(response) | 1209 | r = jd.decode(response) |
1700 | 1377 | logger.debug(r) | 1210 | logger.debug(r) |
1701 | @@ -1382,45 +1215,43 @@ | |||
1702 | 1382 | logger.error("Failed to schedule a job: " | 1215 | logger.error("Failed to schedule a job: " |
1703 | 1383 | + jd.decode(decrypt(r["response"]))["error"]) | 1216 | + jd.decode(decrypt(r["response"]))["error"]) |
1704 | 1384 | return False | 1217 | return False |
1705 | 1385 | del jd | ||
1706 | 1386 | else: | 1218 | else: |
1707 | 1387 | exit_on_error("Empty response from server") | 1219 | exit_on_error("Empty response from server") |
1708 | 1388 | 1220 | ||
1709 | 1389 | 1221 | ||
1717 | 1390 | # Checks if it's enough space in TwinDB storage | 1222 | def check_space(): |
1718 | 1391 | # Inputs | 1223 | """ |
1719 | 1392 | # config - backup config | 1224 | Checks if it's enough space in TwinDB storage |
1720 | 1393 | # job - job order | 1225 | Inputs |
1721 | 1394 | # Returns always True | 1226 | config - backup config |
1722 | 1395 | 1227 | job - job order | |
1723 | 1396 | def check_space(config, job_id): | 1228 | Returns always True |
1724 | 1229 | TODO: implement the fucntion | ||
1725 | 1230 | """ | ||
1726 | 1397 | return True | 1231 | return True |
1727 | 1398 | 1232 | ||
1728 | 1399 | 1233 | ||
1729 | 1400 | # Checks if MySQL user from backup config has enough privileges to perform backup | ||
1730 | 1401 | # Inputs | ||
1731 | 1402 | # config - backup config | ||
1732 | 1403 | # job - job order | ||
1733 | 1404 | # Returns | ||
1734 | 1405 | # True - if all necessary privileges are granted | ||
1735 | 1406 | # False - if not all necessary privilegers are granted | ||
1736 | 1407 | |||
1737 | 1408 | def check_mysql_permissions(config): | 1234 | def check_mysql_permissions(config): |
1740 | 1409 | global logger | 1235 | """ |
1741 | 1410 | 1236 | Checks if MySQL user from backup config has enough privileges to perform backup | |
1742 | 1237 | Inputs | ||
1743 | 1238 | config - backup config | ||
1744 | 1239 | Returns | ||
1745 | 1240 | True - if all necessary privileges are granted | ||
1746 | 1241 | False - if not all necessary privilegers are granted | ||
1747 | 1242 | """ | ||
1748 | 1411 | try: | 1243 | try: |
1749 | 1412 | logger.info("Checking if '%s'@'localhost' has enough rights to backup MySQL" % config["mysql_user"]) | 1244 | logger.info("Checking if '%s'@'localhost' has enough rights to backup MySQL" % config["mysql_user"]) |
1750 | 1413 | result = False | ||
1751 | 1414 | con = get_mysql_connection(user=config["mysql_user"], passwd=config["mysql_password"]) | 1245 | con = get_mysql_connection(user=config["mysql_user"], passwd=config["mysql_password"]) |
1752 | 1415 | required_priv = ["RELOAD", "LOCK TABLES", "REPLICATION CLIENT", "SUPER", "CREATE TABLESPACE"] | 1246 | required_priv = ["RELOAD", "LOCK TABLES", "REPLICATION CLIENT", "SUPER", "CREATE TABLESPACE"] |
1753 | 1416 | for priv in required_priv: | 1247 | for priv in required_priv: |
1754 | 1417 | cursor = con.cursor() | 1248 | cursor = con.cursor() |
1755 | 1418 | logger.debug("Checking if privilege %s is granted" % priv) | 1249 | logger.debug("Checking if privilege %s is granted" % priv) |
1756 | 1419 | query = """ | 1250 | query = """ |
1761 | 1420 | SELECT | 1251 | SELECT |
1762 | 1421 | PRIVILEGE_TYPE | 1252 | PRIVILEGE_TYPE |
1763 | 1422 | FROM information_schema.USER_PRIVILEGES | 1253 | FROM information_schema.USER_PRIVILEGES |
1764 | 1423 | WHERE GRANTEE = '\'%s\'@\\\'localhost\\\'' AND PRIVILEGE_TYPE = %s""" | 1254 | WHERE GRANTEE = '\'%s\'@\\\'localhost\\\'' AND PRIVILEGE_TYPE = %s""" |
1765 | 1424 | logger.debug("Sending query : %s" % query) | 1255 | logger.debug("Sending query : %s" % query) |
1766 | 1425 | cursor.execute(query, (config["mysql_user"], priv)) | 1256 | cursor.execute(query, (config["mysql_user"], priv)) |
1767 | 1426 | cursor.fetchall() | 1257 | cursor.fetchall() |
1768 | @@ -1430,288 +1261,213 @@ | |||
1769 | 1430 | logger.debug("Privilege %s is granted to %s@localhost" % (priv, config["mysql_user"])) | 1261 | logger.debug("Privilege %s is granted to %s@localhost" % (priv, config["mysql_user"])) |
1770 | 1431 | else: | 1262 | else: |
1771 | 1432 | logger.info("%20s ... NOT GRANTED" % priv) | 1263 | logger.info("%20s ... NOT GRANTED" % priv) |
1776 | 1433 | logger.error("Privilege %s is not granted to %s@localhost" % ( priv, config["mysql_user"])) | 1264 | logger.error("Privilege %s is not granted to %s@localhost" % (priv, config["mysql_user"])) |
1777 | 1434 | logger.info( | 1265 | logger.info("Please execute following SQL to grant %s to user %s@localhost:" |
1778 | 1435 | "Please execute following SQL to grant %s to user %s@localhost:" % ( priv, config["mysql_user"])) | 1266 | % (priv, config["mysql_user"])) |
1779 | 1436 | logger.info("GRANT %s ON *.* TO '%s'@'localhost';" % ( priv, config["mysql_user"])) | 1267 | logger.info("GRANT %s ON *.* TO '%s'@'localhost';" % (priv, config["mysql_user"])) |
1780 | 1437 | return False | 1268 | return False |
1781 | 1438 | cursor.close() | 1269 | cursor.close() |
1782 | 1439 | con.close() | 1270 | con.close() |
1799 | 1440 | result = True | 1271 | except mysql.connector.Error as err: |
1800 | 1441 | except: | 1272 | logger.error("MySQL Error: %s" % err) |
1801 | 1442 | logger.error("Unexpected error: %s,%s" % sys.exc_info()[:2]) | 1273 | return False |
1802 | 1443 | result = False | 1274 | logger.debug("MySQL user %s@'localhost' has enough grants to take backup" % config["mysql_user"]) |
1803 | 1444 | if result: | 1275 | return True |
1804 | 1445 | logger.debug("MySQL user %s@'localhost' has enough grants to take backup" % config["mysql_user"]) | 1276 | |
1789 | 1446 | else: | ||
1790 | 1447 | logger.error("MySQL user %s@'localhost' doesn't have enough grants to take backup" % config["mysql_user"]) | ||
1791 | 1448 | return result | ||
1792 | 1449 | |||
1793 | 1450 | |||
1794 | 1451 | # Meta fuction that calls actual backup fucntion depending on tool in backup config | ||
1795 | 1452 | # Inputs | ||
1796 | 1453 | # config - backup config | ||
1797 | 1454 | # job - job order | ||
1798 | 1455 | # Returns what actual backup function returned or -1 if the tool is not supported | ||
1805 | 1456 | 1277 | ||
1806 | 1457 | def take_backup(config, job): | 1278 | def take_backup(config, job): |
1809 | 1458 | global logger | 1279 | """ |
1810 | 1459 | 1280 | Meta function that calls actual backup fucntion depending on tool in backup config | |
1811 | 1281 | :param config: backup config | ||
1812 | 1282 | :param job: job order | ||
1813 | 1283 | :return: what actual backup function returned or -1 if the tool is not supported | ||
1814 | 1284 | """ | ||
1815 | 1460 | ret = -1 | 1285 | ret = -1 |
1816 | 1461 | logger.info("Starting backup job") | 1286 | logger.info("Starting backup job") |
1830 | 1462 | try: | 1287 | notify_params = {"event": "start_job", "job_id": job["job_id"]} |
1831 | 1463 | job_id = int(job["job_id"]) | 1288 | if log_job_notify(notify_params): |
1832 | 1464 | notify_params = {"event": "start_job", "job_id": job_id} | 1289 | ret = take_backup_xtrabackup(config, job) |
1833 | 1465 | if log_job_notify(job_id, notify_params): | 1290 | notify_params = {"event": "stop_job", "job_id": job["job_id"], "ret_code": ret} |
1834 | 1466 | ret = take_backup_xtrabackup(config, job) | 1291 | log_job_notify(notify_params) |
1835 | 1467 | notify_params = {"event": "stop_job", "job_id": job_id, "ret_code": ret} | 1292 | logger.info("Backup job is complete") |
1836 | 1468 | log_job_notify(job_id, notify_params) | 1293 | else: |
1837 | 1469 | logger.info("Backup job is complete") | 1294 | logger.info("Backup job can not start") |
1825 | 1470 | else: | ||
1826 | 1471 | logger.info("Backup job can not start") | ||
1827 | 1472 | except: | ||
1828 | 1473 | logger.error("Unexpected error: %s,%s" % sys.exc_info()[:2]); | ||
1829 | 1474 | |||
1838 | 1475 | return ret | 1295 | return ret |
1839 | 1476 | 1296 | ||
1840 | 1477 | 1297 | ||
1841 | 1478 | # Starts SSH process to TwinDB storage and saves input in file backup_name | ||
1842 | 1479 | # Inputs | ||
1843 | 1480 | # config - backup config | ||
1844 | 1481 | # backup_name - file name to save input in | ||
1845 | 1482 | # stdin, stderr - respective IO handlers | ||
1846 | 1483 | # Returns what SSH process returns | ||
1847 | 1484 | |||
1848 | 1485 | def start_ssh_cmd(config, job_params, backup_name, stdin, stderr): | 1298 | def start_ssh_cmd(config, job_params, backup_name, stdin, stderr): |
1854 | 1486 | global logger | 1299 | """ |
1855 | 1487 | global ssh_private_key | 1300 | Starts an SSH process to TwinDB storage and saves input in file backup_name |
1856 | 1488 | global ssh_port | 1301 | :param config: backup config |
1857 | 1489 | 1302 | :param job_params: job parameters | |
1858 | 1490 | p = None | 1303 | :param backup_name: file name to save input in |
1859 | 1304 | :param stdin: respective IO handlers | ||
1860 | 1305 | :param stderr: respective IO handlers | ||
1861 | 1306 | :return: what SSH process returns | ||
1862 | 1307 | """ | ||
1863 | 1308 | ssh_process = None | ||
1864 | 1309 | ssh_cmd = ["ssh", "-oStrictHostKeyChecking=no", "-i", ssh_private_key_file, "-p", str(ssh_port), | ||
1865 | 1310 | "user_id_%s@%s" % (config["user_id"], job_params["ip"]), "/bin/cat - > %s" % backup_name] | ||
1866 | 1491 | try: | 1311 | try: |
1867 | 1492 | ssh_cmd = ["ssh", "-oStrictHostKeyChecking=no", "-i", ssh_private_key] | ||
1868 | 1493 | ssh_cmd.append("-p") | ||
1869 | 1494 | ssh_cmd.append(str(ssh_port)) | ||
1870 | 1495 | ssh_cmd.append("user_id_%s@%s" % (config["user_id"], job_params["ip"])) | ||
1871 | 1496 | ssh_cmd.append("/bin/cat - > %s" % backup_name) | ||
1872 | 1497 | logger.debug("Starting SSH process: %r" % ssh_cmd) | 1312 | logger.debug("Starting SSH process: %r" % ssh_cmd) |
1892 | 1498 | p = subprocess.Popen(ssh_cmd, stdin=stdin, stdout=subprocess.PIPE, stderr=stderr) | 1313 | ssh_process = subprocess.Popen(ssh_cmd, stdin=stdin, stdout=subprocess.PIPE, stderr=stderr) |
1893 | 1499 | except: | 1314 | except OSError as err: |
1894 | 1500 | logger.error(traceback.format_exc()) | 1315 | logger.error("Failed to run command %r. %s" % (ssh_cmd, err)) |
1895 | 1501 | return p | 1316 | return ssh_process |
1896 | 1502 | 1317 | ||
1897 | 1503 | 1318 | ||
1898 | 1504 | # Starts GPG process, encrypts STDIN and outputs in into STDOUT | 1319 | def start_gpg_cmd(stdin, stderr): |
1899 | 1505 | # Inputs | 1320 | """ |
1900 | 1506 | # config - backup config | 1321 | Starts GPG process, encrypts STDIN and outputs in into STDOUT |
1901 | 1507 | # stdin, stderr - respective IO handlers | 1322 | :param stdin: respective IO handlers |
1902 | 1508 | # Returns what GPG process returns | 1323 | :param stderr: respective IO handlers |
1903 | 1509 | 1324 | :return: what GPG process returns | |
1904 | 1510 | def start_gpg_cmd(config, stdin, stderr): | 1325 | """ |
1905 | 1511 | global logger | 1326 | gpg_process = None |
1906 | 1512 | global ssh_private_key | 1327 | gpg_cmd = ["gpg", "--encrypt", "--yes", "--batch", "--no-permission-warning", "--quiet", "--recipient", server_id] |
1888 | 1513 | global ssh_port | ||
1889 | 1514 | global server_id | ||
1890 | 1515 | |||
1891 | 1516 | p = None | ||
1907 | 1517 | try: | 1328 | try: |
1908 | 1518 | gpg_cmd = ["gpg", "--encrypt", "--yes", "--batch", "--no-permission-warning", "--quiet"] | ||
1909 | 1519 | gpg_cmd.append("--recipient") | ||
1910 | 1520 | gpg_cmd.append("%s" % (server_id)) | ||
1911 | 1521 | |||
1912 | 1522 | logger.debug("Starting GPG process: %r" % gpg_cmd) | 1329 | logger.debug("Starting GPG process: %r" % gpg_cmd) |
1926 | 1523 | p = subprocess.Popen(gpg_cmd, stdin=stdin, stdout=subprocess.PIPE, stderr=stderr) | 1330 | gpg_process = subprocess.Popen(gpg_cmd, stdin=stdin, stdout=subprocess.PIPE, stderr=stderr) |
1927 | 1524 | except: | 1331 | except OSError as err: |
1928 | 1525 | logger.error(traceback.format_exc()) | 1332 | logger.error("Failed to run command %r. %s" % (gpg_cmd, err)) |
1929 | 1526 | return p | 1333 | return gpg_process |
1930 | 1527 | 1334 | ||
1918 | 1528 | |||
1919 | 1529 | # Takes backup copy with XtraBackup | ||
1920 | 1530 | # Inputs | ||
1921 | 1531 | # config - backup config | ||
1922 | 1532 | # job - job order | ||
1923 | 1533 | # Returns | ||
1924 | 1534 | # True - if backup successfully taken | ||
1925 | 1535 | # False - if backup failed | ||
1931 | 1536 | 1335 | ||
1932 | 1537 | def take_backup_xtrabackup(config, job): | 1336 | def take_backup_xtrabackup(config, job): |
1937 | 1538 | global logger | 1337 | """ |
1938 | 1539 | global server_id | 1338 | # Takes backup copy with XtraBackup |
1939 | 1540 | 1339 | :param config: backup config | |
1940 | 1541 | suffix = "tar" | 1340 | :param job: job order |
1941 | 1341 | :return: True if backup was successfully taken or False if it has failed | ||
1942 | 1342 | """ | ||
1943 | 1343 | suffix = "xbstream" | ||
1944 | 1542 | backup_name = "server_id_%s_%s.%s.gpg" % (server_id, datetime.now().isoformat(), suffix) | 1344 | backup_name = "server_id_%s_%s.%s.gpg" % (server_id, datetime.now().isoformat(), suffix) |
1945 | 1543 | extra_config = "" | ||
1946 | 1544 | ret_code = 0 | 1345 | ret_code = 0 |
1948 | 1545 | 1346 | if "params" not in job: | |
1949 | 1347 | logger.error("There are no params in the job order") | ||
1950 | 1348 | return -1 | ||
1951 | 1349 | # Check that job order has all required parameters | ||
1952 | 1350 | mandatory_params = ["ancestor", "backup_type", "ip", "lsn", "type", "volume_id"] | ||
1953 | 1351 | for param in mandatory_params: | ||
1954 | 1352 | if param not in job["params"]: | ||
1955 | 1353 | logger.error("There is no %s in the job order" % param) | ||
1956 | 1354 | return -1 | ||
1957 | 1355 | backup_type = job["params"]["backup_type"] | ||
1958 | 1356 | xtrabackup_cmd = [ | ||
1959 | 1357 | "innobackupex", | ||
1960 | 1358 | "--stream=xbstream", | ||
1961 | 1359 | "--user=%s" % config["mysql_user"], | ||
1962 | 1360 | "--password=%s" % config["mysql_password"], | ||
1963 | 1361 | "--socket=%s" % get_unix_socket(), | ||
1964 | 1362 | "--slave-info", | ||
1965 | 1363 | "--safe-slave-backup", | ||
1966 | 1364 | "--safe-slave-backup-timeout=3600"] | ||
1967 | 1365 | if backup_type == 'incremental': | ||
1968 | 1366 | last_lsn = job["params"]["lsn"] | ||
1969 | 1367 | xtrabackup_cmd.append("--incremental") | ||
1970 | 1368 | xtrabackup_cmd.append(".") | ||
1971 | 1369 | xtrabackup_cmd.append("--incremental-lsn=%s" % last_lsn) | ||
1972 | 1370 | else: | ||
1973 | 1371 | xtrabackup_cmd.append(".") | ||
1974 | 1372 | extra_config = gen_extra_config(config) | ||
1975 | 1373 | if extra_config: | ||
1976 | 1374 | xtrabackup_cmd.append("--defaults-extra-file=%s" % extra_config) | ||
1977 | 1375 | err_descriptors = dict() | ||
1978 | 1376 | for desc in ["gpg", "ssh", "xtrabackup"]: | ||
1979 | 1377 | desc_file = ("/tmp/twindb.%s.err" % desc) | ||
1980 | 1378 | try: | ||
1981 | 1379 | err_descriptors[desc] = open(desc_file, "w+") | ||
1982 | 1380 | except IOError as err: | ||
1983 | 1381 | logger.error("Failed to open file %s. %s" % (desc_file, err)) | ||
1984 | 1382 | return -1 | ||
1985 | 1546 | try: | 1383 | try: |
2063 | 1547 | mysql_user = config["mysql_user"] | 1384 | logger.debug("Starting XtraBackup process: %r" % xtrabackup_cmd) |
2064 | 1548 | mysql_password = config["mysql_password"] | 1385 | xbk_proc = subprocess.Popen(xtrabackup_cmd, stdout=subprocess.PIPE, stderr=err_descriptors["xtrabackup"]) |
2065 | 1549 | d = json.JSONDecoder() | 1386 | except OSError as err: |
2066 | 1550 | backup_type = job["params"]["backup_type"] | 1387 | logger.error("Failed to run command %r. %s" % (xtrabackup_cmd, err)) |
2067 | 1551 | xtrabackup_cmd = ["innobackupex"] | 1388 | return -1 |
2068 | 1552 | xtrabackup_cmd.append("--stream=xbstream") | 1389 | gpg_proc = start_gpg_cmd(xbk_proc.stdout, err_descriptors["gpg"]) |
2069 | 1553 | xtrabackup_cmd.append("--user=%s" % mysql_user) | 1390 | ssh_proc = start_ssh_cmd(config, job["params"], backup_name, gpg_proc.stdout, err_descriptors["ssh"]) |
2070 | 1554 | xtrabackup_cmd.append("--password=%s" % mysql_password) | 1391 | |
2071 | 1555 | xtrabackup_cmd.append("--socket=%s" % get_unix_socket()) | 1392 | ssh_proc.wait() |
2072 | 1556 | xtrabackup_cmd.append("--slave-info") | 1393 | xbk_proc.wait() |
2073 | 1557 | xtrabackup_cmd.append("--safe-slave-backup") | 1394 | gpg_proc.wait() |
2074 | 1558 | xtrabackup_cmd.append("--safe-slave-backup-timeout=3600") | 1395 | |
2075 | 1559 | if backup_type == 'incremental': | 1396 | ret_code_ssh = ssh_proc.returncode |
2076 | 1560 | last_lsn = job["params"]["lsn"] | 1397 | ret_code_gpg = gpg_proc.returncode |
2077 | 1561 | xtrabackup_cmd.append("--incremental") | 1398 | ret_code_xbk = xbk_proc.returncode |
2078 | 1562 | xtrabackup_cmd.append(".") | 1399 | |
2079 | 1563 | xtrabackup_cmd.append("--incremental-lsn=%s" % last_lsn) | 1400 | err_str = dict() |
2080 | 1564 | else: | 1401 | for desc in ["gpg", "ssh", "xtrabackup"]: |
2081 | 1565 | xtrabackup_cmd.append(".") | 1402 | err_descriptors[desc].seek(0) |
2082 | 1566 | extra_config = gen_extra_config(config) | 1403 | err_str[desc] = err_descriptors[desc].read() |
2083 | 1567 | if extra_config: | 1404 | if not err_str[desc]: |
2084 | 1568 | xtrabackup_cmd.append("--defaults-extra-file=%s" % extra_config) | 1405 | err_str[desc] = "no output" |
2085 | 1569 | 1406 | ||
2086 | 1570 | xtr_err = open('/tmp/twindb.xtrabackup.err', "w+") | 1407 | logger.info("XtraBackup stderr: " + err_str["xtrabackup"]) |
2087 | 1571 | gpg_err = open('/tmp/twindb.gpg.err', "w+") | 1408 | logger.info("GPG stderr: " + err_str["gpg"]) |
2088 | 1572 | ssh_err = open('/tmp/twindb.ssh.err', "w+") | 1409 | logger.info("SSH stderr: " + err_str["ssh"]) |
2089 | 1573 | p1 = subprocess.Popen(xtrabackup_cmd, stdout=subprocess.PIPE, stderr=xtr_err) | 1410 | |
2090 | 1574 | p2 = start_gpg_cmd(config, p1.stdout, gpg_err) | 1411 | if ret_code_xbk == 0 and ret_code_gpg == 0 and ret_code_ssh == 0: |
2091 | 1575 | p3 = start_ssh_cmd(config, job["params"], backup_name, p2.stdout, ssh_err) | 1412 | lsn = grep_lsn(err_str["xtrabackup"]) |
2092 | 1576 | 1413 | if not lsn: | |
2093 | 1577 | p1.stdout.close() | 1414 | logger.error("Could not find LSN in XtrabBackup output") |
2094 | 1578 | p2.stdout.close() | 1415 | return -1 |
2095 | 1579 | p3.communicate() | 1416 | file_size = get_backup_size(config, job["params"], backup_name) |
2096 | 1580 | 1417 | if not file_size: | |
2097 | 1581 | ret_code_ssh = p3.returncode | 1418 | logger.error("Backup copy size must not be zero") |
2098 | 1582 | ret_code_gpg = p2.wait() | 1419 | return -1 |
2099 | 1583 | ret_code_xbk = p1.wait() | 1420 | volume_id = job["params"]["volume_id"] |
2100 | 1584 | 1421 | ancestor = job["params"]["ancestor"] | |
2101 | 1585 | xtr_err.seek(0) | 1422 | if not record_backup(backup_name, volume_id, file_size, lsn, ancestor): |
2102 | 1586 | gpg_err.seek(0) | 1423 | logger.error("Failed to save backup copy details") |
2103 | 1587 | ssh_err.seek(0) | 1424 | return -1 |
2104 | 1588 | 1425 | else: | |
2028 | 1589 | xtrabackup_stderr = xtr_err.read() | ||
2029 | 1590 | gpg_stderr = gpg_err.read() | ||
2030 | 1591 | ssh_stderr = ssh_err.read() | ||
2031 | 1592 | |||
2032 | 1593 | if len(xtrabackup_stderr) == 0: | ||
2033 | 1594 | xtrabackup_stderr = "no output" | ||
2034 | 1595 | if len(gpg_stderr) == 0: | ||
2035 | 1596 | gpg_stderr = "no output" | ||
2036 | 1597 | if len(ssh_stderr) == 0: | ||
2037 | 1598 | ssh_stderr = "no output" | ||
2038 | 1599 | |||
2039 | 1600 | logger.info("XtraBackup stderr: " + xtrabackup_stderr) | ||
2040 | 1601 | logger.info("GPG stderr: " + gpg_stderr) | ||
2041 | 1602 | logger.info("SSH stderr: " + ssh_stderr) | ||
2042 | 1603 | ancestor = 0 | ||
2043 | 1604 | |||
2044 | 1605 | if ret_code_xbk == 0 and ret_code_gpg == 0 and ret_code_ssh == 0: | ||
2045 | 1606 | lsn = grep_lsn(xtrabackup_stderr) | ||
2046 | 1607 | if lsn == None: | ||
2047 | 1608 | logger.error("Could not find LSN in XtrabBackup output") | ||
2048 | 1609 | return -1 | ||
2049 | 1610 | file_size = get_backup_size(config, job["params"], backup_name) | ||
2050 | 1611 | if file_size == 0: | ||
2051 | 1612 | logger.error("Backup copy size must not be zero") | ||
2052 | 1613 | return -1 | ||
2053 | 1614 | job_id = job["job_id"] | ||
2054 | 1615 | volume_id = job["params"]["volume_id"] | ||
2055 | 1616 | ancestor = job["params"]["ancestor"] | ||
2056 | 1617 | if not record_backup(job_id, backup_name, volume_id, file_size, lsn, ancestor): | ||
2057 | 1618 | logger.error("Failed to save backup copy details") | ||
2058 | 1619 | return -1 | ||
2059 | 1620 | else: | ||
2060 | 1621 | logger.error("Failed to take backup") | ||
2061 | 1622 | ret_code = -1 | ||
2062 | 1623 | except: | ||
2105 | 1624 | logger.error("Failed to take backup") | 1426 | logger.error("Failed to take backup") |
2106 | 1625 | logger.error(traceback.format_exc()) | ||
2107 | 1626 | return -1 | 1427 | return -1 |
2114 | 1627 | finally: | 1428 | for f in [extra_config, "/tmp/twindb.xtrabackup.err", "/tmp/twindb.gpg.err", "/tmp/twindb.ssh.err"]: |
2115 | 1628 | for f in [extra_config, | 1429 | if os.path.isfile(f): |
2116 | 1629 | "/tmp/twindb.xtrabackup.err", | 1430 | try: |
2111 | 1630 | "/tmp/twindb.gpg.err", | ||
2112 | 1631 | "/tmp/twindb.ssh.err"]: | ||
2113 | 1632 | if os.path.isfile(f): | ||
2117 | 1633 | os.remove(f) | 1431 | os.remove(f) |
2118 | 1432 | except IOError as err: | ||
2119 | 1433 | logger.error("Failed to remove file %s. %s" % (f, err)) | ||
2120 | 1634 | return ret_code | 1434 | return ret_code |
2121 | 1635 | 1435 | ||
2122 | 1636 | 1436 | ||
2123 | 1637 | # Generates MySQL config with datadir option | ||
2124 | 1638 | # Inputs | ||
2125 | 1639 | # config - backup config | ||
2126 | 1640 | # Returns | ||
2127 | 1641 | # File name with MySQL config | ||
2128 | 1642 | # None - if error happened | ||
2129 | 1643 | |||
2130 | 1644 | def gen_extra_config(config): | 1437 | def gen_extra_config(config): |
2134 | 1645 | global logger | 1438 | """ |
2135 | 1646 | 1439 | Generates MySQL config with datadir option | |
2136 | 1647 | extra_config = None | 1440 | :param config: backup config |
2137 | 1441 | :return: File name with MySQL config or None if error happened | ||
2138 | 1442 | """ | ||
2139 | 1648 | try: | 1443 | try: |
2140 | 1649 | f, extra_config = tempfile.mkstemp() | 1444 | f, extra_config = tempfile.mkstemp() |
2141 | 1650 | os.write(f, "[mysqld]\n") | 1445 | os.write(f, "[mysqld]\n") |
2145 | 1651 | con = get_mysql_connection( | 1446 | con = get_mysql_connection(config["mysql_user"], config["mysql_password"]) |
2143 | 1652 | user=config["mysql_user"], | ||
2144 | 1653 | passwd=config["mysql_password"]) | ||
2146 | 1654 | cur = con.cursor() | 1447 | cur = con.cursor() |
2147 | 1655 | cur.execute("SELECT @@datadir") | 1448 | cur.execute("SELECT @@datadir") |
2148 | 1656 | row = cur.fetchone() | 1449 | row = cur.fetchone() |
2149 | 1657 | os.write(f, 'datadir="%s"\n' % row[0]) | 1450 | os.write(f, 'datadir="%s"\n' % row[0]) |
2150 | 1658 | cur.close() | 1451 | cur.close() |
2151 | 1659 | os.close(f) | 1452 | os.close(f) |
2155 | 1660 | except: | 1453 | except IOError as err: |
2156 | 1661 | logger.error("Failed to generate extra defaults file") | 1454 | logger.error("Failed to generate extra defaults file. %s" % err) |
2154 | 1662 | logger.error(traceback.format_exc()) | ||
2157 | 1663 | extra_config = None | 1455 | extra_config = None |
2158 | 1664 | return extra_config | 1456 | return extra_config |
2159 | 1665 | 1457 | ||
2160 | 1666 | 1458 | ||
2161 | 1667 | # Checks if binary log is enabled in local MySQL | ||
2162 | 1668 | # Inputs | ||
2163 | 1669 | # config - backup config | ||
2164 | 1670 | # Returns | ||
2165 | 1671 | # True - binary log enabled | ||
2166 | 1672 | # False - binary log disabled | ||
2167 | 1673 | |||
2168 | 1674 | def is_binlog_enabled(config): | ||
2169 | 1675 | global logger | ||
2170 | 1676 | |||
2171 | 1677 | result = False | ||
2172 | 1678 | try: | ||
2173 | 1679 | con = MySQLdb.connect(user=config["mysql_user"], passwd=config["mysql_password"]) | ||
2174 | 1680 | with con: | ||
2175 | 1681 | cur = con.cursor() | ||
2176 | 1682 | cur.execute("SELECT @@GLOBAL.log_bin") | ||
2177 | 1683 | row = cur.fetchone() | ||
2178 | 1684 | if row[0] == "1": | ||
2179 | 1685 | result = True | ||
2180 | 1686 | else: | ||
2181 | 1687 | result = False | ||
2182 | 1688 | cur.close() | ||
2183 | 1689 | del con | ||
2184 | 1690 | except: | ||
2185 | 1691 | logger.error("Failed to check if binlog is enabled") | ||
2186 | 1692 | logger.error(traceback.format_exc()) | ||
2187 | 1693 | return result | ||
2188 | 1694 | |||
2189 | 1695 | |||
2190 | 1696 | # Logs in to TwinDB storage and get size of backup | ||
2191 | 1697 | # Inputs | ||
2192 | 1698 | # config - backup config | ||
2193 | 1699 | # backup_name - file name with backup | ||
2194 | 1700 | # Returns | ||
2195 | 1701 | # Size of backup in bytes | ||
2196 | 1702 | # Zero if error happened | ||
2197 | 1703 | |||
2198 | 1704 | def get_backup_size(config, job_params, backup_name): | 1459 | def get_backup_size(config, job_params, backup_name): |
2204 | 1705 | global logger | 1460 | """ |
2205 | 1706 | global ssh_private_key | 1461 | Logs in to TwinDB storage and get size of backup |
2206 | 1707 | global ssh_port | 1462 | :param config: backup config |
2207 | 1708 | backup_size = 0 | 1463 | :param job_params: job parameters |
2208 | 1709 | 1464 | :param backup_name: file name with backup | |
2209 | 1465 | :return: size of backup in bytes or zeor if error happened | ||
2210 | 1466 | """ | ||
2211 | 1710 | logger.debug("Getting size of %s" % backup_name) | 1467 | logger.debug("Getting size of %s" % backup_name) |
2212 | 1468 | ssh_cmd = ["ssh", "-oStrictHostKeyChecking=no", "-i", ssh_private_key_file, "-p", str(ssh_port), | ||
2213 | 1469 | "user_id_%s@%s" % (config["user_id"], job_params["ip"]), "/bin/du -b %s" % backup_name] | ||
2214 | 1711 | try: | 1470 | try: |
2215 | 1712 | ssh_cmd = ["ssh", "-oStrictHostKeyChecking=no", "-i", ssh_private_key, "-p", str(ssh_port), | ||
2216 | 1713 | "user_id_%s@%s" % (config["user_id"], job_params["ip"]), "/bin/du -b %s" % backup_name] | ||
2217 | 1714 | |||
2218 | 1715 | process = subprocess.Popen(ssh_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) | 1471 | process = subprocess.Popen(ssh_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) |
2219 | 1716 | cout, cerr = process.communicate() | 1472 | cout, cerr = process.communicate() |
2220 | 1717 | 1473 | ||
2221 | @@ -1726,30 +1482,28 @@ | |||
2222 | 1726 | except subprocess.CalledProcessError as e: | 1482 | except subprocess.CalledProcessError as e: |
2223 | 1727 | logger.error("Failed to get size of backup %s" % backup_name) | 1483 | logger.error("Failed to get size of backup %s" % backup_name) |
2224 | 1728 | logger.error(str(e)) | 1484 | logger.error(str(e)) |
2225 | 1485 | return 0 | ||
2226 | 1729 | except OSError as e: | 1486 | except OSError as e: |
2227 | 1730 | logger.error("Failed to get size of backup %s" % backup_name) | 1487 | logger.error("Failed to get size of backup %s" % backup_name) |
2235 | 1731 | logger.error("Command execution failed: %s" % str(e)) | 1488 | logger.error("Failed to run command %r: %s" % (ssh_cmd, e)) |
2236 | 1732 | except: | 1489 | return 0 |
2237 | 1733 | logger.error("Failed to get size of backup %s" % backup_name) | 1490 | logger.debug("Size of %s = %d bytes (%s)" % (backup_name, backup_size, h_size(backup_size))) |
2231 | 1734 | logger.error(traceback.format_exc()) | ||
2232 | 1735 | else: | ||
2233 | 1736 | logger.debug("Size of %s = %d bytes (%s)" % (backup_name, backup_size, h_size(backup_size))) | ||
2234 | 1737 | |||
2238 | 1738 | return backup_size | 1491 | return backup_size |
2239 | 1739 | 1492 | ||
2240 | 1740 | 1493 | ||
2242 | 1741 | def handler_send_key(config, job): | 1494 | def handler_send_key(job): |
2243 | 1742 | """ | 1495 | """ |
2244 | 1743 | Processes send_key job | 1496 | Processes send_key job |
2245 | 1744 | """ | 1497 | """ |
2246 | 1745 | # Get owner of the GPG key | 1498 | # Get owner of the GPG key |
2247 | 1499 | cmd_1 = ["gpg", "--list-packets"] | ||
2248 | 1746 | try: | 1500 | try: |
2249 | 1747 | gpg_pub_key = job["params"]["gpg_pub_key"] | 1501 | gpg_pub_key = job["params"]["gpg_pub_key"] |
2250 | 1748 | if gpg_pub_key: | 1502 | if gpg_pub_key: |
2251 | 1749 | cmd_1 = ["gpg", "--list-packets"] | ||
2252 | 1750 | logger.debug("Starting %r" % cmd_1) | 1503 | logger.debug("Starting %r" % cmd_1) |
2253 | 1751 | p1 = subprocess.Popen(cmd_1, stdin=subprocess.PIPE, stdout=subprocess.PIPE) | 1504 | p1 = subprocess.Popen(cmd_1, stdin=subprocess.PIPE, stdout=subprocess.PIPE) |
2254 | 1752 | cout, cerr = p1.communicate(gpg_pub_key) | 1505 | cout, cerr = p1.communicate(gpg_pub_key) |
2255 | 1506 | keyid = "Unknown" | ||
2256 | 1753 | for line in cout.split("\n"): | 1507 | for line in cout.split("\n"): |
2257 | 1754 | if "keyid:" in line: | 1508 | if "keyid:" in line: |
2258 | 1755 | keyid = line.replace("keyid:", "").strip() | 1509 | keyid = line.replace("keyid:", "").strip() |
2259 | @@ -1759,420 +1513,388 @@ | |||
2260 | 1759 | logger.error("Requestor public key is empty") | 1513 | logger.error("Requestor public key is empty") |
2261 | 1760 | return | 1514 | return |
2262 | 1761 | except OSError as err: | 1515 | except OSError as err: |
2264 | 1762 | logger.error(err) | 1516 | logger.error("Failed to run command %r: %s" % (cmd_1, err)) |
2265 | 1763 | return | 1517 | return |
2266 | 1764 | # Import public GPG key. It's a user public key sent by the dispatcher | 1518 | # Import public GPG key. It's a user public key sent by the dispatcher |
2267 | 1765 | try: | 1519 | try: |
2268 | 1766 | logger.debug("Importing requestor's key %s" % keyid) | 1520 | logger.debug("Importing requestor's key %s" % keyid) |
2269 | 1767 | cmd_1 = ["gpg", "--import"] | 1521 | cmd_1 = ["gpg", "--import"] |
2271 | 1768 | p1 = subprocess.Popen(cmd_1, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) | 1522 | p1 = subprocess.Popen(cmd_1, stdin=subprocess.PIPE) |
2272 | 1769 | p1.communicate(gpg_pub_key) | 1523 | p1.communicate(gpg_pub_key) |
2273 | 1770 | except OSError as err: | 1524 | except OSError as err: |
2275 | 1771 | logger.error(err) | 1525 | logger.error("Failed to run command %r: %s" % (cmd_1, err)) |
2276 | 1772 | return | 1526 | return |
2277 | 1773 | # Get private key and encrypt it | 1527 | # Get private key and encrypt it |
2284 | 1774 | try: | 1528 | gpg_pub_key = job["params"]["gpg_pub_key"] |
2285 | 1775 | gpg_pub_key = job["params"]["gpg_pub_key"] | 1529 | if gpg_pub_key: |
2286 | 1776 | if gpg_pub_key: | 1530 | logger.debug("Exporting private key of server %s" % server_id) |
2287 | 1777 | logger.debug("Exporting private key of server %s" % server_id) | 1531 | cmd_1 = ["gpg", "--armor", "--export-secret-key", server_id] |
2288 | 1778 | cmd_1 = ["gpg", "--armor", "--export-secret-key", server_id] | 1532 | cmd_2 = ["gpg", "--armor", "--encrypt", "--sign", "--batch", "-r", keyid, "--local-user", server_id, |
2289 | 1779 | cmd_2 = ["gpg", "--armor", "--encrypt", "--sign", "--batch", "-r", keyid, "--local-user", server_id, "--trust-model", "always"] | 1533 | "--trust-model", "always"] |
2290 | 1534 | try: | ||
2291 | 1780 | logger.debug("Starting %r" % cmd_1) | 1535 | logger.debug("Starting %r" % cmd_1) |
2292 | 1781 | p1 = subprocess.Popen(cmd_1, stdout=subprocess.PIPE) | 1536 | p1 = subprocess.Popen(cmd_1, stdout=subprocess.PIPE) |
2293 | 1537 | except OSError as err: | ||
2294 | 1538 | logger.error("Failed to run command %r: %s" % (cmd_1, err)) | ||
2295 | 1539 | return | ||
2296 | 1540 | try: | ||
2297 | 1782 | logger.debug("Starting %r" % cmd_2) | 1541 | logger.debug("Starting %r" % cmd_2) |
2298 | 1783 | p2 = subprocess.Popen(cmd_2, stdin=p1.stdout, stdout=subprocess.PIPE) | 1542 | p2 = subprocess.Popen(cmd_2, stdin=p1.stdout, stdout=subprocess.PIPE) |
2299 | 1784 | cout, cerr = p2.communicate() | 1543 | cout, cerr = p2.communicate() |
2300 | 1785 | enc_private_key = cout | 1544 | enc_private_key = cout |
2301 | 1786 | logger.debug("Encrypted private key %s" % enc_private_key) | 1545 | logger.debug("Encrypted private key %s" % enc_private_key) |
2312 | 1787 | except OSError as err: | 1546 | except OSError as err: |
2313 | 1788 | logger.error(err) | 1547 | logger.error("Failed to run command %r: %s" % (cmd_2, err)) |
2314 | 1789 | return | 1548 | return |
2315 | 1790 | # Now send the private key to dispatcher | 1549 | # Now send the private key to dispatcher |
2316 | 1791 | job_id = job["job_id"] | 1550 | data = { |
2317 | 1792 | data = { | 1551 | "type": "send_key", |
2318 | 1793 | "type": "send_key", | 1552 | "params": { |
2319 | 1794 | "params": { | 1553 | "enc_private_key": enc_private_key, |
2320 | 1795 | "enc_private_key": enc_private_key, | 1554 | "job_id": job_id |
2321 | 1796 | "job_id": job_id | 1555 | } |
2322 | 1797 | } | 1556 | } |
2342 | 1798 | } | 1557 | get_response(data) |
2343 | 1799 | get_response(data) | 1558 | else: |
2344 | 1800 | 1559 | logger.error("The job order requested send_key, but no public key was provided") | |
2345 | 1801 | # Check if directory is empty | 1560 | |
2346 | 1802 | # Inputs | 1561 | |
2347 | 1803 | # dir - directory name | 1562 | def is_dir_empty(directory): |
2348 | 1804 | # Returns | 1563 | """ |
2349 | 1805 | # True - directory is empty | 1564 | Checks if directory is empty |
2350 | 1806 | # False - directory is not empty | 1565 | :param directory: directory name |
2351 | 1807 | 1566 | :return: True if the directory is empty of False otherwise | |
2352 | 1808 | def is_dir_empty(dir): | 1567 | """ |
2353 | 1809 | return len(os.listdir(dir)) == 0 | 1568 | return len(os.listdir(directory)) == 0 |
2354 | 1810 | 1569 | ||
2336 | 1811 | |||
2337 | 1812 | # Meta fuction that calls actual restore fucntion depending on tool in backup config | ||
2338 | 1813 | # Inputs | ||
2339 | 1814 | # config - backup config | ||
2340 | 1815 | # job - job order | ||
2341 | 1816 | # Returns what actual restore function returned or -1 if the tool is not supported | ||
2355 | 1817 | 1570 | ||
2356 | 1818 | def restore_backup(config, job): | 1571 | def restore_backup(config, job): |
2364 | 1819 | global logger | 1572 | """ |
2365 | 1820 | 1573 | Meta function that calls actual restore fucntion depending on tool in backup config | |
2366 | 1821 | ret = -1 | 1574 | :param config: backup config |
2367 | 1822 | logger.info("Starting restore job %r" % job) | 1575 | :param job: job order |
2368 | 1823 | log_start_job(int(job["job_id"])) | 1576 | :return: what actual restore function returned or -1 if the tool is not supported |
2369 | 1824 | 1577 | """ | |
2370 | 1825 | if config["tool"] == "xtrabackup": | 1578 | ret = None |
2371 | 1579 | logger.info("Starting restore job: %s" | ||
2372 | 1580 | % json.dumps(job, indent=4, sort_keys=True)) | ||
2373 | 1581 | notify_params = {"event": "start_job", "job_id": job["job_id"]} | ||
2374 | 1582 | if log_job_notify(notify_params): | ||
2375 | 1826 | ret = restore_xtrabackup(config, job) | 1583 | ret = restore_xtrabackup(config, job) |
2378 | 1827 | elif config["tool"] == "mysqldump": | 1584 | notify_params = {"event": "stop_job", "job_id": job["job_id"], "ret_code": ret} |
2379 | 1828 | ret = restore_mysqldump(config, job) | 1585 | log_job_notify(notify_params) |
2380 | 1586 | logger.info("Restore job is complete") | ||
2381 | 1829 | else: | 1587 | else: |
2386 | 1830 | logger.error("Can't restore backup with unsupported tool %s" % config["tool"]) | 1588 | logger.error("Restore job can not start") |
2383 | 1831 | ret = -1 | ||
2384 | 1832 | log_stop_job(int(job["job_id"]), ret) | ||
2385 | 1833 | logger.info("Restore job is complete") | ||
2387 | 1834 | return ret | 1589 | return ret |
2388 | 1835 | 1590 | ||
2389 | 1836 | 1591 | ||
2390 | 1837 | # Extracts an Xtrabackup archive arc in dst_dir | ||
2391 | 1838 | # Inputs | ||
2392 | 1839 | # config - backup config | ||
2393 | 1840 | # arc - file name with archive to extract | ||
2394 | 1841 | # dst_dir - local destination directory | ||
2395 | 1842 | # Returns | ||
2396 | 1843 | # True - if archive is successfully extracted | ||
2397 | 1844 | # False - if error happened | ||
2398 | 1845 | |||
2399 | 1846 | def extract_archive(config, arc, dst_dir): | ||
2400 | 1847 | global logger | ||
2401 | 1848 | global ssh_private_key | ||
2402 | 1849 | global ssh_port | ||
2403 | 1850 | |||
2404 | 1851 | logger.info("Extracting %s in %s" % (arc, dst_dir)) | ||
2405 | 1852 | try: | ||
2406 | 1853 | ssh_cmd = ["ssh", "-oStrictHostKeyChecking=no", "-i", ssh_private_key, "-p", str(ssh_port)] | ||
2407 | 1854 | ssh_cmd.append(config["username"] + "@" + config["dst_ip"]) | ||
2408 | 1855 | ssh_cmd.append("/bin/cat %s" % arc) | ||
2409 | 1856 | |||
2410 | 1857 | gpg_cmd = ["gpg", "--decrypt"] | ||
2411 | 1858 | |||
2412 | 1859 | xb_cmd = ["xbstream", "-x"] | ||
2413 | 1860 | |||
2414 | 1861 | xb_err = open('/tmp/twindb.xb.err', "w+") | ||
2415 | 1862 | gpg_err = open('/tmp/twindb.gpg.err', "w+") | ||
2416 | 1863 | ssh_err = open('/tmp/twindb.ssh.err', "w+") | ||
2417 | 1864 | logger.info("Starting: %r" % ssh_cmd) | ||
2418 | 1865 | p1 = subprocess.Popen(ssh_cmd, stdout=subprocess.PIPE, stderr=ssh_err) | ||
2419 | 1866 | logger.info("Starting: %r" % gpg_cmd) | ||
2420 | 1867 | p2 = subprocess.Popen(gpg_cmd, stdin=p1.stdout, stdout=subprocess.PIPE, stderr=gpg_err) | ||
2421 | 1868 | logger.info("Starting: %r" % xb_cmd) | ||
2422 | 1869 | p3 = subprocess.Popen(xb_cmd, stdin=p2.stdout, stdout=subprocess.PIPE, stderr=mysql_err, cwd=dst_dir) | ||
2423 | 1870 | p1.stdout.close() | ||
2424 | 1871 | p2.stdout.close() | ||
2425 | 1872 | p3.wait() | ||
2426 | 1873 | xb_err.seek(0) | ||
2427 | 1874 | gpg_err.seek(0) | ||
2428 | 1875 | ssh_err.seek(0) | ||
2429 | 1876 | logger.info("SSH stderr: " + ssh_err.read()) | ||
2430 | 1877 | logger.info("GPG stderr: " + gpg_err.read()) | ||
2431 | 1878 | logger.info("xbstream stderr: " + xb_err.read()) | ||
2432 | 1879 | if p3.returncode != 0: | ||
2433 | 1880 | logger.info("Failed to extract backup %s into %s" % (arc, dst_dir)) | ||
2434 | 1881 | return False | ||
2435 | 1882 | except: | ||
2436 | 1883 | logger.info("Failed to extract backup %s into %s" % (arc, dst_dir)) | ||
2437 | 1884 | finally: | ||
2438 | 1885 | todelete = ['/tmp/twindb.xb.err', '/tmp/twindb.gpg.err', '/tmp/twindb.ssh.err'] | ||
2439 | 1886 | for f in todelete: | ||
2440 | 1887 | if os.path.isfile(f): | ||
2441 | 1888 | os.remove(f) | ||
2442 | 1889 | logger.info("Extracted successfully %s in %s" % (arc, dst_dir)) | ||
2443 | 1890 | return True | ||
2444 | 1891 | |||
2445 | 1892 | |||
2446 | 1893 | # Restores backup copy with XtraBackup | ||
2447 | 1894 | # Inputs | ||
2448 | 1895 | # config - backup config | ||
2449 | 1896 | # job - job order | ||
2450 | 1897 | # Returns | ||
2451 | 1898 | # True - if backup successfully restored | ||
2452 | 1899 | # False - if restore job failed | ||
2453 | 1900 | |||
2454 | 1901 | def restore_xtrabackup(config, job): | 1592 | def restore_xtrabackup(config, job): |
2462 | 1902 | global logger | 1593 | """ |
2463 | 1903 | global ssh_private_key | 1594 | # Restores backup copy with XtraBackup |
2464 | 1904 | global ssh_port | 1595 | # Inputs |
2465 | 1905 | 1596 | # config - backup config | |
2466 | 1906 | backup_name = job["restore_backup_copy"] | 1597 | # job - job order |
2467 | 1907 | backup_name_base = os.path.splitext(backup_name) | 1598 | # Returns |
2468 | 1908 | dst_dir = job["restore_dir"] | 1599 | # True - if backup successfully restored |
2469 | 1600 | # False - if restore job failed | ||
2470 | 1601 | :param config: backup config received from the dispatcher | ||
2471 | 1602 | :param job: job order | ||
2472 | 1603 | :return: | ||
2473 | 1604 | """ | ||
2474 | 1605 | if "params" not in job: | ||
2475 | 1606 | logger.error("There are no params in the job order") | ||
2476 | 1607 | return -1 | ||
2477 | 1608 | # Check that job order has all required parameters | ||
2478 | 1609 | mandatory_params = ["backup_copy_id", "restore_dir", "server_id"] | ||
2479 | 1610 | for param in mandatory_params: | ||
2480 | 1611 | if param not in job["params"]: | ||
2481 | 1612 | logger.error("There is no %s in the job order" % param) | ||
2482 | 1613 | return -1 | ||
2483 | 1614 | dst_dir = job["params"]["restore_dir"] | ||
2484 | 1909 | try: | 1615 | try: |
2485 | 1910 | if os.path.isdir(dst_dir): | 1616 | if os.path.isdir(dst_dir): |
2486 | 1911 | if is_dir_empty(dst_dir): | 1617 | if is_dir_empty(dst_dir): |
2487 | 1912 | logger.info("Directory %s exists. But it's empty, so we can restore backup here" % dst_dir) | 1618 | logger.info("Directory %s exists. But it's empty, so we can restore backup here" % dst_dir) |
2488 | 1913 | else: | 1619 | else: |
2489 | 1914 | logger.error("Directory %s exists and isn't empty, so we can not restore backup here" % dst_dir) | 1620 | logger.error("Directory %s exists and isn't empty, so we can not restore backup here" % dst_dir) |
2491 | 1915 | return False | 1621 | return -1 |
2492 | 1916 | else: | 1622 | else: |
2493 | 1917 | os.makedirs(dst_dir) | 1623 | os.makedirs(dst_dir) |
2495 | 1918 | except: | 1624 | except IOError as err: |
2496 | 1625 | logger.error(err) | ||
2497 | 1919 | logger.error("Can't use directory %s as destination for backup" % dst_dir) | 1626 | logger.error("Can't use directory %s as destination for backup" % dst_dir) |
2502 | 1920 | logger.error(traceback.format_exc()) | 1627 | return -1 |
2503 | 1921 | return False | 1628 | |
2504 | 1922 | full_copy = get_full_of(backup_name) | 1629 | full_copy = get_full_of(job["params"]["backup_copy_id"]) |
2505 | 1923 | logger.info("Restoring backup %s in %s" % (backup_name, dst_dir)) | 1630 | full_copy_id = int(full_copy["backup_copy_id"]) |
2506 | 1631 | if not full_copy: | ||
2507 | 1632 | logger.error("Failed to get full copy parameters") | ||
2508 | 1633 | return -1 | ||
2509 | 1634 | logger.info("Restoring full copy %s in %s" % (full_copy["name"], dst_dir)) | ||
2510 | 1924 | try: | 1635 | try: |
2512 | 1925 | extract_archive(config, full_copy, dst_dir) | 1636 | if not extract_archive(config, full_copy, dst_dir): |
2513 | 1637 | raise JobError("Failed to extract %s" % full_copy["name"]) | ||
2514 | 1926 | # We restored last full backup in dst_dir | 1638 | # We restored last full backup in dst_dir |
2517 | 1927 | xb_err = open('/tmp/twindb.xb.err', "w+") | 1639 | try: |
2518 | 1928 | if full_copy == backup_name: | 1640 | xb_err = open('/tmp/twindb.xb.err', "w+") |
2519 | 1641 | except IOError as err: | ||
2520 | 1642 | raise JobError("Failed to open /tmp/twindb.xb.err: %s" % err) | ||
2521 | 1643 | if full_copy["backup_copy_id"] == job["params"]["backup_copy_id"]: | ||
2522 | 1929 | xb_cmd = ["innobackupex", "--apply-log", dst_dir] | 1644 | xb_cmd = ["innobackupex", "--apply-log", dst_dir] |
2523 | 1930 | else: | 1645 | else: |
2524 | 1931 | xb_cmd = ["innobackupex", "--apply-log", "--redo-only", dst_dir] | 1646 | xb_cmd = ["innobackupex", "--apply-log", "--redo-only", dst_dir] |
2530 | 1932 | p_xb = subprocess.Popen(xb_cmd, stdout=xb_err, stderr=xb_err) | 1647 | try: |
2531 | 1933 | p_xb.wait() | 1648 | p_xb = subprocess.Popen(xb_cmd, stdout=xb_err, stderr=xb_err) |
2532 | 1934 | xb_err.seek(0) | 1649 | p_xb.communicate() |
2533 | 1935 | logger.info("innobackupex stderr: " + xb_err.read()) | 1650 | except OSError as err: |
2534 | 1936 | os.remove('/tmp/twindb.xb.err') | 1651 | raise JobError("Failed to run %r: %s" % (xb_cmd, err)) |
2535 | 1652 | finally: | ||
2536 | 1653 | xb_err.seek(0) | ||
2537 | 1654 | logger.info("innobackupex stderr: " + xb_err.read()) | ||
2538 | 1655 | os.remove('/tmp/twindb.xb.err') | ||
2539 | 1937 | if p_xb.returncode != 0: | 1656 | if p_xb.returncode != 0: |
2542 | 1938 | logger.info("Failed to apply log on full copy %s" % (full_copy)) | 1657 | raise JobError("Failed to apply log on full copy %s" % full_copy["name"]) |
2541 | 1939 | return False | ||
2543 | 1940 | # if we restore full backup return now | 1658 | # if we restore full backup return now |
2547 | 1941 | if full_copy == backup_name: | 1659 | if full_copy_id == job["params"]["backup_copy_id"]: |
2548 | 1942 | logger.info("Successfully restored backup %s in %s" % (backup_name, dst_dir)) | 1660 | logger.info("Successfully restored backup %s in %s" % (full_copy["name"], dst_dir)) |
2549 | 1943 | return True | 1661 | return 0 |
2550 | 1944 | # Now copy all incremental copies and apply then on dst_dir | 1662 | # Now copy all incremental copies and apply then on dst_dir |
2552 | 1945 | inc_copy = full_copy | 1663 | backup_copy = full_copy |
2553 | 1664 | backup_copy_id = full_copy_id | ||
2554 | 1946 | while True: | 1665 | while True: |
2556 | 1947 | inc_copy = get_child_of(full_copy) | 1666 | backup_copy = get_child_of(backup_copy_id) |
2557 | 1667 | if not backup_copy: | ||
2558 | 1668 | raise JobError("Failed to get a child copy of backup_copy_id %d" % backup_copy_id) | ||
2559 | 1669 | backup_copy_id = int(backup_copy["backup_copy_id"]) | ||
2560 | 1948 | inc_dir = tempfile.mkdtemp() | 1670 | inc_dir = tempfile.mkdtemp() |
2567 | 1949 | extract_archive(config, inc_copy, inc_dir) | 1671 | if not extract_archive(config, backup_copy, inc_dir): |
2568 | 1950 | 1672 | raise JobError("Failed to extract %s in %s" % (backup_copy["name"], inc_dir)) | |
2569 | 1951 | xb_err = open('/tmp/twindb.xb.err', "w+") | 1673 | try: |
2570 | 1952 | xb_cmd = ["innobackupex", '--apply-log'] | 1674 | xb_err = open("/tmp/twindb.xb.err", "w+") |
2571 | 1953 | if inc_copy != backup_name: | 1675 | except IOError as err: |
2572 | 1954 | xb_cmd.append('--redo-only') | 1676 | raise JobError("Failed to open /tmp/twindb.xb.err: %s" % err) |
2573 | 1677 | xb_cmd = ["innobackupex", "--apply-log"] | ||
2574 | 1678 | if backup_copy_id != job["params"]["backup_copy_id"]: | ||
2575 | 1679 | xb_cmd.append("--redo-only") | ||
2576 | 1955 | xb_cmd.append('--incremental-dir=%s' % inc_dir) | 1680 | xb_cmd.append('--incremental-dir=%s' % inc_dir) |
2577 | 1956 | xb_cmd.append(dst_dir) | 1681 | xb_cmd.append(dst_dir) |
2583 | 1957 | p_xb = subprocess.Popen(xb_cmd, stdout=xb_err, stderr=xb_err) | 1682 | try: |
2584 | 1958 | p_xb.wait() | 1683 | p_xb = subprocess.Popen(xb_cmd, stdout=xb_err, stderr=xb_err) |
2585 | 1959 | xb_err.seek(0) | 1684 | p_xb.wait() |
2586 | 1960 | logger.info("innobackupex stderr: " + xb_err.read()) | 1685 | except OSError as err: |
2587 | 1961 | os.remove('/tmp/twindb.xb.err') | 1686 | raise JobError("Failed to run %r: %s" % (xb_cmd, err)) |
2588 | 1687 | finally: | ||
2589 | 1688 | xb_err.seek(0) | ||
2590 | 1689 | logger.info("innobackupex stderr: " + xb_err.read()) | ||
2591 | 1690 | os.remove('/tmp/twindb.xb.err') | ||
2592 | 1962 | if p_xb.returncode != 0: | 1691 | if p_xb.returncode != 0: |
2597 | 1963 | logger.info("Failed to apply log on full copy %s" % (full_copy)) | 1692 | raise JobError("Failed to apply log on copy %s" % backup_copy["name"]) |
2598 | 1964 | return False | 1693 | shutil.rmtree(inc_dir) |
2599 | 1965 | os.removedirs(inc_dir) | 1694 | if backup_copy_id == job["params"]["backup_copy_id"]: |
2596 | 1966 | if inc_copy == backup_name: | ||
2600 | 1967 | break | 1695 | break |
2603 | 1968 | logger.info("Successfully restored backup %s in %s" % (backup_name, dst_dir)) | 1696 | logger.info("Successfully restored backup %s in %s" % (backup_copy["name"], dst_dir)) |
2604 | 1969 | except: | 1697 | except JobError as err: |
2605 | 1698 | logger.error(err) | ||
2606 | 1970 | logger.error("Failed to restore backup") | 1699 | logger.error("Failed to restore backup") |
2609 | 1971 | logger.error(traceback.format_exc()) | 1700 | return -1 |
2608 | 1972 | return False | ||
2610 | 1973 | finally: | 1701 | finally: |
2611 | 1974 | for f in ["/tmp/twindb.xb.err", "/tmp/twindb.gpg.err", "/tmp/twindb.ssh.err"]: | 1702 | for f in ["/tmp/twindb.xb.err", "/tmp/twindb.gpg.err", "/tmp/twindb.ssh.err"]: |
2612 | 1975 | if os.path.isfile(f): | 1703 | if os.path.isfile(f): |
2613 | 1976 | os.remove(f) | 1704 | os.remove(f) |
2669 | 1977 | return True | 1705 | return 0 |
2670 | 1978 | 1706 | ||
2671 | 1979 | 1707 | ||
2672 | 1980 | # Restores backup copy taken with mysqldump | 1708 | def extract_archive(config, arc, dst_dir): |
2673 | 1981 | # Inputs | 1709 | """ |
2674 | 1982 | # config - backup config | 1710 | Extracts an Xtrabackup archive arc in dst_dir |
2675 | 1983 | # job - job order | 1711 | :param config: backup config |
2676 | 1984 | # Returns | 1712 | :param arc: dictionary with archive to extract |
2677 | 1985 | # True - if backup successfully restored | 1713 | :param dst_dir: local destination directory |
2678 | 1986 | # False - if restore job failed | 1714 | :return: True - if archive is successfully extracted. |
2679 | 1987 | 1715 | False - if error happened | |
2680 | 1988 | def restore_mysqldump(config, job): | 1716 | """ |
2681 | 1989 | global logger | 1717 | mandatory_params = ["backup_copy_id", "name", "ip"] |
2682 | 1990 | global ssh_private_key | 1718 | for param in mandatory_params: |
2683 | 1991 | global ssh_port | 1719 | if param not in arc: |
2684 | 1992 | 1720 | logger.error("There is no %s in the archive parameters" % param) | |
2630 | 1993 | ret = True | ||
2631 | 1994 | backup_name = job["restore_backup_copy"] | ||
2632 | 1995 | backup_name_base = os.path.splitext(backup_name) | ||
2633 | 1996 | dst_dir = job["restore_dir"] | ||
2634 | 1997 | try: | ||
2635 | 1998 | if os.path.isdir(dst_dir): | ||
2636 | 1999 | if is_dir_empty(dst_dir): | ||
2637 | 2000 | logger.info("Directory %s exists. But it's empty, so we can restore backup here" % dst_dir) | ||
2638 | 2001 | else: | ||
2639 | 2002 | logger.error("Directory %s exists and isn't empty, so we can not restore backup here" % dst_dir) | ||
2640 | 2003 | return False | ||
2641 | 2004 | else: | ||
2642 | 2005 | os.makedirs(dst_dir) | ||
2643 | 2006 | except: | ||
2644 | 2007 | logger.error("Can't use directory %s as destination for backup" % dst_dir) | ||
2645 | 2008 | logger.error(traceback.format_exc()) | ||
2646 | 2009 | return False | ||
2647 | 2010 | full_copy = get_full_of(backup_name) | ||
2648 | 2011 | # Init mysql datadir | ||
2649 | 2012 | logger.info("Initializing MySQL datadir in %s" % dst_dir) | ||
2650 | 2013 | try: | ||
2651 | 2014 | mysql_socket = dst_dir + "/twindb.sock" | ||
2652 | 2015 | mysql_pid = dst_dir + "/mysqld.pid" | ||
2653 | 2016 | mysql_install_db_cmd = ["mysql_install_db", "--no-defaults"] | ||
2654 | 2017 | mysql_install_db_cmd.append("--datadir=%s" % dst_dir) | ||
2655 | 2018 | mysql_install_db_cmd.append("--user=root") | ||
2656 | 2019 | mysql_install_db_cmd.append("--skip-networking") | ||
2657 | 2020 | mysql_install_db_cmd.append("--log-error=/dev/stdout") | ||
2658 | 2021 | mysql_install_db_cmd.append("--pid-file=%s" % mysql_pid) | ||
2659 | 2022 | mysql_install_db_cmd.append("--socket=%s" % mysql_socket) | ||
2660 | 2023 | p = subprocess.Popen(mysql_install_db_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) | ||
2661 | 2024 | out, err = p.communicate() | ||
2662 | 2025 | ret = p.returncode | ||
2663 | 2026 | del p | ||
2664 | 2027 | if ret != 0: | ||
2665 | 2028 | logger.error("Couldn't init MySQL datadir in " + dst_dir) | ||
2666 | 2029 | logger.error("Command: %r" % mysql_install_db_cmd) | ||
2667 | 2030 | logger.error("STDOUT: %s" % out) | ||
2668 | 2031 | logger.error("STDERR: %s" % err) | ||
2685 | 2032 | return False | 1721 | return False |
2787 | 2033 | else: | 1722 | logger.info("Extracting %s in %s" % (arc["name"], dst_dir)) |
2788 | 2034 | logger.info("MySQL datadir in %s is successfully created" % dst_dir) | 1723 | ssh_cmd = [ |
2789 | 2035 | except: | 1724 | "ssh", |
2790 | 2036 | logger.error("Can't init MySQL datadir in " + dst_dir) | 1725 | "-oStrictHostKeyChecking=no", |
2791 | 2037 | logger.error(traceback.format_exc()) | 1726 | "-i", ssh_private_key_file, |
2792 | 2038 | return False | 1727 | "-p", str(ssh_port), |
2793 | 2039 | # Start MySQL instance | 1728 | "user_id_%s@%s" % (config["user_id"], arc["ip"]), |
2794 | 2040 | logger.info("Starting MySQL instance in %s" % dst_dir) | 1729 | "/bin/cat %s" % arc["name"] |
2795 | 2041 | try: | 1730 | ] |
2796 | 2042 | mysqld_err = open('/tmp/twindb.mysqld.err', "w+") | 1731 | gpg_cmd = ["gpg", "--decrypt"] |
2797 | 2043 | mysqld_cmd = ["mysqld", "--no-defaults"] | 1732 | xb_cmd = ["xbstream", "-x"] |
2798 | 2044 | mysqld_cmd.append("--datadir=%s" % dst_dir) | 1733 | |
2799 | 2045 | mysqld_cmd.append("--user=root") | 1734 | desc_file = None |
2800 | 2046 | mysqld_cmd.append("--skip-networking") | 1735 | try: |
2801 | 2047 | mysqld_cmd.append("--skip-grant-tables") | 1736 | err_desc = dict() |
2802 | 2048 | mysqld_cmd.append("--log-error=/dev/stdout") | 1737 | for desc in ["xb", "gpg", "ssh"]: |
2803 | 2049 | mysqld_cmd.append("--pid-file=%s" % mysql_pid) | 1738 | desc_file = ("/tmp/twindb.%s.err" % desc) |
2804 | 2050 | mysqld_cmd.append("--socket=%s" % mysql_socket) | 1739 | err_desc[desc] = open(desc_file, "w+") |
2805 | 2051 | mysqld_proc = subprocess.Popen(mysqld_cmd, stdout=mysqld_err, stderr=mysqld_err) | 1740 | except IOError as err: |
2806 | 2052 | start_timeout = 5 | 1741 | logger.error("Failed to open %s: %s" % (desc_file, err)) |
2807 | 2053 | while start_timeout >= 0: | 1742 | return False |
2808 | 2054 | if start_timeout == 0: | 1743 | |
2809 | 2055 | logger.error("Can't start MySQL instance in " + dst_dir) | 1744 | logger.info("Starting: %r" % ssh_cmd) |
2810 | 2056 | try: | 1745 | p1 = subprocess.Popen(ssh_cmd, stdout=subprocess.PIPE, stderr=err_desc["ssh"]) |
2811 | 2057 | mysqld_proc.terminate() | 1746 | logger.info("Starting: %r" % gpg_cmd) |
2812 | 2058 | except: | 1747 | p2 = subprocess.Popen(gpg_cmd, stdin=p1.stdout, stdout=subprocess.PIPE, |
2813 | 2059 | logger.info("MySQL process is already terminated") | 1748 | stderr=err_desc["gpg"]) |
2814 | 2060 | finally: | 1749 | logger.info("Starting: %r" % xb_cmd) |
2815 | 2061 | if os.path.isfile("/tmp/twindb.mysqld.err"): | 1750 | p3 = subprocess.Popen(xb_cmd, stdin=p2.stdout, stdout=subprocess.PIPE, |
2816 | 2062 | os.remove("/tmp/twindb.mysqld.err") | 1751 | stderr=err_desc["xb"], cwd=dst_dir) |
2817 | 2063 | mysqld_err.seek(0) | 1752 | p1.wait() |
2818 | 2064 | logger.error("mysqld output: " + mysqld_err.read()) | 1753 | p2.wait() |
2819 | 2065 | mysqld_err.close() | 1754 | p3.wait() |
2820 | 2066 | return False | 1755 | for desc in ["xb", "gpg", "ssh"]: |
2821 | 2067 | try: | 1756 | err_desc[desc].seek(0) |
2822 | 2068 | con = MySQLdb.connect(unix_socket=mysql_socket) | 1757 | |
2823 | 2069 | logger.info("Started MySQL instance in %s successfully" % dst_dir) | 1758 | logger.info("SSH stderr: " + err_desc["ssh"].read()) |
2824 | 2070 | start_timeout = -1 | 1759 | logger.info("GPG stderr: " + err_desc["gpg"].read()) |
2825 | 2071 | del con | 1760 | logger.info("xbstream stderr: " + err_desc["xb"].read()) |
2826 | 2072 | except: | 1761 | if p1.returncode != 0 or p2.returncode != 0 or p3.returncode != 0: |
2827 | 2073 | start_timeout -= 1 | 1762 | logger.info("Failed to extract backup %s in %s" % (arc["name"], dst_dir)) |
2828 | 2074 | time.sleep(1) | 1763 | return False |
2829 | 2075 | except: | 1764 | desc_file = None |
2830 | 2076 | mysqld_err.seek(0) | 1765 | try: |
2831 | 2077 | logger.error("Can't start MySQL instance in " + dst_dir) | 1766 | for desc in ["xb", "gpg", "ssh"]: |
2832 | 2078 | logger.error("mysqld output: " + mysqld_err.read()) | 1767 | desc_file = ("/tmp/twindb.%s.err" % desc) |
2833 | 2079 | logger.error(traceback.format_exc()) | 1768 | if os.path.isfile(desc_file): |
2834 | 2080 | mysqld_err.close() | 1769 | os.remove(desc_file) |
2835 | 2081 | try: | 1770 | except IOError as err: |
2836 | 2082 | mysqld_proc.terminate() | 1771 | logger.error("Failed to open %s: %s" % (desc_file, err)) |
2837 | 2083 | except: | 1772 | logger.info("Extracted successfully %s in %s" % (arc["name"], dst_dir)) |
2737 | 2084 | logger.info("MySQL process is already terminated") | ||
2738 | 2085 | finally: | ||
2739 | 2086 | if os.path.isfile("/tmp/twindb.mysqld.err"): | ||
2740 | 2087 | os.remove("/tmp/twindb.mysqld.err") | ||
2741 | 2088 | return False | ||
2742 | 2089 | # Copy, decrypt and restore a dump | ||
2743 | 2090 | logger.info("Restoring backup %s in %s" % (backup_name, dst_dir)) | ||
2744 | 2091 | try: | ||
2745 | 2092 | ssh_cmd = ["ssh", "-oStrictHostKeyChecking=no", "-i", ssh_private_key, "-p", str(ssh_port)] | ||
2746 | 2093 | ssh_cmd.append(config["username"] + "@" + config["dst_ip"]) | ||
2747 | 2094 | ssh_cmd.append("/bin/cat %s" % backup_name) | ||
2748 | 2095 | |||
2749 | 2096 | gpg_cmd = ["gpg", "--decrypt"] | ||
2750 | 2097 | |||
2751 | 2098 | mysql_cmd = ["mysql", "-S", mysql_socket] | ||
2752 | 2099 | |||
2753 | 2100 | mysql_err = open('/tmp/twindb.mysql.err', "w+") | ||
2754 | 2101 | gpg_err = open('/tmp/twindb.gpg.err', "w+") | ||
2755 | 2102 | ssh_err = open('/tmp/twindb.ssh.err', "w+") | ||
2756 | 2103 | logger.info("Starting: %r" % ssh_cmd) | ||
2757 | 2104 | p1 = subprocess.Popen(ssh_cmd, stdout=subprocess.PIPE, stderr=ssh_err) | ||
2758 | 2105 | logger.info("Starting: %r" % gpg_cmd) | ||
2759 | 2106 | p2 = subprocess.Popen(gpg_cmd, stdin=p1.stdout, stdout=subprocess.PIPE, stderr=gpg_err) | ||
2760 | 2107 | logger.info("Starting: %r" % mysql_cmd) | ||
2761 | 2108 | p3 = subprocess.Popen(mysql_cmd, stdin=p2.stdout, stdout=subprocess.PIPE, stderr=mysql_err) | ||
2762 | 2109 | p1.stdout.close() | ||
2763 | 2110 | p2.stdout.close() | ||
2764 | 2111 | out3 = p3.communicate()[0] | ||
2765 | 2112 | mysql_err.seek(0) | ||
2766 | 2113 | gpg_err.seek(0) | ||
2767 | 2114 | ssh_err.seek(0) | ||
2768 | 2115 | logger.info("SSH stderr: " + ssh_err.read()) | ||
2769 | 2116 | logger.info("GPG stderr: " + gpg_err.read()) | ||
2770 | 2117 | logger.info("mysql stderr: " + mysql_err.read()) | ||
2771 | 2118 | logger.info("Successfully restored backup %s in %s" % (backup_name, dst_dir)) | ||
2772 | 2119 | ret = p3.returncode | ||
2773 | 2120 | # Stop MySQL | ||
2774 | 2121 | mysqld_proc.terminate() | ||
2775 | 2122 | except: | ||
2776 | 2123 | logger.error("Failed to restore backup") | ||
2777 | 2124 | logger.error(traceback.format_exc()) | ||
2778 | 2125 | return False | ||
2779 | 2126 | finally: | ||
2780 | 2127 | for f in ["/tmp/twindb.mysql.err", "/tmp/twindb.gpg.err", "/tmp/twindb.ssh.err"]: | ||
2781 | 2128 | if os.path.isfile(f): | ||
2782 | 2129 | os.remove(f) | ||
2783 | 2130 | try: | ||
2784 | 2131 | mysqld_proc.terminate() | ||
2785 | 2132 | except: | ||
2786 | 2133 | logger.info("MySQL process is already terminated") | ||
2838 | 2134 | return True | 1773 | return True |
2839 | 2135 | 1774 | ||
2840 | 2136 | 1775 | ||
2844 | 2137 | # Finds LSN in XtraBackup output | 1776 | def get_full_of(backup_copy_id): |
2845 | 2138 | 1777 | """ | |
2846 | 2139 | def grep_lsn(str): | 1778 | Gets full copy of a given backup_copy_id |
2847 | 1779 | :param backup_copy_id: backup_copy_id | ||
2848 | 1780 | :return: dictionary with full backup copy parameters: | ||
2849 | 1781 | { | ||
2850 | 1782 | "backup_copy_id": 188, | ||
2851 | 1783 | "name": "server_id_479a41b3-d22d-41a8-b7d3-4e40302622f6_2015-04-06T15:10:46.050984.tar.gp", | ||
2852 | 1784 | "ip": "127.0.0.1" | ||
2853 | 1785 | } | ||
2854 | 1786 | """ | ||
2855 | 1787 | logger.debug("Getting full copy parameters of backup_copy_id %d" % backup_copy_id) | ||
2856 | 1788 | response_body = "Empty response" | ||
2857 | 1789 | full_copy = None | ||
2858 | 1790 | try: | ||
2859 | 1791 | data = { | ||
2860 | 1792 | "type": "get_full_copy_params", | ||
2861 | 1793 | "params": { | ||
2862 | 1794 | "backup_copy_id": backup_copy_id | ||
2863 | 1795 | } | ||
2864 | 1796 | } | ||
2865 | 1797 | response_body = get_response(data) | ||
2866 | 1798 | if not response_body: | ||
2867 | 1799 | logger.debug("Empty response from dispatcher") | ||
2868 | 1800 | return None | ||
2869 | 1801 | d = json.JSONDecoder() | ||
2870 | 1802 | response_body_decoded = d.decode(response_body) | ||
2871 | 1803 | if response_body_decoded: | ||
2872 | 1804 | msg_decrypted = decrypt(response_body_decoded["response"]) | ||
2873 | 1805 | msg_pt = d.decode(msg_decrypted) | ||
2874 | 1806 | full_copy = msg_pt["data"] | ||
2875 | 1807 | logger.info("Got full copy params:\n%s" | ||
2876 | 1808 | % json.dumps(full_copy, indent=4, sort_keys=True)) | ||
2877 | 1809 | if msg_pt["error"]: | ||
2878 | 1810 | logger.error(msg_pt["error"]) | ||
2879 | 1811 | except exceptions.KeyError as err: | ||
2880 | 1812 | logger.error("Failed to decode %s" % response_body) | ||
2881 | 1813 | logger.error(err) | ||
2882 | 1814 | return None | ||
2883 | 1815 | return full_copy | ||
2884 | 1816 | |||
2885 | 1817 | |||
2886 | 1818 | def get_child_of(backup_copy_id): | ||
2887 | 1819 | """ | ||
2888 | 1820 | Returns a child copy params of a given backup copy | ||
2889 | 1821 | :param backup_copy_id: | ||
2890 | 1822 | :return: | ||
2891 | 1823 | """ | ||
2892 | 1824 | logger.debug("Getting child copy parameters of backup_copy_id %d" % backup_copy_id) | ||
2893 | 1825 | response_body = "Empty response" | ||
2894 | 1826 | child_copy = None | ||
2895 | 1827 | try: | ||
2896 | 1828 | data = { | ||
2897 | 1829 | "type": "get_child_copy_params", | ||
2898 | 1830 | "params": { | ||
2899 | 1831 | "backup_copy_id": backup_copy_id | ||
2900 | 1832 | } | ||
2901 | 1833 | } | ||
2902 | 1834 | response_body = get_response(data) | ||
2903 | 1835 | if not response_body: | ||
2904 | 1836 | return None | ||
2905 | 1837 | d = json.JSONDecoder() | ||
2906 | 1838 | response_body_decoded = d.decode(response_body) | ||
2907 | 1839 | if response_body_decoded: | ||
2908 | 1840 | msg_decrypted = decrypt(response_body_decoded["response"]) | ||
2909 | 1841 | msg_pt = d.decode(msg_decrypted) | ||
2910 | 1842 | child_copy = msg_pt["data"] | ||
2911 | 1843 | logger.info("Got child copy params:\n%s" | ||
2912 | 1844 | % json.dumps(child_copy, indent=4, sort_keys=True)) | ||
2913 | 1845 | if msg_pt["error"]: | ||
2914 | 1846 | logger.error(msg_pt["error"]) | ||
2915 | 1847 | except exceptions.KeyError as err: | ||
2916 | 1848 | logger.error("Failed to decode %s" % response_body) | ||
2917 | 1849 | logger.error(err) | ||
2918 | 1850 | return None | ||
2919 | 1851 | return child_copy | ||
2920 | 1852 | |||
2921 | 1853 | |||
2922 | 1854 | def grep_lsn(output): | ||
2923 | 1855 | """ | ||
2924 | 1856 | Finds LSN in XtraBackup output | ||
2925 | 1857 | :param output: string with Xtrabackup output | ||
2926 | 1858 | :return: LSN | ||
2927 | 1859 | """ | ||
2928 | 2140 | lsn = None | 1860 | lsn = None |
2930 | 2141 | for line in str.split("\n"): | 1861 | for line in output.split("\n"): |
2931 | 2142 | if line.startswith("xtrabackup: The latest check point (for incremental):"): | 1862 | if line.startswith("xtrabackup: The latest check point (for incremental):"): |
2932 | 2143 | lsn = line.split("'")[1] | 1863 | lsn = line.split("'")[1] |
2933 | 2144 | return lsn | 1864 | return lsn |
2934 | 2145 | 1865 | ||
2935 | 2146 | 1866 | ||
2936 | 2147 | # Finds MySQL socket | ||
2937 | 2148 | def get_unix_socket(): | 1867 | def get_unix_socket(): |
2939 | 2149 | socket = "" | 1868 | """ |
2940 | 1869 | Finds MySQL socket | ||
2941 | 1870 | :return: path to unix socket or None if not found | ||
2942 | 1871 | """ | ||
2943 | 1872 | cmd = ["lsof", "-U", "-c", "/^mysqld$/", "-a", "-F", "n"] | ||
2944 | 2150 | try: | 1873 | try: |
2945 | 2151 | cmd = [ | ||
2946 | 2152 | "lsof", "-U", "-c", "/^mysqld$/", "-a", "-F", "n" | ||
2947 | 2153 | ] | ||
2948 | 2154 | p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) | 1874 | p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) |
2949 | 2155 | cout, cerr = p.communicate() | 1875 | cout, cerr = p.communicate() |
2950 | 2156 | # Outputs socket in format | 1876 | # Outputs socket in format |
2951 | 2157 | # # lsof -U -c mysqld -a -F n | 1877 | # # lsof -U -c mysqld -a -F n |
2952 | 2158 | # p11029 | 1878 | # p11029 |
2953 | 2159 | # n/var/lib/mysql/mysql.sock | 1879 | # n/var/lib/mysql/mysql.sock |
2956 | 2160 | socket = cout.split()[1][1:] | 1880 | mysql_socket = cout.split()[1][1:] |
2957 | 2161 | if not os.path.exists(socket): | 1881 | if not os.path.exists(mysql_socket): |
2958 | 2162 | return None | 1882 | return None |
2960 | 2163 | except: | 1883 | except OSError as err: |
2961 | 1884 | logger.error("Failed to run command %r. %s" % (cmd, err)) | ||
2962 | 2164 | return None | 1885 | return None |
2965 | 2165 | 1886 | return mysql_socket | |
2964 | 2166 | return socket | ||
2966 | 2167 | 1887 | ||
2967 | 2168 | 1888 | ||
2968 | 2169 | def pid_exists(pid): | 1889 | def pid_exists(pid): |
2970 | 2170 | """Check whether pid exists in the current process table.""" | 1890 | """ |
2971 | 1891 | Checks whether pid exists in the current process table. | ||
2972 | 1892 | """ | ||
2973 | 2171 | if pid < 0: | 1893 | if pid < 0: |
2974 | 2172 | return False | 1894 | return False |
2975 | 2173 | try: | 1895 | try: |
2976 | 2174 | os.kill(pid, 0) | 1896 | os.kill(pid, 0) |
2978 | 2175 | except OSError, e: | 1897 | except OSError as e: |
2979 | 2176 | return e.errno == errno.EPERM | 1898 | return e.errno == errno.EPERM |
2980 | 2177 | else: | 1899 | else: |
2981 | 2178 | return True | 1900 | return True |
2982 | @@ -2194,12 +1916,12 @@ | |||
2983 | 2194 | Raise TimeoutExpired on timeout expired (if specified). | 1916 | Raise TimeoutExpired on timeout expired (if specified). |
2984 | 2195 | """ | 1917 | """ |
2985 | 2196 | 1918 | ||
2987 | 2197 | def check_timeout(delay): | 1919 | def check_timeout(timeout_delay): |
2988 | 2198 | if timeout is not None: | 1920 | if timeout is not None: |
2989 | 2199 | if time.time() >= stop_at: | 1921 | if time.time() >= stop_at: |
2990 | 2200 | raise TimeoutExpired | 1922 | raise TimeoutExpired |
2993 | 2201 | time.sleep(delay) | 1923 | time.sleep(timeout_delay) |
2994 | 2202 | return min(delay * 2, 0.04) | 1924 | return min(timeout_delay * 2, 0.04) |
2995 | 2203 | 1925 | ||
2996 | 2204 | if timeout is not None: | 1926 | if timeout is not None: |
2997 | 2205 | waitcall = lambda: os.waitpid(pid, os.WNOHANG) | 1927 | waitcall = lambda: os.waitpid(pid, os.WNOHANG) |
2998 | @@ -2247,100 +1969,155 @@ | |||
2999 | 2247 | raise RuntimeError("unknown process exit status") | 1969 | raise RuntimeError("unknown process exit status") |
3000 | 2248 | 1970 | ||
3001 | 2249 | 1971 | ||
3002 | 2250 | # Removes pid file | ||
3003 | 2251 | # Exits if error happened | ||
3004 | 2252 | |||
3005 | 2253 | def remove_pid(): | 1972 | def remove_pid(): |
3009 | 2254 | global pid_file | 1973 | """ |
3010 | 2255 | try: | 1974 | Removes pid file |
3011 | 2256 | if os.path.isfile(pid_file): | 1975 | :return: nothing |
3012 | 1976 | Exits if error happened | ||
3013 | 1977 | """ | ||
3014 | 1978 | if os.path.isfile(pid_file): | ||
3015 | 1979 | try: | ||
3016 | 2257 | os.remove(pid_file) | 1980 | os.remove(pid_file) |
3023 | 2258 | except: | 1981 | except IOError as err: |
3024 | 2259 | logger.error(traceback.format_exc()) | 1982 | exit_on_error("Failed to remove file %s. %s" % (pid_file, err)) |
3025 | 2260 | sys.exit(2) | 1983 | |
3026 | 2261 | 1984 | ||
3027 | 2262 | 1985 | def check_pid(): | |
3028 | 2263 | # Cleans up when TwinDB agent exists | 1986 | """ |
3029 | 1987 | Checks if pid file already exists. | ||
3030 | 1988 | If it does it detects whether twindb agent is already running. | ||
3031 | 1989 | If the pid file is stale it removes it. | ||
3032 | 1990 | :return: True if pid file doesn't exist or was stale and it was removed. | ||
3033 | 1991 | False if twindb agent is running or error happened | ||
3034 | 1992 | """ | ||
3035 | 1993 | if os.path.isfile(pid_file): | ||
3036 | 1994 | pid = read_pid() | ||
3037 | 1995 | if pid_exists(pid): | ||
3038 | 1996 | try: | ||
3039 | 1997 | f = open("/proc/%d/cmdline" % pid, "r") | ||
3040 | 1998 | cmdline = f.readline() | ||
3041 | 1999 | f.close() | ||
3042 | 2000 | if "twindb.py" in cmdline: | ||
3043 | 2001 | # The process is a live twindb agent | ||
3044 | 2002 | return False | ||
3045 | 2003 | else: | ||
3046 | 2004 | # It's some other process, not a twindb agent | ||
3047 | 2005 | return True | ||
3048 | 2006 | except IOError as err: | ||
3049 | 2007 | logger.error("Can't read from file /proc/%d/cmdline:%s " % (pid, err)) | ||
3050 | 2008 | return False | ||
3051 | 2009 | else: | ||
3052 | 2010 | try: | ||
3053 | 2011 | os.remove(pid_file) | ||
3054 | 2012 | # It's a stale pid file | ||
3055 | 2013 | return True | ||
3056 | 2014 | except IOError as err: | ||
3057 | 2015 | logger.error("Can't remove file %s: %s" % (pid_file, err)) | ||
3058 | 2016 | return False | ||
3059 | 2017 | else: | ||
3060 | 2018 | # pid file doesn't exist | ||
3061 | 2019 | return True | ||
3062 | 2020 | |||
3063 | 2021 | |||
3064 | 2022 | def read_pid(): | ||
3065 | 2023 | """ | ||
3066 | 2024 | Read pid from pid_file | ||
3067 | 2025 | :return: pid or zero if pid file doesn't exist | ||
3068 | 2026 | """ | ||
3069 | 2027 | try: | ||
3070 | 2028 | f = open(pid_file, 'r') | ||
3071 | 2029 | pid = int(f.readline()) | ||
3072 | 2030 | f.close() | ||
3073 | 2031 | except IOError as err: | ||
3074 | 2032 | logger.error("Couldn't read from %s: %s" % (pid_file, err)) | ||
3075 | 2033 | return 0 | ||
3076 | 2034 | return int(pid) | ||
3077 | 2035 | |||
3078 | 2036 | |||
3079 | 2037 | def write_pid(): | ||
3080 | 2038 | """ | ||
3081 | 2039 | Writes pid of the current process to the pid file | ||
3082 | 2040 | :return: nothing. | ||
3083 | 2041 | Exists if error happened | ||
3084 | 2042 | """ | ||
3085 | 2043 | try: | ||
3086 | 2044 | f = open(pid_file, "w") | ||
3087 | 2045 | f.write(str(os.getpid())) | ||
3088 | 2046 | f.close() | ||
3089 | 2047 | except IOError as err: | ||
3090 | 2048 | logger.error("Couldn't save process id in " + pid_file) | ||
3091 | 2049 | exit_on_error(err) | ||
3092 | 2050 | |||
3093 | 2264 | 2051 | ||
3094 | 2265 | def cleanup(signum, frame): | 2052 | def cleanup(signum, frame): |
3097 | 2266 | global logger | 2053 | """ |
3098 | 2267 | global pid_file | 2054 | Cleans up when TwinDB agent exists |
3099 | 2055 | :param signum: | ||
3100 | 2056 | :param frame: | ||
3101 | 2057 | :return: | ||
3102 | 2058 | """ | ||
3103 | 2268 | 2059 | ||
3104 | 2269 | logger.info("Cleaning up on signal " + str(signum)) | 2060 | logger.info("Cleaning up on signal " + str(signum)) |
3106 | 2270 | remove_pid() | 2061 | logger.debug("Frame %r" % frame) |
3107 | 2271 | logger.info("TwinDB agent is ready to exit") | 2062 | logger.info("TwinDB agent is ready to exit") |
3108 | 2272 | sys.exit(0) | 2063 | sys.exit(0) |
3109 | 2273 | 2064 | ||
3110 | 2274 | 2065 | ||
3117 | 2275 | # Reports error removes pid and exits | 2066 | def exit_on_error(message): |
3118 | 2276 | 2067 | """ | |
3119 | 2277 | def exit_on_error(tb=None): | 2068 | Reports error removes pid and exits |
3120 | 2278 | if tb is not None: | 2069 | :rtype : object |
3121 | 2279 | logger.debug(tb) | 2070 | :param message: message to display |
3122 | 2280 | remove_pid() | 2071 | :return: |
3123 | 2072 | """ | ||
3124 | 2073 | logger.error(message) | ||
3125 | 2074 | logger.debug(traceback.format_exc()) | ||
3126 | 2281 | sys.exit(2) | 2075 | sys.exit(2) |
3127 | 2282 | 2076 | ||
3128 | 2283 | 2077 | ||
3129 | 2284 | # Stops TwinDB agent | 2078 | # Stops TwinDB agent |
3130 | 2285 | 2079 | ||
3131 | 2286 | def stop(): | 2080 | def stop(): |
3134 | 2287 | global logger | 2081 | """ |
3135 | 2288 | global pid_file | 2082 | Stops TwinDB agent |
3136 | 2083 | :return: nothing | ||
3137 | 2084 | """ | ||
3138 | 2289 | 2085 | ||
3139 | 2290 | logger.info("Shutting down TwinDB agent") | 2086 | logger.info("Shutting down TwinDB agent") |
3140 | 2291 | 2087 | ||
3141 | 2292 | if not os.path.exists(pid_file): | 2088 | if not os.path.exists(pid_file): |
3142 | 2293 | logger.info("Pid file %s does not exist. Probably twindb agent isn't running" % pid_file) | 2089 | logger.info("Pid file %s does not exist. Probably twindb agent isn't running" % pid_file) |
3143 | 2294 | sys.exit(0) | 2090 | sys.exit(0) |
3145 | 2295 | 2091 | pid = None | |
3146 | 2296 | try: | 2092 | try: |
3147 | 2297 | f = open(pid_file, 'r') | 2093 | f = open(pid_file, 'r') |
3148 | 2298 | pid = int(f.readline()) | 2094 | pid = int(f.readline()) |
3149 | 2299 | os.kill(pid, signal.SIGTERM) | 2095 | os.kill(pid, signal.SIGTERM) |
3150 | 2300 | wait_pid(pid, 300) | 2096 | wait_pid(pid, 300) |
3151 | 2301 | f.close() | 2097 | f.close() |
3152 | 2098 | remove_pid() | ||
3153 | 2302 | except OSError as err: | 2099 | except OSError as err: |
3154 | 2303 | logger.error("Couldn't kill process %d" % pid) | 2100 | logger.error("Couldn't kill process %d" % pid) |
3157 | 2304 | logger.error(err) | 2101 | exit_on_error(err) |
3156 | 2305 | exit_on_error(traceback.format_exc()) | ||
3158 | 2306 | except IOError as err: | 2102 | except IOError as err: |
3159 | 2307 | logger.error("Couldn't read from %s" % pid_file) | 2103 | logger.error("Couldn't read from %s" % pid_file) |
3167 | 2308 | logger.error(err) | 2104 | exit_on_error(err) |
3161 | 2309 | exit_on_error(traceback.format_exc()) | ||
3162 | 2310 | except: | ||
3163 | 2311 | logger.error("Unexpected error: %s,%s" % sys.exc_info()[:2]); | ||
3164 | 2312 | exit_on_error(traceback.format_exc()) | ||
3165 | 2313 | finally: | ||
3166 | 2314 | remove_pid() | ||
3168 | 2315 | logger.info("TwinDB agent successfully shut down") | 2105 | logger.info("TwinDB agent successfully shut down") |
3169 | 2316 | sys.exit(0) | 2106 | sys.exit(0) |
3170 | 2317 | 2107 | ||
3171 | 2318 | 2108 | ||
3172 | 2319 | # Starts TwinDB agent | ||
3173 | 2320 | |||
3174 | 2321 | def start(): | 2109 | def start(): |
3179 | 2322 | global logger | 2110 | """ |
3180 | 2323 | global pid_file | 2111 | Starts TwinDB agent |
3181 | 2324 | global check_period | 2112 | :return: nothing |
3182 | 2325 | 2113 | """ | |
3183 | 2114 | global job_id | ||
3184 | 2326 | logger.info("Starting TwinDB agent") | 2115 | logger.info("Starting TwinDB agent") |
3185 | 2327 | 2116 | ||
3202 | 2328 | if os.path.isfile(pid_file): | 2117 | if check_pid(): |
3203 | 2329 | logger.error("PID file " + pid_file + " exists"); | 2118 | write_pid() |
3204 | 2330 | logger.error("Check if TwinDB agent is already running"); | 2119 | else: |
3205 | 2331 | logger.error("If not, remove file " + pid_file + " manually and restart the agent"); | 2120 | exit_on_error("Another instance of TwinDB agent is running. Exiting") |
3190 | 2332 | sys.exit(2) | ||
3191 | 2333 | try: | ||
3192 | 2334 | f = open(pid_file, 'w') | ||
3193 | 2335 | f.write(str(os.getpid())) | ||
3194 | 2336 | f.close() | ||
3195 | 2337 | except IOError as err: | ||
3196 | 2338 | logger.error("Couldn't save process id in " + pid_file) | ||
3197 | 2339 | logger.error(err) | ||
3198 | 2340 | exit_on_error(traceback.format_exc()) | ||
3199 | 2341 | except: | ||
3200 | 2342 | logger.error("Unexpected error: %s,%s" % sys.exc_info()[:2]); | ||
3201 | 2343 | exit_on_error(traceback.format_exc()) | ||
3206 | 2344 | try: | 2121 | try: |
3207 | 2345 | while True: | 2122 | while True: |
3208 | 2346 | read_config() | 2123 | read_config() |
3209 | @@ -2348,7 +2125,7 @@ | |||
3210 | 2348 | if not config: | 2125 | if not config: |
3211 | 2349 | time.sleep(check_period) | 2126 | time.sleep(check_period) |
3212 | 2350 | continue | 2127 | continue |
3214 | 2351 | report_sss() | 2128 | report_sss(config) |
3215 | 2352 | report_agent_privileges(config) | 2129 | report_agent_privileges(config) |
3216 | 2353 | job = get_job() | 2130 | job = get_job() |
3217 | 2354 | if job: | 2131 | if job: |
3218 | @@ -2356,130 +2133,111 @@ | |||
3219 | 2356 | time.sleep(check_period) | 2133 | time.sleep(check_period) |
3220 | 2357 | except SystemExit: | 2134 | except SystemExit: |
3221 | 2358 | remove_pid() | 2135 | remove_pid() |
3222 | 2359 | except: | ||
3223 | 2360 | exit_on_error(traceback.format_exc()) | ||
3224 | 2361 | sys.exit(0) | 2136 | sys.exit(0) |
3225 | 2362 | 2137 | ||
3226 | 2363 | 2138 | ||
3227 | 2364 | # Processes job | ||
3228 | 2365 | # Inputs | ||
3229 | 2366 | # config - backup config | ||
3230 | 2367 | # job - job order | ||
3231 | 2368 | # Returns what respective job function returns or False if error happens | ||
3232 | 2369 | def process_job(config, job): | 2139 | def process_job(config, job): |
3234 | 2370 | global logger | 2140 | """ |
3235 | 2141 | Processes job | ||
3236 | 2142 | :param config: backup config | ||
3237 | 2143 | :param job: job order | ||
3238 | 2144 | :return: what respective job function returns or False if error happens | ||
3239 | 2145 | """ | ||
3240 | 2371 | global job_id | 2146 | global job_id |
3241 | 2147 | # Check to see that the twindb_agent MySQL user has enough privileges | ||
3242 | 2148 | username = config["mysql_user"] | ||
3243 | 2149 | password = config["mysql_password"] | ||
3244 | 2150 | |||
3245 | 2151 | job_id = int(job["job_id"]) | ||
3246 | 2372 | 2152 | ||
3247 | 2373 | try: | 2153 | try: |
3248 | 2374 | # Check to see that the twindb_agent MySQL user has enough privileges | ||
3249 | 2375 | username = config["mysql_user"] | ||
3250 | 2376 | password = config["mysql_password"] | ||
3251 | 2377 | |||
3252 | 2378 | mysql_access_available, missing_mysql_privileges = has_mysql_access(username, password, grant_capability=False) | 2154 | mysql_access_available, missing_mysql_privileges = has_mysql_access(username, password, grant_capability=False) |
3253 | 2379 | if not mysql_access_available: | 2155 | if not mysql_access_available: |
3254 | 2380 | logger.error("The MySQL user %s does not have all the required privileges." % username) | 2156 | logger.error("The MySQL user %s does not have all the required privileges." % username) |
3265 | 2381 | if len(missing_mysql_privileges) > 0: | 2157 | if missing_mysql_privileges: |
3266 | 2382 | logger.error("You can grant the required privileges by executing the following SQL: " | 2158 | raise JobError("You can grant the required privileges by executing the following SQL: " |
3267 | 2383 | "GRANT %s ON *.* TO '%s'@'localhost'" % (','.join(missing_mysql_privileges), username)) | 2159 | "GRANT %s ON *.* TO '%s'@'localhost'" % (','.join(missing_mysql_privileges), username)) |
3268 | 2384 | 2160 | if not job["start_scheduled"]: | |
3269 | 2385 | return False | 2161 | raise JobError("Job start time isn't set") |
3260 | 2386 | |||
3261 | 2387 | if job["start_scheduled"] == None: | ||
3262 | 2388 | logger.error("Job start time isn't set") | ||
3263 | 2389 | return False | ||
3264 | 2390 | |||
3270 | 2391 | start_scheduled = int(job["start_scheduled"]) | 2162 | start_scheduled = int(job["start_scheduled"]) |
3271 | 2392 | now = int(time.time()) | 2163 | now = int(time.time()) |
3272 | 2393 | if now < start_scheduled: | 2164 | if now < start_scheduled: |
3277 | 2394 | logger.info("Job is scheduled on %s, now %s" | 2165 | raise JobTooSoonError("Job is scheduled on %s, now %s" |
3278 | 2395 | % (time.ctime(start_scheduled), time.ctime(now))) | 2166 | % (time.ctime(start_scheduled), time.ctime(now))) |
3275 | 2396 | return False | ||
3276 | 2397 | |||
3279 | 2398 | logger.info("Processing job_id = %d", int(job_id)) | 2167 | logger.info("Processing job_id = %d", int(job_id)) |
3280 | 2399 | if job["type"] == "backup": | 2168 | if job["type"] == "backup": |
3281 | 2400 | return take_backup(config, job) | 2169 | return take_backup(config, job) |
3282 | 2401 | elif job["type"] == "restore": | 2170 | elif job["type"] == "restore": |
3285 | 2402 | #return restore_backup(config, job) | 2171 | return restore_backup(config, job) |
3284 | 2403 | return False | ||
3286 | 2404 | elif job["type"] == "send_key": | 2172 | elif job["type"] == "send_key": |
3288 | 2405 | return handler_send_key(config, job) | 2173 | return handler_send_key(job) |
3289 | 2406 | else: | 2174 | else: |
3294 | 2407 | logger.error("Unsupported job type " + job["type"]) | 2175 | raise JobError("Unsupported job type " + job["type"]) |
3295 | 2408 | return False | 2176 | except JobError as err: |
3296 | 2409 | except: | 2177 | logger.error("Job error: %s", err) |
3297 | 2410 | logger.error("Unexpected error: %s,%s" % sys.exc_info()[:2]) | 2178 | return False |
3298 | 2179 | except JobTooSoonError as err: | ||
3299 | 2180 | logger.debug(err) | ||
3300 | 2411 | return False | 2181 | return False |
3301 | 2412 | finally: | 2182 | finally: |
3302 | 2413 | job_id = 0 | 2183 | job_id = 0 |
3303 | 2414 | return False | ||
3304 | 2415 | 2184 | ||
3305 | 2416 | 2185 | ||
3306 | 2417 | def setup_logging(): | 2186 | def setup_logging(): |
3307 | 2187 | """ | ||
3308 | 2188 | Setups logging handlers, formats | ||
3309 | 2189 | :return: | ||
3310 | 2190 | """ | ||
3311 | 2418 | global logger | 2191 | global logger |
3312 | 2419 | 2192 | ||
3316 | 2420 | ch = logging.StreamHandler() | 2193 | console_handler = logging.StreamHandler() |
3317 | 2421 | sh = logging.handlers.SysLogHandler() | 2194 | remote_handler = RlogHandler() |
3318 | 2422 | rh = RlogHandler() | 2195 | |
3319 | 2423 | fmt_str = '%(name)s: %(levelname)s: %(funcName)s():%(lineno)d: %(message)s' | 2196 | fmt_str = '%(name)s: %(levelname)s: %(funcName)s():%(lineno)d: %(message)s' |
3328 | 2424 | sfmt = logging.Formatter(fmt_str) | 2197 | remote_format = logging.Formatter(fmt_str) |
3329 | 2425 | cfmt = logging.Formatter("%(asctime)s: " + fmt_str) | 2198 | console_format = logging.Formatter("%(asctime)s: " + fmt_str) |
3330 | 2426 | ch.setFormatter(cfmt) | 2199 | |
3331 | 2427 | sh.setFormatter(sfmt) | 2200 | console_handler.setFormatter(console_format) |
3332 | 2428 | rh.setFormatter(sfmt) | 2201 | remote_handler.setFormatter(remote_format) |
3333 | 2429 | logger.addHandler(sh) | 2202 | |
3334 | 2430 | logger.addHandler(ch) | 2203 | logger.addHandler(console_handler) |
3335 | 2431 | logger.addHandler(rh) | 2204 | logger.addHandler(remote_handler) |
3336 | 2432 | logger.setLevel(logging.INFO) | 2205 | logger.setLevel(logging.INFO) |
3343 | 2433 | # syslog handler shouldn't log DEBUG messages | 2206 | |
3338 | 2434 | sh.setLevel(logging.INFO) | ||
3339 | 2435 | |||
3340 | 2436 | |||
3341 | 2437 | # Main function | ||
3342 | 2438 | # Parses options, creates log class etc | ||
3344 | 2439 | 2207 | ||
3345 | 2440 | def main(): | 2208 | def main(): |
3346 | 2209 | """ | ||
3347 | 2210 | Main function | ||
3348 | 2211 | Parses options, creates log class etc | ||
3349 | 2212 | :return: | ||
3350 | 2213 | """ | ||
3351 | 2214 | global server_id | ||
3352 | 2215 | global check_period | ||
3353 | 2216 | global host | ||
3354 | 2217 | global mysql_user | ||
3355 | 2218 | global mysql_password | ||
3356 | 2219 | global debug | ||
3357 | 2220 | global logger | ||
3358 | 2221 | |||
3359 | 2222 | setup_logging() | ||
3360 | 2223 | read_config() | ||
3361 | 2441 | # before we do *anything* we must ensure server_id is generated or read from config | 2224 | # before we do *anything* we must ensure server_id is generated or read from config |
3362 | 2442 | global host | ||
3363 | 2443 | global init_config | ||
3364 | 2444 | global server_id | ||
3365 | 2445 | global check_period | ||
3366 | 2446 | global logger | ||
3367 | 2447 | global mysql_user | ||
3368 | 2448 | global mysql_password | ||
3369 | 2449 | global debug | ||
3370 | 2450 | |||
3371 | 2451 | # Read or generate server id | 2225 | # Read or generate server id |
3385 | 2452 | if os.path.exists(init_config): | 2226 | if not server_id: |
3373 | 2453 | try: | ||
3374 | 2454 | execfile(init_config, globals()) | ||
3375 | 2455 | if len(server_id) == 0: | ||
3376 | 2456 | print("Error: Config %s doesn't set server_id" | ||
3377 | 2457 | % init_config, file=sys.stderr) | ||
3378 | 2458 | sys.exit(2) | ||
3379 | 2459 | except: | ||
3380 | 2460 | print(traceback.format_exc()) | ||
3381 | 2461 | print("Error: Failed to read from config in %s" | ||
3382 | 2462 | % init_config, file=sys.stderr) | ||
3383 | 2463 | sys.exit(2) | ||
3384 | 2464 | else: | ||
3386 | 2465 | server_id = str(uuid.uuid4()) | 2227 | server_id = str(uuid.uuid4()) |
3388 | 2466 | save_config(server_id) | 2228 | save_config() |
3389 | 2467 | 2229 | ||
3390 | 2468 | # At this point server_id must be set | 2230 | # At this point server_id must be set |
3394 | 2469 | if len(server_id) == 0: | 2231 | if not server_id: |
3395 | 2470 | print("Error: Server id is empty. Can not continue operation", | 2232 | print("Error: Server id is empty. Can not continue operation", file=sys.stderr) |
3393 | 2471 | file=sys.stderr) | ||
3396 | 2472 | sys.exit(2) | 2233 | sys.exit(2) |
3397 | 2473 | |||
3398 | 2474 | setup_logging() | ||
3399 | 2475 | |||
3400 | 2476 | # Signal handlers | 2234 | # Signal handlers |
3401 | 2477 | signal.signal(signal.SIGTERM, cleanup) | 2235 | signal.signal(signal.SIGTERM, cleanup) |
3402 | 2478 | signal.signal(signal.SIGINT, cleanup) | 2236 | signal.signal(signal.SIGINT, cleanup) |
3403 | 2479 | 2237 | ||
3404 | 2480 | # Not clear why I ignore SIGCHLD. | 2238 | # Not clear why I ignore SIGCHLD. |
3405 | 2481 | # it make subprocess.call throw exception, so I comment it out so far | 2239 | # it make subprocess.call throw exception, so I comment it out so far |
3407 | 2482 | #signal.signal(signal.SIGCHLD, signal.SIG_IGN) | 2240 | # signal.signal(signal.SIGCHLD, signal.SIG_IGN) |
3408 | 2483 | 2241 | ||
3409 | 2484 | opts = [] | 2242 | opts = [] |
3410 | 2485 | try: | 2243 | try: |
3411 | @@ -2489,15 +2247,11 @@ | |||
3412 | 2489 | "unregister", "delete-backups", "backup", "is-registered"]) | 2247 | "unregister", "delete-backups", "backup", "is-registered"]) |
3413 | 2490 | except getopt.GetoptError as err: | 2248 | except getopt.GetoptError as err: |
3414 | 2491 | logger.error(err) | 2249 | logger.error(err) |
3415 | 2492 | exit_on_error(traceback.format_exc()) | ||
3416 | 2493 | except: | ||
3417 | 2494 | logger.error("Unexpected error: %s,%s" % sys.exc_info()[:2]); | ||
3418 | 2495 | usage() | ||
3419 | 2496 | exit_on_error(traceback.format_exc()) | ||
3420 | 2497 | 2250 | ||
3421 | 2498 | # Set options first | 2251 | # Set options first |
3422 | 2499 | action = None | 2252 | action = None |
3423 | 2500 | delete_backups = False | 2253 | delete_backups = False |
3424 | 2254 | regcode = None | ||
3425 | 2501 | for opt, arg in opts: | 2255 | for opt, arg in opts: |
3426 | 2502 | if opt == '-i': | 2256 | if opt == '-i': |
3427 | 2503 | check_period = int(arg) | 2257 | check_period = int(arg) |
3428 | @@ -2547,29 +2301,32 @@ | |||
3429 | 2547 | check_period = 1 | 2301 | check_period = 1 |
3430 | 2548 | 2302 | ||
3431 | 2549 | if action == "start": | 2303 | if action == "start": |
3439 | 2550 | if is_registered(): | 2304 | while True: |
3440 | 2551 | start() | 2305 | # It'll keep checking until the agent is registered |
3441 | 2552 | else: | 2306 | # Then it starts |
3442 | 2553 | logger.error( | 2307 | if is_registered(): |
3443 | 2554 | "The server must be registered first. Run following command:\n\ntwindb --register <registration code>\n") | 2308 | start() |
3444 | 2555 | logger.error("Get your code on https://console.twindb.com/?get_code") | 2309 | else: |
3445 | 2556 | sys.exit(2) | 2310 | logger.error("The server must be registered first. Run following command:\n\n" |
3446 | 2311 | "twindb --register <registration code>\n") | ||
3447 | 2312 | logger.error("Get your code on https://console.twindb.com/?get_code") | ||
3448 | 2313 | time.sleep(check_period) | ||
3449 | 2557 | elif action == "stop": | 2314 | elif action == "stop": |
3450 | 2558 | stop() | 2315 | stop() |
3451 | 2559 | elif action == "register": | 2316 | elif action == "register": |
3452 | 2560 | action_handler_register(regcode) | 2317 | action_handler_register(regcode) |
3453 | 2561 | elif action == "unregister": | 2318 | elif action == "unregister": |
3454 | 2562 | if not is_registered(): | 2319 | if not is_registered(): |
3457 | 2563 | logger.error( | 2320 | logger.error("The server must be registered first. Run following command:" |
3458 | 2564 | "The server must be registered first. Run following command:\n\ntwindb --register <registration code>\n") | 2321 | "\n\ntwindb --register <registration code>\n") |
3459 | 2565 | logger.error("Get your code on https://console.twindb.com/?get_code") | 2322 | logger.error("Get your code on https://console.twindb.com/?get_code") |
3460 | 2566 | sys.exit(2) | 2323 | sys.exit(2) |
3461 | 2567 | if action_handler_unregister(delete_backups): | 2324 | if action_handler_unregister(delete_backups): |
3462 | 2568 | stop() | 2325 | stop() |
3463 | 2569 | elif action == "backup": | 2326 | elif action == "backup": |
3464 | 2570 | if not is_registered(): | 2327 | if not is_registered(): |
3467 | 2571 | logger.error( | 2328 | logger.error("The server must be registered first. Run following command:" |
3468 | 2572 | "The server must be registered first. Run following command:\n\ntwindb --register <registration code>\n") | 2329 | "\n\ntwindb --register <registration code>\n") |
3469 | 2573 | logger.error("Get your code on https://console.twindb.com/?get_code") | 2330 | logger.error("Get your code on https://console.twindb.com/?get_code") |
3470 | 2574 | sys.exit(2) | 2331 | sys.exit(2) |
3471 | 2575 | if not action_handler_backup(): | 2332 | if not action_handler_backup(): |
3472 | @@ -2583,9 +2340,8 @@ | |||
3473 | 2583 | sys.exit(0) | 2340 | sys.exit(0) |
3474 | 2584 | else: | 2341 | else: |
3475 | 2585 | # If we got there print usage() and exit | 2342 | # If we got there print usage() and exit |
3477 | 2586 | logger.error("Neither --start nor --stop nor --register is specified") | 2343 | logger.error("Failed to parse command line options") |
3478 | 2587 | usage() | 2344 | usage() |
3479 | 2588 | remove_pid() | ||
3480 | 2589 | sys.exit(2) | 2345 | sys.exit(2) |
3481 | 2590 | 2346 | ||
3482 | 2591 | 2347 |