Merge lp:~ack/landscape-charm/tools-handle-partitioned into lp:~landscape/landscape-charm/tools

Proposed by Alberto Donato
Status: Merged
Merged at revision: 28
Proposed branch: lp:~ack/landscape-charm/tools-handle-partitioned
Merge into: lp:~landscape/landscape-charm/tools
Diff against target: 221 lines (+140/-9)
1 file modified
db-migrator.py (+140/-9)
To merge this branch: bzr merge lp:~ack/landscape-charm/tools-handle-partitioned
Reviewer Review Type Date Requested Status
Free Ekanayaka (community) Approve
Review via email: mp+305689@code.launchpad.net

Description of the change

Update the db import script with Free's changes to skip partitioned resource tables.

To post a comment you must log in.
Revision history for this message
Free Ekanayaka (free.ekanayaka) wrote :

+1

review: Approve

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'db-migrator.py'
2--- db-migrator.py 2016-09-13 15:04:24 +0000
3+++ db-migrator.py 2016-09-14 08:39:22 +0000
4@@ -6,6 +6,7 @@
5 import subprocess
6 import tempfile
7 import shutil
8+import multiprocessing
9
10 import psycopg2
11
12@@ -13,6 +14,25 @@
13
14 STORES = ["main", "package", "session", "knowledge", "account-1", "resource-1"]
15
16+DEAD_TABLES = {
17+ "main": [
18+ "ec2_region", "ec2_region_id_seq", "lds_trial", "lds_trial_id_seq"],
19+ "account-1": [
20+ "change_package_lock_request", "computer_rule_state_id_seq",
21+ "computer_rule_state_pkey", "computer_rule_state_rule_id_key",
22+ "remove_package_lock_request", "create_package_lock_request"],
23+}
24+
25+INCONSISTENT_SEQUENCES = {
26+ "account-1": [
27+ ("computer_rule_state_id_seq", "rule_state_id_seq"),
28+ ("event_log_id_seq", "event_log_entry_id_seq"),
29+ ],
30+ "resource-1": [
31+ ("metadata_id_seq", "annotation_id_seq"),
32+ ],
33+}
34+
35
36 def parse_args(args):
37 parser = ArgumentParser(formatter_class=ArgumentDefaultsHelpFormatter)
38@@ -117,7 +137,23 @@
39 cursor.execute(
40 "SELECT tablename FROM pg_tables WHERE schemaname='public'")
41 for row in cursor.fetchall():
42- cursor.execute("DROP TABLE IF EXISTS {} CASCADE".format(row[0]))
43+ table = row[0]
44+ if store_name.startswith("resource-"):
45+ # Check if this is a partioned table
46+ if "_" in table:
47+ suffix = table.split("_")[-1]
48+ if len(suffix) == 8:
49+ try:
50+ int(suffix)
51+ except ValueError:
52+ # Not a partioned table
53+ pass
54+ else:
55+ # This is a partioned table, it will
56+ # be dropped by CASCADE on the parent,
57+ # so let's skip it.
58+ continue
59+ cursor.execute("DROP TABLE {} CASCADE".format(table))
60
61 # Drop all extensions
62 cursor.execute(
63@@ -148,26 +184,35 @@
64 connection.close()
65
66
67-def fresh_schema(schema_script):
68+def fresh_schema(schema_script, stores_info):
69 subprocess.check_call(schema_script)
70
71
72+def rename_incosistent_sequences(stores_info):
73+ for store_name, sequences in INCONSISTENT_SEQUENCES.iteritems():
74+ connection = connect(stores_info, store_name, as_admin=True)
75+ cursor = connection.cursor()
76+ for old_name, new_name in sequences:
77+ cursor.execute(
78+ "ALTER TABLE IF EXISTS {} RENAME TO {}".format(
79+ old_name, new_name))
80+ connection.commit()
81+ connection.close()
82+
83+
84 def pg_dump(stores_info, store_name, dump_dir):
85 dump_file = os.path.join(dump_dir, store_name + ".sql")
86 options = [
87 "--host={}".format(stores_info["postgres"]["host"]),
88 "--port={}".format(stores_info["postgres"]["port"]),
89 "--format=custom",
90+ "--data-only",
91 "--no-acl",
92 "--exclude-table=patch",
93 "--file={}".format(dump_file),
94 ]
95- if store_name == "main":
96- options.append("--exclude-table=ec2_region*")
97- if store_name.startswith("account-"):
98- options.append("--exclude-table=create_package_lock_request*")
99- options.append("--exclude-table=remove_package_lock_request*")
100- options.append("--exclude-table=change_package_lock_request*")
101+ for table in DEAD_TABLES.get(store_name, []):
102+ options.append("--exclude-table={}".format(table))
103 user = stores_info["admin"]["user"]
104 if user:
105 options.append("--user={}".format(user))
106@@ -181,9 +226,91 @@
107 subprocess.check_call(cmd, env=env)
108
109
110+def pg_dump_partitioned_schemas(stores_info, dump_dir):
111+ store_name = "resource-1"
112+ dump_file = os.path.join(dump_dir, store_name + "-partioned.sql")
113+
114+ options = [
115+ "--host={}".format(stores_info["postgres"]["host"]),
116+ "--port={}".format(stores_info["postgres"]["port"]),
117+ "--schema-only",
118+ "--no-acl",
119+ "--file={}".format(dump_file),
120+ ]
121+
122+ parents = {}
123+
124+ connection = connect(stores_info, store_name)
125+ cursor = connection.cursor()
126+ cursor.execute(
127+ "SELECT table_name FROM information_schema.tables "
128+ "WHERE table_schema='public' AND table_name ~ E'[a-z_]+_[0-9]{8}'")
129+
130+ for row in cursor.fetchall():
131+ table = row[0]
132+ options.append("--table={}".format(table))
133+ parent = "_".join(table.split("_")[:-1])
134+ if parent not in parents:
135+ triggers = {}
136+ trigger_before = parent + "_insert_trigger"
137+ trigger_after = parent + "_insert_trigger_after"
138+ for trigger in [trigger_before, trigger_after]:
139+ cursor.execute(
140+ "SELECT pg_get_functiondef('{}'::regproc)".format(trigger))
141+ triggers[trigger] = cursor.fetchone()[0]
142+ parents[parent] = triggers
143+ connection.close()
144+
145+ user = stores_info["admin"]["user"]
146+ if user:
147+ options.append("--user={}".format(user))
148+ cmd = ["pg_dump"] + options + [stores_info["stores"][store_name]]
149+ password = stores_info["admin"]["password"]
150+ env = os.environ.copy()
151+ if password:
152+ env["PGPASSWORD"] = password
153+ subprocess.check_call(cmd, env=env)
154+
155+ with open(dump_file, "a") as fd:
156+ for parent, triggers in parents.iteritems():
157+ for trigger, body in triggers.iteritems():
158+ fd.write(body.strip() + ";\n")
159+ trigger_before = parent + "_insert_trigger"
160+ trigger_after = parent + "_insert_trigger_after"
161+ fd.write(
162+ "CREATE TRIGGER {} BEFORE INSERT ON {} "
163+ "FOR EACH ROW EXECUTE PROCEDURE {}();\n".format(
164+ trigger_before, parent, trigger_before))
165+ fd.write(
166+ "CREATE TRIGGER {} AFTER INSERT ON {} "
167+ "FOR EACH ROW EXECUTE PROCEDURE {}();\n".format(
168+ trigger_after, parent, trigger_after))
169+
170+
171+def psql_restore_partitioned_schema(stores_info, dump_dir):
172+ store_name = "resource-1"
173+ dump_file = os.path.join(dump_dir, store_name + "-partioned.sql")
174+
175+ options = [
176+ "--host={}".format(stores_info["postgres"]["host"]),
177+ "--port={}".format(stores_info["postgres"]["port"]),
178+ "--file={}".format(dump_file),
179+ ]
180+ user = stores_info["admin"]["user"]
181+ if user:
182+ options.append("--user={}".format(user))
183+ cmd = ["psql"] + options + [stores_info["stores"][store_name]]
184+ password = stores_info["admin"]["password"]
185+ env = os.environ.copy()
186+ if password:
187+ env["PGPASSWORD"] = password
188+ subprocess.check_call(cmd, env=env)
189+
190+
191 def pg_restore(stores_info, store_name, dump_dir):
192 dump_file = os.path.join(dump_dir, store_name + ".sql")
193 options = [
194+ "--jobs={}".format(multiprocessing.cpu_count()),
195 "--disable-triggers",
196 "--no-owner",
197 "--data-only",
198@@ -209,10 +336,13 @@
199 def action_export(stores_info, tarball_file):
200 try:
201 dump_dir = tempfile.mkdtemp()
202+ rename_incosistent_sequences(stores_info)
203 dump_files = []
204 for store_name in STORES:
205 pg_dump(stores_info, store_name, dump_dir)
206 dump_files.append(store_name + ".sql")
207+ pg_dump_partitioned_schemas(stores_info, dump_dir)
208+ dump_files.append("resource-1-partioned.sql")
209 cmd = ["tar", "cfz", tarball_file, "-C", dump_dir] + dump_files
210 subprocess.check_call(" ".join(cmd), shell=True)
211 finally:
212@@ -225,7 +355,8 @@
213 cmd = ["tar", "xfz", tarball_file, "-C", dump_dir]
214 subprocess.check_call(" ".join(cmd), shell=True)
215 drop_data(stores_info)
216- fresh_schema(schema_script)
217+ fresh_schema(schema_script, stores_info)
218+ psql_restore_partitioned_schema(stores_info, dump_dir)
219 for store_name in STORES:
220 pg_restore(stores_info, store_name, dump_dir)
221 set_root_url(stores_info)

Subscribers

People subscribed via source and target branches

to all changes: