=== modified file 'src/main/java/com/persistit/BufferPool.java'
--- src/main/java/com/persistit/BufferPool.java 2012-08-04 19:19:44 +0000
+++ src/main/java/com/persistit/BufferPool.java 2012-08-07 21:14:18 +0000
@@ -16,7 +16,6 @@
package com.persistit;
import java.io.DataOutputStream;
-import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.HashSet;
@@ -37,6 +36,7 @@
import com.persistit.exception.VolumeClosedException;
import com.persistit.util.Debug;
import com.persistit.util.Util;
+import java.io.*;
/**
* A pool of {@link Buffer} objects, maintained on various lists that permit
@@ -52,7 +52,7 @@
private final static long DEFAULT_WRITER_POLL_INTERVAL = 5000;
private final static int PAGE_WRITER_TRANCHE_SIZE = 5000;
-
+
/**
* Sleep time when buffers are exhausted
*/
@@ -196,11 +196,24 @@
private volatile long _writerPollInterval = DEFAULT_WRITER_POLL_INTERVAL;
private volatile int _pageWriterTrancheSize = PAGE_WRITER_TRANCHE_SIZE;
+
+ /**
+ * Polling interval for PageCacher
+ */
+ private volatile long _cacherPollInterval;
+
/**
* The PAGE_WRITER IOTaskRunnable
*/
private PageWriter _writer;
-
+
+ /**
+ * The PAGE_CACHER IOTaskRunnable
+ */
+ private PageCacher _cacher;
+
+ private String _defaultLogPath;
+
/**
* Construct a BufferPool with the specified count of Buffer
s
* of the specified size.
@@ -237,7 +250,7 @@
_hashTable = new Buffer[_bufferCount * HASH_MULTIPLE];
_hashLocks = new ReentrantLock[HASH_LOCKS];
_maxKeys = (_bufferSize - Buffer.HEADER_SIZE) / Buffer.MAX_KEY_RATIO;
-
+
for (int index = 0; index < HASH_LOCKS; index++) {
_hashLocks[index] = new ReentrantLock();
}
@@ -271,25 +284,59 @@
throw e;
}
_writer = new PageWriter();
-
- }
-
- void startThreads() {
+ _cacher = new PageCacher();
+ }
+
+ void warmupBufferPool(String pathName, String fname) throws PersistitException {
+ File file = new File(pathName, fname + ".log");
+ _defaultLogPath = file.getAbsolutePath();
+
+ try {
+ if (!file.exists()) {
+ file.createNewFile();
+ }
+
+ BufferedReader reader = new BufferedReader(new FileReader(file));
+ String currLine;
+ while ((currLine = reader.readLine()) != null) {
+ String[] info = currLine.split(" ");
+ if (info.length == 2) {
+ Volume vol = _persistit.getVolume(info[1]);
+ if (vol != null) {
+ long page = Long.parseLong(info[0]);
+ Buffer buff = get(vol, page, false, true);
+ buff.release();
+ }
+ }
+ }
+ reader.close();
+ _cacherPollInterval = _persistit.getConfiguration().getBufferInventoryPollingInterval();
+ _cacher.start();
+ }
+ catch (IOException e) {
+ throw new PersistitException(e);
+ }
+ }
+
+ void startThreads() throws PersistitException {
_writer.start();
}
void close() {
_closed.set(true);
_persistit.waitForIOTaskStop(_writer);
+ _persistit.waitForIOTaskStop(_cacher);
_writer = null;
+ _cacher = null;
}
/**
- * Abruptly stop (using {@link Thread#stop()}) the writer and collector
+ * Abruptly stop (using {@link Thread#stop()}) the writer, cacher, and collector
* threads. This method should be used only by tests.
*/
void crash() {
IOTaskRunnable.crash(_writer);
+ IOTaskRunnable.crash(_cacher);
}
void flush(final long timestamp) throws PersistitInterruptedException {
@@ -380,6 +427,35 @@
buffer.populateInfo(array[index]);
}
}
+
+ private void populateWarmupFile() throws PersistitException {
+ File file = new File(_defaultLogPath);
+
+ try {
+ BufferedWriter writer = new BufferedWriter(new FileWriter(file));
+ for (int i = 0; i < _buffers.length; ++i) {
+ Buffer b = _buffers[i];
+ if (b != null && b.isValid() && !b.isDirty()) {
+ long page = b.getPageAddress();
+ Volume volume = b.getVolume();
+ long page2 = b.getPageAddress();
+ Volume volume2 = b.getVolume();
+
+ // Check if buffer has changed while reading
+ if (page == page2 && volume == volume2 && volume != null) {
+ String addr = Long.toString(page);
+ String vol = volume.getName();
+ writer.append(addr + " " + vol);
+ writer.newLine();
+ writer.flush();
+ }
+ }
+ }
+ writer.close();
+ } catch (IOException e) {
+ throw new PersistitException(e);
+ }
+ }
private boolean selected(Buffer buffer, int includeMask, int excludeMask) {
return ((includeMask == 0) || (buffer.getStatus() & includeMask) != 0)
@@ -1315,6 +1391,35 @@
return isFlushing() ? 0 : _writerPollInterval;
}
}
+
+ /**
+ * Implementation of PAGE_CACHER thread
+ */
+ class PageCacher extends IOTaskRunnable {
+
+ PageCacher() {
+ super(BufferPool.this._persistit);
+ }
+
+ void start() {
+ start("PAGE_CACHER:" + _bufferSize, _cacherPollInterval);
+ }
+
+ @Override
+ public void runTask() throws Exception {
+ populateWarmupFile();
+ }
+
+ @Override
+ protected boolean shouldStop() {
+ return _closed.get() && !isFlushing();
+ }
+
+ @Override
+ protected long pollInterval() {
+ return isFlushing() ? 0 : _cacherPollInterval;
+ }
+ }
@Override
public String toString() {
=== modified file 'src/main/java/com/persistit/Configuration.java'
--- src/main/java/com/persistit/Configuration.java 2012-08-02 14:19:26 +0000
+++ src/main/java/com/persistit/Configuration.java 2012-08-07 21:14:18 +0000
@@ -264,15 +264,23 @@
* Property name for the "append only" property.
*/
public final static String APPEND_ONLY_PROPERTY = "appendonly";
-
+
/**
* Property name for the "ignore missing volumes" property.
*/
public final static String IGNORE_MISSING_VOLUMES_PROPERTY = "ignoremissingvolumes";
+
/**
* Property name to specify the default {@link SplitPolicy}.
*/
public final static String SPLIT_POLICY_PROPERTY_NAME = "splitpolicy";
+
+ /**
+ * Property name to specify the"buffer inventory" property name.
+ */
+ public final static String BUFFER_INVENTORY_PROPERTY_NAME = "bufferinventory";
+
+ public final static String BUFFER_POLLING_INTERVAL_PROPERTY = "bufferpollinginterval";
/**
* Property name to specify the default {@link JoinPolicy}.
@@ -624,6 +632,8 @@
private int rmiServerPort;
private boolean jmx = true;
private boolean appendOnly;
+ private String bufferInventoryPathName;
+ private long bufferInventoryPollInterval = 3000000; // default five minute polling
private boolean ignoreMissingVolumes;
private String tmpVolDir;
private int tmpVolPageSize;
@@ -702,6 +712,8 @@
}
void loadProperties() throws InvalidVolumeSpecificationException {
+ setBufferInventoryPathName(getProperty(BUFFER_INVENTORY_PROPERTY_NAME));
+ setBufferInventoryPollingInterval(getLongProperty(BUFFER_POLLING_INTERVAL_PROPERTY, bufferInventoryPollInterval));
setAppendOnly(getBooleanProperty(APPEND_ONLY_PROPERTY, false));
setCommitPolicy(getProperty(COMMIT_POLICY_PROPERTY_NAME));
setConstructorOverride(getBooleanProperty(CONSTRUCTOR_OVERRIDE_PROPERTY_NAME, false));
@@ -1802,6 +1814,59 @@
}
/**
+ * Return the path name defined by {@link #getBufferInventoryPathName}
+ * @return the path where file to warm-up Persistit with sample buffer data is stored
+ */
+ public String getBufferInventoryPathName() {
+ return bufferInventoryPathName;
+ }
+
+ /**
+ *
+ * Control where Persistit stores its buffer inventory. In this mode + * Persistit restarts with information from the last run. This method initializes + * the warm-up file at the specified location, if none is specified the buffer + * pool is not warmed up on start-up. + *
+ *
+ * Default value is null
+ * Property name is {@value #BUFFER_INVENTORY_PROPERTY_NAME}
+ *
+ * Control the number of seconds between each poll for the + * cache warm-up option in Persistit. + *
+ *
+ * Default value is 3000
+ * Property name is {@value #BUFFER_POLLING_INTERVAL_PROPERTY}
+ *