Merge lp:~pbeaman/akiban-persistit/warn-missing-volume into lp:akiban-persistit

Proposed by Peter Beaman
Status: Merged
Approved by: Nathan Williams
Approved revision: 345
Merged at revision: 342
Proposed branch: lp:~pbeaman/akiban-persistit/warn-missing-volume
Merge into: lp:akiban-persistit
Diff against target: 970 lines (+400/-151)
10 files modified
src/main/java/com/persistit/Configuration.java (+43/-6)
src/main/java/com/persistit/JournalManager.java (+48/-19)
src/main/java/com/persistit/Persistit.java (+1/-0)
src/main/java/com/persistit/RecoveryManager.java (+7/-3)
src/main/java/com/persistit/TransactionPlayer.java (+146/-108)
src/main/java/com/persistit/exception/MissingVolumeException.java (+45/-0)
src/main/java/com/persistit/logging/LogBase.java (+6/-0)
src/main/java/com/persistit/mxbeans/AlertMonitorMXBean.java (+1/-0)
src/main/java/com/persistit/mxbeans/JournalManagerMXBean.java (+6/-0)
src/test/java/com/persistit/JournalManagerTest.java (+97/-15)
To merge this branch: bzr merge lp:~pbeaman/akiban-persistit/warn-missing-volume
Reviewer Review Type Date Requested Status
Nathan Williams Approve
Review via email: mp+116735@code.launchpad.net

Description of the change

Modification inspired by https://bugs.launchpad.net/akiban-persistit/+bug/1028016 to issue warnings when a previously existing volume is missing, and an option to allow a system to continue by ignoring pages and transactions in the journal from missing volumes.

This branch adds code in several places to deal with the possibility that a formerly open volume is no longer present during recovery:

JournalManager#readForCopy
JournalManager#writeForCopy
TransactionPlayer#applyTransactionUpdates

In each case new logic posts warning messages via the AlertMonitor and then checks a new JournalManagerMXBean property isIgnoreMissingVolumes() to determine whether the problem is fatal or should be ignored.

To allow setting the property before recovery begins there is also a new Configuration setting "ignoremissingvolumes" (suitably long and obscure).

New tests in JournalManagerTest verify intended behavior.

To post a comment you must log in.
Revision history for this message
Nathan Williams (nwilliams) wrote :

One minor thing:
javadoc on setIgnoreMissingVolumes() references appendOnly parameter

Otherwise this makes sense I think.

review: Needs Fixing
345. By Peter Beaman

Small Javadoc issue

Revision history for this message
Peter Beaman (pbeaman) wrote :

Fixed. Thanks.

Revision history for this message
Nathan Williams (nwilliams) wrote :

Looks good.

