Merge lp:~pbeaman/akiban-persistit/fix-accumulator-checkpoint-failure into lp:akiban-persistit

Proposed by Peter Beaman
Status: Merged
Approved by: Nathan Williams
Approved revision: 384
Merged at revision: 378
Proposed branch: lp:~pbeaman/akiban-persistit/fix-accumulator-checkpoint-failure
Merge into: lp:akiban-persistit
Diff against target: 367 lines (+209/-17)
8 files modified
src/main/java/com/persistit/Accumulator.java (+31/-11)
src/main/java/com/persistit/CheckpointManager.java (+8/-1)
src/main/java/com/persistit/Persistit.java (+3/-3)
src/main/java/com/persistit/RecoveryManager.java (+1/-1)
src/main/java/com/persistit/Transaction.java (+6/-0)
src/main/java/com/persistit/TransactionPlayer.java (+7/-1)
src/main/java/com/persistit/util/SequencerConstants.java (+10/-0)
src/test/java/com/persistit/Bug1064565Test.java (+143/-0)
To merge this branch: bzr merge lp:~pbeaman/akiban-persistit/fix-accumulator-checkpoint-failure
Reviewer Review Type Date Requested Status
Nathan Williams Approve
Review via email: mp+129243@code.launchpad.net

Description of the change

This proposal fixes https://bugs.launchpad.net/akiban-persistit/+bug/1064565 which is a data-loss bug affecting the state of Accumulator values after system shutdown/restart.

The bug is caused by a subtle race condition in the protocol that determines when Persistit records Accumulator state. The bug mechanism is described in some detail in the bug report.

The central problem is the handling of the _checkpointRef field of the com.persistit.Accumulator.AccumulatorRef inner class. This field is either null or a reference to an Accumulator. It is intended to be null when the Accumulator has not been updated since the last checkpoint, and non-null when there has been an update since the last checkpoint. Checkpoints happen by default every two minutes, so this loose description of the timing is fine for almost all of every two-minute interval. However, the meaning of “since” needs to be carefully defined in the case where a transaction that is performing Accumulator updates becomes concurrent with a checkpoint. That’s when the bug occurs.

The invariant regarding timing is this: if some committed transaction with commit timestamp tc has updated an Accumulator, and if the most recent checkpoint has timestamp ts0 then the _checkpointRef field must be non-null whenever there is any transaction such that tc > ts0. The converse is not true; it is permissible for _checkpointRef to be non-null even if there are no qualifying updates. The result in that case is that the Accumulator state is recorded redundantly, but not incorrectly.

The reason for this invariant is as follows. Checkpoint C0 will record the snapshot value of the Accumulator as of its start timestamp ts0. By definition of SI, updates committed after ts0 will not be part of the checkpoint even if the transaction that created them started before ts0. In the event C0 is the very last checkpoint recorded before recovery, the state is valid because those subsequent updates would be replayed from the journal during recovery. However, if one more checkpoint C1 is written (as it is under normal shutdown) with start timestamp ts1 > tc, recovery will ignore the transactions in the journal (by definition of Checkpoint). Therefore C1 must record the updates committed at tc in order for correct recovery after C1 has been written to the journal. It is the value of _checkpointRef that determines this behavior.

The bug occurred because this invariant was violated. The following is an informal proof that the proposed changes enforce the invariant.

Changes:

AccumulatorRef now includes a new AtomicLong field called _latestUpdate in which the commit timestamp of the most-recently-started transaction performing an update on the Accumulator is stored. This field is modified using a lock-free CAS loop in AccumulatorRef#checkpointNeeded(…).

The AccumulatorRef#takeCheckpointRef() method has been modified to receive the start timestamp of the checkpoint transaction that is saving Accumulator state. This method sets _checkpointRef to null only if the checkpoint timestamp is greater than the then-current value of _latestUpdate.

The checkpointNeeded(timestamp) method is called from a different place than before. It is now called during the processing of Transaction#commit(…) at a time when the ultimate commit timestamp of the transaction has been determined.

