Merge lp:~pbeaman/akiban-persistit/tree_builder2 into lp:akiban-persistit

Proposed by Peter Beaman
Status: Merged
Approved by: Nathan Williams
Approved revision: no longer in the source branch.
Merged at revision: 408
Proposed branch: lp:~pbeaman/akiban-persistit/tree_builder2
Merge into: lp:akiban-persistit
Diff against target: 342 lines (+148/-28)
3 files modified
src/main/java/com/persistit/BufferPool.java (+13/-14)
src/main/java/com/persistit/TreeBuilder.java (+130/-14)
src/main/java/com/persistit/Volume.java (+5/-0)
To merge this branch: bzr merge lp:~pbeaman/akiban-persistit/tree_builder2
Reviewer Review Type Date Requested Status
Nathan Williams Approve
Review via email: mp+141794@code.launchpad.net

Description of the change

Fairly minor tweaks discovered and tested while running a prototype bulk load on a CAOI schema.

- added missing JavaDoc
- made a TreeBuilder reusable after a merge (for the tbPK case)
- separated _sortKeyCount and _mergedKeyCount and added accessors
- added missing condition on mustWrite in BufferPool#invalidateSmallVolume and added the write logic to BufferPool#invalidateLargeVolume so that the methods match
- added guard code in Volume#close to avoid executing the close logic twice

To post a comment you must log in.
Revision history for this message
Nathan Williams (nwilliams) wrote :

Looks good.

review: Approve
408. By Akiban Build User

merge pbeaman: Fairly minor tweaks discovered and tested while running a prototype bulk load on a CAOI schema.

