Merge lp:~pbeaman/akiban-persistit/fix-several-small-bugs into lp:akiban-persistit
- fix-several-small-bugs
- Merge into trunk
Status: | Merged | ||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|
Approved by: | Peter Beaman | ||||||||||||
Approved revision: | 387 | ||||||||||||
Merged at revision: | 377 | ||||||||||||
Proposed branch: | lp:~pbeaman/akiban-persistit/fix-several-small-bugs | ||||||||||||
Merge into: | lp:akiban-persistit | ||||||||||||
Diff against target: |
770 lines (+287/-80) 18 files modified
src/main/java/com/persistit/BufferPool.java (+6/-5) src/main/java/com/persistit/CheckpointManager.java (+2/-2) src/main/java/com/persistit/JournalManager.java (+3/-20) src/main/java/com/persistit/JournalManagerBench.java (+2/-1) src/main/java/com/persistit/Persistit.java (+53/-27) src/main/java/com/persistit/SessionId.java (+10/-0) src/main/java/com/persistit/Transaction.java (+0/-1) src/main/java/com/persistit/TransactionIndexBucket.java (+5/-9) src/main/java/com/persistit/TransactionStatus.java (+20/-0) src/main/java/com/persistit/VolumeHeader.java (+9/-1) src/main/java/com/persistit/logging/LogBase.java (+3/-0) src/main/java/com/persistit/util/Util.java (+4/-0) src/test/java/com/persistit/IOFailureTest.java (+2/-0) src/test/java/com/persistit/TransactionTest2.java (+43/-9) src/test/java/com/persistit/stress/AbstractStressTest.java (+3/-1) src/test/java/com/persistit/stress/AbstractSuite.java (+4/-4) src/test/java/com/persistit/stress/StartStop.java (+2/-0) src/test/java/com/persistit/unit/ExchangeTest.java (+116/-0) |
||||||||||||
To merge this branch: | bzr merge lp:~pbeaman/akiban-persistit/fix-several-small-bugs | ||||||||||||
Related bugs: |
|
Reviewer | Review Type | Date Requested | Status |
---|---|---|---|
Nathan Williams | Pending | ||
Review via email: mp+128568@code.launchpad.net |
This proposal supersedes a proposal from 2012-10-05.
Commit message
Description of the change
Fixes several small unrelated bugs:
https:/
https:/
https:/
Bug #1062315: Assertion failure in TransactionInde
Nathan Williams (nwilliams) wrote : Posted in a previous version of this proposal | # |
Peter Beaman (pbeaman) wrote : Posted in a previous version of this proposal | # |
VolumeStorageV2
ExchangeTest#
Persistit#close() no longer calls stopCopier - yes, there is a change. The JOURNAL_COPIER thread used to be responsible for various cleanup activities and therefore had to be stopped early. That is no longer the case, so JournalManager#
Nathan Williams (nwilliams) wrote : Posted in a previous version of this proposal | # |
Thanks for the tweaks and clarifications.
Peter Beaman (pbeaman) wrote : Posted in a previous version of this proposal | # |
Has a conflict. Marked Rejected. Will fix and re-approve.
Peter Beaman (pbeaman) wrote : | # |
Merge from trunk (now with branch fix-dynamic-
Peter Beaman (pbeaman) wrote : | # |
Approving this since the only changes are mechanical.
Preview Diff
1 | === modified file 'src/main/java/com/persistit/BufferPool.java' |
2 | --- src/main/java/com/persistit/BufferPool.java 2012-09-27 18:22:25 +0000 |
3 | +++ src/main/java/com/persistit/BufferPool.java 2012-10-08 19:12:23 +0000 |
4 | @@ -15,6 +15,8 @@ |
5 | |
6 | package com.persistit; |
7 | |
8 | +import static com.persistit.util.Util.NS_PER_S; |
9 | + |
10 | import java.io.DataOutputStream; |
11 | import java.io.IOException; |
12 | import java.nio.ByteBuffer; |
13 | @@ -97,11 +99,10 @@ |
14 | */ |
15 | private final static int INVENTORY_VERSIONS = 3; |
16 | |
17 | - private final static long NS_PER_SEC = 1000000000; |
18 | /** |
19 | * Preload log message interval, in seconds |
20 | */ |
21 | - private final static long INVENTORY_PRELOAD_LOG_MESSAGE_NS = 60L * NS_PER_SEC; |
22 | + private final static long INVENTORY_PRELOAD_LOG_MESSAGE_NS = 60L * NS_PER_S; |
23 | |
24 | /** |
25 | * The Persistit instance that references this BufferPool. |
26 | @@ -1514,8 +1515,8 @@ |
27 | count++; |
28 | final long now = System.nanoTime(); |
29 | if (now - reportTime >= INVENTORY_PRELOAD_LOG_MESSAGE_NS) { |
30 | - _persistit.getLogBase().bufferInventoryProgress.log(count, total, (now - reportTime) |
31 | - / NS_PER_SEC); |
32 | + _persistit.getLogBase().bufferInventoryProgress |
33 | + .log(count, total, (now - reportTime) / NS_PER_S); |
34 | reportTime = now; |
35 | } |
36 | if (count >= _bufferCount) { |
37 | @@ -1533,7 +1534,7 @@ |
38 | _persistit.getLogBase().bufferInventoryException.log(e); |
39 | } finally { |
40 | final long now = System.nanoTime(); |
41 | - _persistit.getLogBase().bufferInventoryProgress.log(count, total, (now - reportTime) / NS_PER_SEC); |
42 | + _persistit.getLogBase().bufferInventoryProgress.log(count, total, (now - reportTime) / NS_PER_S); |
43 | } |
44 | } |
45 | |
46 | |
47 | === modified file 'src/main/java/com/persistit/CheckpointManager.java' |
48 | --- src/main/java/com/persistit/CheckpointManager.java 2012-08-24 14:00:17 +0000 |
49 | +++ src/main/java/com/persistit/CheckpointManager.java 2012-10-08 19:12:23 +0000 |
50 | @@ -15,6 +15,8 @@ |
51 | |
52 | package com.persistit; |
53 | |
54 | +import static com.persistit.util.Util.NS_PER_S; |
55 | + |
56 | import java.text.SimpleDateFormat; |
57 | import java.util.ArrayList; |
58 | import java.util.Date; |
59 | @@ -86,8 +88,6 @@ |
60 | } |
61 | } |
62 | |
63 | - private final static long NS_PER_S = 1000000000L; |
64 | - |
65 | /** |
66 | * Default interval in nanoseconds between checkpoints - two minutes. |
67 | */ |
68 | |
69 | === modified file 'src/main/java/com/persistit/JournalManager.java' |
70 | --- src/main/java/com/persistit/JournalManager.java 2012-10-03 18:19:49 +0000 |
71 | +++ src/main/java/com/persistit/JournalManager.java 2012-10-08 19:12:23 +0000 |
72 | @@ -19,6 +19,7 @@ |
73 | import static com.persistit.util.SequencerConstants.PAGE_MAP_READ_INVALIDATE_A; |
74 | import static com.persistit.util.SequencerConstants.RECOVERY_PRUNING_B; |
75 | import static com.persistit.util.ThreadSequencer.sequence; |
76 | +import static com.persistit.util.Util.NS_PER_MS; |
77 | |
78 | import java.io.File; |
79 | import java.io.IOException; |
80 | @@ -79,7 +80,6 @@ |
81 | final static int HALF_URGENT = 5; |
82 | final static int URGENT_COMMIT_DELAY_MILLIS = 50; |
83 | final static int GENTLE_COMMIT_DELAY_MILLIS = 12; |
84 | - private final static long NS_PER_MS = 1000000L; |
85 | private final static int IO_MEASUREMENT_CYCLES = 8; |
86 | private final static int TOO_MANY_WARN_THRESHOLD = 5; |
87 | private final static int TOO_MANY_ERROR_THRESHOLD = 10; |
88 | @@ -1271,11 +1271,6 @@ |
89 | return address % _blockSize; |
90 | } |
91 | |
92 | - void stopCopier() { |
93 | - _copier.setShouldStop(true); |
94 | - _persistit.waitForIOTaskStop(_copier); |
95 | - } |
96 | - |
97 | void setWriteBufferSize(final int size) { |
98 | if (size < MINIMUM_BUFFER_SIZE || size > MAXIMUM_BUFFER_SIZE) { |
99 | throw new IllegalArgumentException("Invalid write buffer size: " + size); |
100 | @@ -1284,11 +1279,7 @@ |
101 | } |
102 | |
103 | public void close() throws PersistitException { |
104 | - |
105 | - synchronized (this) { |
106 | - _closed.set(true); |
107 | - } |
108 | - |
109 | + _closed.set(true); |
110 | rollover(); |
111 | |
112 | final JournalCopier copier = _copier; |
113 | @@ -2456,10 +2447,7 @@ |
114 | int handle = -1; |
115 | |
116 | for (final Iterator<PageNode> iterator = list.iterator(); iterator.hasNext();) { |
117 | - if (_closed.get() && !_copyFast.get() || _appendOnly.get()) { |
118 | - list.clear(); |
119 | - break; |
120 | - } |
121 | + |
122 | final PageNode pageNode = iterator.next(); |
123 | if (pageNode.isInvalid()) { |
124 | iterator.remove(); |
125 | @@ -2520,11 +2508,6 @@ |
126 | final Set<Volume> volumes = new HashSet<Volume>(); |
127 | |
128 | for (final Iterator<PageNode> iterator = list.iterator(); iterator.hasNext();) { |
129 | - if (_closed.get() && !_copyFast.get() || _appendOnly.get()) { |
130 | - list.clear(); |
131 | - break; |
132 | - } |
133 | - |
134 | final PageNode pageNode = iterator.next(); |
135 | |
136 | if (pageNode.getVolumeHandle() != handle) { |
137 | |
138 | === modified file 'src/main/java/com/persistit/JournalManagerBench.java' |
139 | --- src/main/java/com/persistit/JournalManagerBench.java 2012-09-12 20:36:27 +0000 |
140 | +++ src/main/java/com/persistit/JournalManagerBench.java 2012-10-08 19:12:23 +0000 |
141 | @@ -15,6 +15,8 @@ |
142 | |
143 | package com.persistit; |
144 | |
145 | +import static com.persistit.util.Util.NS_PER_S; |
146 | + |
147 | import java.io.File; |
148 | import java.io.RandomAccessFile; |
149 | import java.nio.ByteBuffer; |
150 | @@ -44,7 +46,6 @@ |
151 | */ |
152 | public class JournalManagerBench { |
153 | |
154 | - private final long NS_PER_S = 1000000000L; |
155 | private final byte[] NULLS = new byte[65536]; |
156 | |
157 | private final String[] ARG_TEMPLATE = new String[] { "duration|int:10:10:86400|Duration of test in seconds", |
158 | |
159 | === modified file 'src/main/java/com/persistit/Persistit.java' |
160 | --- src/main/java/com/persistit/Persistit.java 2012-10-03 16:04:16 +0000 |
161 | +++ src/main/java/com/persistit/Persistit.java 2012-10-08 19:12:23 +0000 |
162 | @@ -15,6 +15,8 @@ |
163 | |
164 | package com.persistit; |
165 | |
166 | +import static com.persistit.util.Util.NS_PER_S; |
167 | + |
168 | import java.io.BufferedReader; |
169 | import java.io.File; |
170 | import java.io.FileReader; |
171 | @@ -1514,7 +1516,7 @@ |
172 | return _bufferPoolTable; |
173 | } |
174 | |
175 | - public void cleanup() { |
176 | + void cleanup() { |
177 | final Set<SessionId> sessionIds; |
178 | synchronized (_transactionSessionMap) { |
179 | sessionIds = new HashSet<SessionId>(_transactionSessionMap.keySet()); |
180 | @@ -1659,12 +1661,8 @@ |
181 | } |
182 | recordBufferPoolInventory(); |
183 | |
184 | - /* |
185 | - * The copier is responsible for background pruning of aborted |
186 | - * transactions. Halt it so Transaction#close() can be called |
187 | - * without being concerned about its state changing. |
188 | - */ |
189 | - _journalManager.stopCopier(); |
190 | + _cleanupManager.close(flush); |
191 | + waitForIOTaskStop(_cleanupManager); |
192 | |
193 | getTransaction().close(); |
194 | cleanup(); |
195 | @@ -1681,9 +1679,6 @@ |
196 | } |
197 | } |
198 | |
199 | - _cleanupManager.close(flush); |
200 | - waitForIOTaskStop(_cleanupManager); |
201 | - |
202 | _checkpointManager.close(flush); |
203 | waitForIOTaskStop(_checkpointManager); |
204 | |
205 | @@ -1693,26 +1688,13 @@ |
206 | pool.close(); |
207 | } |
208 | |
209 | - /* |
210 | - * Close (and abort) all remaining transactions. |
211 | - */ |
212 | - Set<Transaction> transactions; |
213 | - synchronized (_transactionSessionMap) { |
214 | - transactions = new HashSet<Transaction>(_transactionSessionMap.values()); |
215 | - _transactionSessionMap.clear(); |
216 | - } |
217 | - for (final Transaction txn : transactions) { |
218 | - try { |
219 | - txn.close(); |
220 | - } catch (final PersistitException e) { |
221 | - _logBase.exception.log(e); |
222 | - } |
223 | - } |
224 | - |
225 | _journalManager.close(); |
226 | final IOTaskRunnable task = _transactionIndex.close(); |
227 | waitForIOTaskStop(task); |
228 | |
229 | + interruptActiveThreads(SHORT_DELAY); |
230 | + closeZombieTransactions(); |
231 | + |
232 | for (final Volume volume : volumes) { |
233 | volume.close(); |
234 | } |
235 | @@ -1730,6 +1712,50 @@ |
236 | releaseAllResources(); |
237 | } |
238 | |
239 | + private void closeZombieTransactions() { |
240 | + final Set<SessionId> sessionIds; |
241 | + synchronized (_transactionSessionMap) { |
242 | + sessionIds = new HashSet<SessionId>(_transactionSessionMap.keySet()); |
243 | + } |
244 | + for (final SessionId sessionId : sessionIds) { |
245 | + Transaction transaction = null; |
246 | + synchronized (_transactionSessionMap) { |
247 | + transaction = _transactionSessionMap.remove(sessionId); |
248 | + } |
249 | + if (!sessionId.isAlive()) { |
250 | + if (transaction != null) { |
251 | + try { |
252 | + transaction.close(); |
253 | + } catch (final Exception e) { |
254 | + _logBase.exception.log(e); |
255 | + } |
256 | + } |
257 | + } |
258 | + } |
259 | + } |
260 | + |
261 | + private void interruptActiveThreads(final long timeout) throws PersistitInterruptedException { |
262 | + final long expires = System.currentTimeMillis() + timeout; |
263 | + boolean remaining = false; |
264 | + do { |
265 | + final Set<SessionId> sessionIds; |
266 | + synchronized (_transactionSessionMap) { |
267 | + sessionIds = new HashSet<SessionId>(_transactionSessionMap.keySet()); |
268 | + } |
269 | + for (final SessionId sessionId : sessionIds) { |
270 | + if (sessionId.isAlive()) { |
271 | + if (sessionId.interrupt()) { |
272 | + _logBase.interruptedAtClose.log(sessionId.ownerName()); |
273 | + } |
274 | + remaining = true; |
275 | + } |
276 | + } |
277 | + if (remaining) { |
278 | + Util.spinSleep(); |
279 | + } |
280 | + } while (remaining && System.currentTimeMillis() < expires); |
281 | + } |
282 | + |
283 | /** |
284 | * Abruptly stop (using {@link Thread#stop()}) the writer and collector |
285 | * processes. This method should be used only by tests. |
286 | @@ -1885,7 +1911,7 @@ |
287 | } |
288 | final long now = System.currentTimeMillis(); |
289 | if (now > _nextCloseTime) { |
290 | - _logBase.waitForClose.log((_nextCloseTime - _beginCloseTime) / 1000); |
291 | + _logBase.waitForClose.log((_nextCloseTime - _beginCloseTime) / NS_PER_S); |
292 | _nextCloseTime += CLOSE_LOG_INTERVAL; |
293 | } |
294 | } |
295 | |
296 | === modified file 'src/main/java/com/persistit/SessionId.java' |
297 | --- src/main/java/com/persistit/SessionId.java 2012-09-26 13:31:13 +0000 |
298 | +++ src/main/java/com/persistit/SessionId.java 2012-10-08 19:12:23 +0000 |
299 | @@ -77,6 +77,16 @@ |
300 | _owner.set(Thread.currentThread()); |
301 | } |
302 | |
303 | + boolean interrupt() { |
304 | + final Thread t = _owner.get(); |
305 | + if (t != null && t != Thread.currentThread()) { |
306 | + t.interrupt(); |
307 | + return true; |
308 | + } else { |
309 | + return false; |
310 | + } |
311 | + } |
312 | + |
313 | public String ownerName() { |
314 | final Thread t = _owner.get(); |
315 | if (t == null) { |
316 | |
317 | === modified file 'src/main/java/com/persistit/Transaction.java' |
318 | --- src/main/java/com/persistit/Transaction.java 2012-08-24 13:57:19 +0000 |
319 | +++ src/main/java/com/persistit/Transaction.java 2012-10-08 19:12:23 +0000 |
320 | @@ -725,7 +725,6 @@ |
321 | _persistit.getTransactionIndex().notifyCompleted(_transactionStatus, |
322 | _persistit.getTimestampAllocator().getCurrentTimestamp()); |
323 | _rollbackCompleted = true; |
324 | - |
325 | } |
326 | } |
327 | } |
328 | |
329 | === modified file 'src/main/java/com/persistit/TransactionIndexBucket.java' |
330 | --- src/main/java/com/persistit/TransactionIndexBucket.java 2012-08-24 13:57:19 +0000 |
331 | +++ src/main/java/com/persistit/TransactionIndexBucket.java 2012-10-08 19:12:23 +0000 |
332 | @@ -170,10 +170,13 @@ |
333 | _lock.unlock(); |
334 | } |
335 | |
336 | - TransactionStatus allocateTransactionStatus() { |
337 | + TransactionStatus allocateTransactionStatus() throws InterruptedException { |
338 | assert _lock.isHeldByCurrentThread(); |
339 | final TransactionStatus status = _free; |
340 | if (status != null) { |
341 | + if (status.isLocked()) { |
342 | + status.briefLock(); |
343 | + } |
344 | assert !status.isLocked(); |
345 | _free = status.getNext(); |
346 | _freeCount--; |
347 | @@ -569,14 +572,7 @@ |
348 | } |
349 | } |
350 | } else if (tc < 0 && tc != ABORTED && -tc < timestamp) { |
351 | - boolean locked = false; |
352 | - try { |
353 | - locked = status.wwLock(TransactionIndex.SHORT_TIMEOUT); |
354 | - } finally { |
355 | - if (locked) { |
356 | - status.wwUnlock(); |
357 | - } |
358 | - } |
359 | + status.briefLock(); |
360 | _transactionIndex.incrementAccumulatorSnapshotRetryCounter(); |
361 | throw RetryException.SINGLE; |
362 | } |
363 | |
364 | === modified file 'src/main/java/com/persistit/TransactionStatus.java' |
365 | --- src/main/java/com/persistit/TransactionStatus.java 2012-08-24 13:57:19 +0000 |
366 | +++ src/main/java/com/persistit/TransactionStatus.java 2012-10-08 19:12:23 +0000 |
367 | @@ -327,6 +327,26 @@ |
368 | } |
369 | |
370 | /** |
371 | + * Block briefly until another thread transiently holding the wwLock |
372 | + * vacates. Times out and returns after |
373 | + * {@value TransactionIndex#SHORT_TIMEOUT} milliseconds. |
374 | + * |
375 | + * @throws InterruptedException |
376 | + */ |
377 | + void briefLock() throws InterruptedException { |
378 | + boolean locked = false; |
379 | + try { |
380 | + locked = wwLock(TransactionIndex.SHORT_TIMEOUT); |
381 | + } catch (final InterruptedException ie) { |
382 | + Thread.currentThread().interrupt(); |
383 | + } finally { |
384 | + if (locked) { |
385 | + wwUnlock(); |
386 | + } |
387 | + } |
388 | + } |
389 | + |
390 | + /** |
391 | * <p> |
392 | * Acquire a lock on this TransactionStatus. This supports the |
393 | * {@link TransactionIndex#wwDependency(long, long, long)} method. While a |
394 | |
395 | === modified file 'src/main/java/com/persistit/VolumeHeader.java' |
396 | --- src/main/java/com/persistit/VolumeHeader.java 2012-08-24 13:57:19 +0000 |
397 | +++ src/main/java/com/persistit/VolumeHeader.java 2012-10-08 19:12:23 +0000 |
398 | @@ -287,11 +287,12 @@ |
399 | */ |
400 | public static boolean verifyVolumeHeader(final VolumeSpecification specification, final long systemTimestamp) |
401 | throws CorruptVolumeException, InvalidVolumeSpecificationException, PersistitIOException { |
402 | + FileInputStream stream = null; |
403 | try { |
404 | final File file = new File(specification.getPath()); |
405 | if (file.exists()) { |
406 | if (file.isFile()) { |
407 | - final FileInputStream stream = new FileInputStream(file); |
408 | + stream = new FileInputStream(file); |
409 | final byte[] bytes = new byte[SIZE]; |
410 | final int readSize = stream.read(bytes); |
411 | if (readSize < SIZE) { |
412 | @@ -336,6 +337,13 @@ |
413 | } |
414 | } catch (final IOException ioe) { |
415 | throw new PersistitIOException(ioe); |
416 | + } finally { |
417 | + if (stream != null) { |
418 | + try { |
419 | + stream.close(); |
420 | + } catch (final IOException e) { |
421 | + } |
422 | + } |
423 | } |
424 | } |
425 | |
426 | |
427 | === modified file 'src/main/java/com/persistit/logging/LogBase.java' |
428 | --- src/main/java/com/persistit/logging/LogBase.java 2012-09-06 20:48:31 +0000 |
429 | +++ src/main/java/com/persistit/logging/LogBase.java 2012-10-08 19:12:23 +0000 |
430 | @@ -253,6 +253,9 @@ |
431 | @Message("WARNING|Exception while writing buffer pool inventory %s") |
432 | public final LogItem bufferInventoryException = PersistitLogMessage.empty(); |
433 | |
434 | + @Message("WARNING|Thread %s interrupted due to shutdown") |
435 | + public final LogItem interruptedAtClose = PersistitLogMessage.empty(); |
436 | + |
437 | public static String recurring(final String message, final int count, final long duration) { |
438 | return String.format(RECURRING, message, count, duration); |
439 | } |
440 | |
441 | === modified file 'src/main/java/com/persistit/util/Util.java' |
442 | --- src/main/java/com/persistit/util/Util.java 2012-08-24 13:57:19 +0000 |
443 | +++ src/main/java/com/persistit/util/Util.java 2012-10-08 19:12:23 +0000 |
444 | @@ -38,6 +38,10 @@ |
445 | public final static String NEW_LINE = System.getProperty("line.separator"); |
446 | private final static String REGEX_QUOTE = "^$*+?()[]."; |
447 | |
448 | + public final static long NS_PER_S = 1000000000L; |
449 | + public final static long MS_PER_S = 1000L; |
450 | + public final static long NS_PER_MS = 1000000L; |
451 | + |
452 | public final static char[] HEX_DIGITS = { '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'A', 'B', 'C', 'D', |
453 | 'E', 'F' }; |
454 | |
455 | |
456 | === modified file 'src/test/java/com/persistit/IOFailureTest.java' |
457 | --- src/test/java/com/persistit/IOFailureTest.java 2012-08-24 14:00:17 +0000 |
458 | +++ src/test/java/com/persistit/IOFailureTest.java 2012-10-08 19:12:23 +0000 |
459 | @@ -308,6 +308,8 @@ |
460 | final long size0 = channel0.size(); |
461 | channel0.truncate(100); |
462 | |
463 | + channel0.close(); |
464 | + |
465 | final File file1 = jman.addressToFile(currentAddress); |
466 | final FileChannel channel1 = new RandomAccessFile(file1, "rw").getChannel(); |
467 | final long size1 = channel1.size(); |
468 | |
469 | === modified file 'src/test/java/com/persistit/TransactionTest2.java' |
470 | --- src/test/java/com/persistit/TransactionTest2.java 2012-08-24 13:57:19 +0000 |
471 | +++ src/test/java/com/persistit/TransactionTest2.java 2012-10-08 19:12:23 +0000 |
472 | @@ -29,6 +29,7 @@ |
473 | import org.junit.Test; |
474 | |
475 | import com.persistit.Transaction.CommitPolicy; |
476 | +import com.persistit.exception.PersistitClosedException; |
477 | import com.persistit.exception.PersistitException; |
478 | import com.persistit.exception.PersistitIOException; |
479 | import com.persistit.exception.PersistitInterruptedException; |
480 | @@ -47,15 +48,17 @@ |
481 | |
482 | final static CommitPolicy policy = CommitPolicy.SOFT; |
483 | |
484 | - final static long TIMEOUT = 20000; // 20 seconds |
485 | + final static long TIMEOUT = 10000; // 10 seconds |
486 | + final static int ITERATIONS_PER_THREAD = 25000; |
487 | |
488 | static int _threadCount = 8; |
489 | - static int _iterationsPerThread = 25000; |
490 | + static int _iterationsPerThread = ITERATIONS_PER_THREAD; |
491 | static int _accounts = 5000; |
492 | |
493 | static AtomicInteger _retriedTransactionCount = new AtomicInteger(); |
494 | static AtomicInteger _completedTransactionCount = new AtomicInteger(); |
495 | static AtomicInteger _failedTransactionCount = new AtomicInteger(); |
496 | + static AtomicInteger _strandedThreads = new AtomicInteger(); |
497 | |
498 | static int _threadCounter = 0; |
499 | |
500 | @@ -118,7 +121,7 @@ |
501 | threadArray[index] = new Thread(new Runnable() { |
502 | @Override |
503 | public void run() { |
504 | - runIt(); |
505 | + runIt(_iterationsPerThread); |
506 | } |
507 | }, "TransactionThread_" + index); |
508 | |
509 | @@ -182,7 +185,7 @@ |
510 | final Thread thread = new Thread(new Runnable() { |
511 | @Override |
512 | public void run() { |
513 | - runIt(); |
514 | + runIt(Integer.MAX_VALUE); |
515 | } |
516 | }, "TransactionThread_" + ++index); |
517 | threads.add(thread); |
518 | @@ -238,19 +241,44 @@ |
519 | txn.begin(); |
520 | } |
521 | |
522 | - public void runIt() { |
523 | + @Test |
524 | + public void transactionsConcurrentWithPersistitClose() throws Exception { |
525 | + new Thread(new Runnable() { |
526 | + @Override |
527 | + public void run() { |
528 | + final Thread[] threadArray = new Thread[_threadCount]; |
529 | + for (int index = 0; index < _threadCount; index++) { |
530 | + threadArray[index] = new Thread(new Runnable() { |
531 | + @Override |
532 | + public void run() { |
533 | + runIt(Integer.MAX_VALUE); |
534 | + } |
535 | + }, "TransactionThread_" + index); |
536 | + } |
537 | + for (int index = 0; index < _threadCount; index++) { |
538 | + threadArray[index].start(); |
539 | + } |
540 | + } |
541 | + }).start(); |
542 | + /* |
543 | + * Let the threads crank up |
544 | + */ |
545 | + Thread.sleep(1000); |
546 | + _persistit.close(); |
547 | + assertEquals("All threads should have exited correctly", 0, _strandedThreads.get()); |
548 | + } |
549 | + |
550 | + public void runIt(final int limit) { |
551 | + _strandedThreads.incrementAndGet(); |
552 | try { |
553 | final Exchange accountEx = _persistit.getExchange("persistit", "account", true); |
554 | // |
555 | final Random random = new Random(); |
556 | - for (int iterations = 1; iterations <= _iterationsPerThread; iterations++) { |
557 | + for (int iterations = 1; iterations <= limit; iterations++) { |
558 | final int accountNo1 = random.nextInt(_accounts); |
559 | - // int accountNo2 = random.nextInt(_accounts - 1); |
560 | - // if (accountNo2 == accountNo1) accountNo2++; |
561 | final int accountNo2 = random.nextInt(_accounts); |
562 | |
563 | final int delta = random.nextInt(10000); |
564 | - // final int delta = 1; |
565 | |
566 | transfer(accountEx, accountNo1, accountNo2, delta); |
567 | _completedTransactionCount.incrementAndGet(); |
568 | @@ -260,10 +288,16 @@ |
569 | System.out.flush(); |
570 | } |
571 | } |
572 | + _strandedThreads.decrementAndGet(); |
573 | } catch (final PersistitInterruptedException exception) { |
574 | + _strandedThreads.decrementAndGet(); |
575 | + // expected |
576 | + } catch (final PersistitClosedException exception) { |
577 | + _strandedThreads.decrementAndGet(); |
578 | // expected |
579 | } catch (final PersistitIOException exception) { |
580 | if (InterruptedIOException.class.equals(exception.getCause().getClass())) { |
581 | + _strandedThreads.decrementAndGet(); |
582 | // expected |
583 | } else { |
584 | exception.printStackTrace(); |
585 | |
586 | === modified file 'src/test/java/com/persistit/stress/AbstractStressTest.java' |
587 | --- src/test/java/com/persistit/stress/AbstractStressTest.java 2012-08-24 13:57:19 +0000 |
588 | +++ src/test/java/com/persistit/stress/AbstractStressTest.java 2012-10-08 19:12:23 +0000 |
589 | @@ -15,6 +15,8 @@ |
590 | |
591 | package com.persistit.stress; |
592 | |
593 | +import static com.persistit.util.Util.NS_PER_S; |
594 | + |
595 | import com.persistit.Persistit; |
596 | |
597 | /** |
598 | @@ -128,7 +130,7 @@ |
599 | _result = new TestResult(false, throwable); |
600 | } |
601 | forceStop(); |
602 | - final long elapsed = (System.nanoTime() - _startTime) / AbstractSuite.NS_PER_S; |
603 | + final long elapsed = (System.nanoTime() - _startTime) / NS_PER_S; |
604 | System.err.printf("\n%s at %,d seconds: %s\n", this, elapsed, _result); |
605 | } |
606 | |
607 | |
608 | === modified file 'src/test/java/com/persistit/stress/AbstractSuite.java' |
609 | --- src/test/java/com/persistit/stress/AbstractSuite.java 2012-08-24 14:00:17 +0000 |
610 | +++ src/test/java/com/persistit/stress/AbstractSuite.java 2012-10-08 19:12:23 +0000 |
611 | @@ -15,6 +15,10 @@ |
612 | |
613 | package com.persistit.stress; |
614 | |
615 | +import static com.persistit.util.Util.MS_PER_S; |
616 | +import static com.persistit.util.Util.NS_PER_MS; |
617 | +import static com.persistit.util.Util.NS_PER_S; |
618 | + |
619 | import java.io.File; |
620 | import java.io.FileWriter; |
621 | import java.io.IOException; |
622 | @@ -42,10 +46,6 @@ |
623 | |
624 | protected final static long PROGRESS_LOG_INTERVAL = 600000; |
625 | |
626 | - protected final static long NS_PER_MS = 1000000; |
627 | - protected final static long MS_PER_S = 1000; |
628 | - protected final static long NS_PER_S = NS_PER_MS * MS_PER_S; |
629 | - |
630 | private long _nextReport; |
631 | private long _accumulatedWork; |
632 | |
633 | |
634 | === modified file 'src/test/java/com/persistit/stress/StartStop.java' |
635 | --- src/test/java/com/persistit/stress/StartStop.java 2012-08-24 13:57:19 +0000 |
636 | +++ src/test/java/com/persistit/stress/StartStop.java 2012-10-08 19:12:23 +0000 |
637 | @@ -15,6 +15,8 @@ |
638 | |
639 | package com.persistit.stress; |
640 | |
641 | +import static com.persistit.util.Util.NS_PER_S; |
642 | + |
643 | import com.persistit.IntegrityCheck; |
644 | import com.persistit.Persistit; |
645 | import com.persistit.Transaction.CommitPolicy; |
646 | |
647 | === modified file 'src/test/java/com/persistit/unit/ExchangeTest.java' |
648 | --- src/test/java/com/persistit/unit/ExchangeTest.java 2012-08-24 13:57:19 +0000 |
649 | +++ src/test/java/com/persistit/unit/ExchangeTest.java 2012-10-08 19:12:23 +0000 |
650 | @@ -368,4 +368,120 @@ |
651 | assertTrue("Not enough methods were tested: " + tested, tested > 10); |
652 | } |
653 | |
654 | + /** |
655 | + * Test for https://bugs.launchpad.net/akiban-persistit/+bug/1023549: |
656 | + * |
657 | + * traverse(EQ, false, 0) returns incorrect result |
658 | + * |
659 | + * This method returns true even when the tree is empty. traverse(EQ, true, |
660 | + * 0) returns the correct value. |
661 | + * |
662 | + * @throws Exception |
663 | + */ |
664 | + @Test |
665 | + public void traverse_EQ_false_0__IsCorrect() throws Exception { |
666 | + traverseCases(false); |
667 | + traverseCases(true); |
668 | + } |
669 | + |
670 | + /** |
671 | + * Test for https://bugs.launchpad.net/akiban-persistit/+bug/1023549: |
672 | + * |
673 | + * traverse(EQ, false, 0) returns incorrect result |
674 | + * |
675 | + * This method returns true even when the tree is empty. traverse(EQ, true, |
676 | + * 0) returns the correct value. |
677 | + * |
678 | + * @throws Exception |
679 | + */ |
680 | + @Test |
681 | + public void traverse_EQ_false_0__IsCorrect_Txn() throws Exception { |
682 | + final Transaction txn = _persistit.getTransaction(); |
683 | + txn.begin(); |
684 | + traverseCases(false); |
685 | + txn.commit(); |
686 | + txn.end(); |
687 | + |
688 | + txn.begin(); |
689 | + traverseCases(true); |
690 | + txn.commit(); |
691 | + txn.end(); |
692 | + } |
693 | + |
694 | + private void traverseCases(final boolean deep) throws Exception { |
695 | + final Exchange ex = _persistit.getExchange("persistit", "gogo", true); |
696 | + |
697 | + ex.removeAll(); |
698 | + ex.clear(); |
699 | + assertEquals("Should be false", false, ex.traverse(Key.EQ, deep, -1)); |
700 | + ex.clear(); |
701 | + assertEquals("Should be false", false, ex.traverse(Key.GTEQ, deep, -1)); |
702 | + ex.clear(); |
703 | + assertEquals("Should be false", false, ex.traverse(Key.GT, deep, -1)); |
704 | + ex.clear(); |
705 | + assertEquals("Should be false", false, ex.traverse(Key.LTEQ, deep, -1)); |
706 | + ex.clear(); |
707 | + assertEquals("Should be false", false, ex.traverse(Key.LT, deep, -1)); |
708 | + ex.clear(); |
709 | + |
710 | + ex.append(1).append(2).store(); |
711 | + ex.clear().append(Key.BEFORE); |
712 | + assertEquals("Should be false", false, ex.traverse(Key.EQ, deep, -1)); |
713 | + assertEquals("Should be true", true, ex.traverse(Key.GTEQ, deep, -1)); |
714 | + assertEquals("Should be true", true, ex.traverse(Key.GTEQ, deep, -1)); |
715 | + ex.clear().append(1); |
716 | + assertEquals("Should be " + !deep, !deep, ex.traverse(Key.EQ, deep, -1)); |
717 | + |
718 | + ex.clear().append(Key.AFTER); |
719 | + assertEquals("Should be false", false, ex.traverse(Key.EQ, deep, -1)); |
720 | + ex.clear().append(Key.AFTER); |
721 | + assertEquals("Should be true", true, ex.traverse(Key.LTEQ, deep, -1)); |
722 | + assertEquals("Should be true", true, ex.traverse(Key.LTEQ, deep, -1)); |
723 | + |
724 | + ex.removeAll(); |
725 | + ex.clear(); |
726 | + assertEquals("Should be false", false, ex.traverse(Key.EQ, deep, 0)); |
727 | + keyCheck(ex, "{{before}}"); |
728 | + ex.clear(); |
729 | + assertEquals("Should be false", false, ex.traverse(Key.GTEQ, deep, 0)); |
730 | + keyCheck(ex, "{{before}}"); |
731 | + ex.clear(); |
732 | + assertEquals("Should be false", false, ex.traverse(Key.GT, deep, 0)); |
733 | + keyCheck(ex, "{{before}}"); |
734 | + ex.clear(); |
735 | + assertEquals("Should be false", false, ex.traverse(Key.LTEQ, deep, 0)); |
736 | + keyCheck(ex, "{{after}}"); |
737 | + ex.clear(); |
738 | + assertEquals("Should be false", false, ex.traverse(Key.LT, deep, 0)); |
739 | + keyCheck(ex, "{{after}}"); |
740 | + ex.clear(); |
741 | + |
742 | + ex.append(1).append(2).store(); |
743 | + ex.clear().append(Key.BEFORE); |
744 | + assertEquals("Should be false", false, ex.traverse(Key.EQ, deep, 0)); |
745 | + keyCheck(ex, "{{before}}"); |
746 | + assertEquals("Should be true", true, ex.traverse(Key.GTEQ, deep, 0)); |
747 | + keyCheck(ex, deep ? "{1,2}" : "{1}"); |
748 | + assertEquals("Should be true", true, ex.traverse(Key.GTEQ, deep, 0)); |
749 | + keyCheck(ex, deep ? "{1,2}" : "{1}"); |
750 | + assertEquals("Should be true", true, ex.traverse(Key.EQ, deep, 0)); |
751 | + keyCheck(ex, deep ? "{1,2}" : "{1}"); |
752 | + |
753 | + ex.clear().append(Key.AFTER); |
754 | + assertEquals("Should be false", false, ex.traverse(Key.EQ, deep, 0)); |
755 | + keyCheck(ex, "{{before}}"); |
756 | + ex.clear().append(Key.AFTER); |
757 | + assertEquals("Should be true", true, ex.traverse(Key.LTEQ, deep, 0)); |
758 | + keyCheck(ex, deep ? "{1,2}" : "{1}"); |
759 | + assertEquals("Should be true", true, ex.traverse(Key.LTEQ, deep, 0)); |
760 | + keyCheck(ex, deep ? "{1,2}" : "{1}"); |
761 | + assertEquals("Should be true", true, ex.traverse(Key.EQ, deep, 0)); |
762 | + keyCheck(ex, deep ? "{1,2}" : "{1}"); |
763 | + |
764 | + } |
765 | + |
766 | + private void keyCheck(final Exchange ex, final String expected) { |
767 | + assertEquals("Key should be " + expected, expected, ex.getKey().toString()); |
768 | + } |
769 | + |
770 | } |
VolumeStorageV2 #updateMetaData () - including timestamp changed in result will make meta data always "dirty" and then written. Is that what we want?
ExchangeTest# traverseEQfalse 0 - The bug has has 'false' in the title and 'true' in the description. Could we add cases for 'true' as well? Additionally, the server use case probably did it inside of a transaction. It should work as is if another test method just calls the new one, but inside of a transaction block. I have no reason to suspect a failure, but simple to add.
Persistit#close() - _journalManager .stopCopier( ) is no longer called. Should it be? The rest of the related changes look plausible and the new test is good.