Merge lp:~nwilliams/akiban-persistit/fix_912514_fetchAndRemove_2 into lp:akiban-persistit

Proposed by Nathan Williams
Status: Merged
Approved by: Peter Beaman
Approved revision: 317
Merged at revision: 311
Proposed branch: lp:~nwilliams/akiban-persistit/fix_912514_fetchAndRemove_2
Merge into: lp:akiban-persistit
Prerequisite: lp:~nwilliams/akiban-persistit/move-core-up
Diff against target: 436 lines (+185/-56)
2 files modified
src/main/java/com/persistit/Exchange.java (+71/-56)
src/test/java/com/persistit/Bug912514Test.java (+114/-0)
To merge this branch: bzr merge lp:~nwilliams/akiban-persistit/fix_912514_fetchAndRemove_2
Reviewer Review Type Date Requested Status
Peter Beaman Approve
Review via email: mp+107863@code.launchpad.net

This proposal supersedes a proposal from 2012-05-29.

Description of the change

Fix Exchange store and fetchAndRemove to work correctly both inside and out of a transaction.

The vast majority of this is Peter's original branch. The last few commits were refactorings, to eliminate duplicate code, that I noticed were possible and suggested on the merge prop.

Original branch and description:
lp:~pbeaman/akiban-persistit/fix_912514_fetchAndRemove

The basic strategy to fix this bug is to used a the ThreadLocal-based cached Value object, rather than Exchange._spareValue, as the MVV holding value. The major change was to remove the "_" from instances of _spareValue in storeInternal. However, there were other changes required to reenable StoreOption.FETCH | StoreOption.MVCC, etc.

New test Bug912514 test tests some of the cases. I will also (and have not yet) run PersistitMapStressTest which is where we first observed the bug.

To post a comment you must log in.
Revision history for this message
Peter Beaman (pbeaman) wrote :

Refactorings look good. Thank you.

