Merge lp:~pbeaman/akiban-persistit/fix-several-small-bugs into lp:akiban-persistit

Proposed by Peter Beaman
Status: Superseded
Proposed branch: lp:~pbeaman/akiban-persistit/fix-several-small-bugs
Merge into: lp:akiban-persistit
Diff against target: 782 lines (+287/-81)
19 files modified
src/main/java/com/persistit/BufferPool.java (+6/-5)
src/main/java/com/persistit/CheckpointManager.java (+2/-2)
src/main/java/com/persistit/Exchange.java (+1/-1)
src/main/java/com/persistit/JournalManager.java (+2/-20)
src/main/java/com/persistit/JournalManagerBench.java (+2/-1)
src/main/java/com/persistit/Persistit.java (+53/-27)
src/main/java/com/persistit/SessionId.java (+10/-0)
src/main/java/com/persistit/Transaction.java (+0/-1)
src/main/java/com/persistit/TransactionIndexBucket.java (+5/-9)
src/main/java/com/persistit/TransactionStatus.java (+20/-0)
src/main/java/com/persistit/VolumeHeader.java (+9/-1)
src/main/java/com/persistit/logging/LogBase.java (+3/-0)
src/main/java/com/persistit/util/Util.java (+4/-0)
src/test/java/com/persistit/IOFailureTest.java (+2/-0)
src/test/java/com/persistit/TransactionTest2.java (+43/-9)
src/test/java/com/persistit/stress/AbstractStressTest.java (+3/-1)
src/test/java/com/persistit/stress/AbstractSuite.java (+4/-4)
src/test/java/com/persistit/stress/StartStop.java (+2/-0)
src/test/java/com/persistit/unit/ExchangeTest.java (+116/-0)
To merge this branch: bzr merge lp:~pbeaman/akiban-persistit/fix-several-small-bugs
Reviewer Review Type Date Requested Status
Nathan Williams Approve
Review via email: mp+128309@code.launchpad.net

This proposal has been superseded by a proposal from 2012-10-08.

Description of the change

Fixes several small unrelated bugs:

https://bugs.launchpad.net/akiban-persistit/+bug/1023549 - as written, is invalid, and this proposal includes a new unit test to prove it.

https://bugs.launchpad.net/akiban-persistit/+bug/1013259 - cleans up Persistit shut-down handling. Changes ensure that concurrent transactions are interrupted and closed properly. The IllegalMonitorStateException occurred because two threads were racing to close a transaction on shutdown; this condition is now eliminated. A new unit test was added to TransactionTest2 to verify correct shutdown behavior.

https://bugs.launchpad.net/akiban-persistit/+bug/1029942 - incorrect constant was used in the log statement. Constants NS_PER_S, NS_PER_MS and MS_PER_S were consolidated into the com.persistit.util.Util class.

Bug #1062315: Assertion failure in TransactionIndexBucket#allocateTransactionStatus - a small change to avoid asserting in the case of a benign race condition was added.

To post a comment you must log in.
Revision history for this message
Nathan Williams (nwilliams) wrote :

VolumeStorageV2#updateMetaData() - including timestamp changed in result will make meta data always "dirty" and then written. Is that what we want?

ExchangeTest#traverseEQfalse0 - The bug has has 'false' in the title and 'true' in the description. Could we add cases for 'true' as well? Additionally, the server use case probably did it inside of a transaction. It should work as is if another test method just calls the new one, but inside of a transaction block. I have no reason to suspect a failure, but simple to add.

Persistit#close() - _journalManager.stopCopier() is no longer called. Should it be? The rest of the related changes look plausible and the new test is good.

review: Needs Information
Revision history for this message
Peter Beaman (pbeaman) wrote :

VolumeStorageV2#updateMetaData - good catch - forgot to change it back after an experiment.

ExchangeTest#traverseEQfalse0 - good catch on not testing the transaction case. Changed the names, added a transactional version, and also tests for both true and false values of the 'deep' parameter.

Persistit#close() no longer calls stopCopier - yes, there is a change. The JOURNAL_COPIER thread used to be responsible for various cleanup activities and therefore had to be stopped early. That is no longer the case, so JournalManager#close() is where the copier now gets stopped.

Revision history for this message
Nathan Williams (nwilliams) wrote :

Thanks for the tweaks and clarifications.

review: Approve
Revision history for this message
Peter Beaman (pbeaman) wrote :

Has a conflict. Marked Rejected. Will fix and re-approve.

387. By Peter Beaman

Merge from trunk (with fix-dynamic-volumes) and fix conflict in JournalManager

