Merge lp:~nwilliams/akiban-server/fix-bug1210234 into lp:~akiban-technologies/akiban-server/trunk

Proposed by Nathan Williams
Status: Merged
Approved by: Thomas Jones-Low
Approved revision: 2737
Merged at revision: 2733
Proposed branch: lp:~nwilliams/akiban-server/fix-bug1210234
Merge into: lp:~akiban-technologies/akiban-server/trunk
Diff against target: 263 lines (+79/-51)
7 files modified
src/main/java/com/akiban/server/service/dxl/AlterTableHelper.java (+1/-1)
src/main/java/com/akiban/server/service/dxl/BasicDDLFunctions.java (+2/-2)
src/main/java/com/akiban/server/store/AbstractSchemaManager.java (+51/-39)
src/main/java/com/akiban/server/store/PersistitStoreSchemaManager.java (+0/-1)
src/main/java/com/akiban/server/store/SchemaManager.java (+1/-7)
src/test/java/com/akiban/server/service/is/BasicInfoSchemaTablesServiceImplTest.java (+1/-1)
src/test/resources/com/akiban/sql/pg/yaml/bugs/test-bug-1210234.yaml (+23/-0)
To merge this branch: bzr merge lp:~nwilliams/akiban-server/fix-bug1210234
Reviewer Review Type Date Requested Status
Thomas Jones-Low Approve
Review via email: mp+179267@code.launchpad.net

Description of the change

Ensure a table version is bumped once, at most, during DDL.

Additionally, look in the staged list of changes when attempting to determine if a table is affected by concurrent DDL.

This bug was the other side of the coin to bug1199111. With a table 'changing' multiple times in an ALTER where a GI is defined, the version was getting confused in the session map. Instead of propagating the isTemporary flag more places, track the bumped version and use that for subsequent bumps within the same query.

To post a comment you must log in.
2737. By Nathan Williams

Use helper creator

Revision history for this message
Thomas Jones-Low (tjoneslo) wrote :

Good, You got rid of my temporary index drop hack.