Revision history for this message
Peter Beaman (pbeaman) :
review: Approve

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
=== modified file 'src/main/java/com/persistit/Exchange.java'
--- src/main/java/com/persistit/Exchange.java 2012-05-25 18:50:59 +0000
+++ src/main/java/com/persistit/Exchange.java 2012-05-29 19:52:29 +0000
@@ -1031,7 +1031,6 @@
1031 * 1031 *
1032 * @return Encoded key location within the data page. The page itself is1032 * @return Encoded key location within the data page. The page itself is
1033 * made valid in the level cache.1033 * made valid in the level cache.
1034 * @throws PMapException
1035 */1034 */
1036 private int search(Key key, boolean writer) throws PersistitException {1035 private int search(Key key, boolean writer) throws PersistitException {
1037 Buffer buffer = null;1036 Buffer buffer = null;
@@ -1099,7 +1098,6 @@
1099 * 1098 *
1100 * @return Encoded key location within the level. The page itself is valid1099 * @return Encoded key location within the level. The page itself is valid
1101 * within the level cache.1100 * within the level cache.
1102 * @throws PMapException
1103 */1101 */
1104 private int searchTree(Key key, int toLevel, boolean writer) throws PersistitException {1102 private int searchTree(Key key, int toLevel, boolean writer) throws PersistitException {
1105 Buffer oldBuffer = null;1103 Buffer oldBuffer = null;
@@ -1198,7 +1196,6 @@
1198 * @param currentLevel1196 * @param currentLevel
1199 * current level in the tree1197 * current level in the tree
1200 * @return Encoded key location within the page.1198 * @return Encoded key location within the page.
1201 * @throws PMapException
1202 */1199 */
1203 private int searchLevel(Key key, boolean edge, long pageAddress, int currentLevel, boolean writer)1200 private int searchLevel(Key key, boolean edge, long pageAddress, int currentLevel, boolean writer)
1204 throws PersistitException {1201 throws PersistitException {
@@ -1323,15 +1320,9 @@
1323 * uponError1320 * uponError
1324 */1321 */
1325 boolean storeInternal(Key key, Value value, int level, int options) throws PersistitException {1322 boolean storeInternal(Key key, Value value, int level, int options) throws PersistitException {
1326 if ((options & StoreOptions.FETCH) > 0 && (options & StoreOptions.MVCC) > 0) {
1327 throw new IllegalArgumentException("Both fetch and MVCC not supported");
1328 }
13291323
1330 final boolean doMVCC = (options & StoreOptions.MVCC) > 0;1324 final boolean doMVCC = (options & StoreOptions.MVCC) > 0;
1331 final boolean doAnyFetch = (options & StoreOptions.FETCH) > 0 || doMVCC;1325 final boolean doFetch = (options & StoreOptions.FETCH) > 0;
1332
1333 // spare used for fetch
1334 Debug.$assert0.t(!doAnyFetch || value != _spareValue);
13351326
1336 // spares used for new splits/levels1327 // spares used for new splits/levels
1337 Debug.$assert0.t(key != _spareKey1);1328 Debug.$assert0.t(key != _spareKey1);
@@ -1344,6 +1335,8 @@
1344 boolean incrementMVVCount = false;1335 boolean incrementMVVCount = false;
13451336
1346 final int maxSimpleValueSize = maxValueSize(key.getEncodedSize());1337 final int maxSimpleValueSize = maxValueSize(key.getEncodedSize());
1338 final Value spareValue = _persistit.getThreadLocalValue();
1339 assert !(doMVCC & value == spareValue || doFetch && value == _spareValue): "storeInternal may be use the supplied Value: " + value;
13471340
1348 //1341 //
1349 // First insert the record in the data page1342 // First insert the record in the data page
@@ -1369,7 +1362,7 @@
1369 // This method may delay significantly for I/O and must1362 // This method may delay significantly for I/O and must
1370 // be called when there are no other claimed resources.1363 // be called when there are no other claimed resources.
1371 //1364 //
1372 newLongRecordPointer = getLongRecordHelper().storeLongRecord(value, _transaction.isActive());1365 newLongRecordPointer = getLongRecordHelper().storeLongRecord(value, _transaction.isActive());
1373 }1366 }
13741367
1375 if (!_ignoreTransactions && ((options & StoreOptions.DONT_JOURNAL) == 0)) {1368 if (!_ignoreTransactions && ((options & StoreOptions.DONT_JOURNAL) == 0)) {
@@ -1396,7 +1389,7 @@
1396 if (!committed && newLongRecordPointerMVV != 0) {1389 if (!committed && newLongRecordPointerMVV != 0) {
1397 _volume.getStructure().deallocateGarbageChain(newLongRecordPointerMVV, 0);1390 _volume.getStructure().deallocateGarbageChain(newLongRecordPointerMVV, 0);
1398 newLongRecordPointerMVV = 0;1391 newLongRecordPointerMVV = 0;
1399 _spareValue.changeLongRecordMode(false);1392 spareValue.changeLongRecordMode(false);
1400 }1393 }
14011394
1402 if (treeClaimRequired && !treeClaimAcquired) {1395 if (treeClaimRequired && !treeClaimAcquired) {
@@ -1462,37 +1455,38 @@
1462 if (keyExisted) {1455 if (keyExisted) {
1463 oldLongRecordPointer = buffer.fetchLongRecordPointer(foundAt);1456 oldLongRecordPointer = buffer.fetchLongRecordPointer(foundAt);
1464 }1457 }
1465 if (doAnyFetch) {1458
1466 buffer.fetch(foundAt, _spareValue);1459 if (doFetch || doMVCC) {
1467 /*1460 buffer.fetch(foundAt, spareValue);
1468 * If we aren't in MVCC we have to un-long-ify as1461 if (oldLongRecordPointer != 0) {
1469 * fetch was requested. Otherwise only do it if it1462 if (isLongMVV(spareValue)) {
1470 * is a long MVV so as to not-needlessly create one.
1471 */
1472 if (!doMVCC) {
1473 fetchFixupForLongRecords(_spareValue, Integer.MAX_VALUE);
1474 } else if (oldLongRecordPointer != 0) {
1475 if (isLongMVV(_spareValue)) {
1476 oldLongRecordPointerMVV = oldLongRecordPointer;1463 oldLongRecordPointerMVV = oldLongRecordPointer;
1477 fetchFixupForLongRecords(_spareValue, Integer.MAX_VALUE);1464 fetchFixupForLongRecords(spareValue, Integer.MAX_VALUE);
1478 }1465 }
1479 /*1466 }
1480 * If it was a long MVV we saved it into the1467 /*
1481 * variable above. Otherwise it is a primordial1468 * If it was a long MVV we saved it into the
1482 * value that we can't get rid of.1469 * variable above. Otherwise it is a
1483 */1470 * primordial value that we can't get rid
1484 oldLongRecordPointer = 0;1471 * of.
1472 */
1473 oldLongRecordPointer = 0;
1474
1475 if (doFetch) {
1476 spareValue.copyTo(_spareValue);
1477 fetchFromValueInternal(_spareValue, Integer.MAX_VALUE, buffer);
1485 }1478 }
1486 }1479 }
1480
1487 if (doMVCC) {1481 if (doMVCC) {
1488 valueToStore = _spareValue;1482 valueToStore = spareValue;
1489 int valueSize = value.getEncodedSize();1483 int valueSize = value.getEncodedSize();
1490 /*1484 /*
1491 * If key didn't exist the value is truly1485 * If key didn't exist the value is truly
1492 * non-existent and not just undefined/zero length1486 * non-existent and not just undefined/zero length
1493 */1487 */
1494 byte[] spareBytes = _spareValue.getEncodedBytes();1488 byte[] spareBytes = spareValue.getEncodedBytes();
1495 int spareSize = keyExisted ? _spareValue.getEncodedSize() : -1;1489 int spareSize = keyExisted ? spareValue.getEncodedSize() : -1;
1496 spareSize = MVV.prune(spareBytes, 0, spareSize, _persistit.getTransactionIndex(), false,1490 spareSize = MVV.prune(spareBytes, 0, spareSize, _persistit.getTransactionIndex(), false,
1497 prunedVersions);1491 prunedVersions);
14981492
@@ -1521,8 +1515,8 @@
1521 MVV.visitAllVersions(_mvvVisitor, spareBytes, 0, spareSize);1515 MVV.visitAllVersions(_mvvVisitor, spareBytes, 0, spareSize);
15221516
1523 int mvvSize = MVV.estimateRequiredLength(spareBytes, spareSize, valueSize);1517 int mvvSize = MVV.estimateRequiredLength(spareBytes, spareSize, valueSize);
1524 _spareValue.ensureFit(mvvSize);1518 spareValue.ensureFit(mvvSize);
1525 spareBytes = _spareValue.getEncodedBytes();1519 spareBytes = spareValue.getEncodedBytes();
15261520
1527 long versionHandle = TransactionIndex.tss2vh(_transaction.getStartTimestamp(), tStep);1521 long versionHandle = TransactionIndex.tss2vh(_transaction.getStartTimestamp(), tStep);
1528 int storedLength = MVV.storeVersion(spareBytes, 0, spareSize, spareBytes.length,1522 int storedLength = MVV.storeVersion(spareBytes, 0, spareSize, spareBytes.length,
@@ -1530,11 +1524,10 @@
15301524
1531 incrementMVVCount = (storedLength & MVV.STORE_EXISTED_MASK) == 0;1525 incrementMVVCount = (storedLength & MVV.STORE_EXISTED_MASK) == 0;
1532 storedLength &= MVV.STORE_LENGTH_MASK;1526 storedLength &= MVV.STORE_LENGTH_MASK;
1533 _spareValue.setEncodedSize(storedLength);1527 spareValue.setEncodedSize(storedLength);
15341528
1535 if (_spareValue.getEncodedSize() > maxSimpleValueSize) {1529 if (spareValue.getEncodedSize() > maxSimpleValueSize) {
1536 newLongRecordPointerMVV = getLongRecordHelper().storeLongRecord(_spareValue,1530 newLongRecordPointerMVV = getLongRecordHelper().storeLongRecord(spareValue, _transaction.isActive());
1537 _transaction.isActive());
1538 }1531 }
1539 }1532 }
1540 }1533 }
@@ -1673,7 +1666,7 @@
1673 }1666 }
16741667
1675 value.changeLongRecordMode(false);1668 value.changeLongRecordMode(false);
1676 _spareValue.changeLongRecordMode(false);1669 spareValue.changeLongRecordMode(false);
1677 if (!committed) {1670 if (!committed) {
1678 //1671 //
1679 // We failed to write the new LONG_RECORD. If there was1672 // We failed to write the new LONG_RECORD. If there was
@@ -1698,7 +1691,7 @@
1698 }1691 }
1699 _volume.getStatistics().bumpStoreCounter();1692 _volume.getStatistics().bumpStoreCounter();
1700 _tree.getStatistics().bumpStoreCounter();1693 _tree.getStatistics().bumpStoreCounter();
1701 if (doAnyFetch) {1694 if (doFetch || doMVCC) {
1702 _volume.getStatistics().bumpFetchCounter();1695 _volume.getStatistics().bumpFetchCounter();
1703 _tree.getStatistics().bumpFetchCounter();1696 _tree.getStatistics().bumpFetchCounter();
1704 }1697 }
@@ -1761,7 +1754,6 @@
1761 * The encoded insert location.1754 * The encoded insert location.
1762 * @return <code>true</code> if it necessary to insert a key into the1755 * @return <code>true</code> if it necessary to insert a key into the
1763 * ancestor index page.1756 * ancestor index page.
1764 * @throws PMapException
1765 */1757 */
1766 // TODO - Check insertIndexLevel timestamps1758 // TODO - Check insertIndexLevel timestamps
1767 private boolean putLevel(LevelCache lc, Key key, ValueHelper valueWriter, Buffer buffer, int foundAt,1759 private boolean putLevel(LevelCache lc, Key key, ValueHelper valueWriter, Buffer buffer, int foundAt,
@@ -2142,7 +2134,7 @@
2142 index = _key.getEncodedSize();2134 index = _key.getEncodedSize();
21432135
2144 if (matches) {2136 if (matches) {
2145 matches = fetchInternal(buffer, outValue, foundAt, minimumBytes);2137 matches = fetchFromBufferInternal(buffer, outValue, foundAt, minimumBytes);
2146 if (!matches && direction != EQ) {2138 if (!matches && direction != EQ) {
2147 nudged = false;2139 nudged = false;
2148 nudgeForMVCC = (direction == GTEQ || direction == LTEQ);2140 nudgeForMVCC = (direction == GTEQ || direction == LTEQ);
@@ -2162,7 +2154,7 @@
2162 if (matches) {2154 if (matches) {
2163 index = _key.nextElementIndex(parentIndex);2155 index = _key.nextElementIndex(parentIndex);
2164 if (index > 0) {2156 if (index > 0) {
2165 boolean isVisibleMatch = fetchInternal(buffer, outValue, foundAt, minimumBytes);2157 boolean isVisibleMatch = fetchFromBufferInternal(buffer, outValue, foundAt, minimumBytes);
2166 //2158 //
2167 // In any case (matching sibling, child or2159 // In any case (matching sibling, child or
2168 // niece/nephew) we need to ignore this2160 // niece/nephew) we need to ignore this
@@ -2569,7 +2561,9 @@
2569 _persistit.checkClosed();2561 _persistit.checkClosed();
2570 _persistit.checkSuspended();2562 _persistit.checkSuspended();
2571 _key.testValidForStoreAndFetch(_volume.getPageSize());2563 _key.testValidForStoreAndFetch(_volume.getPageSize());
2572 storeInternal(_key, _value, 0, StoreOptions.FETCH | StoreOptions.WAIT);2564 int options = StoreOptions.WAIT | StoreOptions.FETCH;
2565 options |= (!_ignoreTransactions && _transaction.isActive()) ? StoreOptions.MVCC : 0;
2566 storeInternal(_key, _value, 0, options);
2573 _spareValue.copyTo(_value);2567 _spareValue.copyTo(_value);
2574 return this;2568 return this;
2575 }2569 }
@@ -2727,7 +2721,7 @@
2727 if (minimumBytes < 0) {2721 if (minimumBytes < 0) {
2728 minimumBytes = 0;2722 minimumBytes = 0;
2729 }2723 }
2730 fetchInternal(value, minimumBytes);2724 searchAndFetchInternal(value, minimumBytes);
2731 return this;2725 return this;
2732 }2726 }
27332727
@@ -2748,9 +2742,29 @@
2748 * As thrown from any internal method.2742 * As thrown from any internal method.
2749 * @return <code>true</code> if the value was visible.2743 * @return <code>true</code> if the value was visible.
2750 */2744 */
2751 private boolean fetchInternal(Buffer buffer, Value value, int foundAt, int minimumBytes) throws PersistitException {2745 private boolean fetchFromBufferInternal(Buffer buffer, Value value, int foundAt, int minimumBytes) throws PersistitException {
2746 buffer.fetch(foundAt, value);
2747 return fetchFromValueInternal(value, minimumBytes, buffer);
2748 }
2749
2750 /**
2751 * Helper for finalizing the value to return from a, potentially, MVV
2752 * contained in the given Value.
2753 *
2754 * @param value
2755 * Value to finalize.
2756 * @param minimumBytes
2757 * Minimum amount of LONG_RECORD to fetch. If &lt;0, the
2758 * <code>value</code> will contain just the descriptor portion.
2759 * @param bufferForPruning
2760 * If not <code>null</code> and <code>Value</code> did contain
2761 * an MVV, call {@link Buffer#enqueuePruningAction(int)}.
2762 * @throws PersistitException
2763 * As thrown from any internal method.
2764 * @return <code>true</code> if the value was visible.
2765 */
2766 private boolean fetchFromValueInternal(Value value, int minimumBytes, Buffer bufferForPruning) throws PersistitException {
2752 boolean visible = true;2767 boolean visible = true;
2753 buffer.fetch(foundAt, value);
2754 /*2768 /*
2755 * We must fetch the full LONG_RECORD, if needed, while buffer is2769 * We must fetch the full LONG_RECORD, if needed, while buffer is
2756 * claimed from calling code so that it can't be de-allocated as we are2770 * claimed from calling code so that it can't be de-allocated as we are
@@ -2763,7 +2777,9 @@
2763 */2777 */
2764 fetchFixupForLongRecords(value, Integer.MAX_VALUE);2778 fetchFixupForLongRecords(value, Integer.MAX_VALUE);
2765 if (MVV.isArrayMVV(value.getEncodedBytes(), 0, value.getEncodedSize())) {2779 if (MVV.isArrayMVV(value.getEncodedBytes(), 0, value.getEncodedSize())) {
2766 buffer.enqueuePruningAction(_tree.getHandle());2780 if (bufferForPruning != null) {
2781 bufferForPruning.enqueuePruningAction(_tree.getHandle());
2782 }
2767 visible = mvccFetch(value, minimumBytes);2783 visible = mvccFetch(value, minimumBytes);
2768 fetchFixupForLongRecords(value, minimumBytes);2784 fetchFixupForLongRecords(value, minimumBytes);
2769 }2785 }
@@ -2790,13 +2806,13 @@
2790 * @throws PersistitException2806 * @throws PersistitException
2791 * As thrown from {@link #search(Key, boolean)}2807 * As thrown from {@link #search(Key, boolean)}
2792 */2808 */
2793 private void fetchInternal(Value value, int minimumBytes) throws PersistitException {2809 private void searchAndFetchInternal(Value value, int minimumBytes) throws PersistitException {
2794 Buffer buffer = null;2810 Buffer buffer = null;
2795 try {2811 try {
2796 int foundAt = search(_key, false);2812 int foundAt = search(_key, false);
2797 LevelCache lc = _levelCache[0];2813 LevelCache lc = _levelCache[0];
2798 buffer = lc._buffer;2814 buffer = lc._buffer;
2799 fetchInternal(buffer, value, foundAt, minimumBytes);2815 fetchFromBufferInternal(buffer, value, foundAt, minimumBytes);
2800 _volume.getStatistics().bumpFetchCounter();2816 _volume.getStatistics().bumpFetchCounter();
2801 _tree.getStatistics().bumpFetchCounter();2817 _tree.getStatistics().bumpFetchCounter();
2802 } finally {2818 } finally {
@@ -3063,7 +3079,7 @@
30633079
3064 _value.clear().putAntiValueMVV();3080 _value.clear().putAntiValueMVV();
3065 final int storeOptions = StoreOptions.MVCC | StoreOptions.WAIT | StoreOptions.ONLY_IF_VISIBLE3081 final int storeOptions = StoreOptions.MVCC | StoreOptions.WAIT | StoreOptions.ONLY_IF_VISIBLE
3066 | StoreOptions.DONT_JOURNAL;3082 | StoreOptions.DONT_JOURNAL | (fetchFirst ? StoreOptions.FETCH : 0);
30673083
3068 boolean anyRemoved = false;3084 boolean anyRemoved = false;
3069 boolean keyIsLessThan = true;3085 boolean keyIsLessThan = true;
@@ -3755,8 +3771,7 @@
3755 /**3771 /**
3756 * Called by Transaction to set up a context for committing updates.3772 * Called by Transaction to set up a context for committing updates.
3757 * 3773 *
3758 * @param volume3774 * @param tree
3759 * @param _treeName
3760 */3775 */
3761 void setTree(Tree tree) throws PersistitException {3776 void setTree(Tree tree) throws PersistitException {
3762 _persistit.checkClosed();3777 _persistit.checkClosed();
@@ -3944,7 +3959,7 @@
3944 boolean savedIgnore = _ignoreMVCCFetch;3959 boolean savedIgnore = _ignoreMVCCFetch;
3945 try {3960 try {
3946 _ignoreMVCCFetch = true;3961 _ignoreMVCCFetch = true;
3947 fetchInternal(_spareValue, -1);3962 searchAndFetchInternal(_spareValue, -1);
3948 final boolean wasLong = isLongRecord(_spareValue);3963 final boolean wasLong = isLongRecord(_spareValue);
3949 _spareValue.clear();3964 _spareValue.clear();
3950 return wasLong;3965 return wasLong;
@@ -3966,7 +3981,7 @@
3966 boolean savedIgnore = _ignoreMVCCFetch;3981 boolean savedIgnore = _ignoreMVCCFetch;
3967 try {3982 try {
3968 _ignoreMVCCFetch = true;3983 _ignoreMVCCFetch = true;
3969 fetchInternal(_spareValue, -1);3984 searchAndFetchInternal(_spareValue, -1);
3970 final boolean wasLong = isLongMVV(_spareValue);3985 final boolean wasLong = isLongMVV(_spareValue);
3971 _spareValue.clear();3986 _spareValue.clear();
3972 return wasLong;3987 return wasLong;
39733988
=== added file 'src/test/java/com/persistit/Bug912514Test.java'
--- src/test/java/com/persistit/Bug912514Test.java 1970-01-01 00:00:00 +0000
+++ src/test/java/com/persistit/Bug912514Test.java 2012-05-29 19:52:29 +0000
@@ -0,0 +1,114 @@
1/**
2 * Copyright © 2012 Akiban Technologies, Inc. All rights reserved.
3 *
4 * This program is free software: you can redistribute it and/or modify
5 * it under the terms of the GNU Affero General Public License as
6 * published by the Free Software Foundation, version 3 (only) of the
7 * License.
8 *
9 * This program is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 * GNU Affero General Public License for more details.
13 *
14 * You should have received a copy of the GNU Affero General Public License
15 * along with this program. If not, see <http://www.gnu.org/licenses/>.
16 *
17 * This program may also be available under different license terms. For more
18 * information, see www.akiban.com or contact licensing@akiban.com.
19 */
20
21package com.persistit;
22
23import static org.junit.Assert.*;
24
25import org.junit.Test;
26
27import com.persistit.unit.PersistitUnitTestCase;
28
29/**
30 * https://bugs.launchpad.net/akiban-persistit/+bug/912514
31 *
32 * PersistitMapStress1 fails intermittently with messages like this:
33 *
34 * Finished unit=#1 PersistitMapStress test=PersistitMapStress1 main at ts=8036
35 * - elapsed=5738 - FAILED: value not expected to be null 8036 Failed test
36 * unit=#1 PersistitMapStress test=PersistitMapStress1 main value not expected
37 * to be null
38 *
39 * I didn't track down the responsible code, but I'm pretty sure the contents of
40 * _spareValue have been mangled.
41 *
42 * Since akiban-server does not use this method the bug is no more than medium
43 * priority. But it does need to be fixed before akiban-persistit is released as
44 * a standalone library.
45 */
46
47public class Bug912514Test extends PersistitUnitTestCase {
48
49 private final static String ROLLBACK = "rollback";
50
51 private void fetchAndStoreAndRemoveHelper(final boolean inTxn, final String... sequence) throws Exception {
52 final Transaction txn = _persistit.getTransaction();
53 final Exchange exchange = _persistit.getExchange("persistit", "Bug912514Test", true);
54 String previous = null;
55 for (String string : sequence) {
56
57 if (inTxn) {
58 txn.begin();
59 }
60
61 if (string == null) {
62 exchange.to(1).fetchAndRemove();
63 } else {
64 exchange.getValue().put(string);
65 exchange.to(1).fetchAndStore();
66 }
67 compare(previous, exchange.getValue());
68
69 if (inTxn) {
70 if (string.startsWith(ROLLBACK)) {
71 txn.rollback();
72 } else {
73 txn.commit();
74 }
75 txn.end();
76 compare(previous, exchange.getValue());
77 }
78
79 if (!inTxn || !string.startsWith(ROLLBACK)) {
80 previous = string;
81 }
82 exchange.fetch();
83 compare(previous, exchange.getValue());
84
85
86 }
87 }
88
89 private void compare(final String string, final Value value) {
90 if (string == null) {
91 assertTrue("Value should be undefined", !value.isDefined());
92 } else {
93 assertEquals("Value should match", string, value.getString());
94 }
95 }
96
97 @Test
98 public void fetchAndStoreTxn() throws Exception {
99 fetchAndStoreAndRemoveHelper(true, RED_FOX, createString(100), createString(1000), createString(10000), RED_FOX);
100 }
101
102 @Test
103 public void fetchAndRemoveNonTxn() throws Exception {
104 fetchAndStoreAndRemoveHelper(false, RED_FOX, null, null, createString(100), null, createString(1000), null,
105 createString(10000), null, null, RED_FOX, null);
106 }
107
108 @Test
109 public void fetchAndStoreTxnWithRollbacks() throws Exception {
110 fetchAndStoreAndRemoveHelper(true, RED_FOX, createString(100), ROLLBACK, createString(1000), ROLLBACK
111 + createString(10000), createString(10000), RED_FOX);
112 }
113
114}

Subscribers

People subscribed via source and target branches