https://code.launchpad.net/~pbeaman/akiban-persistit/tree_builder2/+merge/141794

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'src/main/java/com/persistit/BufferPool.java'
2--- src/main/java/com/persistit/BufferPool.java 2012-12-31 21:03:59 +0000
3+++ src/main/java/com/persistit/BufferPool.java 2013-01-03 17:32:52 +0000
4@@ -546,7 +546,7 @@
5 if (ratio < SMALL_VOLUME_RATIO) {
6 return invalidateSmallVolume(volume, false);
7 } else {
8- return invalidateLargeVolume(volume);
9+ return invalidateLargeVolume(volume, false);
10 }
11 }
12
13@@ -569,7 +569,7 @@
14 try {
15 if ((buffer.getVolume() == volume || volume == null) && !buffer.isFixed()
16 && buffer.isValid()) {
17- if (buffer.isDirty()) {
18+ if (mustWrite && buffer.isDirty()) {
19 buffer.writePage();
20 }
21 invalidate(buffer);
22@@ -602,7 +602,7 @@
23
24 }
25
26- boolean invalidateLargeVolume(final Volume volume) throws PersistitInterruptedException {
27+ boolean invalidateLargeVolume(final Volume volume, final boolean mustWrite) throws PersistitException {
28 boolean result = true;
29 int markedAvailable = 0;
30 for (int index = 0; index < _bufferCount; index++) {
31@@ -613,6 +613,9 @@
32 boolean invalidated = false;
33 try {
34 if ((buffer.getVolume() == volume || volume == null) && !buffer.isFixed() && buffer.isValid()) {
35+ if (mustWrite && buffer.isDirty()) {
36+ buffer.writePage();
37+ }
38 invalidate(buffer);
39 invalidated = true;
40 }
41@@ -642,17 +645,13 @@
42 Debug.$assert0.t(buffer.isValid() && buffer.isOwnedAsWriterByMe());
43
44 while (!detach(buffer)) {
45- //
46- // Spin until detach succeeds. Note: this method must not throw an
47- // Exception
48- // because it is called in at at critical time when cleanup must be
49- // done.
50- // It is not possible to lock the hash bucket here due to possible
51- // deadlock.
52- // However, the likelihood of a lengthy live-lock is infinitesimal
53- // so polling
54- // is acceptable.
55- //
56+ /*
57+ * Spin until detach succeeds. Note: this method must not throw an
58+ * Exception because it is called in at at critical time when
59+ * cleanup must be done. It is not possible to lock the hash bucket
60+ * here due to possible deadlock. However, the likelihood of a
61+ * lengthy live-lock is infinitesimal so polling is acceptable.
62+ */
63 try {
64 Thread.sleep(1);
65 } catch (final InterruptedException ie) {
66
67=== modified file 'src/main/java/com/persistit/TreeBuilder.java'
68--- src/main/java/com/persistit/TreeBuilder.java 2012-12-31 21:03:59 +0000
69+++ src/main/java/com/persistit/TreeBuilder.java 2013-01-03 17:32:52 +0000
70@@ -16,6 +16,7 @@
71 package com.persistit;
72
73 import java.io.File;
74+import java.io.IOException;
75 import java.util.ArrayList;
76 import java.util.Collections;
77 import java.util.Comparator;
78@@ -145,8 +146,11 @@
79 private final List<Volume> _sortVolumes = new ArrayList<Volume>();
80 private final int _pageSize;
81 private final int _pageLimit;
82- private final AtomicLong _keyCount = new AtomicLong();
83+ private final AtomicLong _sortedKeyCount = new AtomicLong();
84+ private final AtomicLong _mergedKeyCount = new AtomicLong();
85 private volatile long _reportKeyCountMultiple = REPORT_REPORT_MULTIPLE;
86+ private Volume _currentSortVolume;
87+ private int _nextDirectoryIndex;
88
89 private final Set<Tree> _allTrees = new HashSet<Tree>();
90 private final List<Tree> _sortedTrees = new ArrayList<Tree>();
91@@ -158,8 +162,6 @@
92 }
93 };
94
95- private Volume _currentSortVolume;
96- private int _nextDirectoryIndex;
97
98 private final Comparator<Tree> _defaultTreeComparator = new Comparator<Tree>() {
99 @Override
100@@ -273,22 +275,58 @@
101 return pageSize;
102 }
103
104+ /**
105+ * @return Name provide when TreeBuilder was constructed. Default name is
106+ * "TreeBuilder".
107+ */
108 public final String getName() {
109 return _name;
110 }
111
112+ /**
113+ * Set the count of keys inserted or merged per call to
114+ * {@link #reportSorted(long)} or {@link #reportMerged(long)}.
115+ *
116+ * @param multiple
117+ */
118 public final void setReportKeyCountMultiple(final long multiple) {
119 _reportKeyCountMultiple = Util.rangeCheck(multiple, 1, Long.MAX_VALUE);
120 }
121
122+ /**
123+ *
124+ * @return Count of keys inserted or merged per call to
125+ * {@link #reportSorted(long)} or {@link #reportMerged(long)}
126+ */
127 public final long getReportKeyCountMultiple() {
128 return _reportKeyCountMultiple;
129 }
130
131+ /**
132+ * @return Count of sort volumes that have been created while sorting keys
133+ */
134 public final synchronized int getSortVolumeCount() {
135 return _sortVolumes.size();
136 }
137
138+ /**
139+ * @return Number of keys stored in sort trees
140+ */
141+ public long getSortedKeyCount() {
142+ return _sortedKeyCount.get();
143+ }
144+
145+ /**
146+ * @return Number of keys merged into destination trees
147+ */
148+ public long getMergedKeyCount() {
149+ return _mergedKeyCount.get();
150+ }
151+
152+ /**
153+ * @return List of destination
154+ * <code>Tree<code> instances. This list is built as keys are stored.
155+ */
156 public final List<Tree> getTrees() {
157 final List<Tree> list = new ArrayList<Tree>();
158 list.addAll(_allTrees);
159@@ -296,7 +334,36 @@
160 return list;
161 }
162
163- public final void setSortTreeDirectories(final List<File> directories) throws Exception {
164+ /**
165+ * <p>
166+ * Define a list of directories in which sort volumes will be created. This
167+ * method can be used to override the default value provided by
168+ * {@link Configuration#getTmpVolDir()} to control more closely where sort
169+ * trees will be stored. If the list is empty then the directory defined by
170+ * the <code>Configuration</code> will be used. If multiple directories are
171+ * declared then volumes will be allocated to them in round-robin fashion.
172+ * This technique can distribute large load sets over multiple volumes and
173+ * can allow for interleaved disk reads during the merge process.
174+ * </p>
175+ * <p>
176+ * If a <code>File</code> supplied to this method does not exist, an attempt
177+ * is made to create it as a directory. This method also attempts to create
178+ * and delete a file in each supplied directory to ensure that if there is a
179+ * file permission or other problem, it is detected immediately, rather than
180+ * much later during the sort process.
181+ * </p>
182+ *
183+ * @param directories
184+ * List of <code>File</code> instances, each of which must be a
185+ * directory
186+ * @throws IllegalArgumentException
187+ * if a supplied file exists and is not a directory or cannot be
188+ * created as a new directory
189+ * @throws IOException
190+ * if an attempt to create a file in one of the supplied
191+ * directories fails
192+ */
193+ public final void setSortTreeDirectories(final List<File> directories) throws IOException {
194 if (directories == null || directories.isEmpty()) {
195 synchronized (this) {
196 _directories.clear();
197@@ -333,14 +400,39 @@
198 }
199 }
200
201+ /**
202+ *
203+ * @return List of directories set via the
204+ * {@link #setSortTreeDirectories(List)} method.
205+ */
206 public final List<File> getSortTreeDirectories() {
207 return Collections.unmodifiableList(_directories);
208 }
209
210+ /**
211+ * Store a key-value pair into a sort tree. The {@link Tree}, {@link Key}
212+ * and {@link Value} are specified by the supplied {@link Exchange}.
213+ *
214+ * @param exchange
215+ * The Exchange
216+ * @throws Exception
217+ */
218 public final void store(final Exchange exchange) throws Exception {
219 store(exchange.getTree(), exchange.getKey(), exchange.getValue());
220 }
221
222+ /**
223+ * Store a key-value pair for a specified <code>Tree</code> into a sort
224+ * tree.
225+ *
226+ * @param tree
227+ * the Tree
228+ * @param key
229+ * the Key
230+ * @param value
231+ * the Value
232+ * @throws Exception
233+ */
234 public final void store(final Tree tree, final Key key, final Value value) throws Exception {
235 final Map<Tree, Exchange> map = _sortExchangeMapThreadLocal.get();
236 Exchange ex = map.get(tree);
237@@ -365,7 +457,7 @@
238 }
239 }
240 if (stored) {
241- final long count = _keyCount.incrementAndGet();
242+ final long count = _sortedKeyCount.incrementAndGet();
243 if ((count % _reportKeyCountMultiple) == 0) {
244 reportSorted(count);
245 }
246@@ -394,10 +486,9 @@
247 * @throws Exception
248 */
249 public synchronized void merge() throws Exception {
250- if ((_keyCount.get() % _reportKeyCountMultiple) != 0) {
251- reportSorted(_keyCount.get());
252+ if ((_mergedKeyCount.get() % _reportKeyCountMultiple) != 0) {
253+ reportSorted(_mergedKeyCount.get());
254 }
255- _keyCount.set(0);
256 _sortedTrees.clear();
257 _sortedTrees.addAll(_allTrees);
258 Tree currentTree = null;
259@@ -435,8 +526,8 @@
260 }
261 if (stored) {
262 afterMergeKey(ex);
263- if ((_keyCount.incrementAndGet() % _reportKeyCountMultiple) == 0) {
264- reportMerged(_keyCount.get());
265+ if ((_mergedKeyCount.incrementAndGet() % _reportKeyCountMultiple) == 0) {
266+ reportMerged(_mergedKeyCount.get());
267 }
268 }
269 }
270@@ -449,13 +540,38 @@
271 node = next;
272 }
273 }
274- if ((_keyCount.get() % _reportKeyCountMultiple) != 0) {
275- reportMerged(_keyCount.get());
276- }
277- _keyCount.set(0);
278+ if ((_mergedKeyCount.get() % _reportKeyCountMultiple) != 0) {
279+ reportMerged(_mergedKeyCount.get());
280+ }
281+ reset();
282+ }
283+
284+ private synchronized void reset() throws Exception {
285+ Exception exception = null;
286+ for (final Volume volume : _sortVolumes) {
287+ try {
288+ volume.close();
289+ } catch (PersistitException e) {
290+ if (exception == null) {
291+ exception = e;
292+ }
293+ }
294+ }
295+ _sortVolumes.clear();
296+ _currentSortVolume = null;
297+ _nextDirectoryIndex = 0;
298 _sortExchangeMapThreadLocal.get().clear();
299 _allTrees.clear();
300 _sortedTrees.clear();
301+ if (exception != null) {
302+ throw exception;
303+ }
304+ }
305+
306+ public void clear() throws Exception {
307+ _sortedKeyCount.set(0);
308+ _mergedKeyCount.set(0);
309+ reset();
310 }
311
312 private synchronized Volume getSortVolume() throws Exception {
313
314=== modified file 'src/main/java/com/persistit/Volume.java'
315--- src/main/java/com/persistit/Volume.java 2012-12-31 21:03:59 +0000
316+++ src/main/java/com/persistit/Volume.java 2013-01-03 17:32:52 +0000
317@@ -51,6 +51,7 @@
318 private final String _name;
319 private long _id;
320 private final AtomicBoolean _closing = new AtomicBoolean();
321+ private final AtomicBoolean _closed = new AtomicBoolean();
322 private final AtomicInteger _handle = new AtomicInteger();
323 private final AtomicReference<Object> _appCache = new AtomicReference<Object>();
324
325@@ -198,6 +199,9 @@
326 if (!storage.claim(true, timeout)) {
327 throw new InUseException("Unable to acquire claim on " + this);
328 }
329+ if (_closed.get()) {
330+ break;
331+ }
332 try {
333 //
334 // BufferPool#invalidate may fail and return false if other
335@@ -208,6 +212,7 @@
336 getStructure().close();
337 getStorage().close();
338 getStatistics().reset();
339+ _closed.set(true);
340 break;
341 }
342 } finally {

Subscribers

People subscribed via source and target branches