Merge lp:~nwilliams/akiban-persistit/fix_912514_fetchAndRemove_2 into lp:akiban-persistit
- fix_912514_fetchAndRemove_2
- Merge into trunk
Status: | Merged | ||||
---|---|---|---|---|---|
Approved by: | Peter Beaman | ||||
Approved revision: | 317 | ||||
Merged at revision: | 311 | ||||
Proposed branch: | lp:~nwilliams/akiban-persistit/fix_912514_fetchAndRemove_2 | ||||
Merge into: | lp:akiban-persistit | ||||
Prerequisite: | lp:~nwilliams/akiban-persistit/move-core-up | ||||
Diff against target: |
436 lines (+185/-56) 2 files modified
src/main/java/com/persistit/Exchange.java (+71/-56) src/test/java/com/persistit/Bug912514Test.java (+114/-0) |
||||
To merge this branch: | bzr merge lp:~nwilliams/akiban-persistit/fix_912514_fetchAndRemove_2 | ||||
Related bugs: |
|
Reviewer | Review Type | Date Requested | Status |
---|---|---|---|
Peter Beaman | Approve | ||
Review via email: mp+107863@code.launchpad.net |
This proposal supersedes a proposal from 2012-05-29.
Commit message
Description of the change
Fix Exchange store and fetchAndRemove to work correctly both inside and out of a transaction.
The vast majority of this is Peter's original branch. The last few commits were refactorings, to eliminate duplicate code, that I noticed were possible and suggested on the merge prop.
Original branch and description:
lp:~pbeaman/akiban-persistit/fix_912514_fetchAndRemove
The basic strategy to fix this bug is to used a the ThreadLocal-based cached Value object, rather than Exchange.
New test Bug912514 test tests some of the cases. I will also (and have not yet) run PersistitMapStr
Peter Beaman (pbeaman) wrote : | # |
Peter Beaman (pbeaman) : | # |
Preview Diff
1 | === modified file 'src/main/java/com/persistit/Exchange.java' |
2 | --- src/main/java/com/persistit/Exchange.java 2012-05-25 18:50:59 +0000 |
3 | +++ src/main/java/com/persistit/Exchange.java 2012-05-29 19:52:29 +0000 |
4 | @@ -1031,7 +1031,6 @@ |
5 | * |
6 | * @return Encoded key location within the data page. The page itself is |
7 | * made valid in the level cache. |
8 | - * @throws PMapException |
9 | */ |
10 | private int search(Key key, boolean writer) throws PersistitException { |
11 | Buffer buffer = null; |
12 | @@ -1099,7 +1098,6 @@ |
13 | * |
14 | * @return Encoded key location within the level. The page itself is valid |
15 | * within the level cache. |
16 | - * @throws PMapException |
17 | */ |
18 | private int searchTree(Key key, int toLevel, boolean writer) throws PersistitException { |
19 | Buffer oldBuffer = null; |
20 | @@ -1198,7 +1196,6 @@ |
21 | * @param currentLevel |
22 | * current level in the tree |
23 | * @return Encoded key location within the page. |
24 | - * @throws PMapException |
25 | */ |
26 | private int searchLevel(Key key, boolean edge, long pageAddress, int currentLevel, boolean writer) |
27 | throws PersistitException { |
28 | @@ -1323,15 +1320,9 @@ |
29 | * uponError |
30 | */ |
31 | boolean storeInternal(Key key, Value value, int level, int options) throws PersistitException { |
32 | - if ((options & StoreOptions.FETCH) > 0 && (options & StoreOptions.MVCC) > 0) { |
33 | - throw new IllegalArgumentException("Both fetch and MVCC not supported"); |
34 | - } |
35 | |
36 | final boolean doMVCC = (options & StoreOptions.MVCC) > 0; |
37 | - final boolean doAnyFetch = (options & StoreOptions.FETCH) > 0 || doMVCC; |
38 | - |
39 | - // spare used for fetch |
40 | - Debug.$assert0.t(!doAnyFetch || value != _spareValue); |
41 | + final boolean doFetch = (options & StoreOptions.FETCH) > 0; |
42 | |
43 | // spares used for new splits/levels |
44 | Debug.$assert0.t(key != _spareKey1); |
45 | @@ -1344,6 +1335,8 @@ |
46 | boolean incrementMVVCount = false; |
47 | |
48 | final int maxSimpleValueSize = maxValueSize(key.getEncodedSize()); |
49 | + final Value spareValue = _persistit.getThreadLocalValue(); |
50 | + assert !(doMVCC & value == spareValue || doFetch && value == _spareValue): "storeInternal may be use the supplied Value: " + value; |
51 | |
52 | // |
53 | // First insert the record in the data page |
54 | @@ -1369,7 +1362,7 @@ |
55 | // This method may delay significantly for I/O and must |
56 | // be called when there are no other claimed resources. |
57 | // |
58 | - newLongRecordPointer = getLongRecordHelper().storeLongRecord(value, _transaction.isActive()); |
59 | + newLongRecordPointer = getLongRecordHelper().storeLongRecord(value, _transaction.isActive()); |
60 | } |
61 | |
62 | if (!_ignoreTransactions && ((options & StoreOptions.DONT_JOURNAL) == 0)) { |
63 | @@ -1396,7 +1389,7 @@ |
64 | if (!committed && newLongRecordPointerMVV != 0) { |
65 | _volume.getStructure().deallocateGarbageChain(newLongRecordPointerMVV, 0); |
66 | newLongRecordPointerMVV = 0; |
67 | - _spareValue.changeLongRecordMode(false); |
68 | + spareValue.changeLongRecordMode(false); |
69 | } |
70 | |
71 | if (treeClaimRequired && !treeClaimAcquired) { |
72 | @@ -1462,37 +1455,38 @@ |
73 | if (keyExisted) { |
74 | oldLongRecordPointer = buffer.fetchLongRecordPointer(foundAt); |
75 | } |
76 | - if (doAnyFetch) { |
77 | - buffer.fetch(foundAt, _spareValue); |
78 | - /* |
79 | - * If we aren't in MVCC we have to un-long-ify as |
80 | - * fetch was requested. Otherwise only do it if it |
81 | - * is a long MVV so as to not-needlessly create one. |
82 | - */ |
83 | - if (!doMVCC) { |
84 | - fetchFixupForLongRecords(_spareValue, Integer.MAX_VALUE); |
85 | - } else if (oldLongRecordPointer != 0) { |
86 | - if (isLongMVV(_spareValue)) { |
87 | + |
88 | + if (doFetch || doMVCC) { |
89 | + buffer.fetch(foundAt, spareValue); |
90 | + if (oldLongRecordPointer != 0) { |
91 | + if (isLongMVV(spareValue)) { |
92 | oldLongRecordPointerMVV = oldLongRecordPointer; |
93 | - fetchFixupForLongRecords(_spareValue, Integer.MAX_VALUE); |
94 | + fetchFixupForLongRecords(spareValue, Integer.MAX_VALUE); |
95 | } |
96 | - /* |
97 | - * If it was a long MVV we saved it into the |
98 | - * variable above. Otherwise it is a primordial |
99 | - * value that we can't get rid of. |
100 | - */ |
101 | - oldLongRecordPointer = 0; |
102 | + } |
103 | + /* |
104 | + * If it was a long MVV we saved it into the |
105 | + * variable above. Otherwise it is a |
106 | + * primordial value that we can't get rid |
107 | + * of. |
108 | + */ |
109 | + oldLongRecordPointer = 0; |
110 | + |
111 | + if (doFetch) { |
112 | + spareValue.copyTo(_spareValue); |
113 | + fetchFromValueInternal(_spareValue, Integer.MAX_VALUE, buffer); |
114 | } |
115 | } |
116 | + |
117 | if (doMVCC) { |
118 | - valueToStore = _spareValue; |
119 | + valueToStore = spareValue; |
120 | int valueSize = value.getEncodedSize(); |
121 | /* |
122 | * If key didn't exist the value is truly |
123 | * non-existent and not just undefined/zero length |
124 | */ |
125 | - byte[] spareBytes = _spareValue.getEncodedBytes(); |
126 | - int spareSize = keyExisted ? _spareValue.getEncodedSize() : -1; |
127 | + byte[] spareBytes = spareValue.getEncodedBytes(); |
128 | + int spareSize = keyExisted ? spareValue.getEncodedSize() : -1; |
129 | spareSize = MVV.prune(spareBytes, 0, spareSize, _persistit.getTransactionIndex(), false, |
130 | prunedVersions); |
131 | |
132 | @@ -1521,8 +1515,8 @@ |
133 | MVV.visitAllVersions(_mvvVisitor, spareBytes, 0, spareSize); |
134 | |
135 | int mvvSize = MVV.estimateRequiredLength(spareBytes, spareSize, valueSize); |
136 | - _spareValue.ensureFit(mvvSize); |
137 | - spareBytes = _spareValue.getEncodedBytes(); |
138 | + spareValue.ensureFit(mvvSize); |
139 | + spareBytes = spareValue.getEncodedBytes(); |
140 | |
141 | long versionHandle = TransactionIndex.tss2vh(_transaction.getStartTimestamp(), tStep); |
142 | int storedLength = MVV.storeVersion(spareBytes, 0, spareSize, spareBytes.length, |
143 | @@ -1530,11 +1524,10 @@ |
144 | |
145 | incrementMVVCount = (storedLength & MVV.STORE_EXISTED_MASK) == 0; |
146 | storedLength &= MVV.STORE_LENGTH_MASK; |
147 | - _spareValue.setEncodedSize(storedLength); |
148 | + spareValue.setEncodedSize(storedLength); |
149 | |
150 | - if (_spareValue.getEncodedSize() > maxSimpleValueSize) { |
151 | - newLongRecordPointerMVV = getLongRecordHelper().storeLongRecord(_spareValue, |
152 | - _transaction.isActive()); |
153 | + if (spareValue.getEncodedSize() > maxSimpleValueSize) { |
154 | + newLongRecordPointerMVV = getLongRecordHelper().storeLongRecord(spareValue, _transaction.isActive()); |
155 | } |
156 | } |
157 | } |
158 | @@ -1673,7 +1666,7 @@ |
159 | } |
160 | |
161 | value.changeLongRecordMode(false); |
162 | - _spareValue.changeLongRecordMode(false); |
163 | + spareValue.changeLongRecordMode(false); |
164 | if (!committed) { |
165 | // |
166 | // We failed to write the new LONG_RECORD. If there was |
167 | @@ -1698,7 +1691,7 @@ |
168 | } |
169 | _volume.getStatistics().bumpStoreCounter(); |
170 | _tree.getStatistics().bumpStoreCounter(); |
171 | - if (doAnyFetch) { |
172 | + if (doFetch || doMVCC) { |
173 | _volume.getStatistics().bumpFetchCounter(); |
174 | _tree.getStatistics().bumpFetchCounter(); |
175 | } |
176 | @@ -1761,7 +1754,6 @@ |
177 | * The encoded insert location. |
178 | * @return <code>true</code> if it necessary to insert a key into the |
179 | * ancestor index page. |
180 | - * @throws PMapException |
181 | */ |
182 | // TODO - Check insertIndexLevel timestamps |
183 | private boolean putLevel(LevelCache lc, Key key, ValueHelper valueWriter, Buffer buffer, int foundAt, |
184 | @@ -2142,7 +2134,7 @@ |
185 | index = _key.getEncodedSize(); |
186 | |
187 | if (matches) { |
188 | - matches = fetchInternal(buffer, outValue, foundAt, minimumBytes); |
189 | + matches = fetchFromBufferInternal(buffer, outValue, foundAt, minimumBytes); |
190 | if (!matches && direction != EQ) { |
191 | nudged = false; |
192 | nudgeForMVCC = (direction == GTEQ || direction == LTEQ); |
193 | @@ -2162,7 +2154,7 @@ |
194 | if (matches) { |
195 | index = _key.nextElementIndex(parentIndex); |
196 | if (index > 0) { |
197 | - boolean isVisibleMatch = fetchInternal(buffer, outValue, foundAt, minimumBytes); |
198 | + boolean isVisibleMatch = fetchFromBufferInternal(buffer, outValue, foundAt, minimumBytes); |
199 | // |
200 | // In any case (matching sibling, child or |
201 | // niece/nephew) we need to ignore this |
202 | @@ -2569,7 +2561,9 @@ |
203 | _persistit.checkClosed(); |
204 | _persistit.checkSuspended(); |
205 | _key.testValidForStoreAndFetch(_volume.getPageSize()); |
206 | - storeInternal(_key, _value, 0, StoreOptions.FETCH | StoreOptions.WAIT); |
207 | + int options = StoreOptions.WAIT | StoreOptions.FETCH; |
208 | + options |= (!_ignoreTransactions && _transaction.isActive()) ? StoreOptions.MVCC : 0; |
209 | + storeInternal(_key, _value, 0, options); |
210 | _spareValue.copyTo(_value); |
211 | return this; |
212 | } |
213 | @@ -2727,7 +2721,7 @@ |
214 | if (minimumBytes < 0) { |
215 | minimumBytes = 0; |
216 | } |
217 | - fetchInternal(value, minimumBytes); |
218 | + searchAndFetchInternal(value, minimumBytes); |
219 | return this; |
220 | } |
221 | |
222 | @@ -2748,9 +2742,29 @@ |
223 | * As thrown from any internal method. |
224 | * @return <code>true</code> if the value was visible. |
225 | */ |
226 | - private boolean fetchInternal(Buffer buffer, Value value, int foundAt, int minimumBytes) throws PersistitException { |
227 | + private boolean fetchFromBufferInternal(Buffer buffer, Value value, int foundAt, int minimumBytes) throws PersistitException { |
228 | + buffer.fetch(foundAt, value); |
229 | + return fetchFromValueInternal(value, minimumBytes, buffer); |
230 | + } |
231 | + |
232 | + /** |
233 | + * Helper for finalizing the value to return from a, potentially, MVV |
234 | + * contained in the given Value. |
235 | + * |
236 | + * @param value |
237 | + * Value to finalize. |
238 | + * @param minimumBytes |
239 | + * Minimum amount of LONG_RECORD to fetch. If <0, the |
240 | + * <code>value</code> will contain just the descriptor portion. |
241 | + * @param bufferForPruning |
242 | + * If not <code>null</code> and <code>Value</code> did contain |
243 | + * an MVV, call {@link Buffer#enqueuePruningAction(int)}. |
244 | + * @throws PersistitException |
245 | + * As thrown from any internal method. |
246 | + * @return <code>true</code> if the value was visible. |
247 | + */ |
248 | + private boolean fetchFromValueInternal(Value value, int minimumBytes, Buffer bufferForPruning) throws PersistitException { |
249 | boolean visible = true; |
250 | - buffer.fetch(foundAt, value); |
251 | /* |
252 | * We must fetch the full LONG_RECORD, if needed, while buffer is |
253 | * claimed from calling code so that it can't be de-allocated as we are |
254 | @@ -2763,7 +2777,9 @@ |
255 | */ |
256 | fetchFixupForLongRecords(value, Integer.MAX_VALUE); |
257 | if (MVV.isArrayMVV(value.getEncodedBytes(), 0, value.getEncodedSize())) { |
258 | - buffer.enqueuePruningAction(_tree.getHandle()); |
259 | + if (bufferForPruning != null) { |
260 | + bufferForPruning.enqueuePruningAction(_tree.getHandle()); |
261 | + } |
262 | visible = mvccFetch(value, minimumBytes); |
263 | fetchFixupForLongRecords(value, minimumBytes); |
264 | } |
265 | @@ -2790,13 +2806,13 @@ |
266 | * @throws PersistitException |
267 | * As thrown from {@link #search(Key, boolean)} |
268 | */ |
269 | - private void fetchInternal(Value value, int minimumBytes) throws PersistitException { |
270 | + private void searchAndFetchInternal(Value value, int minimumBytes) throws PersistitException { |
271 | Buffer buffer = null; |
272 | try { |
273 | int foundAt = search(_key, false); |
274 | LevelCache lc = _levelCache[0]; |
275 | buffer = lc._buffer; |
276 | - fetchInternal(buffer, value, foundAt, minimumBytes); |
277 | + fetchFromBufferInternal(buffer, value, foundAt, minimumBytes); |
278 | _volume.getStatistics().bumpFetchCounter(); |
279 | _tree.getStatistics().bumpFetchCounter(); |
280 | } finally { |
281 | @@ -3063,7 +3079,7 @@ |
282 | |
283 | _value.clear().putAntiValueMVV(); |
284 | final int storeOptions = StoreOptions.MVCC | StoreOptions.WAIT | StoreOptions.ONLY_IF_VISIBLE |
285 | - | StoreOptions.DONT_JOURNAL; |
286 | + | StoreOptions.DONT_JOURNAL | (fetchFirst ? StoreOptions.FETCH : 0); |
287 | |
288 | boolean anyRemoved = false; |
289 | boolean keyIsLessThan = true; |
290 | @@ -3755,8 +3771,7 @@ |
291 | /** |
292 | * Called by Transaction to set up a context for committing updates. |
293 | * |
294 | - * @param volume |
295 | - * @param _treeName |
296 | + * @param tree |
297 | */ |
298 | void setTree(Tree tree) throws PersistitException { |
299 | _persistit.checkClosed(); |
300 | @@ -3944,7 +3959,7 @@ |
301 | boolean savedIgnore = _ignoreMVCCFetch; |
302 | try { |
303 | _ignoreMVCCFetch = true; |
304 | - fetchInternal(_spareValue, -1); |
305 | + searchAndFetchInternal(_spareValue, -1); |
306 | final boolean wasLong = isLongRecord(_spareValue); |
307 | _spareValue.clear(); |
308 | return wasLong; |
309 | @@ -3966,7 +3981,7 @@ |
310 | boolean savedIgnore = _ignoreMVCCFetch; |
311 | try { |
312 | _ignoreMVCCFetch = true; |
313 | - fetchInternal(_spareValue, -1); |
314 | + searchAndFetchInternal(_spareValue, -1); |
315 | final boolean wasLong = isLongMVV(_spareValue); |
316 | _spareValue.clear(); |
317 | return wasLong; |
318 | |
319 | === added file 'src/test/java/com/persistit/Bug912514Test.java' |
320 | --- src/test/java/com/persistit/Bug912514Test.java 1970-01-01 00:00:00 +0000 |
321 | +++ src/test/java/com/persistit/Bug912514Test.java 2012-05-29 19:52:29 +0000 |
322 | @@ -0,0 +1,114 @@ |
323 | +/** |
324 | + * Copyright © 2012 Akiban Technologies, Inc. All rights reserved. |
325 | + * |
326 | + * This program is free software: you can redistribute it and/or modify |
327 | + * it under the terms of the GNU Affero General Public License as |
328 | + * published by the Free Software Foundation, version 3 (only) of the |
329 | + * License. |
330 | + * |
331 | + * This program is distributed in the hope that it will be useful, |
332 | + * but WITHOUT ANY WARRANTY; without even the implied warranty of |
333 | + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
334 | + * GNU Affero General Public License for more details. |
335 | + * |
336 | + * You should have received a copy of the GNU Affero General Public License |
337 | + * along with this program. If not, see <http://www.gnu.org/licenses/>. |
338 | + * |
339 | + * This program may also be available under different license terms. For more |
340 | + * information, see www.akiban.com or contact licensing@akiban.com. |
341 | + */ |
342 | + |
343 | +package com.persistit; |
344 | + |
345 | +import static org.junit.Assert.*; |
346 | + |
347 | +import org.junit.Test; |
348 | + |
349 | +import com.persistit.unit.PersistitUnitTestCase; |
350 | + |
351 | +/** |
352 | + * https://bugs.launchpad.net/akiban-persistit/+bug/912514 |
353 | + * |
354 | + * PersistitMapStress1 fails intermittently with messages like this: |
355 | + * |
356 | + * Finished unit=#1 PersistitMapStress test=PersistitMapStress1 main at ts=8036 |
357 | + * - elapsed=5738 - FAILED: value not expected to be null 8036 Failed test |
358 | + * unit=#1 PersistitMapStress test=PersistitMapStress1 main value not expected |
359 | + * to be null |
360 | + * |
361 | + * I didn't track down the responsible code, but I'm pretty sure the contents of |
362 | + * _spareValue have been mangled. |
363 | + * |
364 | + * Since akiban-server does not use this method the bug is no more than medium |
365 | + * priority. But it does need to be fixed before akiban-persistit is released as |
366 | + * a standalone library. |
367 | + */ |
368 | + |
369 | +public class Bug912514Test extends PersistitUnitTestCase { |
370 | + |
371 | + private final static String ROLLBACK = "rollback"; |
372 | + |
373 | + private void fetchAndStoreAndRemoveHelper(final boolean inTxn, final String... sequence) throws Exception { |
374 | + final Transaction txn = _persistit.getTransaction(); |
375 | + final Exchange exchange = _persistit.getExchange("persistit", "Bug912514Test", true); |
376 | + String previous = null; |
377 | + for (String string : sequence) { |
378 | + |
379 | + if (inTxn) { |
380 | + txn.begin(); |
381 | + } |
382 | + |
383 | + if (string == null) { |
384 | + exchange.to(1).fetchAndRemove(); |
385 | + } else { |
386 | + exchange.getValue().put(string); |
387 | + exchange.to(1).fetchAndStore(); |
388 | + } |
389 | + compare(previous, exchange.getValue()); |
390 | + |
391 | + if (inTxn) { |
392 | + if (string.startsWith(ROLLBACK)) { |
393 | + txn.rollback(); |
394 | + } else { |
395 | + txn.commit(); |
396 | + } |
397 | + txn.end(); |
398 | + compare(previous, exchange.getValue()); |
399 | + } |
400 | + |
401 | + if (!inTxn || !string.startsWith(ROLLBACK)) { |
402 | + previous = string; |
403 | + } |
404 | + exchange.fetch(); |
405 | + compare(previous, exchange.getValue()); |
406 | + |
407 | + |
408 | + } |
409 | + } |
410 | + |
411 | + private void compare(final String string, final Value value) { |
412 | + if (string == null) { |
413 | + assertTrue("Value should be undefined", !value.isDefined()); |
414 | + } else { |
415 | + assertEquals("Value should match", string, value.getString()); |
416 | + } |
417 | + } |
418 | + |
419 | + @Test |
420 | + public void fetchAndStoreTxn() throws Exception { |
421 | + fetchAndStoreAndRemoveHelper(true, RED_FOX, createString(100), createString(1000), createString(10000), RED_FOX); |
422 | + } |
423 | + |
424 | + @Test |
425 | + public void fetchAndRemoveNonTxn() throws Exception { |
426 | + fetchAndStoreAndRemoveHelper(false, RED_FOX, null, null, createString(100), null, createString(1000), null, |
427 | + createString(10000), null, null, RED_FOX, null); |
428 | + } |
429 | + |
430 | + @Test |
431 | + public void fetchAndStoreTxnWithRollbacks() throws Exception { |
432 | + fetchAndStoreAndRemoveHelper(true, RED_FOX, createString(100), ROLLBACK, createString(1000), ROLLBACK |
433 | + + createString(10000), createString(10000), RED_FOX); |
434 | + } |
435 | + |
436 | +} |
Refactorings look good. Thank you.