Merge lp:~nwilliams/akiban-persistit/fix_912514_fetchAndRemove_2 into lp:akiban-persistit

Proposed by Nathan Williams
Status: Merged
Approved by: Peter Beaman
Approved revision: 317
Merged at revision: 311
Proposed branch: lp:~nwilliams/akiban-persistit/fix_912514_fetchAndRemove_2
Merge into: lp:akiban-persistit
Prerequisite: lp:~nwilliams/akiban-persistit/move-core-up
Diff against target: 436 lines (+185/-56)
2 files modified
src/main/java/com/persistit/Exchange.java (+71/-56)
src/test/java/com/persistit/Bug912514Test.java (+114/-0)
To merge this branch: bzr merge lp:~nwilliams/akiban-persistit/fix_912514_fetchAndRemove_2
Reviewer Review Type Date Requested Status
Peter Beaman Approve
Review via email: mp+107863@code.launchpad.net

This proposal supersedes a proposal from 2012-05-29.

Description of the change

Fix Exchange store and fetchAndRemove to work correctly both inside and out of a transaction.

The vast majority of this is Peter's original branch. The last few commits were refactorings, to eliminate duplicate code, that I noticed were possible and suggested on the merge prop.

Original branch and description:
lp:~pbeaman/akiban-persistit/fix_912514_fetchAndRemove

The basic strategy to fix this bug is to used a the ThreadLocal-based cached Value object, rather than Exchange._spareValue, as the MVV holding value. The major change was to remove the "_" from instances of _spareValue in storeInternal. However, there were other changes required to reenable StoreOption.FETCH | StoreOption.MVCC, etc.

New test Bug912514 test tests some of the cases. I will also (and have not yet) run PersistitMapStressTest which is where we first observed the bug.

To post a comment you must log in.
Revision history for this message
Peter Beaman (pbeaman) wrote :

Refactorings look good. Thank you.

