Merge lp:~pbeaman/akiban-persistit/add-lock into lp:akiban-persistit

Proposed by Peter Beaman
Status: Superseded
Proposed branch: lp:~pbeaman/akiban-persistit/add-lock
Merge into: lp:akiban-persistit
Diff against target: 1031 lines (+557/-42)
20 files modified
doc/Transactions.rst (+2/-0)
src/main/java/com/persistit/Accumulator.java (+2/-2)
src/main/java/com/persistit/AlertMonitor.java (+1/-1)
src/main/java/com/persistit/Buffer.java (+8/-7)
src/main/java/com/persistit/BufferPool.java (+31/-2)
src/main/java/com/persistit/CLI.java (+1/-1)
src/main/java/com/persistit/Configuration.java (+1/-1)
src/main/java/com/persistit/Exchange.java (+153/-9)
src/main/java/com/persistit/JournalManager.java (+12/-3)
src/main/java/com/persistit/Key.java (+1/-1)
src/main/java/com/persistit/KeyFilter.java (+1/-1)
src/main/java/com/persistit/KeyParser.java (+1/-1)
src/main/java/com/persistit/LongRecordHelper.java (+3/-2)
src/main/java/com/persistit/Persistit.java (+14/-0)
src/main/java/com/persistit/Tree.java (+3/-3)
src/main/java/com/persistit/TreeBuilder.java (+2/-1)
src/main/java/com/persistit/Volume.java (+7/-1)
src/test/java/com/persistit/ExchangeLockTest.java (+313/-0)
src/test/java/com/persistit/MediatedFileChannelTest.java (+1/-1)
src/test/java/com/persistit/unit/UnitTestProperties.java (+0/-5)
To merge this branch: bzr merge lp:~pbeaman/akiban-persistit/add-lock
Reviewer Review Type Date Requested Status
Akiban Technologies Pending
Review via email: mp+143979@code.launchpad.net

This proposal has been superseded by a proposal from 2013-01-22.

Description of the change

Supports a new method Exchange#lock. This method causes a write-write conflict when two concurrent transactions attempt to lock the same key. This could be done by an application storing a value under the same key, but the lock method is more efficient because it uses a temporary tree that seldom causes disk I/O, and because the value written is an AntiValue which causes default pruning to remove the key as soon as it is no longer needed.

To support this we now have a designated temporary volume, the lock volume, maintained by Persistit. This is created lazily on the first call to Persistit#getLockVolume(). So that various operations including pruning work correctly, the lock volume is given a reserved volume handle (Integer.MAX_VALUE) and its trees are managed by the JournalManager's treeHandleMap even though no entires for its tries are written to the journal.

Also in this update are changes in documentation and a new set of tests.

To post a comment you must log in.
413. By Peter Beaman

Fix up javadoc, make Exchange#setTimoutMillis public, and add test

414. By Peter Beaman

Fix up javadoc, make Exchange#setTimoutMillis public, and add test

Revision history for this message
Peter Beaman (pbeaman) wrote :

Ran into some build issues with this in Java 7. Specifically, javadoc is a little fussier (needs @return tags to have text).

415. By Peter Beaman

Fix ExchangeTest unit test

416. By Peter Beaman

zero timeout means default

417. By Peter Beaman

Tweak a variable name

418. By Peter Beaman

Merge from trunk. Merge from java7-issues

419. By Peter Beaman

Rev version because server should depend on new lock API

420. By Peter Beaman

Merge from trunk and modify for review comments

421. By Peter Beaman

Clarify JavaDoc

422. By Peter Beaman

Source code cleanup

423. By Peter Beaman

Add extra prune cycle in ExchangeLockTest#lockTablePruning

424. By Peter Beaman

Explicit call to pruneObsoleteTransactions in ExchangeLockTest

425. By Peter Beaman

Merge 3.2.6-upgrade

426. By Peter Beaman

Source code cleanup

427. By Peter Beaman

Another attempt at fixing ExchangeLockTest