review: Approve

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'src/main/java/com/persistit/Configuration.java'
2--- src/main/java/com/persistit/Configuration.java 2012-06-08 19:14:52 +0000
3+++ src/main/java/com/persistit/Configuration.java 2012-07-26 18:54:32 +0000
4@@ -38,12 +38,12 @@
5 import java.util.TreeMap;
6
7 import com.persistit.Transaction.CommitPolicy;
8+import com.persistit.exception.CorruptJournalException;
9 import com.persistit.exception.InvalidVolumeSpecificationException;
10 import com.persistit.exception.PersistitException;
11 import com.persistit.exception.PersistitIOException;
12 import com.persistit.exception.PropertiesNotFoundException;
13 import com.persistit.logging.DefaultPersistitLogger;
14-import com.persistit.logging.PersistitLevel;
15 import com.persistit.policy.JoinPolicy;
16 import com.persistit.policy.SplitPolicy;
17 import com.persistit.util.Util;
18@@ -271,6 +271,10 @@
19 public final static String APPEND_ONLY_PROPERTY = "appendonly";
20
21 /**
22+ * Property name for the "ignore missing volumes" property.
23+ */
24+ public final static String IGNORE_MISSING_VOLUMES_PROPERTY = "ignoremissingvolumes";
25+ /**
26 * Property name to specify the default {@link SplitPolicy}.
27 */
28 public final static String SPLIT_POLICY_PROPERTY_NAME = "splitpolicy";
29@@ -403,9 +407,10 @@
30 public void setMaximumCount(int maximumCount) {
31 this.maximumCount = maximumCount;
32 }
33-
34+
35 /**
36 * Set the minimum and maximum buffer count.
37+ *
38 * @param count
39 */
40 public void setCount(int count) {
41@@ -493,8 +498,8 @@
42 int bufferSizeWithOverhead = Buffer.bufferSizeWithOverhead(bufferSize);
43 int buffers = Math.max(minimumCount, Math.min(maximumCount, (int) (allocation / bufferSizeWithOverhead)));
44 if (buffers < BufferPool.MINIMUM_POOL_COUNT || buffers > BufferPool.MAXIMUM_POOL_COUNT
45- || (long)buffers * (long)bufferSizeWithOverhead > maximumAvailable
46- || (long)(buffers + 1) * (long)bufferSizeWithOverhead < minimumMemory) {
47+ || (long) buffers * (long) bufferSizeWithOverhead > maximumAvailable
48+ || (long) (buffers + 1) * (long) bufferSizeWithOverhead < minimumMemory) {
49 throw new IllegalArgumentException(String.format(
50 "Invalid buffer pool configuration: %,d buffers in %sb of maximum available memory", buffers,
51 displayableLongValue(maximumAvailable)));
52@@ -624,6 +629,7 @@
53 private int rmiServerPort;
54 private boolean jmx = true;
55 private boolean appendOnly;
56+ private boolean ignoreMissingVolumes;
57 private String tmpVolDir;
58 private int tmpVolPageSize;
59 private long tmpVolMaxSize;
60@@ -704,6 +710,7 @@
61 setAppendOnly(getBooleanProperty(APPEND_ONLY_PROPERTY, false));
62 setCommitPolicy(getProperty(COMMIT_POLICY_PROPERTY_NAME));
63 setConstructorOverride(getBooleanProperty(CONSTRUCTOR_OVERRIDE_PROPERTY_NAME, false));
64+ setIgnoreMissingVolumes(getBooleanProperty(IGNORE_MISSING_VOLUMES_PROPERTY, false));
65 setJmxEnabled(getBooleanProperty(ENABLE_JMX_PROPERTY_NAME, true));
66 setJoinPolicy(getProperty(JOIN_POLICY_PROPERTY_NAME));
67 setJournalPath(getProperty(JOURNAL_PATH_PROPERTY_NAME, DEFAULT_JOURNAL_PATH));
68@@ -1772,6 +1779,7 @@
69
70 /**
71 * Return the value defined by {@link #setAppendOnly}
72+ *
73 * @return the whether to start Persistit in append-only mode
74 */
75 public boolean isAppendOnly() {
76@@ -1780,7 +1788,7 @@
77
78 /**
79 * <p>
80- * Control whether Persistit starts in <i>append-only</i>. In this mode
81+ * Control whether Persistit starts in <i>append-only</i> mode. In this mode
82 * Persistit accumulates database updates in the journal without copying
83 * changes back into the volume files. This method changes only the initial
84 * state; use {@link Management#setAppendOnly(boolean)} method to change
85@@ -1792,10 +1800,39 @@
86 * </p>
87 *
88 * @param appendOnly
89- * the appendOnly to set
90+ * <code>true</code> to start Persistit in append-only only
91 */
92 public void setAppendOnly(boolean appendOnly) {
93 this.appendOnly = appendOnly;
94 }
95
96+ /**
97+ * Return the value defined by {@link #setIgnoreMissingVolumes(boolean)}
98+ *
99+ * @return the whether to start Persistit in ignore-missing-volumes mode
100+ */
101+ public boolean isIgnoreMissingVolumes() {
102+ return ignoreMissingVolumes;
103+ }
104+
105+ /**
106+ * <p>
107+ * Control whether Persistit starts in <i>ignore-missing-volumes</i> mode.
108+ * In this mode references in the journal to unknown volumes are ignored
109+ * rather than noted as {@link CorruptJournalException}s. Almost always this
110+ * mode should be disabled; the setting is available to enable recovery of a
111+ * journal into a subset of formerly existing volumes and should be used
112+ * only with care.
113+ * </p>
114+ * <p>
115+ * Default value is <code>false</code><br />
116+ * Property name is {@value #IGNORE_MISSING_VOLUMES_PROPERTY}
117+ * </p>
118+ *
119+ * @param ignoreMissingVolumes
120+ * <code>true</code> to ignore missing volumes
121+ */
122+ public void setIgnoreMissingVolumes(boolean ignoreMissingVolumes) {
123+ this.ignoreMissingVolumes = ignoreMissingVolumes;
124+ }
125 }
126
127=== modified file 'src/main/java/com/persistit/JournalManager.java'
128--- src/main/java/com/persistit/JournalManager.java 2012-06-29 21:43:10 +0000
129+++ src/main/java/com/persistit/JournalManager.java 2012-07-26 18:54:32 +0000
130@@ -135,6 +135,8 @@
131
132 private AtomicBoolean _appendOnly = new AtomicBoolean();
133
134+ private AtomicBoolean _ignoreMissingVolume = new AtomicBoolean();
135+
136 private String _journalFilePath;
137
138 /**
139@@ -375,6 +377,11 @@
140 }
141
142 @Override
143+ public boolean isIgnoreMissingVolumes() {
144+ return _ignoreMissingVolume.get();
145+ }
146+
147+ @Override
148 public boolean isCopyingFast() {
149 return _copyFast.get();
150 }
151@@ -385,6 +392,11 @@
152 }
153
154 @Override
155+ public void setIgnoreMissingVolumes(boolean ignore) {
156+ _ignoreMissingVolume.set(ignore);
157+ }
158+
159+ @Override
160 public void setCopyingFast(boolean fast) {
161 _copyFast.set(fast);
162 }
163@@ -2362,6 +2374,8 @@
164 int handle = -1;
165
166 for (final Iterator<PageNode> iterator = list.iterator(); iterator.hasNext();) {
167+ Volume volumeRef = null;
168+
169 if (_closed.get() && !_copyFast.get() || _appendOnly.get()) {
170 list.clear();
171 break;
172@@ -2373,23 +2387,20 @@
173 }
174 if (pageNode.getVolumeHandle() != handle) {
175 handle = -1;
176- volume = _handleToVolumeMap.get(pageNode.getVolumeHandle());
177- if (volume == null) {
178- // TODO
179- } else {
180- volume = _persistit.getVolume(volume.getName());
181+ volume = null;
182+ // Possibly hollow volume
183+ volumeRef = _handleToVolumeMap.get(pageNode.getVolumeHandle());
184+ if (volumeRef != null) {
185+ // Opened volume, if present
186+ volume = _persistit.getVolume(volumeRef.getName());
187 handle = pageNode.getVolumeHandle();
188 }
189 }
190- if (volume == null || volume.isClosed()) {
191- // Remove from the List so that below we won't remove it from
192- // from the pageMap.
193- iterator.remove();
194+ if (volume == null) {
195+ // Deal with this in writeForCopy
196 continue;
197 }
198
199- volume.verifyId(volume.getId());
200-
201 final int at = bb.position();
202 final long pageAddress;
203 try {
204@@ -2400,8 +2411,9 @@
205 }
206 pageAddress = readPageBufferFromJournal(stablePageNode, bb);
207 } catch (PersistitException ioe) {
208- _persistit.getLogBase().copyException.log(ioe, volume, pageNode.getPageAddress(), pageNode
209- .getJournalAddress());
210+ _persistit.getAlertMonitor().post(
211+ new Event(AlertLevel.ERROR, _persistit.getLogBase().copyException, ioe, volume, pageNode
212+ .getPageAddress(), pageNode.getJournalAddress()), AlertMonitor.JOURNAL_CATEGORY);
213 throw ioe;
214 }
215
216@@ -2420,10 +2432,12 @@
217
218 void writeForCopy(final List<PageNode> list, final ByteBuffer bb) throws PersistitException {
219 Collections.sort(list, PageNode.WRITE_COMPARATOR);
220+ Volume volumeRef = null;
221 Volume volume = null;
222 int handle = -1;
223
224 final HashSet<Volume> volumes = new HashSet<Volume>();
225+
226 for (final Iterator<PageNode> iterator = list.iterator(); iterator.hasNext();) {
227 if (_closed.get() && !_copyFast.get() || _appendOnly.get()) {
228 list.clear();
229@@ -2434,15 +2448,28 @@
230
231 if (pageNode.getVolumeHandle() != handle) {
232 handle = -1;
233- volume = _handleToVolumeMap.get(pageNode.getVolumeHandle());
234- if (volume == null) {
235- // TODO
236- } else {
237- volume = _persistit.getVolume(volume.getName());
238+ volume = null;
239+ // Possibly hollow volume
240+ volumeRef = _handleToVolumeMap.get(pageNode.getVolumeHandle());
241+ if (volumeRef != null) {
242+ // Opened volume, if present
243+ volume = _persistit.getVolume(volumeRef.getName());
244 handle = pageNode.getVolumeHandle();
245 }
246 }
247-
248+ if (volume == null) {
249+ _persistit.getAlertMonitor().post(
250+ new Event(AlertLevel.WARN, _persistit.getLogBase().missingVolume, volumeRef, pageNode
251+ .getJournalAddress()), AlertMonitor.MISSING_VOLUME_CATEGORY);
252+ if (_ignoreMissingVolume.get()) {
253+ _persistit.getLogBase().lostPageFromMissingVolume.log(pageNode.getPageAddress(), volumeRef,
254+ pageNode.getJournalAddress());
255+ // Not removing the page from the List here will cause
256+ // cleanupForCopy to remove it from
257+ // the page map.
258+ continue;
259+ }
260+ }
261 if (volume == null || volume.isClosed()) {
262 // Remove from the List so that below we won't remove it from
263 // from the pageMap.
264@@ -2450,6 +2477,8 @@
265 continue;
266 }
267
268+ volumeRef.verifyId(volume.getId());
269+
270 final long pageAddress = pageNode.getPageAddress();
271 volume.getStorage().extend(pageAddress);
272 final int pageSize = volume.getPageSize();
273
274=== modified file 'src/main/java/com/persistit/Persistit.java'
275--- src/main/java/com/persistit/Persistit.java 2012-07-25 23:58:46 +0000
276+++ src/main/java/com/persistit/Persistit.java 2012-07-26 18:54:32 +0000
277@@ -647,6 +647,7 @@
278
279 _journalManager.init(_recoveryManager, journalPath, journalSize);
280 _journalManager.setAppendOnly(_configuration.isAppendOnly());
281+ _journalManager.setIgnoreMissingVolumes(_configuration.isIgnoreMissingVolumes());
282 }
283
284 void initializeBufferPools() {
285
286=== modified file 'src/main/java/com/persistit/RecoveryManager.java'
287--- src/main/java/com/persistit/RecoveryManager.java 2012-06-28 14:48:47 +0000
288+++ src/main/java/com/persistit/RecoveryManager.java 2012-07-26 18:54:32 +0000
289@@ -699,12 +699,16 @@
290 return sb.toString();
291 }
292
293- public String addressToString(final long address) {
294- return _player.addressToString(address);
295+ String addressToString(final long address) {
296+ return TransactionPlayer.addressToString(address);
297 }
298
299 private String addressToString(final long address, final long timestamp) {
300- return _player.addressToString(address, timestamp);
301+ return TransactionPlayer.addressToString(address, timestamp);
302+ }
303+
304+ TransactionPlayer getPlayer() {
305+ return _player;
306 }
307
308 // ----------------------------Phase 1----------------------------
309
310=== modified file 'src/main/java/com/persistit/TransactionPlayer.java'
311--- src/main/java/com/persistit/TransactionPlayer.java 2012-05-25 18:50:59 +0000
312+++ src/main/java/com/persistit/TransactionPlayer.java 2012-07-26 18:54:32 +0000
313@@ -28,7 +28,10 @@
314 import java.nio.ByteBuffer;
315 import java.util.ArrayList;
316 import java.util.List;
317+import java.util.concurrent.atomic.AtomicLong;
318
319+import com.persistit.AlertMonitor.AlertLevel;
320+import com.persistit.AlertMonitor.Event;
321 import com.persistit.JournalManager.TransactionMapItem;
322 import com.persistit.JournalManager.TreeDescriptor;
323 import com.persistit.JournalRecord.D0;
324@@ -38,9 +41,14 @@
325 import com.persistit.JournalRecord.SR;
326 import com.persistit.JournalRecord.TX;
327 import com.persistit.exception.CorruptJournalException;
328+import com.persistit.exception.MissingVolumeException;
329 import com.persistit.exception.PersistitException;
330
331 class TransactionPlayer {
332+
333+ private final AtomicLong appliedUpdates = new AtomicLong();
334+ private final AtomicLong ignoredUpdates = new AtomicLong();
335+ private final AtomicLong failedUpdates = new AtomicLong();
336
337 interface TransactionPlayerListener {
338
339@@ -61,7 +69,7 @@
340 void endTransaction(long address, long timestamp) throws PersistitException;
341
342 void endRecovery(long address, long timestamp) throws PersistitException;
343-
344+
345 boolean requiresLongRecordConversion();
346
347 }
348@@ -72,7 +80,7 @@
349 _support = support;
350 }
351
352- public void applyTransaction(final TransactionMapItem item, final TransactionPlayerListener listener)
353+ void applyTransaction(final TransactionMapItem item, final TransactionPlayerListener listener)
354 throws PersistitException {
355
356 final List<Long> chainedAddress = new ArrayList<Long>();
357@@ -83,7 +91,6 @@
358 long startTimestamp;
359 long commitTimestamp;
360 long backchainAddress;
361- int appliedUpdates = 0;
362
363 for (;;) {
364 _support.read(address, TX.OVERHEAD);
365@@ -114,8 +121,8 @@
366 }
367
368 listener.startTransaction(address, startTimestamp, commitTimestamp);
369- appliedUpdates += applyTransactionUpdates(_support.getReadBuffer(), address, recordSize, startTimestamp, commitTimestamp,
370- listener);
371+ applyTransactionUpdates(_support.getReadBuffer(), address, recordSize, startTimestamp,
372+ commitTimestamp, listener);
373
374 for (Long continuation : chainedAddress) {
375 address = continuation.longValue();
376@@ -127,124 +134,140 @@
377 + " has invalid length " + recordSize + " or type " + type);
378 }
379 _support.read(address, recordSize);
380- appliedUpdates += applyTransactionUpdates(_support.getReadBuffer(), address, recordSize, startTimestamp, commitTimestamp,
381- listener);
382+ applyTransactionUpdates(_support.getReadBuffer(), address, recordSize, startTimestamp,
383+ commitTimestamp, listener);
384 }
385 listener.endTransaction(address, startTimestamp);
386
387 }
388
389- int applyTransactionUpdates(final ByteBuffer byteBuffer, final long address, final int recordSize,
390+ void applyTransactionUpdates(final ByteBuffer byteBuffer, final long address, final int recordSize,
391 final long startTimestamp, final long commitTimestamp, final TransactionPlayerListener listener)
392 throws PersistitException {
393 ByteBuffer bb = byteBuffer;
394 final int start = bb.position();
395 int end = start + recordSize;
396 int position = start + TX.OVERHEAD;
397- int appliedUpdates = 0;
398
399 while (position < end) {
400 bb.position(position);
401 final int innerSize = JournalRecord.getLength(bb);
402 final int type = JournalRecord.getType(bb);
403- switch (type) {
404- case SR.TYPE: {
405- final int keySize = SR.getKeySize(bb);
406- final int treeHandle = SR.getTreeHandle(bb);
407- final Exchange exchange = getExchange(treeHandle, address, startTimestamp);
408- exchange.ignoreTransactions();
409- final Key key = exchange.getKey();
410- final Value value = exchange.getValue();
411- System.arraycopy(bb.array(), bb.position() + SR.OVERHEAD, key.getEncodedBytes(), 0, keySize);
412- key.setEncodedSize(keySize);
413- final int valueSize = innerSize - SR.OVERHEAD - keySize;
414- value.ensureFit(valueSize);
415- System.arraycopy(bb.array(), bb.position() + SR.OVERHEAD + keySize, value.getEncodedBytes(), 0,
416- valueSize);
417- value.setEncodedSize(valueSize);
418-
419- if (value.getEncodedSize() >= Buffer.LONGREC_SIZE
420- && (value.getEncodedBytes()[0] & 0xFF) == Buffer.LONGREC_TYPE) {
421+ try {
422+ switch (type) {
423+ case SR.TYPE: {
424+ final int keySize = SR.getKeySize(bb);
425+ final int treeHandle = SR.getTreeHandle(bb);
426+ final Exchange exchange = getExchange(treeHandle, address, startTimestamp);
427+ exchange.ignoreTransactions();
428+ final Key key = exchange.getKey();
429+ final Value value = exchange.getValue();
430+ System.arraycopy(bb.array(), bb.position() + SR.OVERHEAD, key.getEncodedBytes(), 0, keySize);
431+ key.setEncodedSize(keySize);
432+ final int valueSize = innerSize - SR.OVERHEAD - keySize;
433+ value.ensureFit(valueSize);
434+ System.arraycopy(bb.array(), bb.position() + SR.OVERHEAD + keySize, value.getEncodedBytes(), 0,
435+ valueSize);
436+ value.setEncodedSize(valueSize);
437+
438+ if (value.getEncodedSize() >= Buffer.LONGREC_SIZE
439+ && (value.getEncodedBytes()[0] & 0xFF) == Buffer.LONGREC_TYPE) {
440+ /*
441+ * convertToLongRecord will pollute the getReadBuffer().
442+ * Therefore before calling it we need to copy the TX
443+ * record to a fresh ByteBuffer.
444+ */
445+ if (bb == _support.getReadBuffer()) {
446+ end = recordSize - (position - start);
447+ bb = ByteBuffer.allocate(end);
448+ bb.put(_support.getReadBuffer().array(), position, end);
449+ bb.flip();
450+ position = 0;
451+ }
452+ if (listener.requiresLongRecordConversion()) {
453+ _support.convertToLongRecord(value, treeHandle, address, commitTimestamp);
454+ }
455+ }
456+
457+ listener.store(address, startTimestamp, exchange);
458+ appliedUpdates.incrementAndGet();
459+ // Don't keep exchanges with enlarged value - let them be
460+ // GC'd
461+ if (exchange.getValue().getMaximumSize() < Value.DEFAULT_MAXIMUM_SIZE) {
462+ releaseExchange(exchange);
463+ }
464+ break;
465+ }
466+
467+ case DR.TYPE: {
468+ final int key1Size = DR.getKey1Size(bb);
469+ final int elisionCount = DR.getKey2Elision(bb);
470+ final Exchange exchange = getExchange(DR.getTreeHandle(bb), address, startTimestamp);
471+ exchange.ignoreTransactions();
472+ final Key key1 = exchange.getAuxiliaryKey1();
473+ final Key key2 = exchange.getAuxiliaryKey2();
474+ System.arraycopy(bb.array(), bb.position() + DR.OVERHEAD, key1.getEncodedBytes(), 0, key1Size);
475+ key1.setEncodedSize(key1Size);
476+ final int key2Size = innerSize - DR.OVERHEAD - key1Size;
477+ System.arraycopy(key1.getEncodedBytes(), 0, key2.getEncodedBytes(), 0, elisionCount);
478+ System.arraycopy(bb.array(), bb.position() + DR.OVERHEAD + key1Size, key2.getEncodedBytes(),
479+ elisionCount, key2Size);
480+ key2.setEncodedSize(key2Size + elisionCount);
481+ listener.removeKeyRange(address, startTimestamp, exchange, exchange.getAuxiliaryKey1(), exchange
482+ .getAuxiliaryKey2());
483+ appliedUpdates.incrementAndGet();
484+ releaseExchange(exchange);
485+ break;
486+ }
487+
488+ case DT.TYPE: {
489+ final Exchange exchange = getExchange(DT.getTreeHandle(bb), address, startTimestamp);
490+ listener.removeTree(address, startTimestamp, exchange);
491+ appliedUpdates.incrementAndGet();
492+ releaseExchange(exchange);
493+ break;
494+ }
495+
496+ case D0.TYPE: {
497+ final Exchange exchange = getExchange(D0.getTreeHandle(bb), address, startTimestamp);
498+ listener.delta(address, startTimestamp, exchange.getTree(), D0.getIndex(bb), D0
499+ .getAccumulatorTypeOrdinal(bb), 1);
500+ appliedUpdates.incrementAndGet();
501+ break;
502+ }
503+
504+ case D1.TYPE: {
505+ final Exchange exchange = getExchange(D1.getTreeHandle(bb), address, startTimestamp);
506+ listener.delta(address, startTimestamp, exchange.getTree(), D1.getIndex(bb), D1
507+ .getAccumulatorTypeOrdinal(bb), D1.getValue(bb));
508+ appliedUpdates.incrementAndGet();
509+ break;
510+ }
511+
512+ default: {
513+ throw new CorruptJournalException("Invalid record type " + type + " at journal address "
514+ + addressToString(address + position - start) + " index of transaction record at "
515+ + addressToString(address));
516+ }
517+ }
518+ } catch (MissingVolumeException mve) {
519+ final Persistit db = _support.getPersistit();
520+ if (db.getJournalManager().isIgnoreMissingVolumes()) {
521 /*
522- * convertToLongRecord will pollute the getReadBuffer().
523- * Therefore before calling it we need to copy the TX record
524- * to a fresh ByteBuffer.
525+ * If ignoreMissingVolumes is enabled, then issue a warning
526+ * Alert, but allow recovery or rollback to continue.
527 */
528- if (bb == _support.getReadBuffer()) {
529- end = recordSize - (position - start);
530- bb = ByteBuffer.allocate(end);
531- bb.put(_support.getReadBuffer().array(), position, end);
532- bb.flip();
533- position = 0;
534- }
535- if (listener.requiresLongRecordConversion()) {
536- _support.convertToLongRecord(value, treeHandle, address, commitTimestamp);
537- }
538- }
539-
540- listener.store(address, startTimestamp, exchange);
541- appliedUpdates++;
542- // Don't keep exchanges with enlarged value - let them be GC'd
543- if (exchange.getValue().getMaximumSize() < Value.DEFAULT_MAXIMUM_SIZE) {
544- releaseExchange(exchange);
545- }
546- break;
547- }
548-
549- case DR.TYPE: {
550- final int key1Size = DR.getKey1Size(bb);
551- final int elisionCount = DR.getKey2Elision(bb);
552- final Exchange exchange = getExchange(DR.getTreeHandle(bb), address, startTimestamp);
553- exchange.ignoreTransactions();
554- final Key key1 = exchange.getAuxiliaryKey1();
555- final Key key2 = exchange.getAuxiliaryKey2();
556- System.arraycopy(bb.array(), bb.position() + DR.OVERHEAD, key1.getEncodedBytes(), 0, key1Size);
557- key1.setEncodedSize(key1Size);
558- final int key2Size = innerSize - DR.OVERHEAD - key1Size;
559- System.arraycopy(key1.getEncodedBytes(), 0, key2.getEncodedBytes(), 0, elisionCount);
560- System.arraycopy(bb.array(), bb.position() + DR.OVERHEAD + key1Size, key2.getEncodedBytes(),
561- elisionCount, key2Size);
562- key2.setEncodedSize(key2Size + elisionCount);
563- listener.removeKeyRange(address, startTimestamp, exchange, exchange.getAuxiliaryKey1(), exchange
564- .getAuxiliaryKey2());
565- appliedUpdates++;
566- releaseExchange(exchange);
567- break;
568- }
569-
570- case DT.TYPE: {
571- final Exchange exchange = getExchange(DT.getTreeHandle(bb), address, startTimestamp);
572- listener.removeTree(address, startTimestamp, exchange);
573- appliedUpdates++;
574- releaseExchange(exchange);
575- break;
576- }
577-
578- case D0.TYPE: {
579- final Exchange exchange = getExchange(D0.getTreeHandle(bb), address, startTimestamp);
580- listener.delta(address, startTimestamp, exchange.getTree(), D0.getIndex(bb), D0
581- .getAccumulatorTypeOrdinal(bb), 1);
582- appliedUpdates++;
583- break;
584- }
585-
586- case D1.TYPE: {
587- final Exchange exchange = getExchange(D1.getTreeHandle(bb), address, startTimestamp);
588- listener.delta(address, startTimestamp, exchange.getTree(), D1.getIndex(bb), D1
589- .getAccumulatorTypeOrdinal(bb), D1.getValue(bb));
590- appliedUpdates++;
591- break;
592- }
593-
594- default: {
595- throw new CorruptJournalException("Invalid record type " + type + " at journal address "
596- + addressToString(address + position - start) + " index of transaction record at "
597- + addressToString(address));
598- }
599+ db.getAlertMonitor().post(
600+ new Event(AlertLevel.WARN, db.getLogBase().missingVolume, mve.getVolumeName(), address
601+ + position - start), AlertMonitor.MISSING_VOLUME_CATEGORY);
602+ ignoredUpdates.incrementAndGet();
603+ } else {
604+ failedUpdates.incrementAndGet();
605+ throw mve;
606+ }
607 }
608 position += innerSize;
609 }
610- return appliedUpdates;
611 }
612
613 public static String addressToString(final long address) {
614@@ -261,17 +284,20 @@
615 throw new CorruptJournalException("Tree handle " + treeHandle + " is undefined at "
616 + addressToString(from, timestamp));
617 }
618- Volume volume = _support.handleToVolume(td.getVolumeHandle());
619- if (volume == null) {
620+ Volume volumeRef = _support.handleToVolume(td.getVolumeHandle());
621+ Volume volume;
622+ if (volumeRef == null) {
623 throw new CorruptJournalException("Volume handle " + td.getVolumeHandle() + " is undefined at "
624 + addressToString(from, timestamp));
625 }
626
627- if (!volume.isOpened()) {
628- volume = _support.getPersistit().getVolume(volume.getName());
629+ if (volumeRef.isOpened()) {
630+ volume = volumeRef;
631+ } else {
632+ volume = _support.getPersistit().getVolume(volumeRef.getName());
633 if (volume == null) {
634- throw new CorruptJournalException("No matching Volume found for journal reference " + volume + " at "
635- + addressToString(from, timestamp));
636+ throw new MissingVolumeException("No matching Volume found for journal reference " + volumeRef + " at "
637+ + addressToString(from, timestamp), volumeRef.getName());
638 }
639 }
640 volume.verifyId(volume.getId());
641@@ -286,4 +312,16 @@
642 private void releaseExchange(final Exchange exchange) {
643 _support.getPersistit().releaseExchange(exchange);
644 }
645+
646+ long getAppliedUpdates() {
647+ return appliedUpdates.get();
648+ }
649+
650+ long getIgnoredUpdates() {
651+ return ignoredUpdates.get();
652+ }
653+
654+ long getFailedUpdates() {
655+ return failedUpdates.get();
656+ }
657 }
658
659=== added file 'src/main/java/com/persistit/exception/MissingVolumeException.java'
660--- src/main/java/com/persistit/exception/MissingVolumeException.java 1970-01-01 00:00:00 +0000
661+++ src/main/java/com/persistit/exception/MissingVolumeException.java 2012-07-26 18:54:32 +0000
662@@ -0,0 +1,45 @@
663+/**
664+ * Copyright © 2011-2012 Akiban Technologies, Inc. All rights reserved.
665+ *
666+ * This program is free software: you can redistribute it and/or modify
667+ * it under the terms of the GNU Affero General Public License as
668+ * published by the Free Software Foundation, version 3 (only) of the
669+ * License.
670+ *
671+ * This program is distributed in the hope that it will be useful,
672+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
673+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
674+ * GNU Affero General Public License for more details.
675+ *
676+ * You should have received a copy of the GNU Affero General Public License
677+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
678+ *
679+ * This program may also be available under different license terms. For more
680+ * information, see www.akiban.com or contact licensing@akiban.com.
681+ */
682+
683+package com.persistit.exception;
684+
685+/**
686+ * Thrown if the journal files refer to a volume that is no longer present in
687+ * the system. Generally this condition is irrecoverable because without the
688+ * missing volume a consistent database state cannot be restored. However, in
689+ * the event the removal of the volume is intentional, it is possible to specify
690+ * a mode in which pages and transactions destined for missing volumes are
691+ * ignored.
692+ *
693+ * @version 1.0
694+ */
695+public class MissingVolumeException extends CorruptJournalException {
696+ private static final long serialVersionUID = -9014051945087375523L;
697+ private final String _volumeName;
698+
699+ public MissingVolumeException(final String msg, final String volumeName) {
700+ super(msg);
701+ _volumeName = volumeName;
702+ }
703+
704+ public String getVolumeName() {
705+ return _volumeName;
706+ }
707+}
708
709=== modified file 'src/main/java/com/persistit/logging/LogBase.java'
710--- src/main/java/com/persistit/logging/LogBase.java 2012-07-25 23:58:46 +0000
711+++ src/main/java/com/persistit/logging/LogBase.java 2012-07-26 18:54:32 +0000
712@@ -174,6 +174,12 @@
713 @Message("ERROR|Exception %s while writing volume %s page %,d")
714 public final LogItem writeException = PersistitLogMessage.empty();
715
716+ @Message("WARNING|Missing volume %s referenced at journal address %,d")
717+ public final LogItem missingVolume = PersistitLogMessage.empty();
718+
719+ @Message("WARNING|Lost page %,d from missing volume %s referenced at journal address %,d")
720+ public final LogItem lostPageFromMissingVolume = PersistitLogMessage.empty();
721+
722 @Message("WARNING|Exception %s while copying volume %s page %,d from journal address %,d")
723 public final LogItem copyException = PersistitLogMessage.empty();
724
725
726=== modified file 'src/main/java/com/persistit/mxbeans/AlertMonitorMXBean.java'
727--- src/main/java/com/persistit/mxbeans/AlertMonitorMXBean.java 2012-05-25 18:50:59 +0000
728+++ src/main/java/com/persistit/mxbeans/AlertMonitorMXBean.java 2012-07-26 18:54:32 +0000
729@@ -41,6 +41,7 @@
730 final static String EXTEND_VOLUME_CATEGORY = "ExtendVolume";
731 final static String FLUSH_STATISTICS_CATEGORY = "FlushStatistics";
732 final static String CLEANUP_CATEGORY = "Cleanup";
733+ final static String MISSING_VOLUME_CATEGORY = "MissingVolume";
734
735 /**
736 * Current maximum AlertLevel in this monitor as a String: one of NORMAL,
737
738=== modified file 'src/main/java/com/persistit/mxbeans/JournalManagerMXBean.java'
739--- src/main/java/com/persistit/mxbeans/JournalManagerMXBean.java 2012-05-25 18:50:59 +0000
740+++ src/main/java/com/persistit/mxbeans/JournalManagerMXBean.java 2012-07-26 18:54:32 +0000
741@@ -148,12 +148,18 @@
742
743 @Description("True if copying of pages from the journal to their destination volumes is disabled")
744 boolean isAppendOnly();
745+
746+ @Description("True to allow journal to lose pages from missing volumes")
747+ boolean isIgnoreMissingVolumes();
748
749 @Description("True if copy-fast mode has been enabled")
750 boolean isCopyingFast();
751
752 @Description("True if copying of pages from the journal to their destination volumes is disabled")
753 void setAppendOnly(boolean appendOnly);
754+
755+ @Description("True to allow journal to lose pages from missing volumes")
756+ void setIgnoreMissingVolumes(boolean ignore);
757
758 @Description("True if copy-fast mode has been enabled")
759 void setCopyingFast(boolean fast);
760
761=== modified file 'src/test/java/com/persistit/JournalManagerTest.java'
762--- src/test/java/com/persistit/JournalManagerTest.java 2012-05-25 18:50:59 +0000
763+++ src/test/java/com/persistit/JournalManagerTest.java 2012-07-26 18:54:32 +0000
764@@ -21,19 +21,19 @@
765 package com.persistit;
766
767 import static com.persistit.unit.ConcurrentUtil.createThread;
768-import static com.persistit.unit.ConcurrentUtil.ThrowingRunnable;
769 import static com.persistit.unit.ConcurrentUtil.startAndJoinAssertSuccess;
770 import static com.persistit.util.SequencerConstants.PAGE_MAP_READ_INVALIDATE_B;
771 import static com.persistit.util.SequencerConstants.PAGE_MAP_READ_INVALIDATE_C;
772+import static com.persistit.util.SequencerConstants.PAGE_MAP_READ_INVALIDATE_SCHEDULE;
773 import static com.persistit.util.ThreadSequencer.addSchedules;
774 import static com.persistit.util.ThreadSequencer.disableSequencer;
775 import static com.persistit.util.ThreadSequencer.enableSequencer;
776-import static com.persistit.util.ThreadSequencer.PAGE_MAP_READ_INVALIDATE_SCHEDULE;
777 import static com.persistit.util.ThreadSequencer.sequence;
778 import static org.junit.Assert.assertEquals;
779 import static org.junit.Assert.assertTrue;
780 import static org.junit.Assert.fail;
781
782+import java.io.File;
783 import java.util.ArrayList;
784 import java.util.HashMap;
785 import java.util.HashSet;
786@@ -50,6 +50,7 @@
787 import com.persistit.JournalManager.PageNode;
788 import com.persistit.TransactionPlayer.TransactionPlayerListener;
789 import com.persistit.exception.PersistitException;
790+import com.persistit.unit.ConcurrentUtil.ThrowingRunnable;
791 import com.persistit.unit.PersistitUnitTestCase;
792 import com.persistit.unit.UnitTestProperties;
793 import com.persistit.util.Util;
794@@ -405,7 +406,7 @@
795 * Remove from the left end
796 */
797 {
798- final List<PageNode> source =testCleanupPageListSource(10);
799+ final List<PageNode> source = testCleanupPageListSource(10);
800 for (int i = 0; i < 4; i++) {
801 source.get(i).invalidate();
802 }
803@@ -416,7 +417,7 @@
804 * Remove from the right end
805 */
806 {
807- final List<PageNode> source =testCleanupPageListSource(10);
808+ final List<PageNode> source = testCleanupPageListSource(10);
809 for (int i = 10; --i >= 7;) {
810 source.get(i).invalidate();
811 }
812@@ -427,7 +428,7 @@
813 * Remove from the middle
814 */
815 {
816- final List<PageNode> source =testCleanupPageListSource(10);
817+ final List<PageNode> source = testCleanupPageListSource(10);
818 for (int i = 2; i < 8; i++) {
819 source.get(i).invalidate();
820 }
821@@ -454,6 +455,88 @@
822 }
823 }
824
825+ @Test
826+ public void missingVolumePageHandling() throws Exception {
827+ final Configuration config = _persistit.getConfiguration();
828+ Volume volume1 = new Volume(config.volumeSpecification("${datapath}/missing1,create,"
829+ + "pageSize:16384,initialPages:1,extensionPages:1,maximumPages:25000"));
830+ volume1.open(_persistit);
831+ Volume volume2 = new Volume(config.volumeSpecification("${datapath}/missing2,create,"
832+ + "pageSize:16384,initialPages:1,extensionPages:1,maximumPages:25000"));
833+ volume2.open(_persistit);
834+ _persistit.getExchange(volume1, "test1", true);
835+ _persistit.getExchange(volume2, "test2", true);
836+ _persistit.close();
837+ new File(volume1.getPath()).delete();
838+ new File(volume2.getPath()).delete();
839+ volume1 = null;
840+ volume2 = null;
841+ _persistit = new Persistit();
842+ _persistit.initialize(config);
843+ AlertMonitor am = _persistit.getAlertMonitor();
844+ assertTrue("Startup with missing volumes should have generated alerts", am
845+ .getHistory(AlertMonitor.MISSING_VOLUME_CATEGORY) != null);
846+
847+ _persistit.getJournalManager().setIgnoreMissingVolumes(true);
848+ // Should add more alerts
849+ _persistit.copyBackPages();
850+ int alertCount1 = am.getHistory(AlertMonitor.MISSING_VOLUME_CATEGORY).getCount();
851+ _persistit.copyBackPages();
852+ int alertCount2 = am.getHistory(AlertMonitor.MISSING_VOLUME_CATEGORY).getCount();
853+ assertEquals("No more alerts after setting ignoreMissingVolumes", alertCount1, alertCount2);
854+ }
855+
856+ @Test
857+ public void missingVolumeTransactionHandlingNotIgnored() throws Exception {
858+ final Configuration config = _persistit.getConfiguration();
859+ Volume volume = new Volume(config.volumeSpecification("${datapath}/missing1,create,"
860+ + "pageSize:16384,initialPages:1,extensionPages:1,maximumPages:25000"));
861+ volume.open(_persistit);
862+ Exchange ex = _persistit.getExchange(volume, "test1", true);
863+ final Transaction txn = ex.getTransaction();
864+ txn.begin();
865+ ex.getValue().put(RED_FOX);
866+ ex.to(0).store();
867+ txn.commit();
868+ txn.end();
869+ _persistit.getJournalManager().flush();
870+ _persistit.crash();
871+ new File(volume.getPath()).delete();
872+ volume = null;
873+ _persistit = new Persistit();
874+ _persistit.initialize(config);
875+ assertTrue("Should have failed updates during recovery", _persistit.getRecoveryManager().getPlayer()
876+ .getFailedUpdates() > 0);
877+ }
878+
879+ @Test
880+ public void missingVolumeTransactionHandlingIgnored() throws Exception {
881+ final Configuration config = _persistit.getConfiguration();
882+ Volume volume = new Volume(config.volumeSpecification("${datapath}/missing1,create,"
883+ + "pageSize:16384,initialPages:1,extensionPages:1,maximumPages:25000"));
884+ volume.open(_persistit);
885+ Exchange ex = _persistit.getExchange(volume, "test1", true);
886+ final Transaction txn = ex.getTransaction();
887+ txn.begin();
888+ ex.getValue().put(RED_FOX);
889+ ex.to(0).store();
890+ txn.commit();
891+ txn.end();
892+ _persistit.getJournalManager().flush();
893+ _persistit.crash();
894+ new File(volume.getPath()).delete();
895+ volume = null;
896+ config.setIgnoreMissingVolumes(true);
897+ _persistit = new Persistit();
898+ _persistit.getJournalManager().setIgnoreMissingVolumes(true);
899+ _persistit.initialize(config);
900+ AlertMonitor am = _persistit.getAlertMonitor();
901+ assertTrue("Startup with missing volumes should have generated alerts", am
902+ .getHistory(AlertMonitor.MISSING_VOLUME_CATEGORY) != null);
903+ assertTrue("Should have failed updates during recovery", _persistit.getRecoveryManager().getPlayer()
904+ .getIgnoredUpdates() > 0);
905+ }
906+
907 private List<PageNode> testCleanupPageListSource(final int size) {
908 final List<PageNode> source = new ArrayList<PageNode>(size);
909 for (int index = 0; index < 1000000; index++) {
910@@ -461,8 +544,7 @@
911 }
912 return source;
913 }
914-
915-
916+
917 private void testCleanupPageListHelper(final List<PageNode> source) throws Exception {
918 final List<PageNode> cleaned = new ArrayList<PageNode>(source);
919 for (Iterator<PageNode> iterator = cleaned.iterator(); iterator.hasNext();) {
920@@ -512,8 +594,8 @@
921 final int COUNT = 5000;
922 final String TREE_NAME = "JournalManagerTest1";
923 /*
924- * Test sequence points in the JOURNAL_COPIER path, don't
925- * want to hit unintentionally hit them.
926+ * Test sequence points in the JOURNAL_COPIER path, don't want to hit
927+ * unintentionally hit them.
928 */
929 _persistit.getJournalManager().setCopierInterval(50000);
930 /*
931@@ -522,7 +604,7 @@
932 Transaction txn = _persistit.getTransaction();
933 txn.begin();
934 Exchange ex = _persistit.getExchange(_volumeName, TREE_NAME, true);
935- for(int i = 0; i < COUNT; ++i) {
936+ for (int i = 0; i < COUNT; ++i) {
937 ex.to(i);
938 ex.getValue().put(RED_FOX);
939 ex.store();
940@@ -530,8 +612,8 @@
941 txn.commit();
942 txn.end();
943 /*
944- * Thread will read over everything that is inserted, hopefully going
945- * to the journal for each required page.
946+ * Thread will read over everything that is inserted, hopefully going to
947+ * the journal for each required page.
948 */
949 Thread thread1 = createThread("READ_THREAD", new ThrowingRunnable() {
950 @Override
951@@ -541,7 +623,7 @@
952 Exchange ex = _persistit.getExchange(_volumeName, TREE_NAME, false);
953 ex.to(Key.BEFORE);
954 int count = 0;
955- while(ex.next(true)) {
956+ while (ex.next(true)) {
957 ++count;
958 }
959 assertEquals("Traversed count", COUNT, count);
960@@ -549,8 +631,8 @@
961 }
962 });
963 /*
964- * Thread will copy pages out of the journal and into the volume, hopefully
965- * invalidating pageMap entries during the cleanupForCopy.
966+ * Thread will copy pages out of the journal and into the volume,
967+ * hopefully invalidating pageMap entries during the cleanupForCopy.
968 */
969 Thread thread2 = createThread("COPY_BACK_THREAD", new ThrowingRunnable() {
970 @Override

Subscribers

People subscribed via source and target branches