Revision history for this message
Peter Beaman (pbeaman) :
review: Approve

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'src/main/java/com/persistit/Exchange.java'
2--- src/main/java/com/persistit/Exchange.java 2012-05-25 18:50:59 +0000
3+++ src/main/java/com/persistit/Exchange.java 2012-05-29 19:52:29 +0000
4@@ -1031,7 +1031,6 @@
5 *
6 * @return Encoded key location within the data page. The page itself is
7 * made valid in the level cache.
8- * @throws PMapException
9 */
10 private int search(Key key, boolean writer) throws PersistitException {
11 Buffer buffer = null;
12@@ -1099,7 +1098,6 @@
13 *
14 * @return Encoded key location within the level. The page itself is valid
15 * within the level cache.
16- * @throws PMapException
17 */
18 private int searchTree(Key key, int toLevel, boolean writer) throws PersistitException {
19 Buffer oldBuffer = null;
20@@ -1198,7 +1196,6 @@
21 * @param currentLevel
22 * current level in the tree
23 * @return Encoded key location within the page.
24- * @throws PMapException
25 */
26 private int searchLevel(Key key, boolean edge, long pageAddress, int currentLevel, boolean writer)
27 throws PersistitException {
28@@ -1323,15 +1320,9 @@
29 * uponError
30 */
31 boolean storeInternal(Key key, Value value, int level, int options) throws PersistitException {
32- if ((options & StoreOptions.FETCH) > 0 && (options & StoreOptions.MVCC) > 0) {
33- throw new IllegalArgumentException("Both fetch and MVCC not supported");
34- }
35
36 final boolean doMVCC = (options & StoreOptions.MVCC) > 0;
37- final boolean doAnyFetch = (options & StoreOptions.FETCH) > 0 || doMVCC;
38-
39- // spare used for fetch
40- Debug.$assert0.t(!doAnyFetch || value != _spareValue);
41+ final boolean doFetch = (options & StoreOptions.FETCH) > 0;
42
43 // spares used for new splits/levels
44 Debug.$assert0.t(key != _spareKey1);
45@@ -1344,6 +1335,8 @@
46 boolean incrementMVVCount = false;
47
48 final int maxSimpleValueSize = maxValueSize(key.getEncodedSize());
49+ final Value spareValue = _persistit.getThreadLocalValue();
50+ assert !(doMVCC & value == spareValue || doFetch && value == _spareValue): "storeInternal may be use the supplied Value: " + value;
51
52 //
53 // First insert the record in the data page
54@@ -1369,7 +1362,7 @@
55 // This method may delay significantly for I/O and must
56 // be called when there are no other claimed resources.
57 //
58- newLongRecordPointer = getLongRecordHelper().storeLongRecord(value, _transaction.isActive());
59+ newLongRecordPointer = getLongRecordHelper().storeLongRecord(value, _transaction.isActive());
60 }
61
62 if (!_ignoreTransactions && ((options & StoreOptions.DONT_JOURNAL) == 0)) {
63@@ -1396,7 +1389,7 @@
64 if (!committed && newLongRecordPointerMVV != 0) {
65 _volume.getStructure().deallocateGarbageChain(newLongRecordPointerMVV, 0);
66 newLongRecordPointerMVV = 0;
67- _spareValue.changeLongRecordMode(false);
68+ spareValue.changeLongRecordMode(false);
69 }
70
71 if (treeClaimRequired && !treeClaimAcquired) {
72@@ -1462,37 +1455,38 @@
73 if (keyExisted) {
74 oldLongRecordPointer = buffer.fetchLongRecordPointer(foundAt);
75 }
76- if (doAnyFetch) {
77- buffer.fetch(foundAt, _spareValue);
78- /*
79- * If we aren't in MVCC we have to un-long-ify as
80- * fetch was requested. Otherwise only do it if it
81- * is a long MVV so as to not-needlessly create one.
82- */
83- if (!doMVCC) {
84- fetchFixupForLongRecords(_spareValue, Integer.MAX_VALUE);
85- } else if (oldLongRecordPointer != 0) {
86- if (isLongMVV(_spareValue)) {
87+
88+ if (doFetch || doMVCC) {
89+ buffer.fetch(foundAt, spareValue);
90+ if (oldLongRecordPointer != 0) {
91+ if (isLongMVV(spareValue)) {
92 oldLongRecordPointerMVV = oldLongRecordPointer;
93- fetchFixupForLongRecords(_spareValue, Integer.MAX_VALUE);
94+ fetchFixupForLongRecords(spareValue, Integer.MAX_VALUE);
95 }
96- /*
97- * If it was a long MVV we saved it into the
98- * variable above. Otherwise it is a primordial
99- * value that we can't get rid of.
100- */
101- oldLongRecordPointer = 0;
102+ }
103+ /*
104+ * If it was a long MVV we saved it into the
105+ * variable above. Otherwise it is a
106+ * primordial value that we can't get rid
107+ * of.
108+ */
109+ oldLongRecordPointer = 0;
110+
111+ if (doFetch) {
112+ spareValue.copyTo(_spareValue);
113+ fetchFromValueInternal(_spareValue, Integer.MAX_VALUE, buffer);
114 }
115 }
116+
117 if (doMVCC) {
118- valueToStore = _spareValue;
119+ valueToStore = spareValue;
120 int valueSize = value.getEncodedSize();
121 /*
122 * If key didn't exist the value is truly
123 * non-existent and not just undefined/zero length
124 */
125- byte[] spareBytes = _spareValue.getEncodedBytes();
126- int spareSize = keyExisted ? _spareValue.getEncodedSize() : -1;
127+ byte[] spareBytes = spareValue.getEncodedBytes();
128+ int spareSize = keyExisted ? spareValue.getEncodedSize() : -1;
129 spareSize = MVV.prune(spareBytes, 0, spareSize, _persistit.getTransactionIndex(), false,
130 prunedVersions);
131
132@@ -1521,8 +1515,8 @@
133 MVV.visitAllVersions(_mvvVisitor, spareBytes, 0, spareSize);
134
135 int mvvSize = MVV.estimateRequiredLength(spareBytes, spareSize, valueSize);
136- _spareValue.ensureFit(mvvSize);
137- spareBytes = _spareValue.getEncodedBytes();
138+ spareValue.ensureFit(mvvSize);
139+ spareBytes = spareValue.getEncodedBytes();
140
141 long versionHandle = TransactionIndex.tss2vh(_transaction.getStartTimestamp(), tStep);
142 int storedLength = MVV.storeVersion(spareBytes, 0, spareSize, spareBytes.length,
143@@ -1530,11 +1524,10 @@
144
145 incrementMVVCount = (storedLength & MVV.STORE_EXISTED_MASK) == 0;
146 storedLength &= MVV.STORE_LENGTH_MASK;
147- _spareValue.setEncodedSize(storedLength);
148+ spareValue.setEncodedSize(storedLength);
149
150- if (_spareValue.getEncodedSize() > maxSimpleValueSize) {
151- newLongRecordPointerMVV = getLongRecordHelper().storeLongRecord(_spareValue,
152- _transaction.isActive());
153+ if (spareValue.getEncodedSize() > maxSimpleValueSize) {
154+ newLongRecordPointerMVV = getLongRecordHelper().storeLongRecord(spareValue, _transaction.isActive());
155 }
156 }
157 }
158@@ -1673,7 +1666,7 @@
159 }
160
161 value.changeLongRecordMode(false);
162- _spareValue.changeLongRecordMode(false);
163+ spareValue.changeLongRecordMode(false);
164 if (!committed) {
165 //
166 // We failed to write the new LONG_RECORD. If there was
167@@ -1698,7 +1691,7 @@
168 }
169 _volume.getStatistics().bumpStoreCounter();
170 _tree.getStatistics().bumpStoreCounter();
171- if (doAnyFetch) {
172+ if (doFetch || doMVCC) {
173 _volume.getStatistics().bumpFetchCounter();
174 _tree.getStatistics().bumpFetchCounter();
175 }
176@@ -1761,7 +1754,6 @@
177 * The encoded insert location.
178 * @return <code>true</code> if it necessary to insert a key into the
179 * ancestor index page.
180- * @throws PMapException
181 */
182 // TODO - Check insertIndexLevel timestamps
183 private boolean putLevel(LevelCache lc, Key key, ValueHelper valueWriter, Buffer buffer, int foundAt,
184@@ -2142,7 +2134,7 @@
185 index = _key.getEncodedSize();
186
187 if (matches) {
188- matches = fetchInternal(buffer, outValue, foundAt, minimumBytes);
189+ matches = fetchFromBufferInternal(buffer, outValue, foundAt, minimumBytes);
190 if (!matches && direction != EQ) {
191 nudged = false;
192 nudgeForMVCC = (direction == GTEQ || direction == LTEQ);
193@@ -2162,7 +2154,7 @@
194 if (matches) {
195 index = _key.nextElementIndex(parentIndex);
196 if (index > 0) {
197- boolean isVisibleMatch = fetchInternal(buffer, outValue, foundAt, minimumBytes);
198+ boolean isVisibleMatch = fetchFromBufferInternal(buffer, outValue, foundAt, minimumBytes);
199 //
200 // In any case (matching sibling, child or
201 // niece/nephew) we need to ignore this
202@@ -2569,7 +2561,9 @@
203 _persistit.checkClosed();
204 _persistit.checkSuspended();
205 _key.testValidForStoreAndFetch(_volume.getPageSize());
206- storeInternal(_key, _value, 0, StoreOptions.FETCH | StoreOptions.WAIT);
207+ int options = StoreOptions.WAIT | StoreOptions.FETCH;
208+ options |= (!_ignoreTransactions && _transaction.isActive()) ? StoreOptions.MVCC : 0;
209+ storeInternal(_key, _value, 0, options);
210 _spareValue.copyTo(_value);
211 return this;
212 }
213@@ -2727,7 +2721,7 @@
214 if (minimumBytes < 0) {
215 minimumBytes = 0;
216 }
217- fetchInternal(value, minimumBytes);
218+ searchAndFetchInternal(value, minimumBytes);
219 return this;
220 }
221
222@@ -2748,9 +2742,29 @@
223 * As thrown from any internal method.
224 * @return <code>true</code> if the value was visible.
225 */
226- private boolean fetchInternal(Buffer buffer, Value value, int foundAt, int minimumBytes) throws PersistitException {
227+ private boolean fetchFromBufferInternal(Buffer buffer, Value value, int foundAt, int minimumBytes) throws PersistitException {
228+ buffer.fetch(foundAt, value);
229+ return fetchFromValueInternal(value, minimumBytes, buffer);
230+ }
231+
232+ /**
233+ * Helper for finalizing the value to return from a, potentially, MVV
234+ * contained in the given Value.
235+ *
236+ * @param value
237+ * Value to finalize.
238+ * @param minimumBytes
239+ * Minimum amount of LONG_RECORD to fetch. If &lt;0, the
240+ * <code>value</code> will contain just the descriptor portion.
241+ * @param bufferForPruning
242+ * If not <code>null</code> and <code>Value</code> did contain
243+ * an MVV, call {@link Buffer#enqueuePruningAction(int)}.
244+ * @throws PersistitException
245+ * As thrown from any internal method.
246+ * @return <code>true</code> if the value was visible.
247+ */
248+ private boolean fetchFromValueInternal(Value value, int minimumBytes, Buffer bufferForPruning) throws PersistitException {
249 boolean visible = true;
250- buffer.fetch(foundAt, value);
251 /*
252 * We must fetch the full LONG_RECORD, if needed, while buffer is
253 * claimed from calling code so that it can't be de-allocated as we are
254@@ -2763,7 +2777,9 @@
255 */
256 fetchFixupForLongRecords(value, Integer.MAX_VALUE);
257 if (MVV.isArrayMVV(value.getEncodedBytes(), 0, value.getEncodedSize())) {
258- buffer.enqueuePruningAction(_tree.getHandle());
259+ if (bufferForPruning != null) {
260+ bufferForPruning.enqueuePruningAction(_tree.getHandle());
261+ }
262 visible = mvccFetch(value, minimumBytes);
263 fetchFixupForLongRecords(value, minimumBytes);
264 }
265@@ -2790,13 +2806,13 @@
266 * @throws PersistitException
267 * As thrown from {@link #search(Key, boolean)}
268 */
269- private void fetchInternal(Value value, int minimumBytes) throws PersistitException {
270+ private void searchAndFetchInternal(Value value, int minimumBytes) throws PersistitException {
271 Buffer buffer = null;
272 try {
273 int foundAt = search(_key, false);
274 LevelCache lc = _levelCache[0];
275 buffer = lc._buffer;
276- fetchInternal(buffer, value, foundAt, minimumBytes);
277+ fetchFromBufferInternal(buffer, value, foundAt, minimumBytes);
278 _volume.getStatistics().bumpFetchCounter();
279 _tree.getStatistics().bumpFetchCounter();
280 } finally {
281@@ -3063,7 +3079,7 @@
282
283 _value.clear().putAntiValueMVV();
284 final int storeOptions = StoreOptions.MVCC | StoreOptions.WAIT | StoreOptions.ONLY_IF_VISIBLE
285- | StoreOptions.DONT_JOURNAL;
286+ | StoreOptions.DONT_JOURNAL | (fetchFirst ? StoreOptions.FETCH : 0);
287
288 boolean anyRemoved = false;
289 boolean keyIsLessThan = true;
290@@ -3755,8 +3771,7 @@
291 /**
292 * Called by Transaction to set up a context for committing updates.
293 *
294- * @param volume
295- * @param _treeName
296+ * @param tree
297 */
298 void setTree(Tree tree) throws PersistitException {
299 _persistit.checkClosed();
300@@ -3944,7 +3959,7 @@
301 boolean savedIgnore = _ignoreMVCCFetch;
302 try {
303 _ignoreMVCCFetch = true;
304- fetchInternal(_spareValue, -1);
305+ searchAndFetchInternal(_spareValue, -1);
306 final boolean wasLong = isLongRecord(_spareValue);
307 _spareValue.clear();
308 return wasLong;
309@@ -3966,7 +3981,7 @@
310 boolean savedIgnore = _ignoreMVCCFetch;
311 try {
312 _ignoreMVCCFetch = true;
313- fetchInternal(_spareValue, -1);
314+ searchAndFetchInternal(_spareValue, -1);
315 final boolean wasLong = isLongMVV(_spareValue);
316 _spareValue.clear();
317 return wasLong;
318
319=== added file 'src/test/java/com/persistit/Bug912514Test.java'
320--- src/test/java/com/persistit/Bug912514Test.java 1970-01-01 00:00:00 +0000
321+++ src/test/java/com/persistit/Bug912514Test.java 2012-05-29 19:52:29 +0000
322@@ -0,0 +1,114 @@
323+/**
324+ * Copyright © 2012 Akiban Technologies, Inc. All rights reserved.
325+ *
326+ * This program is free software: you can redistribute it and/or modify
327+ * it under the terms of the GNU Affero General Public License as
328+ * published by the Free Software Foundation, version 3 (only) of the
329+ * License.
330+ *
331+ * This program is distributed in the hope that it will be useful,
332+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
333+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
334+ * GNU Affero General Public License for more details.
335+ *
336+ * You should have received a copy of the GNU Affero General Public License
337+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
338+ *
339+ * This program may also be available under different license terms. For more
340+ * information, see www.akiban.com or contact licensing@akiban.com.
341+ */
342+
343+package com.persistit;
344+
345+import static org.junit.Assert.*;
346+
347+import org.junit.Test;
348+
349+import com.persistit.unit.PersistitUnitTestCase;
350+
351+/**
352+ * https://bugs.launchpad.net/akiban-persistit/+bug/912514
353+ *
354+ * PersistitMapStress1 fails intermittently with messages like this:
355+ *
356+ * Finished unit=#1 PersistitMapStress test=PersistitMapStress1 main at ts=8036
357+ * - elapsed=5738 - FAILED: value not expected to be null 8036 Failed test
358+ * unit=#1 PersistitMapStress test=PersistitMapStress1 main value not expected
359+ * to be null
360+ *
361+ * I didn't track down the responsible code, but I'm pretty sure the contents of
362+ * _spareValue have been mangled.
363+ *
364+ * Since akiban-server does not use this method the bug is no more than medium
365+ * priority. But it does need to be fixed before akiban-persistit is released as
366+ * a standalone library.
367+ */
368+
369+public class Bug912514Test extends PersistitUnitTestCase {
370+
371+ private final static String ROLLBACK = "rollback";
372+
373+ private void fetchAndStoreAndRemoveHelper(final boolean inTxn, final String... sequence) throws Exception {
374+ final Transaction txn = _persistit.getTransaction();
375+ final Exchange exchange = _persistit.getExchange("persistit", "Bug912514Test", true);
376+ String previous = null;
377+ for (String string : sequence) {
378+
379+ if (inTxn) {
380+ txn.begin();
381+ }
382+
383+ if (string == null) {
384+ exchange.to(1).fetchAndRemove();
385+ } else {
386+ exchange.getValue().put(string);
387+ exchange.to(1).fetchAndStore();
388+ }
389+ compare(previous, exchange.getValue());
390+
391+ if (inTxn) {
392+ if (string.startsWith(ROLLBACK)) {
393+ txn.rollback();
394+ } else {
395+ txn.commit();
396+ }
397+ txn.end();
398+ compare(previous, exchange.getValue());
399+ }
400+
401+ if (!inTxn || !string.startsWith(ROLLBACK)) {
402+ previous = string;
403+ }
404+ exchange.fetch();
405+ compare(previous, exchange.getValue());
406+
407+
408+ }
409+ }
410+
411+ private void compare(final String string, final Value value) {
412+ if (string == null) {
413+ assertTrue("Value should be undefined", !value.isDefined());
414+ } else {
415+ assertEquals("Value should match", string, value.getString());
416+ }
417+ }
418+
419+ @Test
420+ public void fetchAndStoreTxn() throws Exception {
421+ fetchAndStoreAndRemoveHelper(true, RED_FOX, createString(100), createString(1000), createString(10000), RED_FOX);
422+ }
423+
424+ @Test
425+ public void fetchAndRemoveNonTxn() throws Exception {
426+ fetchAndStoreAndRemoveHelper(false, RED_FOX, null, null, createString(100), null, createString(1000), null,
427+ createString(10000), null, null, RED_FOX, null);
428+ }
429+
430+ @Test
431+ public void fetchAndStoreTxnWithRollbacks() throws Exception {
432+ fetchAndStoreAndRemoveHelper(true, RED_FOX, createString(100), ROLLBACK, createString(1000), ROLLBACK
433+ + createString(10000), createString(10000), RED_FOX);
434+ }
435+
436+}

Subscribers

People subscribed via source and target branches