Unmerged revisions

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'doc/Transactions.rst'
2--- doc/Transactions.rst 2012-06-18 17:49:04 +0000
3+++ doc/Transactions.rst 2013-01-22 16:32:21 +0000
4@@ -116,6 +116,8 @@
5
6 Note that many common transaction patterns, including those defined by the TPC-C benchmark, do not experience write-skew and therefore *are* serializable under SI.
7
8+In the unusual case where transactions are susceptible to write skew, the Exchange#lock method offers a way for applications to create a write-write dependency explicitly. This mechanism provides an efficient mechanism for ensuring serializable behavior.
9+
10 Durability Options: ``CommitPolicy``
11 ------------------------------------
12
13
14=== modified file 'src/main/java/com/persistit/Accumulator.java'
15--- src/main/java/com/persistit/Accumulator.java 2012-10-11 20:37:43 +0000
16+++ src/main/java/com/persistit/Accumulator.java 2013-01-22 16:32:21 +0000
17@@ -426,7 +426,7 @@
18 *
19 * @param a
20 * @param b
21- * @return
22+ * @return the result of the commutative operation on a and b
23 */
24 abstract long applyValue(long a, long b);
25
26@@ -439,7 +439,7 @@
27 *
28 * @param a
29 * @param b
30- * @return
31+ * @return the result of the commutative operation on a and b
32 */
33 abstract long updateValue(long a, long b);
34
35
36=== modified file 'src/main/java/com/persistit/AlertMonitor.java'
37--- src/main/java/com/persistit/AlertMonitor.java 2012-08-24 13:57:19 +0000
38+++ src/main/java/com/persistit/AlertMonitor.java 2013-01-22 16:32:21 +0000
39@@ -794,7 +794,7 @@
40 * </p>
41 *
42 * @param event
43- * @return
44+ * @return formatted event
45 */
46 private String format(final Event event) {
47 return event == null ? "null" : event.toString();
48
49=== modified file 'src/main/java/com/persistit/Buffer.java'
50--- src/main/java/com/persistit/Buffer.java 2012-11-26 17:19:45 +0000
51+++ src/main/java/com/persistit/Buffer.java 2013-01-22 16:32:21 +0000
52@@ -1219,7 +1219,7 @@
53 * @param key
54 * @param mode
55 * @param foundAt
56- * @return
57+ * @return foundAt value
58 */
59 int traverse(final Key key, final Key.Direction mode, final int foundAt) {
60 final boolean exactMatch = (foundAt & EXACT_MASK) > 0;
61@@ -1239,7 +1239,7 @@
62 *
63 * @param key
64 * @param foundAt
65- * @return
66+ * @return foundAt value
67 */
68 int previousKey(final Key key, final int foundAt) {
69 int p = (foundAt & P_MASK) - KEYBLOCK_LENGTH;
70@@ -1312,7 +1312,7 @@
71 *
72 * @param key
73 * @param foundAt
74- * @return
75+ * @return foundAt value
76 */
77 int nextKey(final Key key, final int foundAt) {
78 int p = foundAt & P_MASK;
79@@ -1347,7 +1347,7 @@
80 *
81 * @param value
82 * @param foundAt
83- * @return
84+ * @return foundAt value
85 */
86 int nextLongRecord(final Value value, final int foundAt) {
87 Debug.$assert0.t(isDataPage());
88@@ -1568,7 +1568,7 @@
89 * been inserted or removed.
90 *
91 * @param p
92- * @return
93+ * @return when key is adjacent
94 */
95 private boolean adjacentKeyCheck(int p) {
96 p &= P_MASK;
97@@ -3595,7 +3595,8 @@
98 *
99 * @param tree
100 * @param spareKey
101- * @return
102+ * @return <code>true</code> if this method changed the contents of the
103+ * buffer
104 * @throws PersistitException
105 */
106 boolean pruneMvvValues(final Tree tree, final boolean pruneLongMVVs) throws PersistitException {
107@@ -3814,7 +3815,7 @@
108 System.arraycopy(bytes, offset, value.getEncodedBytes(), 0, oldSize);
109 value.setEncodedSize(oldSize);
110 final LongRecordHelper helper = new LongRecordHelper(_persistit, _vol);
111- helper.fetchLongRecord(value, Integer.MAX_VALUE);
112+ helper.fetchLongRecord(value, Integer.MAX_VALUE, SharedResource.DEFAULT_MAX_WAIT_TIME);
113 final byte[] rawBytes = value.getEncodedBytes();
114 final int oldLongSize = value.getEncodedSize();
115 // TODO - perhaps remove. Done as a precaution for now.
116
117=== modified file 'src/main/java/com/persistit/BufferPool.java'
118--- src/main/java/com/persistit/BufferPool.java 2013-01-02 22:41:20 +0000
119+++ src/main/java/com/persistit/BufferPool.java 2013-01-22 16:32:21 +0000
120@@ -697,6 +697,30 @@
121 /**
122 * Find or load a page given its Volume and address. The returned page has a
123 * reader or a writer lock, depending on whether the writer parameter is
124+ * true on entry. Waits up to {@value SharedResource#DEFAULT_MAX_WAIT_TIME}
125+ * milliseconds to acquire the desired lock on the page
126+ *
127+ * @param vol
128+ * The Volume
129+ * @param page
130+ * The address of the page
131+ * @param writer
132+ * <i>true</i> if a write lock is required.
133+ * @param wantRead
134+ * <i>true</i> if the caller wants the page read from disk.
135+ * <i>false</i> to allocate a new blank page.)
136+ * @return Buffer The Buffer describing the buffer containing the page.
137+ * @throws InUseException
138+ * if the specific lock could not be acquired
139+ */
140+ Buffer get(final Volume vol, final long page, final boolean writer, final boolean wantRead)
141+ throws PersistitException {
142+ return get(vol, page, writer, wantRead, SharedResource.DEFAULT_MAX_WAIT_TIME);
143+ }
144+
145+ /**
146+ * Find or load a page given its Volume and address. The returned page has a
147+ * reader or a writer lock, depending on whether the writer parameter is
148 * true on entry.
149 *
150 * @param vol
151@@ -708,10 +732,15 @@
152 * @param wantRead
153 * <i>true</i> if the caller wants the page read from disk.
154 * <i>false</i> to allocate a new blank page.)
155+ * @param timeout
156+ * maximum time to wait for the page to become available before
157+ * throwing an InUseException
158 * @return Buffer The Buffer describing the buffer containing the page.
159 * @throws InUseException
160+ * if the specific lock could not be acquired within the
161+ * specified timeout
162 */
163- Buffer get(final Volume vol, final long page, final boolean writer, final boolean wantRead)
164+ Buffer get(final Volume vol, final long page, final boolean writer, final boolean wantRead, final long timeout)
165 throws PersistitException {
166 final int hash = hashIndex(vol, page);
167 Buffer buffer = null;
168@@ -776,7 +805,7 @@
169 if (mustClaim) {
170 boolean claimed = false;
171 boolean same = true;
172- final long expires = System.currentTimeMillis() + SharedResource.DEFAULT_MAX_WAIT_TIME;
173+ final long expires = System.currentTimeMillis() + timeout;
174 while (same && !claimed && System.currentTimeMillis() < expires) {
175 /*
176 * We're here because we found the page we want, but another
177
178=== modified file 'src/main/java/com/persistit/CLI.java'
179--- src/main/java/com/persistit/CLI.java 2012-11-20 17:45:51 +0000
180+++ src/main/java/com/persistit/CLI.java 2013-01-22 16:32:21 +0000
181@@ -683,7 +683,7 @@
182 * @param volumepath
183 * @param rmiport
184 * @param y
185- * @return
186+ * @return Result description
187 * @throws Exception
188 */
189 @Cmd("open")
190
191=== modified file 'src/main/java/com/persistit/Configuration.java'
192--- src/main/java/com/persistit/Configuration.java 2012-11-30 23:06:38 +0000
193+++ src/main/java/com/persistit/Configuration.java 2013-01-22 16:32:21 +0000
194@@ -909,7 +909,7 @@
195 * Properties containing substitution values
196 * @param depth
197 * Count of recursive calls - maximum depth is 20.
198- * @return
199+ * @return String value resulting from substitutions
200 */
201 String substituteProperties(String text, final Properties properties, final int depth) {
202 int p = text.indexOf("${");
203
204=== modified file 'src/main/java/com/persistit/Exchange.java'
205--- src/main/java/com/persistit/Exchange.java 2013-01-07 23:10:42 +0000
206+++ src/main/java/com/persistit/Exchange.java 2013-01-22 16:32:21 +0000
207@@ -266,6 +266,7 @@
208 private BufferPool _pool;
209 private Volume _volume;
210 private Tree _tree;
211+ private long _timeoutMillis = SharedResource.DEFAULT_MAX_WAIT_TIME;
212
213 private volatile long _cachedTreeGeneration = -1;
214 private volatile int _cacheDepth = 0;
215@@ -300,7 +301,7 @@
216
217 private volatile Thread _thread;
218
219- private Exchange(final Persistit persistit) {
220+ Exchange(final Persistit persistit) {
221 _persistit = persistit;
222 _key = new Key(_persistit);
223 _spareKey1 = new Key(_persistit);
224@@ -1127,7 +1128,7 @@
225 * @param buffer
226 * @param key
227 * @param lc
228- * @return
229+ * @return foundAt value
230 * @throws PersistitInterruptedException
231 */
232 private int findKey(final Buffer buffer, final Key key, final LevelCache lc) throws PersistitInterruptedException {
233@@ -1281,7 +1282,7 @@
234 }
235
236 if (buffer == null) {
237- buffer = _pool.get(_volume, pageAddress, writer, true);
238+ buffer = _pool.get(_volume, pageAddress, writer, true, _timeoutMillis);
239 }
240 checkPageType(buffer, currentLevel + PAGE_TYPE_DATA, true);
241
242@@ -1714,8 +1715,7 @@
243 try {
244 sequence(WRITE_WRITE_STORE_A);
245 final long depends = _persistit.getTransactionIndex().wwDependency(re.getVersionHandle(),
246- // TODO - timeout?
247- _transaction.getTransactionStatus(), SharedResource.DEFAULT_MAX_WAIT_TIME);
248+ _transaction.getTransactionStatus(), _timeoutMillis);
249 if (depends != 0 && depends != TransactionStatus.ABORTED) {
250 // version is from concurrent txn that already
251 // committed
252@@ -1738,7 +1738,7 @@
253 treeClaimAcquired = false;
254 }
255 final boolean doWait = (options & StoreOptions.WAIT) != 0;
256- treeClaimAcquired = _treeHolder.claim(true, doWait ? SharedResource.DEFAULT_MAX_WAIT_TIME : 0);
257+ treeClaimAcquired = _treeHolder.claim(true, doWait ? _timeoutMillis : 0);
258 if (!treeClaimAcquired) {
259 if (!doWait) {
260 throw re;
261@@ -2175,7 +2175,8 @@
262
263 Debug.$assert0.t(rightSiblingPage >= 0 && rightSiblingPage <= MAX_VALID_PAGE_ADDR);
264 if (rightSiblingPage > 0) {
265- final Buffer rightSibling = _pool.get(_volume, rightSiblingPage, false, true);
266+ final Buffer rightSibling = _pool.get(_volume, rightSiblingPage, false, true,
267+ _timeoutMillis);
268 buffer.releaseTouched();
269 //
270 // Reset foundAtNext to point to the first key block
271@@ -2741,6 +2742,105 @@
272 }
273
274 /**
275+ * Invoke {@link #lock(Key, long)} with the current key and a default
276+ * timeout value of
277+ * {@value com.persistit.SharedResource#DEFAULT_MAX_WAIT_TIME} milliseconds.
278+ *
279+ * @throws PersistitException
280+ */
281+ public void lock() throws PersistitException {
282+ lock(_key, SharedResource.DEFAULT_MAX_WAIT_TIME);
283+ }
284+
285+ /**
286+ * <p>
287+ * Within a transaction, enforces a constraint that no other concurrent
288+ * transaction also successfully locks the same key. This method must run
289+ * within the scope of an active transaction.
290+ * </p>
291+ * <p>
292+ * This method is designed to help applications overcome problems with
293+ * "write skew" which is a type of isolation anomaly permitted by Snapshot
294+ * Isolation. See, for example,
295+ * http://en.wikipedia.org/wiki/Snapshot_isolation for a concise explanation
296+ * of Snapshot Isolation and the write skew anomaly.
297+ * <p>
298+ * </p>
299+ * To use this facility an application specifies a key which may or may not
300+ * be associated with an actual storage location, but which is designed to
301+ * conflict with any other transaction that could participate in a write
302+ * skew. Thus the operation serves as a way of ensuring serializable
303+ * execution of transactions that could otherwise experience write skew.
304+ * </p>
305+ * <p>
306+ * This method does not actually use any locking mechanism; rather, it
307+ * creates a write-write conflict with another transaction when both
308+ * transactions are concurrent and when both transactions attempt to lock
309+ * the same key. The result in that case is that one of the transactions
310+ * receives a {@link RollbackException}. An application using this facility
311+ * simply retries the transaction, at which point it is likely to
312+ * successfully execute the call to {@link #lock()}.
313+ * </p>
314+ * <p>
315+ * This method works by writing a short value associated with the provided
316+ * key into a temporary volume (accessible through the
317+ * {@link Persistit#getLockVolume()} method). The value is removed through
318+ * the normal pruning process soon after the all potentially conflicting
319+ * transactions have either rolled back or committed.
320+ * </p>
321+ * <p>
322+ * All of these interactions are performed through the normal MVCC
323+ * transaction mechanism. This method differs from the {@link #store()}
324+ * method only in that the {@link Tree} to which a value is written is
325+ * located in a reserved temporary volume and is therefore normally not
326+ * written to disk. The key is removed by pruning once there is are no
327+ * longer any concurrent transactions that could conflict with it.
328+ * </p>
329+ * <p>
330+ * As part of the normal MVCC process, if this method detects a potentially
331+ * conflicting lock written by another active concurrent transaction, this
332+ * transaction waits until the other transaction either commits or aborts.
333+ * To prevent an unbounded wait time this method accepts a timeout value in
334+ * milliseconds. If the potentially conflicting transaction neither commits
335+ * nor aborts during the timeout interval, this method throws a
336+ * <code>RollbackException</code>. In the event this method attempts to
337+ * enter a deadlock state with another current transaction; the potential
338+ * deadlock is detected immediately and this method immediately throws a
339+ * <code>RollbackException</code>.
340+ * </p>
341+ *
342+ * @param lockKey
343+ * the source Key
344+ * @param timeout
345+ * timeout interval in milliseconds, zero for default timeout
346+ * @throws PersistitException
347+ * @throws RollbackException
348+ * in the specific case that another concurrent transaction has
349+ * also locked the same key
350+ * @throws IllegalStateException
351+ * if this Thread does not have an active transaction scope
352+ * @see Transaction
353+ */
354+ public void lock(final Key lockKey, final long timeout) throws PersistitException {
355+ assertCorrectThread(true);
356+ _persistit.checkClosed();
357+ if (!_transaction.isActive()) {
358+ throw new IllegalStateException("No active transaction scope");
359+ }
360+ final Exchange lockExchange = _persistit.getExchange(_persistit.getLockVolume(), _tree.getName(), true);
361+ /**
362+ * Lock table trees need tree handles for pruning
363+ */
364+ _persistit.getJournalManager().handleForTree(lockExchange.getTree());
365+ lockExchange.setTimeoutMillis(timeout == 0 ? getTimeoutMillis() : timeout);
366+ lockKey.copyTo(lockExchange.getKey());
367+ lockExchange.getKey().testValidForStoreAndFetch(_pool.getBufferSize());
368+ lockExchange.getValue().clear().putAntiValueMVV();
369+ final int options = StoreOptions.WAIT | StoreOptions.DONT_JOURNAL | StoreOptions.MVCC;
370+ lockExchange.storeInternal(lockExchange.getKey(), lockExchange.getValue(), 0, options);
371+ }
372+
373+ /**
374 * Fetches the value associated with the <code>Key</code>, then inserts or
375 * updates the value. Effectively this swaps the content of
376 * <code>Value</code> with the database record associated with the current
377@@ -3051,7 +3151,7 @@
378 // claim is held for the duration to prevent a non-atomic
379 // update.
380 //
381- getLongRecordHelper().fetchLongRecord(value, minimumBytes);
382+ getLongRecordHelper().fetchLongRecord(value, minimumBytes, _timeoutMillis);
383 }
384 }
385
386@@ -4116,7 +4216,7 @@
387 if (buffer.isAfterRightEdge(foundAt)) {
388 final long rightSiblingPage = buffer.getRightSibling();
389 if (rightSiblingPage > 0) {
390- final Buffer rightSibling = _pool.get(_volume, rightSiblingPage, false, true);
391+ final Buffer rightSibling = _pool.get(_volume, rightSiblingPage, false, true, _timeoutMillis);
392 buffer.releaseTouched();
393 //
394 // Reset foundAtNext to point to the first key block
395@@ -4179,6 +4279,50 @@
396 }
397
398 /**
399+ * @return The standard timeout setting in milliseconds for this
400+ * <code>Exchange</code>
401+ * @see Exchange#setTimeoutMillis(long)
402+ */
403+ public long getTimeoutMillis() {
404+ assertCorrectThread(true);
405+ return _timeoutMillis;
406+ }
407+
408+ /**
409+ * <p>
410+ * Set the standard timeout for this <code>Exchange</code>. The timeout
411+ * value represents an approximate upper bound on the wait time for various
412+ * methods that wait for actions by other threads. For example, if a thread
413+ * needs to read a value from a {@link Buffer} that is currently be updated
414+ * by another thread, the read operation waits up to <code>timeout</code>
415+ * milliseconds for the other thread to release the <code>Buffer</code>.
416+ * </p>
417+ * <p>
418+ * The timeout value is advisory, and some operations may stall for a longer
419+ * period of time than specified. Setting a timeout does not guarantee
420+ * real-time behavior.
421+ * </p>
422+ * <p>
423+ * The supplied value must be non-negative. If it is zero, the default
424+ * timeout value {@value com.persistit.SharedResource#DEFAULT_MAX_WAIT_TIME}
425+ * milliseconds is set.
426+ * </p>
427+ *
428+ * @param timeout
429+ * Standard timeout setting, in milliseconds, for operations that
430+ * wait.
431+ */
432+ public void setTimeoutMillis(final long timeout) {
433+ if (timeout == 0) {
434+ _timeoutMillis = SharedResource.DEFAULT_MAX_WAIT_TIME;
435+ } else {
436+ // Clipping this to avoid potential overflows when computing
437+ // intervals
438+ _timeoutMillis = Util.rangeCheck(timeout, 0, Math.max(timeout, Long.MAX_VALUE / 2));
439+ }
440+ }
441+
442+ /**
443 * Returns a copy of either the data page or a page on the index path to the
444 * data page containing the current key. This method looks up the current
445 * key, then copies and returns the page at the specified tree level in a
446
447=== modified file 'src/main/java/com/persistit/JournalManager.java'
448--- src/main/java/com/persistit/JournalManager.java 2012-10-26 19:44:17 +0000
449+++ src/main/java/com/persistit/JournalManager.java 2013-01-22 16:32:21 +0000
450@@ -587,6 +587,9 @@
451 }
452
453 int handleForVolume(final Volume volume) throws PersistitException {
454+ if (volume.getHandle() != 0) {
455+ return volume.getHandle();
456+ }
457 if (!_allowHandlesForTempVolumesAndTrees && volume.isTemporary()) {
458 throw new IllegalStateException("Creating handle for temporary volume " + volume);
459 }
460@@ -621,7 +624,9 @@
461 }
462 handle = Integer.valueOf(++_handleCounter);
463 Debug.$assert0.t(!_handleToTreeMap.containsKey(handle));
464- writeTreeHandleToJournal(td, handle.intValue());
465+ if (td.getVolumeHandle() != Volume.LOCK_VOLUME_HANDLE) {
466+ writeTreeHandleToJournal(td, handle.intValue());
467+ }
468 _treeToHandleMap.put(td, handle);
469 _handleToTreeMap.put(handle, td);
470 }
471@@ -629,7 +634,7 @@
472 }
473
474 int handleForTree(final Tree tree) throws PersistitException {
475- if (!_allowHandlesForTempVolumesAndTrees && tree.getVolume().isTemporary()) {
476+ if (!_allowHandlesForTempVolumesAndTrees && tree.getVolume().isTemporary() && !tree.getVolume().isLockVolume()) {
477 throw new IllegalStateException("Creating handle for temporary tree " + tree);
478 }
479 if (tree.getHandle() != 0) {
480@@ -659,7 +664,11 @@
481 Volume volumeForHandle(final int handle) throws PersistitException {
482 final Volume volume = lookupVolumeHandle(handle);
483 if (volume == null) {
484- return null;
485+ if (handle == Volume.LOCK_VOLUME_HANDLE) {
486+ return _persistit.getLockVolume();
487+ } else {
488+ return null;
489+ }
490 }
491 if (!volume.isOpened()) {
492 volume.open(_persistit);
493
494=== modified file 'src/main/java/com/persistit/Key.java'
495--- src/main/java/com/persistit/Key.java 2012-11-07 14:03:51 +0000
496+++ src/main/java/com/persistit/Key.java 2013-01-22 16:32:21 +0000
497@@ -1748,7 +1748,7 @@
498 * @param v
499 * @param bytes
500 * @param offset
501- * @return
502+ * @return size of appended segment
503 */
504 private int appendIntInternal(final int v) {
505 int size = _size;
506
507=== modified file 'src/main/java/com/persistit/KeyFilter.java'
508--- src/main/java/com/persistit/KeyFilter.java 2012-10-19 14:10:24 +0000
509+++ src/main/java/com/persistit/KeyFilter.java 2013-01-22 16:32:21 +0000
510@@ -1524,7 +1524,7 @@
511 * @param level
512 * @param forward
513 * @param eq
514- * @return
515+ * @return whether there may be more matching keys
516 */
517 private boolean next(final Key key, final int index, final int level, final boolean forward, final boolean eq) {
518
519
520=== modified file 'src/main/java/com/persistit/KeyParser.java'
521--- src/main/java/com/persistit/KeyParser.java 2012-08-24 13:57:19 +0000
522+++ src/main/java/com/persistit/KeyParser.java 2013-01-22 16:32:21 +0000
523@@ -218,7 +218,7 @@
524 * successful, append the segment to the key.
525 *
526 * @param key
527- * @return
528+ * @return <code>true</code> a valid key segment was parsed
529 */
530 private boolean parseKeySegment(final Key key) {
531 final int index = _index;
532
533=== modified file 'src/main/java/com/persistit/LongRecordHelper.java'
534--- src/main/java/com/persistit/LongRecordHelper.java 2012-08-24 13:57:19 +0000
535+++ src/main/java/com/persistit/LongRecordHelper.java 2013-01-22 16:32:21 +0000
536@@ -60,7 +60,8 @@
537 * @param minimumBytesToFetch
538 * @throws PersistitException
539 */
540- void fetchLongRecord(final Value value, final int minimumBytesToFetch) throws PersistitException {
541+ void fetchLongRecord(final Value value, final int minimumBytesToFetch, final long timeout)
542+ throws PersistitException {
543
544 Buffer buffer = null;
545
546@@ -93,7 +94,7 @@
547 corrupt("Invalid LONG_RECORD remaining size=" + remainingSize + " of " + rawSize + " in page "
548 + page);
549 }
550- buffer = _volume.getPool().get(_volume, page, false, true);
551+ buffer = _volume.getPool().get(_volume, page, false, true, timeout);
552 if (buffer.getPageType() != PAGE_TYPE_LONG_RECORD) {
553 corrupt("LONG_RECORD chain is invalid at page " + page + " - invalid page type: " + buffer);
554 }
555
556=== modified file 'src/main/java/com/persistit/Persistit.java'
557--- src/main/java/com/persistit/Persistit.java 2012-12-31 01:47:18 +0000
558+++ src/main/java/com/persistit/Persistit.java 2013-01-22 16:32:21 +0000
559@@ -284,6 +284,8 @@
560
561 private final ThreadLocal<SoftReference<Value>> _valueThreadLocal = new ThreadLocal<SoftReference<Value>>();
562
563+ private volatile Volume _lockVolume;
564+
565 /**
566 * Construct a hollow Persistit instance. To be useful, the instance must
567 * receive a <code>Configuration</code> through one of the methods
568@@ -1284,6 +1286,18 @@
569 }
570
571 /**
572+ * @return reserved temporary volume for locks
573+ * @throws PersistitException
574+ */
575+ public synchronized Volume getLockVolume() throws PersistitException {
576+ if (_lockVolume == null) {
577+ _lockVolume = createTemporaryVolume();
578+ _lockVolume.setHandle(Volume.LOCK_VOLUME_HANDLE);
579+ }
580+ return _lockVolume;
581+ }
582+
583+ /**
584 * @return The {@link SplitPolicy} that will by applied by default to newly
585 * created or allocated {@link Exchange}s.
586 */
587
588=== modified file 'src/main/java/com/persistit/Tree.java'
589--- src/main/java/com/persistit/Tree.java 2012-09-27 18:22:25 +0000
590+++ src/main/java/com/persistit/Tree.java 2013-01-22 16:32:21 +0000
591@@ -274,8 +274,8 @@
592 }
593
594 /**
595- * Assign are set the tree handle. The tree must may not be a member of a
596- * temporary volume.
597+ * Set the tree handle. The tree must may not be a member of a temporary
598+ * volume.
599 *
600 * @throws PersistitException
601 */
602@@ -332,7 +332,7 @@
603 * only once.
604 *
605 * @param handle
606- * @return
607+ * @return the handle
608 * @throws IllegalStateException
609 * if the handle has already been set
610 */
611
612=== modified file 'src/main/java/com/persistit/TreeBuilder.java'
613--- src/main/java/com/persistit/TreeBuilder.java 2013-01-04 00:16:13 +0000
614+++ src/main/java/com/persistit/TreeBuilder.java 2013-01-22 16:32:21 +0000
615@@ -657,7 +657,8 @@
616 * @param key
617 * @param v1
618 * @param v2
619- * @return
620+ * @return If <code>true</code>, the resulting value is v2. If
621+ * <code>false</code>, the resulting value is v1.
622 * @throws DuplicateKeyException
623 * if a key being inserted or merged matches a key already
624 * exists
625
626=== modified file 'src/main/java/com/persistit/Volume.java'
627--- src/main/java/com/persistit/Volume.java 2013-01-02 22:41:20 +0000
628+++ src/main/java/com/persistit/Volume.java 2013-01-22 16:32:21 +0000
629@@ -48,6 +48,8 @@
630 */
631 public class Volume {
632
633+ final static int LOCK_VOLUME_HANDLE = Integer.MAX_VALUE;
634+
635 private final String _name;
636 private long _id;
637 private final AtomicBoolean _closing = new AtomicBoolean();
638@@ -336,6 +338,10 @@
639 return getStorage().isTemp();
640 }
641
642+ boolean isLockVolume() {
643+ return getHandle() == LOCK_VOLUME_HANDLE;
644+ }
645+
646 /**
647 * @return The size in bytes of one page in this <code>Volume</code>.
648 */
649@@ -546,7 +552,7 @@
650 * only once.
651 *
652 * @param handle
653- * @return
654+ * @return the handle
655 * @throws IllegalStateException
656 * if the handle has already been set
657 */
658
659=== added file 'src/test/java/com/persistit/ExchangeLockTest.java'
660--- src/test/java/com/persistit/ExchangeLockTest.java 1970-01-01 00:00:00 +0000
661+++ src/test/java/com/persistit/ExchangeLockTest.java 2013-01-22 16:32:21 +0000
662@@ -0,0 +1,313 @@
663+/**
664+ * Copyright © 2012 Akiban Technologies, Inc. All rights reserved.
665+ *
666+ * This program and the accompanying materials are made available
667+ * under the terms of the Eclipse Public License v1.0 which
668+ * accompanies this distribution, and is available at
669+ * http://www.eclipse.org/legal/epl-v10.html
670+ *
671+ * This program may also be available under different license terms.
672+ * For more information, see www.akiban.com or contact licensing@akiban.com.
673+ *
674+ * Contributors:
675+ * Akiban Technologies, Inc.
676+ */
677+
678+package com.persistit;
679+
680+import static org.junit.Assert.assertEquals;
681+import static org.junit.Assert.assertTrue;
682+import static org.junit.Assert.fail;
683+
684+import java.util.concurrent.Semaphore;
685+
686+import org.junit.Test;
687+
688+import com.persistit.exception.InUseException;
689+import com.persistit.exception.InvalidKeyException;
690+import com.persistit.exception.PersistitException;
691+
692+public class ExchangeLockTest extends PersistitUnitTestCase {
693+ private final static long DMILLIS = SharedResource.DEFAULT_MAX_WAIT_TIME;
694+ private final Semaphore _coordinator = new Semaphore(0);
695+
696+ @Test
697+ public void singleThreadedLock() throws Exception {
698+ final Exchange ex = _persistit.getExchange("persistit", "ExchangeLockTest", true);
699+ final Transaction txn = ex.getTransaction();
700+ try {
701+ ex.lock();
702+ fail("Expected to fail");
703+ } catch (final IllegalStateException e) {
704+ // expected
705+ }
706+ txn.begin();
707+ try {
708+ try {
709+ ex.lock();
710+ fail("Expected to fail");
711+ } catch (final InvalidKeyException e) {
712+ // expected
713+ }
714+ ex.append("motor");
715+ ex.lock();
716+ final Tree tree = _persistit.getLockVolume().getTree("ExchangeLockTest", false);
717+ assertTrue("Expected tree to be defined", tree != null);
718+ final Exchange ex2 = new Exchange(tree);
719+ ex2.ignoreMVCCFetch(true);
720+ assertTrue("Expect a key in the temp volume", ex2.next(true));
721+ txn.commit();
722+ } catch (final Exception e) {
723+ e.printStackTrace();
724+ } finally {
725+ txn.end();
726+ }
727+
728+ txn.begin();
729+ try {
730+ ex.lock();
731+ txn.commit();
732+ } finally {
733+ txn.end();
734+ }
735+ }
736+
737+ private class Locker implements Runnable {
738+ final Semaphore _semaphore = new Semaphore(0);
739+ final long _timeout;
740+ final int[] _sequence;
741+ Exception _exception;
742+ volatile int _expectedReleases;
743+
744+ volatile boolean _committed;
745+
746+ private Locker(final long timeout, final int... sequence) {
747+ _timeout = timeout;
748+ _sequence = sequence;
749+ }
750+
751+ private void go(final int waitFor) throws InterruptedException {
752+ _semaphore.release();
753+ _coordinator.acquire(waitFor);
754+ }
755+
756+ private int cycles() {
757+ return _sequence.length + 2;
758+ }
759+
760+ @Override
761+ public void run() {
762+ try {
763+ final Exchange ex = _persistit.getExchange("persistit", "ExchangeLockTest", true);
764+ final Transaction txn = ex.getTransaction();
765+ _expectedReleases = cycles();
766+ txn.begin();
767+ try {
768+ for (final int k : _sequence) {
769+ _semaphore.acquire();
770+ ex.clear().append(k).lock(ex.getKey(), _timeout);
771+ _coordinator.release();
772+ _expectedReleases--;
773+ }
774+ _semaphore.acquire();
775+ txn.commit();
776+ _committed = true;
777+ _coordinator.release();
778+ _expectedReleases--;
779+ } catch (final Exception e) {
780+ _exception = e;
781+ txn.rollback();
782+ } finally {
783+ if (_expectedReleases > 0) {
784+ _coordinator.release(_expectedReleases);
785+ }
786+ _semaphore.acquire();
787+ txn.end();
788+ _coordinator.release();
789+ _expectedReleases--;
790+ }
791+
792+ } catch (final Exception e) {
793+ e.printStackTrace();
794+ }
795+ }
796+ }
797+
798+ private Thread[] start(final Locker... lockers) {
799+ final Thread[] threads = new Thread[lockers.length];
800+ int count = 0;
801+ for (final Locker locker : lockers) {
802+ final Thread t = new Thread(locker);
803+ t.start();
804+ threads[count++] = t;
805+ }
806+ return threads;
807+ }
808+
809+ private void join(final Thread[] threads) throws InterruptedException {
810+ for (final Thread t : threads) {
811+ t.join();
812+ }
813+ }
814+
815+ @Test
816+ public void nonConflictingLocks() throws Exception {
817+ final Locker a = new Locker(DMILLIS, 1, 5);
818+ final Locker b = new Locker(DMILLIS, 2, 6);
819+ final Locker c = new Locker(DMILLIS, 3, 7);
820+ final Locker d = new Locker(DMILLIS, 4, 8);
821+ final Thread[] threads = start(a, b, c, d);
822+ for (int i = 0; i < a.cycles(); i++) {
823+ a.go(1);
824+ b.go(1);
825+ c.go(1);
826+ d.go(1);
827+ }
828+ join(threads);
829+ assertTrue(a._committed);
830+ assertTrue(b._committed);
831+ assertTrue(c._committed);
832+ assertTrue(d._committed);
833+ }
834+
835+ @Test
836+ public void simpleConflictingLocks() throws Exception {
837+ final Locker a = new Locker(1000, 1);
838+ final Locker b = new Locker(1000, 1);
839+ final Thread[] threads = start(a, b);
840+ final long start = System.currentTimeMillis();
841+ for (int i = 0; i < a.cycles(); i++) {
842+ a.go(1);
843+ b.go(1);
844+ }
845+ join(threads);
846+ final long end = System.currentTimeMillis();
847+ assertTrue(end - start >= 1000);
848+ assertTrue(a._committed ^ b._committed);
849+ }
850+
851+ @Test
852+ public void deadlock() throws Exception {
853+ final Locker a = new Locker(DMILLIS, 1, 2);
854+ final Locker b = new Locker(DMILLIS, 2, 1);
855+ final Thread[] threads = start(a, b);
856+ final long start = System.currentTimeMillis();
857+ for (int i = 0; i < a.cycles(); i++) {
858+ a.go(i == 0 ? 1 : 0);
859+ b.go(i == 0 ? 1 : 0);
860+ }
861+ join(threads);
862+ final long end = System.currentTimeMillis();
863+ assertTrue(end - start < DMILLIS);
864+ assertTrue(a._committed ^ b._committed);
865+ }
866+
867+ @Test
868+ public void multiWayDeadlock() throws Exception {
869+ final Locker a = new Locker(DMILLIS, 1, 2, 3, 4, 5);
870+ final Locker b = new Locker(DMILLIS, 2, 3, 4, 5, 1);
871+ final Locker c = new Locker(DMILLIS, 3, 4, 5, 1, 2);
872+ final Locker d = new Locker(DMILLIS, 4, 5, 1, 2, 3);
873+ final Locker e = new Locker(DMILLIS, 5, 1, 2, 3, 4);
874+ final Thread[] threads = start(a, b, c, d, e);
875+ final long start = System.currentTimeMillis();
876+ for (int i = 0; i < a.cycles(); i++) {
877+ final int w = (i == 0) ? 1 : 0;
878+ a.go(w);
879+ b.go(w);
880+ c.go(w);
881+ d.go(w);
882+ e.go(w);
883+ }
884+ join(threads);
885+ final long end = System.currentTimeMillis();
886+ assertTrue(end - start < DMILLIS);
887+ int succeeded = 0;
888+ for (final Locker l : new Locker[] { a, b, c, d, e }) {
889+ if (l._committed) {
890+ succeeded++;
891+ }
892+ }
893+ assertEquals(1, succeeded);
894+ }
895+
896+ @Test
897+ public void lockTablePruning() throws Exception {
898+ final Exchange ex = _persistit.getExchange("persistit", "ExchangeLockTest", true);
899+ final Transaction txn = ex.getTransaction();
900+ txn.begin();
901+ for (int i = 0; i < 10000; i++) {
902+ ex.clear().append(i).append(RED_FOX).lock();
903+ }
904+ txn.commit();
905+ txn.end();
906+
907+ final Exchange lockExchange = new Exchange(_persistit.getLockVolume().getTree("ExchangeLockTest", false));
908+ lockExchange.ignoreMVCCFetch(true);
909+ final int count1 = keyCount(lockExchange);
910+ assertTrue(count1 > 0);
911+ for (int i = 0; i < 10000; i++) {
912+ lockExchange.clear().append(i).append(RED_FOX).prune();
913+ }
914+
915+ final int count2 = keyCount(lockExchange);
916+ assertTrue(count2 < count1);
917+ _persistit.getTransactionIndex().updateActiveTransactionCache();
918+ _persistit.getCleanupManager().poll();
919+
920+ final int count3 = keyCount(lockExchange);
921+ assertEquals(0, count3);
922+
923+ }
924+
925+ private int keyCount(final Exchange ex) throws PersistitException {
926+ int count = 0;
927+ ex.clear();
928+ while (ex.next(true)) {
929+ count++;
930+ }
931+ return count;
932+ }
933+
934+ @Test
935+ public void timeout() throws Exception {
936+ /*
937+ * A cursory check of the Exchange timeout value
938+ */
939+ final Exchange ex = _persistit.getExchange("persistit", "gogo", true);
940+ ex.to("a").store();
941+ final long latchedPage = ex.fetchBufferCopy(0).getPageAddress();
942+ final BufferPool pool = ex.getBufferPool();
943+ final Volume volume = ex.getVolume();
944+ ex.setTimeoutMillis(1000);
945+ final long start = System.currentTimeMillis();
946+ final Semaphore a = new Semaphore(0);
947+ final Semaphore b = new Semaphore(0);
948+
949+ new Thread(new Runnable() {
950+ @Override
951+ public void run() {
952+ try {
953+ final Buffer buffer = pool.get(volume, latchedPage, true, true);
954+ a.release();
955+ b.acquire();
956+ buffer.release();
957+ } catch (final Exception e) {
958+ e.printStackTrace();
959+ }
960+ }
961+ }).start();
962+ a.acquire();
963+ try {
964+ ex.to("a").store();
965+ fail("Expected an InUseException");
966+
967+ } catch (final InUseException e) {
968+ // expected
969+ }
970+ final long end = System.currentTimeMillis();
971+ b.release();
972+ final long interval = end - start;
973+ assertTrue("Should have waited about 1 second", interval >= 1000 && interval < 2000);
974+ }
975+}
976
977=== modified file 'src/test/java/com/persistit/MediatedFileChannelTest.java'
978--- src/test/java/com/persistit/MediatedFileChannelTest.java 2012-08-24 13:57:19 +0000
979+++ src/test/java/com/persistit/MediatedFileChannelTest.java 2013-01-22 16:32:21 +0000
980@@ -84,7 +84,6 @@
981 for (int i = 0; i < 65536; i++) {
982 bb.array()[i] = (byte) (i % 32 + 64);
983 }
984-
985 final long start = System.nanoTime();
986 while (errors == 0 && System.nanoTime() - start < TIME) {
987 try {
988@@ -94,6 +93,7 @@
989 fc.force(true);
990 count++;
991 } catch (final InterruptedIOException e) {
992+ bb.position(0);
993 // ignore -- expected
994 interrupts++;
995 // need to clear the interrupted status
996
997=== modified file 'src/test/java/com/persistit/unit/UnitTestProperties.java'
998--- src/test/java/com/persistit/unit/UnitTestProperties.java 2012-08-10 16:18:11 +0000
999+++ src/test/java/com/persistit/unit/UnitTestProperties.java 2013-01-22 16:32:21 +0000
1000@@ -41,7 +41,6 @@
1001 p.setProperty("journalpath", "${datapath}/persistit_journal");
1002 p.setProperty("logfile", "${datapath}/persistit_${timestamp}.log");
1003 p.setProperty("tmpvoldir", "${datapath}");
1004- p.setProperty("rmiport", System.getProperty("rmiport", "8081"));
1005 p.setProperty("jmx", "true");
1006 return p;
1007 }
1008@@ -62,7 +61,6 @@
1009 p.setProperty("journalpath", "${datapath}/persistit_journal");
1010 p.setProperty("logfile", "${datapath}/persistit_${timestamp}.log");
1011 p.setProperty("tmpvoldir", "${datapath}");
1012- p.setProperty("rmiport", System.getProperty("rmiport", "8081"));
1013 return p;
1014 }
1015
1016@@ -78,7 +76,6 @@
1017 p.setProperty("journalpath", "${datapath}/persistit_alt_journal");
1018 p.setProperty("logfile", "${datapath}/persistit_${timestamp}.log");
1019 p.setProperty("tmpvoldir", "${datapath}");
1020- p.setProperty("rmiport", System.getProperty("rmiport", "8081"));
1021 return p;
1022 }
1023
1024@@ -94,8 +91,6 @@
1025 p.setProperty("journalpath", "${datapath}/persistit_journal");
1026 p.setProperty("logfile", "${datapath}/persistit_${timestamp}.log");
1027 p.setProperty("tmpvoldir", "${datapath}");
1028- p.setProperty("rmiport", System.getProperty("rmiport", "8081"));
1029- p.setProperty("jmx", "true");
1030 return p;
1031 }
1032

Subscribers

People subscribed via source and target branches