Other minor change:

The Accumulator _baseValue field is now marked volatile because it is set during recovery by one thread and may first be read by a different thread. There is no race condition requiring synchronization, but the Java memory model would permit the second thread to receive stale data.

Informal Proof:

Two threads C and T are executing concurrently; C is calling takeCheckpointRef(ts) (while performing a checkpoint), and T is calling checkpointNeeded(tc) (while committing a transaction). Once both calls are finished we need to prove that under all execution schedules, the value of _checkpointRef is non-null if tc > ts.

(Note that both of these methods are lock-free because they are called frequently.)

These methods modify two shared variables, _checkpointRef and _latestUpdate – for brevity call these R and L, respectively.

T updates L then sets R.
C reads L, possibly clears R, reads L and possibly sets R again.

By definition of Java, the operations “update”, “set”, “read” and “clear” are atomic. We’ll use the following abbreviations:

TuL – T updates L
TsR – T sets R
CrL1 – C reads L the first time
CcR – C clears R
CrL2 – C reads L the second time
CsR – C sets R

Clearly all the execution schedules in which T’s operations either precede or follow C’s operations (i.e., TuL, TsR, CrL1, CcR… and CrL1, CcR…, TuL, TsR) are safe.

All schedules in which TuL precedes CrL1 are safe because C will never clear R. The following are the two such possible executions:

TuL, TsR, CrL1
TuL, CrL1, TsR

Now consider schedules in which TuL follows CrL1 but precedes CcR.

CrL1, TuL, TsR, CcR, CrL2, CsR

In this schedule C sees an old value of L and acts by clearing R. In this case the second read CrL2 sees the updated value of L and restores the value of C, leaving C correctly non-null. (This is the case that motivates the CrL2, CsR sequence in takeCheckpointRef.)

CrL1, TuL, CcR, TsR, CrL2, CsR

In this schedule TsR occurs after CcR so the value of C is already non-null. The final execution of CsR simply redundantly sets R. Same analysis applies to the following cases:

CrL1, TuL, CcR, CrL2, TsR, CsR
CrL1, TuL, CcR, CrL2, CsR, TsR

Finally, all schedules in which TuL follows CcR are all safe because in all such cases TsR must follow CsR, e.g.:

CrL1, CcR, TuL, TsR, CrL2, CsR
CrL1, CcR, TuL, CrL2, TsR, CsR
CrL1, CcR, TuL, CrL2, CsR, TsR

To post a comment you must log in.
Revision history for this message
Nathan Williams (nwilliams) wrote :

With regards to takeCheckpointRef and checkpointNeeded, the description is very complete and the code looks consistent with it. I believe it is correct as-is.

I am a little wary of diff line 81 though. Unconditionally calling checkpointNeeded() with 0 requires that checkpointNeeded always sets the ref. It does today, but I was about to suggest diff line 47 could be return instead, without loss of correctness, to prevent one case of spurious saves.

It looks like there is exactly 1 caller of updateBaseValue() and a timestamp is readily available. Is there a reason we shouldn't pass the ts down so that we are always dealing with real ones in checkpointNeeded?

Minor though and otherwise looks good!

review: Needs Information
Revision history for this message
Peter Beaman (pbeaman) wrote :

I think my explanation of the updateBaseValue call is a little off. I'll fix the Javadoc. updateBaseValue is called from RecoveryManager.DefaultRecoveryListener to apply a change from a committed transaction that has not yet been checkpointed. So in fact the setting of _checkpointRef in checkpointNeeded will occur precisely when a checkpoint is in fact needed, and there should be no redundant checkpointing. Re 0 vs. the commit timestamp of the transaction: the choice is immaterial; the intent is to set _checkpointRef so that the first checkpoint cycle of the new epoch will checkpoint the accumulator. If updateBaseValue were engaged in a race with the checkpoint manager then the distinction would matter, but checkpoint manager starts after recovery is done. But upon reflection, given that someday someone might change that, I'll change it to pass commit timestamp.