review: Approve

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'src/main/java/com/akiban/server/service/dxl/AlterTableHelper.java'
2--- src/main/java/com/akiban/server/service/dxl/AlterTableHelper.java 2013-07-27 00:43:54 +0000
3+++ src/main/java/com/akiban/server/service/dxl/AlterTableHelper.java 2013-08-08 19:36:54 +0000
4@@ -110,7 +110,7 @@
5 for(IndexName name : affectedGroupIndexes.keySet()) {
6 groupIndexes.add(origTable.getGroup().getIndex(name.getName()));
7 }
8- ddl.schemaManager().dropIndexes(session, groupIndexes, true);
9+ ddl.schemaManager().dropIndexes(session, groupIndexes);
10 }
11
12 public void createAffectedGroupIndexes(Session session, BasicDDLFunctions ddl, UserTable origTable, UserTable newTable, boolean dataChange) {
13
14=== modified file 'src/main/java/com/akiban/server/service/dxl/BasicDDLFunctions.java'
15--- src/main/java/com/akiban/server/service/dxl/BasicDDLFunctions.java 2013-07-27 00:45:55 +0000
16+++ src/main/java/com/akiban/server/service/dxl/BasicDDLFunctions.java 2013-08-08 19:36:54 +0000
17@@ -1143,7 +1143,7 @@
18 }
19 indexes.add(index);
20 }
21- schemaManager().dropIndexes(session, indexes, false);
22+ schemaManager().dropIndexes(session, indexes);
23 store().deleteIndexes(session, indexes);
24 for(TableListener listener : listenerService.getTableListeners()) {
25 listener.onDropIndex(session, indexes);
26@@ -1195,7 +1195,7 @@
27 }
28 indexes.add(index);
29 }
30- schemaManager().dropIndexes(session, indexes, false);
31+ schemaManager().dropIndexes(session, indexes);
32 store().deleteIndexes(session, indexes);
33 for(TableListener listener : listenerService.getTableListeners()) {
34 listener.onDropIndex(session, indexes);
35
36=== modified file 'src/main/java/com/akiban/server/store/AbstractSchemaManager.java'
37--- src/main/java/com/akiban/server/store/AbstractSchemaManager.java 2013-07-30 19:23:14 +0000
38+++ src/main/java/com/akiban/server/store/AbstractSchemaManager.java 2013-08-08 19:36:54 +0000
39@@ -77,6 +77,7 @@
40 import java.util.HashSet;
41 import java.util.List;
42 import java.util.Map;
43+import java.util.Map.Entry;
44 import java.util.Set;
45 import java.util.SortedMap;
46 import java.util.TreeMap;
47@@ -289,7 +290,7 @@
48 }
49
50 @Override
51- public void dropIndexes(Session session, final Collection<? extends Index> indexesToDrop, boolean temporary) {
52+ public void dropIndexes(Session session, final Collection<? extends Index> indexesToDrop) {
53 final AkibanInformationSchema newAIS = AISCloner.clone(
54 getAis(session),
55 new ProtobufWriter.TableSelector() {
56@@ -304,13 +305,12 @@
57 }
58 });
59
60- if (!temporary) {
61- Collection<Integer> tableIDs = new ArrayList<>(indexesToDrop.size());
62- for(Index index : indexesToDrop) {
63- tableIDs.addAll(index.getAllTableIDs());
64- }
65- trackBumpTableVersion(session, newAIS, tableIDs);
66+ Collection<Integer> tableIDs = new ArrayList<>(indexesToDrop.size());
67+ for(Index index : indexesToDrop) {
68+ tableIDs.addAll(index.getAllTableIDs());
69 }
70+ trackBumpTableVersion(session, newAIS, tableIDs);
71+
72 final Set<String> schemas = new HashSet<>();
73 for(Index index : indexesToDrop) {
74 schemas.add(DefaultNameGenerator.schemaNameForIndex(index));
75@@ -521,7 +521,17 @@
76 if(table == null) {
77 throw new IllegalStateException("Unknown table: " + tableID);
78 }
79- Integer curVer = tableVersionMap.get(tableID);
80+ // May be changed by ongoing DDL
81+ Map<Integer,Integer> changedVersions = session.get(TABLE_VERSIONS);
82+ Integer curVer = null;
83+ if(changedVersions != null) {
84+ curVer = changedVersions.get(tableID);
85+ }
86+ // Or not
87+ if(curVer == null) {
88+ curVer = tableVersionMap.get(tableID);
89+ }
90+ // Current may be null in pre-1.4.3 volumes
91 Integer tableVer = table.getVersion();
92 if(curVer == null) {
93 return tableVer != null;
94@@ -694,34 +704,34 @@
95 );
96 }
97
98- protected void trackBumpTableVersion (Session session, AkibanInformationSchema newAIS, Collection<Integer> allTableIDs)
99+ protected void trackBumpTableVersion (Session session, AkibanInformationSchema newAIS, Collection<Integer> affectedIDs)
100 {
101+ // Schedule the update for the tableVersionMap version number on commit.
102+ // A single DDL may trigger multiple tracks (e.g. ALTER affecting a GI), so only bump once per DDL.
103+ Map<Integer,Integer> tableAndVersions = session.get(TABLE_VERSIONS);
104+ if(tableAndVersions == null) {
105+ tableAndVersions = new HashMap<>();
106+ session.put(TABLE_VERSIONS, tableAndVersions);
107+ txnService.addCallback(session, TransactionService.CallbackType.COMMIT, bumpTableVersionCommit);
108+ txnService.addCallback(session, TransactionService.CallbackType.END, cleanTableVersion);
109+ }
110+
111 // Set the new table version for tables in the NewAIS
112- for(Integer tableID : allTableIDs) {
113- Integer current = tableVersionMap.get(tableID);
114- Integer update = (current == null) ? 1 : current + 1;
115+ for(Integer tableID : affectedIDs) {
116+ Integer newVersion = tableAndVersions.get(tableID);
117+ if(newVersion == null) {
118+ Integer current = tableVersionMap.get(tableID);
119+ newVersion = (current == null) ? 1 : current + 1;
120+ tableAndVersions.put(tableID, newVersion);
121+ }
122 UserTable table = newAIS.getUserTable(tableID);
123- if(table != null) { // From drop
124- table.setVersion(update);
125+ if(table != null) { // From a drop
126+ table.setVersion(newVersion);
127 }
128 }
129- // Schedule the update for the tableVersionMap version number on commit.
130- // Replace any existing map as we only should have one at at time.
131- // for one AIS.
132- // There may be two of these, the first for an alter table,
133- // the second for group indexes affected by the change.
134- Map<AkibanInformationSchema,Collection<Integer>> map = session.get(TABLE_VERSIONS);
135- if(map == null) {
136- map = new HashMap<>();
137- session.put(TABLE_VERSIONS, map);
138- }
139- map.put(newAIS, allTableIDs);
140- txnService.addCallback(session, TransactionService.CallbackType.COMMIT, bumpTableVersionCommit);
141- txnService.addCallback(session, TransactionService.CallbackType.END, cleanTableVersion);
142 }
143
144- protected final static Session.MapKey<AkibanInformationSchema, Collection<Integer>> TABLE_VERSIONS =
145- Session.MapKey.mapNamed("TABLE_VERSIONS");
146+ protected final static Session.MapKey<Integer,Integer> TABLE_VERSIONS = Session.MapKey.mapNamed("TABLE_VERSIONS");
147
148 // If the Alter table fails, make sure to clean up the TABLE_VERSION change list on end
149 // If the Alter succeeds, the bumpTableVersionCommit process will clean up, and this does nothing.
150@@ -737,20 +747,22 @@
151 protected final TransactionService.Callback bumpTableVersionCommit = new TransactionService.Callback() {
152 @Override
153 public void run(Session session, long timestamp) {
154- Map<AkibanInformationSchema,Collection<Integer>> tableIDs = session.remove(TABLE_VERSIONS);
155- if (tableIDs != null) {
156- for(Map.Entry<AkibanInformationSchema, Collection<Integer>> entry : tableIDs.entrySet()) {
157- bumpTableVersions(entry.getKey(), entry.getValue());
158- }
159+ Map<Integer,Integer> tableAndVersions = session.remove(TABLE_VERSIONS);
160+ if(tableAndVersions != null) {
161+ bumpTableVersions(tableAndVersions);
162 }
163 }
164 };
165
166- private void bumpTableVersions(AkibanInformationSchema newAIS, Collection<Integer> allTableIDs) {
167- for(Integer tableID : allTableIDs) {
168- Integer current = tableVersionMap.get(tableID);
169- Integer update = (current == null) ? 1 : current + 1;
170- boolean success = tableVersionMap.compareAndSet(tableID, current, update);
171+ private void bumpTableVersions(Map<Integer,Integer> tableAndVersions) {
172+ for(Entry<Integer, Integer> entry : tableAndVersions.entrySet()) {
173+ int tableID = entry.getKey();
174+ int newVersion = entry.getValue();
175+ Integer current = tableVersionMap.get(entry.getKey());
176+ if(current != null && current >= newVersion) {
177+ throw new IllegalStateException("Current not < new: " + current + "," + newVersion);
178+ }
179+ boolean success = tableVersionMap.compareAndSet(tableID, current, newVersion);
180 // Failed CAS would indicate concurrent DDL on this table, which should not be possible
181 if(!success) {
182 throw new IllegalStateException("Unexpected concurrent DDL on table: " + tableID);
183
184=== modified file 'src/main/java/com/akiban/server/store/PersistitStoreSchemaManager.java'
185--- src/main/java/com/akiban/server/store/PersistitStoreSchemaManager.java 2013-07-29 15:22:41 +0000
186+++ src/main/java/com/akiban/server/store/PersistitStoreSchemaManager.java 2013-08-08 19:36:54 +0000
187@@ -355,7 +355,6 @@
188
189 this.nameGenerator = SynchronizedNameGenerator.wrap(new DefaultNameGenerator(newAIS));
190 this.delayedTreeIDGenerator = new AtomicLong();
191- this.tableVersionMap = ReadWriteMap.wrapNonFair(new HashMap<Integer,Integer>());
192 for(UserTable table : newAIS.getUserTables().values()) {
193 // Note: table.getVersion may be null (pre-1.4.3 volumes)
194 tableVersionMap.put(table.getTableId(), table.getVersion());
195
196=== modified file 'src/main/java/com/akiban/server/store/SchemaManager.java'
197--- src/main/java/com/akiban/server/store/SchemaManager.java 2013-07-30 19:23:14 +0000
198+++ src/main/java/com/akiban/server/store/SchemaManager.java 2013-08-08 19:36:54 +0000
199@@ -18,10 +18,7 @@
200 package com.akiban.server.store;
201
202 import java.util.Collection;
203-import java.util.List;
204-import java.util.Map;
205 import java.util.Set;
206-import java.util.SortedMap;
207
208 import com.akiban.ais.model.AkibanInformationSchema;
209 import com.akiban.ais.model.Index;
210@@ -118,11 +115,8 @@
211 * supported through this interface.
212 * @param session Session to operate under.
213 * @param indexes List of indexes to drop.
214- * @param index drop is temporary - True means indexes will be recreated in
215- * the same transaction as part of an alter. False means this is the only
216- * change.
217 */
218- void dropIndexes(Session session, Collection<? extends Index> indexes, boolean temporary);
219+ void dropIndexes(Session session, Collection<? extends Index> indexes);
220
221 /**
222 * Delete the definition of the table with the given name. Throws
223
224=== modified file 'src/test/java/com/akiban/server/service/is/BasicInfoSchemaTablesServiceImplTest.java'
225--- src/test/java/com/akiban/server/service/is/BasicInfoSchemaTablesServiceImplTest.java 2013-07-30 19:23:14 +0000
226+++ src/test/java/com/akiban/server/service/is/BasicInfoSchemaTablesServiceImplTest.java 2013-08-08 19:36:54 +0000
227@@ -703,7 +703,7 @@
228 }
229
230 @Override
231- public void dropIndexes(Session session, Collection<? extends Index> indexes, boolean temporary) {
232+ public void dropIndexes(Session session, Collection<? extends Index> indexes) {
233 throw new UnsupportedOperationException();
234 }
235
236
237=== added file 'src/test/resources/com/akiban/sql/pg/yaml/bugs/test-bug-1210234.yaml'
238--- src/test/resources/com/akiban/sql/pg/yaml/bugs/test-bug-1210234.yaml 1970-01-01 00:00:00 +0000
239+++ src/test/resources/com/akiban/sql/pg/yaml/bugs/test-bug-1210234.yaml 2013-08-08 19:36:54 +0000
240@@ -0,0 +1,23 @@
241+#
242+# bug1210234: Second ALTER on a table within a group fails with concurrent DDL exception if a GI is defined
243+#
244+---
245+- CreateTable: c(cid INT NOT NULL PRIMARY KEY, name VARCHAR(255) NOT NULL)
246+---
247+- CreateTable: o(oid INT NOT NULL PRIMARY KEY, cid INT NOT NULL, odate date NOT NULL, priority VARCHAR(15),
248+ GROUPING FOREIGN KEY(cid) REFERENCES c(cid))
249+---
250+- CreateTable: i(iid INT NOT NULL PRIMARY KEY, oid INT NOT NULL, discount FLOAT,
251+ GROUPING FOREIGN KEY(oid) REFERENCES o(oid))
252+---
253+- Statement: CREATE INDEX gi_ooi ON test.c (o.priority, o.odate, i.discount) USING LEFT JOIN
254+---
255+- Statement: INSERT INTO c VALUES (1, 'John')
256+---
257+- Statement: INSERT INTO o VALUES (1, 1, '1995-02-01', 'Prior 2')
258+---
259+- Statement: INSERT INTO i VALUES (1, 1, 0.4)
260+---
261+- Statement: ALTER TABLE i ALTER COLUMN discount SET DATA TYPE INT
262+---
263+- Statement: ALTER TABLE o ALTER COLUMN odate SET DATA TYPE CHAR(8)

Subscribers

People subscribed via source and target branches