Unmerged revisions

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'src/main/java/com/persistit/BufferPool.java'
2--- src/main/java/com/persistit/BufferPool.java 2012-09-27 18:22:25 +0000
3+++ src/main/java/com/persistit/BufferPool.java 2012-10-08 18:01:33 +0000
4@@ -15,6 +15,8 @@
5
6 package com.persistit;
7
8+import static com.persistit.util.Util.NS_PER_S;
9+
10 import java.io.DataOutputStream;
11 import java.io.IOException;
12 import java.nio.ByteBuffer;
13@@ -97,11 +99,10 @@
14 */
15 private final static int INVENTORY_VERSIONS = 3;
16
17- private final static long NS_PER_SEC = 1000000000;
18 /**
19 * Preload log message interval, in seconds
20 */
21- private final static long INVENTORY_PRELOAD_LOG_MESSAGE_NS = 60L * NS_PER_SEC;
22+ private final static long INVENTORY_PRELOAD_LOG_MESSAGE_NS = 60L * NS_PER_S;
23
24 /**
25 * The Persistit instance that references this BufferPool.
26@@ -1514,8 +1515,8 @@
27 count++;
28 final long now = System.nanoTime();
29 if (now - reportTime >= INVENTORY_PRELOAD_LOG_MESSAGE_NS) {
30- _persistit.getLogBase().bufferInventoryProgress.log(count, total, (now - reportTime)
31- / NS_PER_SEC);
32+ _persistit.getLogBase().bufferInventoryProgress
33+ .log(count, total, (now - reportTime) / NS_PER_S);
34 reportTime = now;
35 }
36 if (count >= _bufferCount) {
37@@ -1533,7 +1534,7 @@
38 _persistit.getLogBase().bufferInventoryException.log(e);
39 } finally {
40 final long now = System.nanoTime();
41- _persistit.getLogBase().bufferInventoryProgress.log(count, total, (now - reportTime) / NS_PER_SEC);
42+ _persistit.getLogBase().bufferInventoryProgress.log(count, total, (now - reportTime) / NS_PER_S);
43 }
44 }
45
46
47=== modified file 'src/main/java/com/persistit/CheckpointManager.java'
48--- src/main/java/com/persistit/CheckpointManager.java 2012-08-24 14:00:17 +0000
49+++ src/main/java/com/persistit/CheckpointManager.java 2012-10-08 18:01:33 +0000
50@@ -15,6 +15,8 @@
51
52 package com.persistit;
53
54+import static com.persistit.util.Util.NS_PER_S;
55+
56 import java.text.SimpleDateFormat;
57 import java.util.ArrayList;
58 import java.util.Date;
59@@ -86,8 +88,6 @@
60 }
61 }
62
63- private final static long NS_PER_S = 1000000000L;
64-
65 /**
66 * Default interval in nanoseconds between checkpoints - two minutes.
67 */
68
69=== modified file 'src/main/java/com/persistit/Exchange.java'
70--- src/main/java/com/persistit/Exchange.java 2012-10-02 20:37:24 +0000
71+++ src/main/java/com/persistit/Exchange.java 2012-10-08 18:01:33 +0000
72@@ -1492,7 +1492,7 @@
73 if (doMVCC) {
74 valueToStore = spareValue;
75 final int valueSize = value.getEncodedSize();
76- int retries = VERSIONS_OUT_OF_ORDER_RETRY_COUNT;
77+ final int retries = VERSIONS_OUT_OF_ORDER_RETRY_COUNT;
78
79 for (;;) {
80 try {
81
82=== modified file 'src/main/java/com/persistit/JournalManager.java'
83--- src/main/java/com/persistit/JournalManager.java 2012-09-27 22:17:34 +0000
84+++ src/main/java/com/persistit/JournalManager.java 2012-10-08 18:01:33 +0000
85@@ -19,6 +19,7 @@
86 import static com.persistit.util.SequencerConstants.PAGE_MAP_READ_INVALIDATE_A;
87 import static com.persistit.util.SequencerConstants.RECOVERY_PRUNING_B;
88 import static com.persistit.util.ThreadSequencer.sequence;
89+import static com.persistit.util.Util.NS_PER_MS;
90
91 import java.io.File;
92 import java.io.IOException;
93@@ -78,7 +79,6 @@
94 final static int HALF_URGENT = 5;
95 final static int URGENT_COMMIT_DELAY_MILLIS = 50;
96 final static int GENTLE_COMMIT_DELAY_MILLIS = 12;
97- private final static long NS_PER_MS = 1000000L;
98 private final static int IO_MEASUREMENT_CYCLES = 8;
99 private final static int TOO_MANY_WARN_THRESHOLD = 5;
100 private final static int TOO_MANY_ERROR_THRESHOLD = 10;
101@@ -1255,11 +1255,6 @@
102 return address % _blockSize;
103 }
104
105- void stopCopier() {
106- _copier.setShouldStop(true);
107- _persistit.waitForIOTaskStop(_copier);
108- }
109-
110 void setWriteBufferSize(final int size) {
111 if (size < MINIMUM_BUFFER_SIZE || size > MAXIMUM_BUFFER_SIZE) {
112 throw new IllegalArgumentException("Invalid write buffer size: " + size);
113@@ -1268,11 +1263,7 @@
114 }
115
116 public void close() throws PersistitException {
117-
118- synchronized (this) {
119- _closed.set(true);
120- }
121-
122+ _closed.set(true);
123 rollover();
124
125 final JournalCopier copier = _copier;
126@@ -2442,10 +2433,6 @@
127 for (final Iterator<PageNode> iterator = list.iterator(); iterator.hasNext();) {
128 Volume volumeRef = null;
129
130- if (_closed.get() && !_copyFast.get() || _appendOnly.get()) {
131- list.clear();
132- break;
133- }
134 final PageNode pageNode = iterator.next();
135 if (pageNode.isInvalid()) {
136 iterator.remove();
137@@ -2507,11 +2494,6 @@
138 final Set<Volume> volumes = new HashSet<Volume>();
139
140 for (final Iterator<PageNode> iterator = list.iterator(); iterator.hasNext();) {
141- if (_closed.get() && !_copyFast.get() || _appendOnly.get()) {
142- list.clear();
143- break;
144- }
145-
146 final PageNode pageNode = iterator.next();
147
148 if (pageNode.getVolumeHandle() != handle) {
149
150=== modified file 'src/main/java/com/persistit/JournalManagerBench.java'
151--- src/main/java/com/persistit/JournalManagerBench.java 2012-09-12 20:36:27 +0000
152+++ src/main/java/com/persistit/JournalManagerBench.java 2012-10-08 18:01:33 +0000
153@@ -15,6 +15,8 @@
154
155 package com.persistit;
156
157+import static com.persistit.util.Util.NS_PER_S;
158+
159 import java.io.File;
160 import java.io.RandomAccessFile;
161 import java.nio.ByteBuffer;
162@@ -44,7 +46,6 @@
163 */
164 public class JournalManagerBench {
165
166- private final long NS_PER_S = 1000000000L;
167 private final byte[] NULLS = new byte[65536];
168
169 private final String[] ARG_TEMPLATE = new String[] { "duration|int:10:10:86400|Duration of test in seconds",
170
171=== modified file 'src/main/java/com/persistit/Persistit.java'
172--- src/main/java/com/persistit/Persistit.java 2012-09-12 22:02:22 +0000
173+++ src/main/java/com/persistit/Persistit.java 2012-10-08 18:01:33 +0000
174@@ -15,6 +15,8 @@
175
176 package com.persistit;
177
178+import static com.persistit.util.Util.NS_PER_S;
179+
180 import java.io.BufferedReader;
181 import java.io.File;
182 import java.io.FileReader;
183@@ -1509,7 +1511,7 @@
184 return _bufferPoolTable;
185 }
186
187- public void cleanup() {
188+ void cleanup() {
189 final Set<SessionId> sessionIds;
190 synchronized (_transactionSessionMap) {
191 sessionIds = new HashSet<SessionId>(_transactionSessionMap.keySet());
192@@ -1654,12 +1656,8 @@
193 }
194 recordBufferPoolInventory();
195
196- /*
197- * The copier is responsible for background pruning of aborted
198- * transactions. Halt it so Transaction#close() can be called
199- * without being concerned about its state changing.
200- */
201- _journalManager.stopCopier();
202+ _cleanupManager.close(flush);
203+ waitForIOTaskStop(_cleanupManager);
204
205 getTransaction().close();
206 cleanup();
207@@ -1676,9 +1674,6 @@
208 }
209 }
210
211- _cleanupManager.close(flush);
212- waitForIOTaskStop(_cleanupManager);
213-
214 _checkpointManager.close(flush);
215 waitForIOTaskStop(_checkpointManager);
216
217@@ -1688,26 +1683,13 @@
218 pool.close();
219 }
220
221- /*
222- * Close (and abort) all remaining transactions.
223- */
224- Set<Transaction> transactions;
225- synchronized (_transactionSessionMap) {
226- transactions = new HashSet<Transaction>(_transactionSessionMap.values());
227- _transactionSessionMap.clear();
228- }
229- for (final Transaction txn : transactions) {
230- try {
231- txn.close();
232- } catch (final PersistitException e) {
233- _logBase.exception.log(e);
234- }
235- }
236-
237 _journalManager.close();
238 final IOTaskRunnable task = _transactionIndex.close();
239 waitForIOTaskStop(task);
240
241+ interruptActiveThreads(SHORT_DELAY);
242+ closeZombieTransactions();
243+
244 for (final Volume volume : volumes) {
245 volume.close();
246 }
247@@ -1725,6 +1707,50 @@
248 releaseAllResources();
249 }
250
251+ private void closeZombieTransactions() {
252+ final Set<SessionId> sessionIds;
253+ synchronized (_transactionSessionMap) {
254+ sessionIds = new HashSet<SessionId>(_transactionSessionMap.keySet());
255+ }
256+ for (final SessionId sessionId : sessionIds) {
257+ Transaction transaction = null;
258+ synchronized (_transactionSessionMap) {
259+ transaction = _transactionSessionMap.remove(sessionId);
260+ }
261+ if (!sessionId.isAlive()) {
262+ if (transaction != null) {
263+ try {
264+ transaction.close();
265+ } catch (final Exception e) {
266+ _logBase.exception.log(e);
267+ }
268+ }
269+ }
270+ }
271+ }
272+
273+ private void interruptActiveThreads(final long timeout) throws PersistitInterruptedException {
274+ final long expires = System.currentTimeMillis() + timeout;
275+ boolean remaining = false;
276+ do {
277+ final Set<SessionId> sessionIds;
278+ synchronized (_transactionSessionMap) {
279+ sessionIds = new HashSet<SessionId>(_transactionSessionMap.keySet());
280+ }
281+ for (final SessionId sessionId : sessionIds) {
282+ if (sessionId.isAlive()) {
283+ if (sessionId.interrupt()) {
284+ _logBase.interruptedAtClose.log(sessionId.ownerName());
285+ }
286+ remaining = true;
287+ }
288+ }
289+ if (remaining) {
290+ Util.spinSleep();
291+ }
292+ } while (remaining && System.currentTimeMillis() < expires);
293+ }
294+
295 /**
296 * Abruptly stop (using {@link Thread#stop()}) the writer and collector
297 * processes. This method should be used only by tests.
298@@ -1880,7 +1906,7 @@
299 }
300 final long now = System.currentTimeMillis();
301 if (now > _nextCloseTime) {
302- _logBase.waitForClose.log((_nextCloseTime - _beginCloseTime) / 1000);
303+ _logBase.waitForClose.log((_nextCloseTime - _beginCloseTime) / NS_PER_S);
304 _nextCloseTime += CLOSE_LOG_INTERVAL;
305 }
306 }
307
308=== modified file 'src/main/java/com/persistit/SessionId.java'
309--- src/main/java/com/persistit/SessionId.java 2012-09-26 13:31:13 +0000
310+++ src/main/java/com/persistit/SessionId.java 2012-10-08 18:01:33 +0000
311@@ -77,6 +77,16 @@
312 _owner.set(Thread.currentThread());
313 }
314
315+ boolean interrupt() {
316+ final Thread t = _owner.get();
317+ if (t != null && t != Thread.currentThread()) {
318+ t.interrupt();
319+ return true;
320+ } else {
321+ return false;
322+ }
323+ }
324+
325 public String ownerName() {
326 final Thread t = _owner.get();
327 if (t == null) {
328
329=== modified file 'src/main/java/com/persistit/Transaction.java'
330--- src/main/java/com/persistit/Transaction.java 2012-08-24 13:57:19 +0000
331+++ src/main/java/com/persistit/Transaction.java 2012-10-08 18:01:33 +0000
332@@ -725,7 +725,6 @@
333 _persistit.getTransactionIndex().notifyCompleted(_transactionStatus,
334 _persistit.getTimestampAllocator().getCurrentTimestamp());
335 _rollbackCompleted = true;
336-
337 }
338 }
339 }
340
341=== modified file 'src/main/java/com/persistit/TransactionIndexBucket.java'
342--- src/main/java/com/persistit/TransactionIndexBucket.java 2012-08-24 13:57:19 +0000
343+++ src/main/java/com/persistit/TransactionIndexBucket.java 2012-10-08 18:01:33 +0000
344@@ -170,10 +170,13 @@
345 _lock.unlock();
346 }
347
348- TransactionStatus allocateTransactionStatus() {
349+ TransactionStatus allocateTransactionStatus() throws InterruptedException {
350 assert _lock.isHeldByCurrentThread();
351 final TransactionStatus status = _free;
352 if (status != null) {
353+ if (status.isLocked()) {
354+ status.briefLock();
355+ }
356 assert !status.isLocked();
357 _free = status.getNext();
358 _freeCount--;
359@@ -569,14 +572,7 @@
360 }
361 }
362 } else if (tc < 0 && tc != ABORTED && -tc < timestamp) {
363- boolean locked = false;
364- try {
365- locked = status.wwLock(TransactionIndex.SHORT_TIMEOUT);
366- } finally {
367- if (locked) {
368- status.wwUnlock();
369- }
370- }
371+ status.briefLock();
372 _transactionIndex.incrementAccumulatorSnapshotRetryCounter();
373 throw RetryException.SINGLE;
374 }
375
376=== modified file 'src/main/java/com/persistit/TransactionStatus.java'
377--- src/main/java/com/persistit/TransactionStatus.java 2012-08-24 13:57:19 +0000
378+++ src/main/java/com/persistit/TransactionStatus.java 2012-10-08 18:01:33 +0000
379@@ -327,6 +327,26 @@
380 }
381
382 /**
383+ * Block briefly until another thread transiently holding the wwLock
384+ * vacates. Times out and returns after
385+ * {@value TransactionIndex#SHORT_TIMEOUT} milliseconds.
386+ *
387+ * @throws InterruptedException
388+ */
389+ void briefLock() throws InterruptedException {
390+ boolean locked = false;
391+ try {
392+ locked = wwLock(TransactionIndex.SHORT_TIMEOUT);
393+ } catch (final InterruptedException ie) {
394+ Thread.currentThread().interrupt();
395+ } finally {
396+ if (locked) {
397+ wwUnlock();
398+ }
399+ }
400+ }
401+
402+ /**
403 * <p>
404 * Acquire a lock on this TransactionStatus. This supports the
405 * {@link TransactionIndex#wwDependency(long, long, long)} method. While a
406
407=== modified file 'src/main/java/com/persistit/VolumeHeader.java'
408--- src/main/java/com/persistit/VolumeHeader.java 2012-08-24 13:57:19 +0000
409+++ src/main/java/com/persistit/VolumeHeader.java 2012-10-08 18:01:33 +0000
410@@ -287,11 +287,12 @@
411 */
412 public static boolean verifyVolumeHeader(final VolumeSpecification specification, final long systemTimestamp)
413 throws CorruptVolumeException, InvalidVolumeSpecificationException, PersistitIOException {
414+ FileInputStream stream = null;
415 try {
416 final File file = new File(specification.getPath());
417 if (file.exists()) {
418 if (file.isFile()) {
419- final FileInputStream stream = new FileInputStream(file);
420+ stream = new FileInputStream(file);
421 final byte[] bytes = new byte[SIZE];
422 final int readSize = stream.read(bytes);
423 if (readSize < SIZE) {
424@@ -336,6 +337,13 @@
425 }
426 } catch (final IOException ioe) {
427 throw new PersistitIOException(ioe);
428+ } finally {
429+ if (stream != null) {
430+ try {
431+ stream.close();
432+ } catch (final IOException e) {
433+ }
434+ }
435 }
436 }
437
438
439=== modified file 'src/main/java/com/persistit/logging/LogBase.java'
440--- src/main/java/com/persistit/logging/LogBase.java 2012-09-06 20:48:31 +0000
441+++ src/main/java/com/persistit/logging/LogBase.java 2012-10-08 18:01:33 +0000
442@@ -253,6 +253,9 @@
443 @Message("WARNING|Exception while writing buffer pool inventory %s")
444 public final LogItem bufferInventoryException = PersistitLogMessage.empty();
445
446+ @Message("WARNING|Thread %s interrupted due to shutdown")
447+ public final LogItem interruptedAtClose = PersistitLogMessage.empty();
448+
449 public static String recurring(final String message, final int count, final long duration) {
450 return String.format(RECURRING, message, count, duration);
451 }
452
453=== modified file 'src/main/java/com/persistit/util/Util.java'
454--- src/main/java/com/persistit/util/Util.java 2012-08-24 13:57:19 +0000
455+++ src/main/java/com/persistit/util/Util.java 2012-10-08 18:01:33 +0000
456@@ -38,6 +38,10 @@
457 public final static String NEW_LINE = System.getProperty("line.separator");
458 private final static String REGEX_QUOTE = "^$*+?()[].";
459
460+ public final static long NS_PER_S = 1000000000L;
461+ public final static long MS_PER_S = 1000L;
462+ public final static long NS_PER_MS = 1000000L;
463+
464 public final static char[] HEX_DIGITS = { '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'A', 'B', 'C', 'D',
465 'E', 'F' };
466
467
468=== modified file 'src/test/java/com/persistit/IOFailureTest.java'
469--- src/test/java/com/persistit/IOFailureTest.java 2012-08-24 14:00:17 +0000
470+++ src/test/java/com/persistit/IOFailureTest.java 2012-10-08 18:01:33 +0000
471@@ -308,6 +308,8 @@
472 final long size0 = channel0.size();
473 channel0.truncate(100);
474
475+ channel0.close();
476+
477 final File file1 = jman.addressToFile(currentAddress);
478 final FileChannel channel1 = new RandomAccessFile(file1, "rw").getChannel();
479 final long size1 = channel1.size();
480
481=== modified file 'src/test/java/com/persistit/TransactionTest2.java'
482--- src/test/java/com/persistit/TransactionTest2.java 2012-08-24 13:57:19 +0000
483+++ src/test/java/com/persistit/TransactionTest2.java 2012-10-08 18:01:33 +0000
484@@ -29,6 +29,7 @@
485 import org.junit.Test;
486
487 import com.persistit.Transaction.CommitPolicy;
488+import com.persistit.exception.PersistitClosedException;
489 import com.persistit.exception.PersistitException;
490 import com.persistit.exception.PersistitIOException;
491 import com.persistit.exception.PersistitInterruptedException;
492@@ -47,15 +48,17 @@
493
494 final static CommitPolicy policy = CommitPolicy.SOFT;
495
496- final static long TIMEOUT = 20000; // 20 seconds
497+ final static long TIMEOUT = 10000; // 10 seconds
498+ final static int ITERATIONS_PER_THREAD = 25000;
499
500 static int _threadCount = 8;
501- static int _iterationsPerThread = 25000;
502+ static int _iterationsPerThread = ITERATIONS_PER_THREAD;
503 static int _accounts = 5000;
504
505 static AtomicInteger _retriedTransactionCount = new AtomicInteger();
506 static AtomicInteger _completedTransactionCount = new AtomicInteger();
507 static AtomicInteger _failedTransactionCount = new AtomicInteger();
508+ static AtomicInteger _strandedThreads = new AtomicInteger();
509
510 static int _threadCounter = 0;
511
512@@ -118,7 +121,7 @@
513 threadArray[index] = new Thread(new Runnable() {
514 @Override
515 public void run() {
516- runIt();
517+ runIt(_iterationsPerThread);
518 }
519 }, "TransactionThread_" + index);
520
521@@ -182,7 +185,7 @@
522 final Thread thread = new Thread(new Runnable() {
523 @Override
524 public void run() {
525- runIt();
526+ runIt(Integer.MAX_VALUE);
527 }
528 }, "TransactionThread_" + ++index);
529 threads.add(thread);
530@@ -238,19 +241,44 @@
531 txn.begin();
532 }
533
534- public void runIt() {
535+ @Test
536+ public void transactionsConcurrentWithPersistitClose() throws Exception {
537+ new Thread(new Runnable() {
538+ @Override
539+ public void run() {
540+ final Thread[] threadArray = new Thread[_threadCount];
541+ for (int index = 0; index < _threadCount; index++) {
542+ threadArray[index] = new Thread(new Runnable() {
543+ @Override
544+ public void run() {
545+ runIt(Integer.MAX_VALUE);
546+ }
547+ }, "TransactionThread_" + index);
548+ }
549+ for (int index = 0; index < _threadCount; index++) {
550+ threadArray[index].start();
551+ }
552+ }
553+ }).start();
554+ /*
555+ * Let the threads crank up
556+ */
557+ Thread.sleep(1000);
558+ _persistit.close();
559+ assertEquals("All threads should have exited correctly", 0, _strandedThreads.get());
560+ }
561+
562+ public void runIt(final int limit) {
563+ _strandedThreads.incrementAndGet();
564 try {
565 final Exchange accountEx = _persistit.getExchange("persistit", "account", true);
566 //
567 final Random random = new Random();
568- for (int iterations = 1; iterations <= _iterationsPerThread; iterations++) {
569+ for (int iterations = 1; iterations <= limit; iterations++) {
570 final int accountNo1 = random.nextInt(_accounts);
571- // int accountNo2 = random.nextInt(_accounts - 1);
572- // if (accountNo2 == accountNo1) accountNo2++;
573 final int accountNo2 = random.nextInt(_accounts);
574
575 final int delta = random.nextInt(10000);
576- // final int delta = 1;
577
578 transfer(accountEx, accountNo1, accountNo2, delta);
579 _completedTransactionCount.incrementAndGet();
580@@ -260,10 +288,16 @@
581 System.out.flush();
582 }
583 }
584+ _strandedThreads.decrementAndGet();
585 } catch (final PersistitInterruptedException exception) {
586+ _strandedThreads.decrementAndGet();
587+ // expected
588+ } catch (final PersistitClosedException exception) {
589+ _strandedThreads.decrementAndGet();
590 // expected
591 } catch (final PersistitIOException exception) {
592 if (InterruptedIOException.class.equals(exception.getCause().getClass())) {
593+ _strandedThreads.decrementAndGet();
594 // expected
595 } else {
596 exception.printStackTrace();
597
598=== modified file 'src/test/java/com/persistit/stress/AbstractStressTest.java'
599--- src/test/java/com/persistit/stress/AbstractStressTest.java 2012-08-24 13:57:19 +0000
600+++ src/test/java/com/persistit/stress/AbstractStressTest.java 2012-10-08 18:01:33 +0000
601@@ -15,6 +15,8 @@
602
603 package com.persistit.stress;
604
605+import static com.persistit.util.Util.NS_PER_S;
606+
607 import com.persistit.Persistit;
608
609 /**
610@@ -128,7 +130,7 @@
611 _result = new TestResult(false, throwable);
612 }
613 forceStop();
614- final long elapsed = (System.nanoTime() - _startTime) / AbstractSuite.NS_PER_S;
615+ final long elapsed = (System.nanoTime() - _startTime) / NS_PER_S;
616 System.err.printf("\n%s at %,d seconds: %s\n", this, elapsed, _result);
617 }
618
619
620=== modified file 'src/test/java/com/persistit/stress/AbstractSuite.java'
621--- src/test/java/com/persistit/stress/AbstractSuite.java 2012-08-24 14:00:17 +0000
622+++ src/test/java/com/persistit/stress/AbstractSuite.java 2012-10-08 18:01:33 +0000
623@@ -15,6 +15,10 @@
624
625 package com.persistit.stress;
626
627+import static com.persistit.util.Util.MS_PER_S;
628+import static com.persistit.util.Util.NS_PER_MS;
629+import static com.persistit.util.Util.NS_PER_S;
630+
631 import java.io.File;
632 import java.io.FileWriter;
633 import java.io.IOException;
634@@ -42,10 +46,6 @@
635
636 protected final static long PROGRESS_LOG_INTERVAL = 600000;
637
638- protected final static long NS_PER_MS = 1000000;
639- protected final static long MS_PER_S = 1000;
640- protected final static long NS_PER_S = NS_PER_MS * MS_PER_S;
641-
642 private long _nextReport;
643 private long _accumulatedWork;
644
645
646=== modified file 'src/test/java/com/persistit/stress/StartStop.java'
647--- src/test/java/com/persistit/stress/StartStop.java 2012-08-24 13:57:19 +0000
648+++ src/test/java/com/persistit/stress/StartStop.java 2012-10-08 18:01:33 +0000
649@@ -15,6 +15,8 @@
650
651 package com.persistit.stress;
652
653+import static com.persistit.util.Util.NS_PER_S;
654+
655 import com.persistit.IntegrityCheck;
656 import com.persistit.Persistit;
657 import com.persistit.Transaction.CommitPolicy;
658
659=== modified file 'src/test/java/com/persistit/unit/ExchangeTest.java'
660--- src/test/java/com/persistit/unit/ExchangeTest.java 2012-08-24 13:57:19 +0000
661+++ src/test/java/com/persistit/unit/ExchangeTest.java 2012-10-08 18:01:33 +0000
662@@ -368,4 +368,120 @@
663 assertTrue("Not enough methods were tested: " + tested, tested > 10);
664 }
665
666+ /**
667+ * Test for https://bugs.launchpad.net/akiban-persistit/+bug/1023549:
668+ *
669+ * traverse(EQ, false, 0) returns incorrect result
670+ *
671+ * This method returns true even when the tree is empty. traverse(EQ, true,
672+ * 0) returns the correct value.
673+ *
674+ * @throws Exception
675+ */
676+ @Test
677+ public void traverse_EQ_false_0__IsCorrect() throws Exception {
678+ traverseCases(false);
679+ traverseCases(true);
680+ }
681+
682+ /**
683+ * Test for https://bugs.launchpad.net/akiban-persistit/+bug/1023549:
684+ *
685+ * traverse(EQ, false, 0) returns incorrect result
686+ *
687+ * This method returns true even when the tree is empty. traverse(EQ, true,
688+ * 0) returns the correct value.
689+ *
690+ * @throws Exception
691+ */
692+ @Test
693+ public void traverse_EQ_false_0__IsCorrect_Txn() throws Exception {
694+ final Transaction txn = _persistit.getTransaction();
695+ txn.begin();
696+ traverseCases(false);
697+ txn.commit();
698+ txn.end();
699+
700+ txn.begin();
701+ traverseCases(true);
702+ txn.commit();
703+ txn.end();
704+ }
705+
706+ private void traverseCases(final boolean deep) throws Exception {
707+ final Exchange ex = _persistit.getExchange("persistit", "gogo", true);
708+
709+ ex.removeAll();
710+ ex.clear();
711+ assertEquals("Should be false", false, ex.traverse(Key.EQ, deep, -1));
712+ ex.clear();
713+ assertEquals("Should be false", false, ex.traverse(Key.GTEQ, deep, -1));
714+ ex.clear();
715+ assertEquals("Should be false", false, ex.traverse(Key.GT, deep, -1));
716+ ex.clear();
717+ assertEquals("Should be false", false, ex.traverse(Key.LTEQ, deep, -1));
718+ ex.clear();
719+ assertEquals("Should be false", false, ex.traverse(Key.LT, deep, -1));
720+ ex.clear();
721+
722+ ex.append(1).append(2).store();
723+ ex.clear().append(Key.BEFORE);
724+ assertEquals("Should be false", false, ex.traverse(Key.EQ, deep, -1));
725+ assertEquals("Should be true", true, ex.traverse(Key.GTEQ, deep, -1));
726+ assertEquals("Should be true", true, ex.traverse(Key.GTEQ, deep, -1));
727+ ex.clear().append(1);
728+ assertEquals("Should be " + !deep, !deep, ex.traverse(Key.EQ, deep, -1));
729+
730+ ex.clear().append(Key.AFTER);
731+ assertEquals("Should be false", false, ex.traverse(Key.EQ, deep, -1));
732+ ex.clear().append(Key.AFTER);
733+ assertEquals("Should be true", true, ex.traverse(Key.LTEQ, deep, -1));
734+ assertEquals("Should be true", true, ex.traverse(Key.LTEQ, deep, -1));
735+
736+ ex.removeAll();
737+ ex.clear();
738+ assertEquals("Should be false", false, ex.traverse(Key.EQ, deep, 0));
739+ keyCheck(ex, "{{before}}");
740+ ex.clear();
741+ assertEquals("Should be false", false, ex.traverse(Key.GTEQ, deep, 0));
742+ keyCheck(ex, "{{before}}");
743+ ex.clear();
744+ assertEquals("Should be false", false, ex.traverse(Key.GT, deep, 0));
745+ keyCheck(ex, "{{before}}");
746+ ex.clear();
747+ assertEquals("Should be false", false, ex.traverse(Key.LTEQ, deep, 0));
748+ keyCheck(ex, "{{after}}");
749+ ex.clear();
750+ assertEquals("Should be false", false, ex.traverse(Key.LT, deep, 0));
751+ keyCheck(ex, "{{after}}");
752+ ex.clear();
753+
754+ ex.append(1).append(2).store();
755+ ex.clear().append(Key.BEFORE);
756+ assertEquals("Should be false", false, ex.traverse(Key.EQ, deep, 0));
757+ keyCheck(ex, "{{before}}");
758+ assertEquals("Should be true", true, ex.traverse(Key.GTEQ, deep, 0));
759+ keyCheck(ex, deep ? "{1,2}" : "{1}");
760+ assertEquals("Should be true", true, ex.traverse(Key.GTEQ, deep, 0));
761+ keyCheck(ex, deep ? "{1,2}" : "{1}");
762+ assertEquals("Should be true", true, ex.traverse(Key.EQ, deep, 0));
763+ keyCheck(ex, deep ? "{1,2}" : "{1}");
764+
765+ ex.clear().append(Key.AFTER);
766+ assertEquals("Should be false", false, ex.traverse(Key.EQ, deep, 0));
767+ keyCheck(ex, "{{before}}");
768+ ex.clear().append(Key.AFTER);
769+ assertEquals("Should be true", true, ex.traverse(Key.LTEQ, deep, 0));
770+ keyCheck(ex, deep ? "{1,2}" : "{1}");
771+ assertEquals("Should be true", true, ex.traverse(Key.LTEQ, deep, 0));
772+ keyCheck(ex, deep ? "{1,2}" : "{1}");
773+ assertEquals("Should be true", true, ex.traverse(Key.EQ, deep, 0));
774+ keyCheck(ex, deep ? "{1,2}" : "{1}");
775+
776+ }
777+
778+ private void keyCheck(final Exchange ex, final String expected) {
779+ assertEquals("Key should be " + expected, expected, ex.getKey().toString());
780+ }
781+
782 }

Subscribers

People subscribed via source and target branches