Re return vs. break at +47: I think that might be okay, but it complicates the "proof." Need to think about that one. As is, I believe the code is correct, but the change could eliminate a redundant set operation on volatile _checkpointRef and that might be a tiny help with highly concurrent transactions.

Revision history for this message
Nathan Williams (nwilliams) wrote :

To be clear, I wasn't worried about a race for updateBaseValue. I was worried that it was passing zero and expecting that to *always* set the ref. This is purely for future proofing changes to checkpointNeeded and reducing "magic" values. Another option would be a checkpointNeeded that takes no timestamp and just unconditionally sets the ref.

383. By Peter Beaman

Review comments: pass commitTimestamp to updateBaseValue.

384. By Peter Beaman

Review comment: return in diff +47

Revision history for this message
Peter Beaman (pbeaman) wrote :

Agree about the magic value. Changed it so that updateBaseValue passes the actual commitTimestamp. Also changed break to return in +47. I think that's fine. The important thing is to ensure that _checkpointRef is always set when it needs to be, and I think that property is preserved.

Revision history for this message
Nathan Williams (nwilliams) wrote :

Thanks for the tweak! I think this is good to go now.

review: Approve

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
=== modified file 'src/main/java/com/persistit/Accumulator.java'
--- src/main/java/com/persistit/Accumulator.java 2012-08-24 13:57:19 +0000
+++ src/main/java/com/persistit/Accumulator.java 2012-10-11 20:43:21 +0000
@@ -122,7 +122,7 @@
122 /*122 /*
123 * Check-pointed value read during recovery.123 * Check-pointed value read during recovery.
124 */124 */
125 private long _baseValue;125 private volatile long _baseValue;
126126
127 /*127 /*
128 * Snapshot value at the most recent checkpoint128 * Snapshot value at the most recent checkpoint
@@ -366,23 +366,37 @@
366 */366 */
367 final static class AccumulatorRef {367 final static class AccumulatorRef {
368 final WeakReference<Accumulator> _weakRef;368 final WeakReference<Accumulator> _weakRef;
369 final AtomicLong _latestUpdate = new AtomicLong();
369 volatile Accumulator _checkpointRef;370 volatile Accumulator _checkpointRef;
370371
371 AccumulatorRef(final Accumulator acc) {372 AccumulatorRef(final Accumulator acc) {
372 _weakRef = new WeakReference<Accumulator>(acc);373 _weakRef = new WeakReference<Accumulator>(acc);
373 _checkpointRef = acc;
374 }374 }
375375
376 Accumulator takeCheckpointRef() {376 Accumulator takeCheckpointRef(final long timestamp) {
377 final Accumulator result = _checkpointRef;377 final Accumulator result = _checkpointRef;
378 _checkpointRef = null;378
379 if (timestamp > _latestUpdate.get()) {
380 _checkpointRef = null;
381 if (timestamp <= _latestUpdate.get()) {
382 _checkpointRef = result;
383 }
384 }
385
379 return result;386 return result;
380 }387 }
381388
382 void checkpointNeeded(final Accumulator acc) {389 void checkpointNeeded(final Accumulator acc, final long timestamp) {
383 if (_checkpointRef == null) {390 while (true) {
384 _checkpointRef = acc;391 final long latest = _latestUpdate.get();
392 if (latest > timestamp) {
393 return;
394 }
395 if (_latestUpdate.compareAndSet(latest, timestamp)) {
396 break;
397 }
385 }398 }
399 _checkpointRef = acc;
386 }400 }
387401
388 boolean isLive() {402 boolean isLive() {
@@ -448,8 +462,8 @@
448 return _accumulatorRef;462 return _accumulatorRef;
449 }463 }
450464
451 void checkpointNeeded() {465 void checkpointNeeded(final long timestamp) {
452 _accumulatorRef.checkpointNeeded(this);466 _accumulatorRef.checkpointNeeded(this, timestamp);
453 }467 }
454468
455 long getBucketValue(final int hashIndex) {469 long getBucketValue(final int hashIndex) {
@@ -563,9 +577,16 @@
563 * 577 *
564 * @param value578 * @param value
565 */579 */
566 void updateBaseValue(final long value) {580 void updateBaseValue(final long value, final long commitTimestamp) {
567 _baseValue = applyValue(_baseValue, value);581 _baseValue = applyValue(_baseValue, value);
568 _liveValue.set(_baseValue);582 _liveValue.set(_baseValue);
583 /*
584 * This method is called during recovery processing to handle a delta
585 * operation that was part of a transaction that committed after the
586 * keystone checkpoint. That update requires the accumulator to be saved
587 * on the next checkpoint.
588 */
589 checkpointNeeded(commitTimestamp);
569 }590 }
570591
571 /**592 /**
@@ -619,7 +640,6 @@
619 */640 */
620 final long selectedValue = selectValue(value, updated);641 final long selectedValue = selectValue(value, updated);
621 _transactionIndex.addOrCombineDelta(status, this, step, selectedValue);642 _transactionIndex.addOrCombineDelta(status, this, step, selectedValue);
622 checkpointNeeded();
623 return updated;643 return updated;
624 }644 }
625645
626646
=== modified file 'src/main/java/com/persistit/CheckpointManager.java'
--- src/main/java/com/persistit/CheckpointManager.java 2012-10-04 20:40:40 +0000
+++ src/main/java/com/persistit/CheckpointManager.java 2012-10-11 20:43:21 +0000
@@ -15,6 +15,8 @@
1515
16package com.persistit;16package com.persistit;
1717
18import static com.persistit.util.SequencerConstants.ACCUMULATOR_CHECKPOINT_A;
19import static com.persistit.util.ThreadSequencer.sequence;
18import static com.persistit.util.Util.NS_PER_S;20import static com.persistit.util.Util.NS_PER_S;
1921
20import java.text.SimpleDateFormat;22import java.text.SimpleDateFormat;
@@ -246,7 +248,12 @@
246 txn.beginCheckpoint();248 txn.beginCheckpoint();
247 try {249 try {
248 _persistit.flushTransactions(txn.getStartTimestamp());250 _persistit.flushTransactions(txn.getStartTimestamp());
249 final List<Accumulator> accumulators = _persistit.getCheckpointAccumulators();251 /*
252 * Test only: block here while Accumulator update occurs
253 */
254 sequence(ACCUMULATOR_CHECKPOINT_A);
255
256 final List<Accumulator> accumulators = _persistit.takeCheckpointAccumulators(txn.getStartTimestamp());
250 _persistit.getTransactionIndex().checkpointAccumulatorSnapshots(txn.getStartTimestamp(), accumulators);257 _persistit.getTransactionIndex().checkpointAccumulatorSnapshots(txn.getStartTimestamp(), accumulators);
251 Accumulator.saveAccumulatorCheckpointValues(accumulators);258 Accumulator.saveAccumulatorCheckpointValues(accumulators);
252 txn.commit(CommitPolicy.HARD);259 txn.commit(CommitPolicy.HARD);
253260
=== modified file 'src/main/java/com/persistit/Persistit.java'
--- src/main/java/com/persistit/Persistit.java 2012-10-08 19:10:49 +0000
+++ src/main/java/com/persistit/Persistit.java 2012-10-11 20:43:21 +0000
@@ -2419,7 +2419,7 @@
2419 }2419 }
2420 }2420 }
2421 }2421 }
2422 if ((checkpointCount % ACCUMULATOR_CHECKPOINT_THRESHOLD) == 0) {2422 if (checkpointCount > 0 && (checkpointCount % ACCUMULATOR_CHECKPOINT_THRESHOLD) == 0) {
2423 try {2423 try {
2424 _checkpointManager.createCheckpoint();2424 _checkpointManager.createCheckpoint();
2425 } catch (final PersistitException e) {2425 } catch (final PersistitException e) {
@@ -2441,7 +2441,7 @@
2441 }2441 }
2442 }2442 }
24432443
2444 List<Accumulator> getCheckpointAccumulators() {2444 List<Accumulator> takeCheckpointAccumulators(final long timestamp) {
2445 final List<Accumulator> result = new ArrayList<Accumulator>();2445 final List<Accumulator> result = new ArrayList<Accumulator>();
2446 synchronized (_accumulators) {2446 synchronized (_accumulators) {
2447 for (final Iterator<AccumulatorRef> refIterator = _accumulators.iterator(); refIterator.hasNext();) {2447 for (final Iterator<AccumulatorRef> refIterator = _accumulators.iterator(); refIterator.hasNext();) {
@@ -2449,7 +2449,7 @@
2449 if (!ref.isLive()) {2449 if (!ref.isLive()) {
2450 refIterator.remove();2450 refIterator.remove();
2451 }2451 }
2452 final Accumulator acc = ref.takeCheckpointRef();2452 final Accumulator acc = ref.takeCheckpointRef(timestamp);
2453 if (acc != null) {2453 if (acc != null) {
2454 result.add(acc);2454 result.add(acc);
2455 }2455 }
24562456
=== modified file 'src/main/java/com/persistit/RecoveryManager.java'
--- src/main/java/com/persistit/RecoveryManager.java 2012-10-03 16:04:16 +0000
+++ src/main/java/com/persistit/RecoveryManager.java 2012-10-11 20:43:21 +0000
@@ -246,7 +246,7 @@
246 final int accumulatorTypeOrdinal, final long value) throws PersistitException {246 final int accumulatorTypeOrdinal, final long value) throws PersistitException {
247 final Accumulator.Type type = Accumulator.Type.values()[accumulatorTypeOrdinal];247 final Accumulator.Type type = Accumulator.Type.values()[accumulatorTypeOrdinal];
248 final Accumulator accumulator = tree.getAccumulator(type, index);248 final Accumulator accumulator = tree.getAccumulator(type, index);
249 accumulator.updateBaseValue(value);249 accumulator.updateBaseValue(value, timestamp);
250 }250 }
251251
252 @Override252 @Override
253253
=== modified file 'src/main/java/com/persistit/Transaction.java'
--- src/main/java/com/persistit/Transaction.java 2012-10-05 19:37:58 +0000
+++ src/main/java/com/persistit/Transaction.java 2012-10-11 20:43:21 +0000
@@ -854,6 +854,12 @@
854 _commitTimestamp = _persistit.getTimestampAllocator().updateTimestamp();854 _commitTimestamp = _persistit.getTimestampAllocator().updateTimestamp();
855 sequence(COMMIT_FLUSH_C);855 sequence(COMMIT_FLUSH_C);
856 long flushedTimetimestamp = 0;856 long flushedTimetimestamp = 0;
857
858 for (Delta delta = _transactionStatus.getDelta(); delta != null; delta = delta.getNext()) {
859 final Accumulator acc = delta.getAccumulator();
860 acc.checkpointNeeded(_commitTimestamp);
861 }
862
857 boolean committed = false;863 boolean committed = false;
858 try {864 try {
859865
860866
=== modified file 'src/main/java/com/persistit/TransactionPlayer.java'
--- src/main/java/com/persistit/TransactionPlayer.java 2012-10-03 16:04:16 +0000
+++ src/main/java/com/persistit/TransactionPlayer.java 2012-10-11 20:43:21 +0000
@@ -225,7 +225,13 @@
225225
226 case D0.TYPE: {226 case D0.TYPE: {
227 final Exchange exchange = getExchange(D0.getTreeHandle(bb), address, startTimestamp);227 final Exchange exchange = getExchange(D0.getTreeHandle(bb), address, startTimestamp);
228 listener.delta(address, startTimestamp, exchange.getTree(), D0.getIndex(bb),228 /*
229 * Note that the commitTimestamp, not startTimestamp is
230 * passed to the delta method. The
231 * Accumulator#updateBaseValue method needs the
232 * commitTimestamp.
233 */
234 listener.delta(address, commitTimestamp, exchange.getTree(), D0.getIndex(bb),
229 D0.getAccumulatorTypeOrdinal(bb), 1);235 D0.getAccumulatorTypeOrdinal(bb), 1);
230 appliedUpdates.incrementAndGet();236 appliedUpdates.incrementAndGet();
231 break;237 break;
232238
=== modified file 'src/main/java/com/persistit/util/SequencerConstants.java'
--- src/main/java/com/persistit/util/SequencerConstants.java 2012-09-28 21:39:44 +0000
+++ src/main/java/com/persistit/util/SequencerConstants.java 2012-10-11 20:43:21 +0000
@@ -103,4 +103,14 @@
103 array(DEALLOCATE_CHAIN_B), array(DEALLOCATE_CHAIN_A, DEALLOCATE_CHAIN_C),103 array(DEALLOCATE_CHAIN_B), array(DEALLOCATE_CHAIN_A, DEALLOCATE_CHAIN_C),
104 array(DEALLOCATE_CHAIN_A, DEALLOCATE_CHAIN_C) };104 array(DEALLOCATE_CHAIN_A, DEALLOCATE_CHAIN_C) };
105105
106 /*
107 * Used in testing delete/deallocate sequence in Bug1022567Test
108 */
109 int ACCUMULATOR_CHECKPOINT_A = allocate("ACCUMULATOR_CHECKPOINT_A");
110 int ACCUMULATOR_CHECKPOINT_B = allocate("ACCUMULATOR_CHECKPOINT_B");
111 int ACCUMULATOR_CHECKPOINT_C = allocate("ACCUMULATOR_CHECKPOINT_C");
112 int[][] ACCUMULATOR_CHECKPOINT_SCHEDULED = new int[][] { array(ACCUMULATOR_CHECKPOINT_A, ACCUMULATOR_CHECKPOINT_B),
113 array(ACCUMULATOR_CHECKPOINT_B), array(ACCUMULATOR_CHECKPOINT_A, ACCUMULATOR_CHECKPOINT_C),
114 array(ACCUMULATOR_CHECKPOINT_A, ACCUMULATOR_CHECKPOINT_C) };
115
106}116}
107117
=== added file 'src/test/java/com/persistit/Bug1064565Test.java'
--- src/test/java/com/persistit/Bug1064565Test.java 1970-01-01 00:00:00 +0000
+++ src/test/java/com/persistit/Bug1064565Test.java 2012-10-11 20:43:21 +0000
@@ -0,0 +1,143 @@
1/**
2 * Copyright © 2012 Akiban Technologies, Inc. All rights reserved.
3 *
4 * This program and the accompanying materials are made available
5 * under the terms of the Eclipse Public License v1.0 which
6 * accompanies this distribution, and is available at
7 * http://www.eclipse.org/legal/epl-v10.html
8 *
9 * This program may also be available under different license terms.
10 * For more information, see www.akiban.com or contact licensing@akiban.com.
11 *
12 * Contributors:
13 * Akiban Technologies, Inc.
14 */
15
16package com.persistit;
17
18import static com.persistit.unit.UnitTestProperties.VOLUME_NAME;
19import static com.persistit.util.SequencerConstants.ACCUMULATOR_CHECKPOINT_A;
20import static com.persistit.util.SequencerConstants.ACCUMULATOR_CHECKPOINT_B;
21import static com.persistit.util.SequencerConstants.ACCUMULATOR_CHECKPOINT_C;
22import static com.persistit.util.SequencerConstants.ACCUMULATOR_CHECKPOINT_SCHEDULED;
23import static com.persistit.util.ThreadSequencer.addSchedules;
24import static com.persistit.util.ThreadSequencer.enableSequencer;
25import static com.persistit.util.ThreadSequencer.sequence;
26import static com.persistit.util.ThreadSequencer.setCondition;
27import static org.junit.Assert.assertEquals;
28
29import java.util.concurrent.atomic.AtomicBoolean;
30
31import org.junit.Test;
32
33import com.persistit.exception.PersistitException;
34import com.persistit.util.ThreadSequencer.Condition;
35
36/**
37 * https://bugs.launchpad.net/akiban-persistit/+bug/1064565
38 *
39 * The state of an Accumulator is sometimes incorrect after shutting down and
40 * restarting Persistit and as a result an application can read a count or value
41 * that is inconsistent with the history of committed transactions.
42 *
43 * The bug mechanism is a race between the CheckpointManager#createCheckpoint
44 * method and the Accumulator#update method in which an update which occurs in a
45 * transaction that starts immediately after the checkpoint begins its
46 * transaction can be lost. The probability of failure is low but may be
47 * increased by intense I/O activity.
48 *
49 * This is a data loss error and is therefore critical.
50 */
51
52public class Bug1064565Test extends PersistitUnitTestCase {
53
54 private final static String TREE_NAME = "Bug1064565Test";
55
56 private Exchange getExchange() throws PersistitException {
57 return _persistit.getExchange(VOLUME_NAME, TREE_NAME, true);
58 }
59
60 @Test
61 public void accumulatorRace() throws Exception {
62 enableSequencer(false);
63 addSchedules(ACCUMULATOR_CHECKPOINT_SCHEDULED);
64 final AtomicBoolean once = new AtomicBoolean(true);
65 setCondition(ACCUMULATOR_CHECKPOINT_A, new Condition() {
66 @Override
67 public boolean enabled() {
68 return once.getAndSet(false);
69 }
70 });
71
72 Exchange exchange = getExchange();
73 Transaction txn = exchange.getTransaction();
74 final Thread t = new Thread(new Runnable() {
75 @Override
76 public void run() {
77 try {
78 _persistit.checkpoint();
79 } catch (final PersistitException e) {
80 throw new RuntimeException(e);
81 }
82 }
83 });
84 t.start();
85
86 txn.begin();
87 Accumulator acc = exchange.getTree().getAccumulator(Accumulator.Type.SUM, 0);
88 acc.update(42, txn);
89 sequence(ACCUMULATOR_CHECKPOINT_B);
90 txn.commit();
91 txn.end();
92 sequence(ACCUMULATOR_CHECKPOINT_C);
93
94 _persistit.close();
95
96 final Configuration config = _persistit.getConfiguration();
97 _persistit = new Persistit();
98 _persistit.initialize(config);
99
100 exchange = getExchange();
101 txn = exchange.getTransaction();
102 txn.begin();
103 acc = exchange.getTree().getAccumulator(Accumulator.Type.SUM, 0);
104 assertEquals("Accumulator state should have been checkpointed", 42, acc.getSnapshotValue(txn));
105 txn.commit();
106 txn.end();
107
108 _persistit.checkpoint();
109 _persistit.checkpoint();
110 _persistit.checkpoint();
111 }
112
113 /**
114 * ThreadSequencer is not even needed: this sequence shows how setting
115 * checkpointNeeded inside of the main transaction is not correctly
116 * sequenced against the checkpoint.
117 */
118 @Test
119 public void nathansVersion() throws Exception {
120 Exchange exchange = getExchange();
121 Transaction txn = exchange.getTransaction();
122 txn.begin();
123 Accumulator acc = exchange.getTree().getAccumulator(Accumulator.Type.SUM, 0);
124 acc.update(42, txn);
125 _persistit.checkpoint();
126 txn.commit();
127 txn.end();
128 _persistit.copyBackPages();
129 final Configuration config = _persistit.getConfiguration();
130 _persistit.close();
131 _persistit = new Persistit();
132 _persistit.initialize(config);
133
134 exchange = getExchange();
135 txn = exchange.getTransaction();
136 txn.begin();
137 acc = exchange.getTree().getAccumulator(Accumulator.Type.SUM, 0);
138 assertEquals("Accumulator state should have been checkpointed", 42, acc.getSnapshotValue(txn));
139 txn.commit();
140 txn.end();
141
142 }
143}

Subscribers

People subscribed via source and target branches