Merge lp:~tjoneslo/akiban-server/file-merge-sort into lp:~akiban-technologies/akiban-server/trunk
- file-merge-sort
- Merge into trunk
Status: | Merged |
---|---|
Approved by: | Nathan Williams |
Approved revision: | 2708 |
Merged at revision: | 2722 |
Proposed branch: | lp:~tjoneslo/akiban-server/file-merge-sort |
Merge into: | lp:~akiban-technologies/akiban-server/trunk |
Diff against target: |
1212 lines (+1075/-1) 13 files modified
pom.xml (+5/-0) src/main/java/com/akiban/qp/operator/SimpleQueryContext.java (+4/-0) src/main/java/com/akiban/qp/persistitadapter/PersistitAdapter.java (+1/-0) src/main/java/com/akiban/qp/persistitadapter/indexcursor/MergeJoinSorter.java (+621/-0) src/main/java/com/akiban/server/error/ErrorCode.java (+1/-0) src/main/java/com/akiban/server/error/MergeSortIOException.java (+27/-0) src/main/java/com/akiban/server/service/tree/TreeServiceImpl.java (+10/-0) src/main/resources/com/akiban/server/error/error_code.properties (+2/-0) src/main/resources/com/akiban/server/service/config/configuration-defaults.properties (+3/-0) src/test/java/com/akiban/qp/persistitadapter/indexcursor/KeyFinalCursorIT.java (+170/-0) src/test/java/com/akiban/qp/persistitadapter/indexcursor/KeyReaderWriterTest.java (+182/-0) src/test/java/com/akiban/server/test/it/sort/MemorySorterIT.java (+0/-1) src/test/java/com/akiban/server/test/it/sort/MergeJoinSorterIT.java (+49/-0) |
To merge this branch: | bzr merge lp:~tjoneslo/akiban-server/file-merge-sort |
Related bugs: |
Reviewer | Review Type | Date Requested | Status |
---|---|---|---|
Nathan Williams | Approve | ||
Thomas Jones-Low | Needs Resubmitting | ||
Review via email: mp+176781@code.launchpad.net |
Commit message
Description of the change
Add the FileMergeSort to the akiban trunk.
This relies upon a library called java-merge-sort, done by the same people that did the Jackson JSON libaray already in use.
All of the code is in the new MergeJoinSorter and consists of class to translate between the AkibanServer way of doing things and what the java merge sort expected.
SortKey -> the single class that holds the KeyState for comparison and the single Key for the whole row.
KeyReadCursor -> Reads rows from the input cursor, converts them to SortKeys, and passes them to the java merge sort for processing. This is the external input.
KeyFinalCursor -> reads from the final sorted output and return rows through the cursor interface for next operator in line. This is the external output.
KeyReader/KeyWriter -> used by the java merge sort library to read/write the intermediate blocks of SortKeys.
KeySortCompare -> used by the java merge sort library to compare the SortKeys to put them into the correct order.
Internally the java merge sort library reads input (KeyReadCursor) and sorts it (KeySortCompare) into a buffer (40MB). If size of the input exceeds the buffer size, the (full) buffer is written to disk (KeyWriter) and the process repeated with a new buffer. When all of the input has been read, the java merge sort library does a 16 way merge (KeyReader) to create the final output. This final output is written (KeyFinalCursor) into the row stream.
Also two complete new tests, one for the KeyReader/Writer pair, the other for the KeyReadCursor and KeyFinalCursor to make sure the process doesn't magle rows on the way through. And Extend the SortIT test to verify the sorting works as expected.
Thomas Jones-Low (tjoneslo) wrote : | # |
Nathan Williams (nwilliams) wrote : | # |
Nice! This looks pretty good overall. A handful of small issues below.
The exceptions on lines 166 and 192 probably needs bubbled up.
Line 291, 306 and 319 look suspicious. InputStream returns -1 at EOF, whereas this is checking read < size. Maybe an assert instead?
Line 491, a new config instead of persistit.tmpvoldir might be nice. Or perhaps an akserver.tmp_dir, which TreeService can then assign to tmpvoldir if otherwise unset.
Did you investigate if the fasterxml.Sorter closes all of the temp files from the provider on finish? On error? If not, I wonder if we need to hang onto them all and ensure they are in close().
Line 496, could SimplQueryContext look at adapter.
Line 550, is that TODO relevant or just stale?
Line 576, I think we just have to eat that one. Maybe log for... good measure?
TestKeyReaderWr
Does surefire actually pick up TestKeyReaderWr
Maybe a new configuration that get shimmed into SortConfig? I can imagine wanting to set that pretty quickly.
- 2707. By tjoneslo
-
Address merge comments.
Add new error code for returning problems on IO errors
Clean up other error handling.
Thomas Jones-Low (tjoneslo) wrote : | # |
> Nice! This looks pretty good overall. A handful of small issues below.
>
> The exceptions on lines 166 and 192 probably needs bubbled up.
>
Done
> Line 291, 306 and 319 look suspicious. InputStream returns -1 at EOF, whereas
> this is checking read < size. Maybe an assert instead?
>
Converted to an assert.
> Line 491, a new config instead of persistit.tmpvoldir might be nice. Or
> perhaps an akserver.tmp_dir, which TreeService can then assign to tmpvoldir if
> otherwise unset.
>
tmpvoldir is a persistit specific configuration item, the only way to alter the manner in which it gets set is to make changes to Persistit.
Added the akserver.tmp_dir variable and use that instead.
> Did you investigate if the fasterxml.Sorter closes all of the temp files from
> the provider on finish? On error? If not, I wonder if we need to hang onto
> them all and ensure they are in close().
>
The initial read into the temp files does so correctly. The merge stage does not, and I've filed a bug in the Merge-sort github. https:/
> Line 496, could SimplQueryContext look at adapter.
> instead of throwing? Or is there no Session either?
>
Fixed, removed the try/catch in MergeJoinSorter too. The adapter session sessionIDs and the context sessionIDs are different creatures (the former is from SessionService, the latter are from the MontiorService) but for these purposes they can be the same.
> Line 550, is that TODO relevant or just stale?
>
It's relevant. I still don't have an answer for that.
> Line 576, I think we just have to eat that one. Maybe log for... good measure?
>
> TestKeyReaderWr
> nervous. A non-deterministic IT is asking for headaches.
>
Fixed.
> Does surefire actually pick up TestKeyReaderWr
> looks for *Test.java.
>
Renamed to KeyReaderWriterTest
> Maybe a new configuration that get shimmed into SortConfig? I can imagine
> wanting to set that pretty quickly.
The only other configuration item is the size of the sort buffer. Added a akserver.
Nathan Williams (nwilliams) wrote : | # |
> tmpvoldir is a persistit specific configuration item,
> the only way to alter the manner in which it gets set
> is to make changes to Persistit.
Sorry, I wasn't clear. TreeServiceImpl
>> Line 550, is that TODO relevant or just stale?
> It's relevant. I still don't have an answer for that.
Now 576. I think it needs to throw an exception instead of returning null. Null indicates end of stream and query would succeed, whereas an IOException may mean one of the sort files couldn't be read (for example).
The Random in the test is still static, which makes it sensitive to run order. Could we make it not static for the unlikely-
- 2708. By tjoneslo
-
Update TreeServiceImpl to use new tmp_dir for persistit tmpvoldir to have only one config item
Fix missing exception in MergeJoinSorter
Fix the reused static random in the Key tests.
Thomas Jones-Low (tjoneslo) wrote : | # |
address review comments
- Update TreeServiceImpl processing to use akserver.tmp_dir into persisitit.
- Throw exception from MergeJoinSort
- Remove/fix the static random() used in two places that may cause problems.
Nathan Williams (nwilliams) wrote : | # |
Excellent. Thanks for the tweaks.
Preview Diff
1 | === modified file 'pom.xml' |
2 | --- pom.xml 2013-06-14 11:23:11 +0000 |
3 | +++ pom.xml 2013-07-29 16:50:42 +0000 |
4 | @@ -206,6 +206,11 @@ |
5 | <version>4.0.0</version> |
6 | </dependency> |
7 | <dependency> |
8 | + <groupId>com.fasterxml.util</groupId> |
9 | + <artifactId>java-merge-sort</artifactId> |
10 | + <version>0.7.1</version> |
11 | + </dependency> |
12 | + <dependency> |
13 | <groupId>org.apache.httpcomponents</groupId> |
14 | <artifactId>httpclient</artifactId> |
15 | <version>4.2.3</version> |
16 | |
17 | === modified file 'src/main/java/com/akiban/qp/operator/SimpleQueryContext.java' |
18 | --- src/main/java/com/akiban/qp/operator/SimpleQueryContext.java 2013-03-22 20:05:57 +0000 |
19 | +++ src/main/java/com/akiban/qp/operator/SimpleQueryContext.java 2013-07-29 16:50:42 +0000 |
20 | @@ -77,6 +77,10 @@ |
21 | |
22 | @Override |
23 | public int getSessionId() { |
24 | + if (adapter != null) { |
25 | + return (int)adapter.getSession().sessionId(); |
26 | + } |
27 | + |
28 | throw new UnsupportedOperationException(); |
29 | } |
30 | |
31 | |
32 | === modified file 'src/main/java/com/akiban/qp/persistitadapter/PersistitAdapter.java' |
33 | --- src/main/java/com/akiban/qp/persistitadapter/PersistitAdapter.java 2013-07-18 18:28:04 +0000 |
34 | +++ src/main/java/com/akiban/qp/persistitadapter/PersistitAdapter.java 2013-07-29 16:50:42 +0000 |
35 | @@ -93,6 +93,7 @@ |
36 | InOutTap loadTap) |
37 | { |
38 | return new PersistitSorter(context, bindings, input, rowType, ordering, sortOption, loadTap); |
39 | + //return new MergeJoinSorter(context, bindings, input, rowType, ordering, sortOption, loadTap); |
40 | } |
41 | |
42 | @Override |
43 | |
44 | === added file 'src/main/java/com/akiban/qp/persistitadapter/indexcursor/MergeJoinSorter.java' |
45 | --- src/main/java/com/akiban/qp/persistitadapter/indexcursor/MergeJoinSorter.java 1970-01-01 00:00:00 +0000 |
46 | +++ src/main/java/com/akiban/qp/persistitadapter/indexcursor/MergeJoinSorter.java 2013-07-29 16:50:42 +0000 |
47 | @@ -0,0 +1,621 @@ |
48 | +/** |
49 | + * Copyright (C) 2009-2013 Akiban Technologies, Inc. |
50 | + * |
51 | + * This program is free software: you can redistribute it and/or modify |
52 | + * it under the terms of the GNU Affero General Public License as published by |
53 | + * the Free Software Foundation, either version 3 of the License, or |
54 | + * (at your option) any later version. |
55 | + * |
56 | + * This program is distributed in the hope that it will be useful, |
57 | + * but WITHOUT ANY WARRANTY; without even the implied warranty of |
58 | + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
59 | + * GNU Affero General Public License for more details. |
60 | + * |
61 | + * You should have received a copy of the GNU Affero General Public License |
62 | + * along with this program. If not, see <http://www.gnu.org/licenses/>. |
63 | + */ |
64 | +package com.akiban.qp.persistitadapter.indexcursor; |
65 | + |
66 | +import java.io.File; |
67 | +import java.io.FileInputStream; |
68 | +import java.io.FileNotFoundException; |
69 | +import java.io.FileOutputStream; |
70 | +import java.io.IOException; |
71 | +import java.io.InputStream; |
72 | +import java.io.OutputStream; |
73 | +import java.nio.ByteBuffer; |
74 | +import java.util.ArrayList; |
75 | +import java.util.Arrays; |
76 | +import java.util.Comparator; |
77 | +import java.util.List; |
78 | + |
79 | +import com.akiban.qp.operator.API; |
80 | +import com.akiban.qp.operator.API.Ordering; |
81 | +import com.akiban.qp.operator.CursorLifecycle; |
82 | +import com.akiban.qp.operator.QueryBindings; |
83 | +import com.akiban.qp.operator.QueryContext; |
84 | +import com.akiban.qp.operator.RowCursor; |
85 | +import com.akiban.qp.persistitadapter.Sorter; |
86 | +import com.akiban.qp.row.Row; |
87 | +import com.akiban.qp.row.ValuesHolderRow; |
88 | +import com.akiban.qp.rowtype.RowType; |
89 | +import com.akiban.server.PersistitKeyPValueSource; |
90 | +import com.akiban.server.PersistitKeyPValueTarget; |
91 | +import com.akiban.server.api.dml.ColumnSelector; |
92 | +import com.akiban.server.error.MergeSortIOException; |
93 | +import com.akiban.server.types3.TInstance; |
94 | +import com.akiban.server.types3.pvalue.PValueSource; |
95 | +import com.akiban.server.types3.pvalue.PValueTargets; |
96 | +import com.akiban.util.tap.InOutTap; |
97 | +import com.persistit.Key; |
98 | +import com.persistit.KeyState; |
99 | +import com.persistit.Persistit; |
100 | +import com.persistit.exception.KeyTooLongException; |
101 | + |
102 | +import com.fasterxml.sort.DataReader; |
103 | +import com.fasterxml.sort.DataReaderFactory; |
104 | +import com.fasterxml.sort.DataWriter; |
105 | +import com.fasterxml.sort.DataWriterFactory; |
106 | +import com.fasterxml.sort.SortConfig; |
107 | +import com.fasterxml.sort.TempFileProvider; |
108 | + |
109 | +/** |
110 | + * <h1>Overview</h1> |
111 | + * |
112 | + * Sort rows by inserting them in sorted order into a memory buffer, then into on-disk files, and performing |
113 | + * a multiway merge sort on the resulting files. |
114 | + * |
115 | + * <h1>Behavior</h1> |
116 | + * |
117 | + * The rows of the input stream are written into a memory pool (40MB as a default). When (if) the memory pool is filled, |
118 | + * the pool is written to disk, then emptied and rows are written again to the memory pool. When the final row is written |
119 | + * to the disk files are merged in an n-way merge sort. The default is 16 way sort. When all of the temp files are merged |
120 | + * into one file, the rows are read from the file in sorted order. |
121 | + * |
122 | + * If the initial input stream does not produce enough data to overflow the memory pool, no disk files will be produced. |
123 | + * |
124 | + * <h1>Performance</h1> |
125 | + * |
126 | + * The MergeJoinSorter generates IO dependent upon the size of the input stream. |
127 | + * If the input stream generates less than the memory pool size (40MB default), there is no IO generated. |
128 | + * If the input stream generates more than the memory pool size, but less than 16x the pool size, it should |
129 | + * generate two read and two writes for each row. One write to, one read from into the initial temporary file, |
130 | + * one write to, one read from the final sorted temporary file. For each 16x larger the input set gets it adds |
131 | + * one more write to/read from temporary file cycle. |
132 | + * |
133 | + * <h1>Memory Requirements</h1> |
134 | + * |
135 | + * The MergeJoinSorter allocates a single memory buffer for each instance to perform an initial sort, |
136 | + * defaulting to 40MB in size. |
137 | +*/ |
138 | + |
139 | +public class MergeJoinSorter implements Sorter { |
140 | + |
141 | + private QueryContext context; |
142 | + private QueryBindings bindings; |
143 | + private RowCursor input; |
144 | + private RowType rowType; |
145 | + private Ordering ordering; |
146 | + private InOutTap loadTap; |
147 | + |
148 | + private File finalFile; |
149 | + |
150 | + private final SorterAdapter<?, ?, ?> sorterAdapter; |
151 | + private final List<Integer> orderChanges; |
152 | + private Key sortKey; |
153 | + private Comparator<SortKey> compare; |
154 | + |
155 | + public MergeJoinSorter (QueryContext context, |
156 | + QueryBindings bindings, |
157 | + RowCursor input, |
158 | + RowType rowType, |
159 | + API.Ordering ordering, |
160 | + API.SortOption sortOption, |
161 | + InOutTap loadTap) |
162 | + { |
163 | + this.context = context; |
164 | + this.bindings = bindings; |
165 | + this.input = input; |
166 | + this.rowType = rowType; |
167 | + this.ordering = ordering.copy(); |
168 | + this.loadTap = loadTap; |
169 | + |
170 | + this.sortKey = new Key ((Persistit)null); |
171 | + this.sorterAdapter = new PValueSorterAdapter(); |
172 | + // Note: init may change this.ordering |
173 | + sorterAdapter.init(rowType, this.ordering, this.sortKey, null, this.context, this.bindings, sortOption); |
174 | + // Explicitly use input ordering to avoid appended field |
175 | + this.orderChanges = new ArrayList<>(); |
176 | + List<Comparator<KeyState>> comparators = new ArrayList<>(); |
177 | + for(int i = 0; i < ordering.sortColumns(); ++i) { |
178 | + Comparator<KeyState> c = ordering.ascending(i) ? ASC_COMPARATOR : DESC_COMPARATOR; |
179 | + if(i == 0 || ordering.ascending(i-1) != ordering.ascending(i)) { |
180 | + orderChanges.add(i); |
181 | + comparators.add(c); |
182 | + } |
183 | + } |
184 | + this.orderChanges.add(ordering.sortColumns()); |
185 | + this.compare = new KeySortCompare(comparators); |
186 | + |
187 | + } |
188 | + |
189 | + @Override |
190 | + public RowCursor sort() { |
191 | + try { |
192 | + loadTree(); |
193 | + } catch (IOException e) { |
194 | + throw new MergeSortIOException(e); |
195 | + } |
196 | + return cursor(); |
197 | + } |
198 | + |
199 | + @Override |
200 | + public void close() { |
201 | + } |
202 | + |
203 | + private void loadTree() throws FileNotFoundException, IOException { |
204 | + MergeTempFileProvider tmpFileProvider = new MergeTempFileProvider(context); |
205 | + finalFile = tmpFileProvider.provide(); |
206 | + |
207 | + com.fasterxml.sort.Sorter<SortKey> s = new com.fasterxml.sort.Sorter<SortKey> ( |
208 | + getSortConfig(tmpFileProvider), |
209 | + new KeyReaderFactory(), |
210 | + new KeyWriterFactory(), |
211 | + compare); |
212 | + s.sort(new KeyReadCursor(), new KeyWriter(new FileOutputStream(finalFile))); |
213 | + } |
214 | + |
215 | + private RowCursor cursor() { |
216 | + KeyFinalCursor cursor = null; |
217 | + try { |
218 | + cursor = new KeyFinalCursor (finalFile, rowType); |
219 | + } catch (FileNotFoundException e) { |
220 | + throw new MergeSortIOException(e); |
221 | + } |
222 | + return cursor; |
223 | + } |
224 | + |
225 | + public KeyReadCursor readCursor() { |
226 | + return new KeyReadCursor(); |
227 | + } |
228 | + |
229 | + private SortConfig getSortConfig (MergeTempFileProvider tmpFileProvider) { |
230 | + long maxMemory = Long.parseLong(context.getServiceManager().getConfigurationService().getProperty("akserver.sort.memory")); |
231 | + return new SortConfig().withTempFileProvider(tmpFileProvider).withMaxMemoryUsage(maxMemory); |
232 | + } |
233 | + /* |
234 | + * Base class for reading/writing bytes - |
235 | + * KeyState[] is list of key segments broken by ASC/DESC ordering |
236 | + * rowKey is the whole, unaltered row of data. |
237 | + */ |
238 | + public static class SortKey { |
239 | + public List<KeyState> sortKeys; |
240 | + public Key rowKey; |
241 | + |
242 | + public SortKey () { |
243 | + this.sortKeys = new ArrayList<>(); |
244 | + this.rowKey = new Key ((Persistit)null); |
245 | + rowKey.clear(); |
246 | + } |
247 | + |
248 | + public SortKey (List<KeyState> sortKeys, Key rowKey) { |
249 | + this.sortKeys = sortKeys; |
250 | + this.rowKey = rowKey; |
251 | + } |
252 | + |
253 | + // Sorter uses size of elements to determine when the |
254 | + // presort buffer is full. |
255 | + public int getSize() { |
256 | + int size = 0; |
257 | + for (KeyState state : sortKeys) { |
258 | + size += state.getBytes().length + 4; |
259 | + size += 4; |
260 | + } |
261 | + size += rowKey.getEncodedSize() + 4; |
262 | + return size; |
263 | + } |
264 | + } |
265 | + |
266 | + private class KeyReaderFactory extends DataReaderFactory<SortKey> { |
267 | + |
268 | + @Override |
269 | + public DataReader<SortKey> constructReader(InputStream arg0) |
270 | + throws IOException { |
271 | + return new KeyReader(arg0); |
272 | + } |
273 | + |
274 | + } |
275 | + |
276 | + public static class KeyReader extends DataReader<SortKey> { |
277 | + |
278 | + private InputStream is; |
279 | + private ByteBuffer length; |
280 | + public KeyReader (InputStream is) { |
281 | + this.is = is; |
282 | + length = ByteBuffer.allocate(4); |
283 | + } |
284 | + |
285 | + @Override |
286 | + public void close() throws IOException { |
287 | + is.close(); |
288 | + } |
289 | + |
290 | + @Override |
291 | + public int estimateSizeInBytes(SortKey arg0) { |
292 | + return arg0.getSize(); |
293 | + } |
294 | + |
295 | + @Override |
296 | + public SortKey readNext() throws IOException { |
297 | + |
298 | + SortKey key = new SortKey(); |
299 | + int states = readLength(); |
300 | + if (states < 0) { |
301 | + return null; |
302 | + } |
303 | + for (int i = 0; i < states; i++) { |
304 | + KeyState state = readKeyState(); |
305 | + if (state == null) { |
306 | + return null; |
307 | + } |
308 | + key.sortKeys.add(state); |
309 | + } |
310 | + |
311 | + key.rowKey = readKey(); |
312 | + |
313 | + return key; |
314 | + } |
315 | + |
316 | + private KeyState readKeyState() throws IOException { |
317 | + int size = readLength(); |
318 | + if (size < 1) { |
319 | + return null; |
320 | + } |
321 | + byte[] bytes = new byte[size]; |
322 | + int bytesRead = is.read(bytes); |
323 | + |
324 | + assert bytesRead == size: "Invalid byte count on key state read"; |
325 | + |
326 | + return new KeyState(bytes); |
327 | + } |
328 | + |
329 | + private Key readKey() throws IOException{ |
330 | + int size = readLength(); |
331 | + if (size < 1) { |
332 | + return null; |
333 | + } |
334 | + Key key = new Key ((Persistit)null); |
335 | + key.setMaximumSize(size); |
336 | + int bytesRead = is.read(key.getEncodedBytes(), 0, size); |
337 | + |
338 | + assert bytesRead == size : "Invalid byte count on key read"; |
339 | + |
340 | + key.setEncodedSize(size); |
341 | + return key; |
342 | + } |
343 | + |
344 | + private int readLength() throws IOException { |
345 | + length.clear(); |
346 | + int bytesRead = is.read(length.array()); |
347 | + if (bytesRead == -1) { // EOF marker |
348 | + return -1; |
349 | + } |
350 | + assert bytesRead == 4 : "Invalid byte count on length read"; |
351 | + return length.getInt(); |
352 | + } |
353 | + } |
354 | + |
355 | + /* |
356 | + * Class to read rows from the input cursor to the Sort, |
357 | + * converting them to SortKey elements for the Sorter. |
358 | + */ |
359 | + public class KeyReadCursor extends DataReader<SortKey> { |
360 | + |
361 | + private int rowCount = 0; |
362 | + private Key convertKey; |
363 | + private int rowFields; |
364 | + private TInstance tFieldTypes[]; |
365 | + private PersistitKeyPValueTarget valueTarget; |
366 | + |
367 | + public KeyReadCursor () { |
368 | + this.rowFields = rowType.nFields(); |
369 | + this.convertKey = new Key ((Persistit)null); |
370 | + this.tFieldTypes = new TInstance[rowFields]; |
371 | + for (int i = 0; i < rowFields; i++) { |
372 | + tFieldTypes[i] = rowType.typeInstanceAt(i); |
373 | + } |
374 | + valueTarget = new PersistitKeyPValueTarget(); |
375 | + valueTarget.attach(convertKey); |
376 | + } |
377 | + |
378 | + @Override |
379 | + public void close() throws IOException { |
380 | + // Do Nothing; |
381 | + } |
382 | + |
383 | + @Override |
384 | + public int estimateSizeInBytes(SortKey arg0) { |
385 | + return arg0.getSize(); |
386 | + } |
387 | + |
388 | + @Override |
389 | + public SortKey readNext() throws IOException { |
390 | + SortKey sortKey = null; |
391 | + loadTap.in(); |
392 | + try { |
393 | + Row row = input.next(); |
394 | + context.checkQueryCancelation(); |
395 | + |
396 | + if (row != null) { |
397 | + ++rowCount; |
398 | + sortKey = new SortKey (createKey(row, rowCount), createValue(row)); |
399 | + } |
400 | + } finally { |
401 | + loadTap.out(); |
402 | + } |
403 | + return sortKey; |
404 | + } |
405 | + |
406 | + private List<KeyState> createKey(Row row, int rowCount) { |
407 | + KeyState[] states = new KeyState[orderChanges.size() - 1]; |
408 | + for(int i = 0; i < states.length; ++i) { |
409 | + int startOffset = orderChanges.get(i); |
410 | + int endOffset = orderChanges.get(i + 1); |
411 | + boolean isLast = i == states.length - 1; |
412 | + // Loop for key growth |
413 | + while(true) { |
414 | + try { |
415 | + sortKey.clear(); |
416 | + for(int j = startOffset; j < endOffset; ++j) { |
417 | + sorterAdapter.evaluateToKey(row, j); |
418 | + } |
419 | + if(isLast && sorterAdapter.preserveDuplicates()) { |
420 | + sortKey.append(rowCount); |
421 | + } |
422 | + break; |
423 | + } catch (KeyTooLongException e) { |
424 | + enlargeKey(sortKey); |
425 | + } |
426 | + } |
427 | + states[i] = new KeyState(sortKey); |
428 | + } |
429 | + return Arrays.asList(states); |
430 | + } |
431 | + |
432 | + private Key createValue(Row row) |
433 | + { |
434 | + while(true) { |
435 | + try { |
436 | + convertKey.clear(); |
437 | + for (int i = 0; i < rowFields; i++) { |
438 | + //sorterAdapter.evaluateToTarget(row, i); |
439 | + PValueSource field = row.pvalue(i); |
440 | + //putFieldToTarget(field, i, oFieldTypes, tFieldTypes); |
441 | + tFieldTypes[i].writeCanonical(field, valueTarget); |
442 | + } |
443 | + break; |
444 | + } catch (KeyTooLongException e) { |
445 | + enlargeKey(convertKey); |
446 | + } |
447 | + } |
448 | + return new Key(convertKey); |
449 | + } |
450 | + |
451 | + private void enlargeKey (Key key) { |
452 | + if (key.getMaximumSize() == Key.MAX_KEY_LENGTH_UPPER_BOUND) { |
453 | + throw new KeyTooLongException("Maximum size exceeded=" + Key.MAX_KEY_LENGTH_UPPER_BOUND); |
454 | + } |
455 | + key.setMaximumSize(Math.min((key.getMaximumSize() * 2), Key.MAX_KEY_LENGTH_UPPER_BOUND)); |
456 | + } |
457 | + |
458 | + public int rowCount() { |
459 | + return rowCount; |
460 | + } |
461 | + } |
462 | + |
463 | + private class KeyWriterFactory extends DataWriterFactory<SortKey> { |
464 | + |
465 | + @Override |
466 | + public DataWriter<SortKey> constructWriter(OutputStream arg0) |
467 | + throws IOException { |
468 | + return new KeyWriter(arg0); |
469 | + } |
470 | + } |
471 | + |
472 | + public static class KeyWriter extends DataWriter<SortKey> { |
473 | + private OutputStream os; |
474 | + private ByteBuffer length; |
475 | + |
476 | + public KeyWriter(OutputStream os) { |
477 | + this.os = os; |
478 | + length = ByteBuffer.allocate(4); |
479 | + } |
480 | + @Override |
481 | + public void close() throws IOException { |
482 | + os.close(); |
483 | + |
484 | + } |
485 | + |
486 | + @Override |
487 | + public void writeEntry(SortKey arg0) throws IOException { |
488 | + writeInt(arg0.sortKeys.size()); |
489 | + for (KeyState state : arg0.sortKeys) { |
490 | + writeKeyState (state); |
491 | + } |
492 | + writeKey (arg0.rowKey); |
493 | + } |
494 | + |
495 | + private void writeKeyState (KeyState state) throws IOException { |
496 | + writeInt (state.getBytes().length); |
497 | + os.write(state.getBytes()); |
498 | + } |
499 | + |
500 | + private void writeKey (Key key) throws IOException { |
501 | + writeInt(key.getEncodedSize()); |
502 | + os.write(key.getEncodedBytes(), 0, key.getEncodedSize()); |
503 | + } |
504 | + |
505 | + private void writeInt (int size) throws IOException { |
506 | + length.clear(); |
507 | + length.putInt(size); |
508 | + os.write(length.array()); |
509 | + } |
510 | + } |
511 | + |
512 | + /* |
513 | + * Class to provide temporary file names for inserting the |
514 | + * overflow buffers to disk. Implemented to the MergeJoin sort interface |
515 | + */ |
516 | + public class MergeTempFileProvider implements TempFileProvider { |
517 | + |
518 | + private final File directory; |
519 | + private final String prefix; |
520 | + private final String suffix; |
521 | + public MergeTempFileProvider (QueryContext context) { |
522 | + directory = new File (context.getServiceManager().getConfigurationService().getProperty("akserver.tmp_dir")); |
523 | + suffix = ".tmp"; |
524 | + String tmpPrefix; |
525 | + tmpPrefix = "sort-" + context.getSessionId() + "-"; |
526 | + prefix = tmpPrefix; |
527 | + } |
528 | + |
529 | + @Override |
530 | + public File provide() throws IOException { |
531 | + File f = File.createTempFile(prefix, suffix, directory); |
532 | + f.deleteOnExit(); |
533 | + return f; |
534 | + } |
535 | + } |
536 | + |
537 | + /* |
538 | + * Class to create a cursor which reads the final sorted output |
539 | + * from the file, returning each sorted item as a Row. |
540 | + */ |
541 | + public static class KeyFinalCursor implements RowCursor { |
542 | + private boolean isIdle = true; |
543 | + private boolean isDestroyed = false; |
544 | + |
545 | + private final KeyReader read; |
546 | + private final RowType rowType; |
547 | + private PersistitKeyPValueSource valueSources[]; |
548 | + |
549 | + public KeyFinalCursor (File inputFile, RowType rowType) throws FileNotFoundException { |
550 | + this (new FileInputStream(inputFile), rowType); |
551 | + } |
552 | + |
553 | + public KeyFinalCursor(InputStream stream, RowType rowType) { |
554 | + read = new KeyReader(stream); |
555 | + this.rowType = rowType; |
556 | + valueSources = new PersistitKeyPValueSource[rowType.nFields()]; |
557 | + for (int i = 0; i < rowType.nFields(); i++) { |
558 | + valueSources[i] = new PersistitKeyPValueSource (rowType.typeInstanceAt(i)); |
559 | + } |
560 | + } |
561 | + |
562 | + @Override |
563 | + public void open() { |
564 | + CursorLifecycle.checkIdle(this); |
565 | + isIdle = false; |
566 | + } |
567 | + |
568 | + @Override |
569 | + public Row next() { |
570 | + CursorLifecycle.checkIdleOrActive(this); |
571 | + SortKey key; |
572 | + Row row = null; |
573 | + try { |
574 | + key = read.readNext(); |
575 | + } catch (IOException e) { |
576 | + throw new MergeSortIOException (e); |
577 | + } |
578 | + if (key != null) { |
579 | + row = createRow (key); |
580 | + return row; |
581 | + } |
582 | + return null; |
583 | + } |
584 | + |
585 | + private Row createRow (SortKey key) { |
586 | + ValuesHolderRow rowCopy = new ValuesHolderRow(rowType, true); |
587 | + for(int i = 0 ; i < rowType.nFields(); ++i) { |
588 | + valueSources[i].attach(key.rowKey, i, valueSources[i].tInstance()); |
589 | + PValueTargets.copyFrom(valueSources[i], rowCopy.pvalueAt(i)); |
590 | + } |
591 | + return rowCopy; |
592 | + } |
593 | + |
594 | + @Override |
595 | + public void close() { |
596 | + CursorLifecycle.checkIdleOrActive(this); |
597 | + if(!isIdle) { |
598 | + try { |
599 | + read.close(); |
600 | + } catch (IOException e) { |
601 | + // TODO: manage this exception? |
602 | + } |
603 | + isIdle = true; |
604 | + } |
605 | + } |
606 | + |
607 | + @Override |
608 | + public void jump(Row row, ColumnSelector columnSelector) { |
609 | + throw new UnsupportedOperationException(); |
610 | + } |
611 | + |
612 | + @Override |
613 | + public void destroy() { |
614 | + isDestroyed = true; |
615 | + } |
616 | + |
617 | + @Override |
618 | + public boolean isIdle() { |
619 | + return !isDestroyed && isIdle; |
620 | + } |
621 | + |
622 | + @Override |
623 | + public boolean isActive() { |
624 | + return !isDestroyed && !isIdle; |
625 | + } |
626 | + |
627 | + @Override |
628 | + public boolean isDestroyed() { |
629 | + return isDestroyed; |
630 | + } |
631 | + } |
632 | + |
633 | + /* |
634 | + * Comparison function, implemented for MergeSort to compare |
635 | + * the KeyState lists generated by the KeyReadCursor |
636 | + */ |
637 | + public static class KeySortCompare implements Comparator<SortKey> { |
638 | + private final Comparator<KeyState>[] comparators; |
639 | + |
640 | + @SuppressWarnings("unchecked") |
641 | + private KeySortCompare (List<Comparator<KeyState>> comparators) { |
642 | + this.comparators = comparators.toArray(new Comparator[comparators.size()]); |
643 | + } |
644 | + |
645 | + @Override |
646 | + public int compare(SortKey o1, SortKey o2) { |
647 | + int val = 0; |
648 | + for (int i = 0; (i < comparators.length) && (val == 0); ++i) { |
649 | + val = comparators[i].compare(o1.sortKeys.get(i), o2.sortKeys.get(i)); |
650 | + } |
651 | + return val; |
652 | + } |
653 | + } |
654 | + |
655 | + private static final Comparator<KeyState> ASC_COMPARATOR = new Comparator<KeyState>() { |
656 | + @Override |
657 | + public int compare(KeyState k1, KeyState k2) { |
658 | + return k1.compareTo(k2); |
659 | + } |
660 | + }; |
661 | + |
662 | + private static final Comparator<KeyState> DESC_COMPARATOR = new Comparator<KeyState>() { |
663 | + @Override |
664 | + public int compare(KeyState k1, KeyState k2) { |
665 | + return k2.compareTo(k1); |
666 | + } |
667 | + }; |
668 | +} |
669 | \ No newline at end of file |
670 | |
671 | === modified file 'src/main/java/com/akiban/server/error/ErrorCode.java' |
672 | --- src/main/java/com/akiban/server/error/ErrorCode.java 2013-07-19 19:40:21 +0000 |
673 | +++ src/main/java/com/akiban/server/error/ErrorCode.java 2013-07-29 16:50:42 +0000 |
674 | @@ -422,6 +422,7 @@ |
675 | PROTOBUF_READ ("53", "00A", Importance.ERROR, ProtobufReadException.class), |
676 | PROTOBUF_WRITE ("53", "00B", Importance.ERROR, ProtobufWriteException.class), |
677 | INVALID_ALTER ("53", "00C", Importance.ERROR, InvalidAlterException.class), |
678 | + MERGE_SORT_IO ("53", "00D", Importance.ERROR, MergeSortIOException.class), |
679 | |
680 | // Class 55 - Type conversion errors |
681 | UNKNOWN_TYPE ("55", "001", Importance.DEBUG, UnknownDataTypeException.class), |
682 | |
683 | === added file 'src/main/java/com/akiban/server/error/MergeSortIOException.java' |
684 | --- src/main/java/com/akiban/server/error/MergeSortIOException.java 1970-01-01 00:00:00 +0000 |
685 | +++ src/main/java/com/akiban/server/error/MergeSortIOException.java 2013-07-29 16:50:42 +0000 |
686 | @@ -0,0 +1,27 @@ |
687 | +/** |
688 | + * Copyright (C) 2009-2013 Akiban Technologies, Inc. |
689 | + * |
690 | + * This program is free software: you can redistribute it and/or modify |
691 | + * it under the terms of the GNU Affero General Public License as published by |
692 | + * the Free Software Foundation, either version 3 of the License, or |
693 | + * (at your option) any later version. |
694 | + * |
695 | + * This program is distributed in the hope that it will be useful, |
696 | + * but WITHOUT ANY WARRANTY; without even the implied warranty of |
697 | + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
698 | + * GNU Affero General Public License for more details. |
699 | + * |
700 | + * You should have received a copy of the GNU Affero General Public License |
701 | + * along with this program. If not, see <http://www.gnu.org/licenses/>. |
702 | + */ |
703 | +package com.akiban.server.error; |
704 | + |
705 | +import java.io.IOException; |
706 | + |
707 | +public class MergeSortIOException extends InvalidOperationException { |
708 | + |
709 | + public MergeSortIOException(IOException ex) { |
710 | + super(ErrorCode.MERGE_SORT_IO, ex.getMessage()); |
711 | + } |
712 | + |
713 | +} |
714 | |
715 | === modified file 'src/main/java/com/akiban/server/service/tree/TreeServiceImpl.java' |
716 | --- src/main/java/com/akiban/server/service/tree/TreeServiceImpl.java 2013-05-03 00:59:49 +0000 |
717 | +++ src/main/java/com/akiban/server/service/tree/TreeServiceImpl.java 2013-07-29 16:50:42 +0000 |
718 | @@ -42,6 +42,7 @@ |
719 | import com.akiban.server.service.session.Session; |
720 | import com.akiban.server.service.session.SessionService; |
721 | import com.google.inject.Inject; |
722 | +import com.persistit.Configuration; |
723 | import com.persistit.Exchange; |
724 | import com.persistit.Key; |
725 | import com.persistit.Persistit; |
726 | @@ -63,6 +64,8 @@ |
727 | private static final String PERSISTIT_MODULE_NAME = "persistit."; |
728 | |
729 | private static final String DATAPATH_PROP_NAME = "datapath"; |
730 | + |
731 | + private static final String TEMPDIR_NAME = "akserver.tmp_dir"; |
732 | |
733 | private static final String BUFFER_SIZE_PROP_NAME = "buffersize"; |
734 | |
735 | @@ -189,6 +192,13 @@ |
736 | properties.setProperty(DATAPATH_PROP_NAME, datapath); |
737 | ensureDirectoryExists(datapath, false); |
738 | |
739 | + // |
740 | + // Copied the akserver.tmp_dir property to the Persistit tmpvoldir |
741 | + // The latter is used for temporary Persistit volumes used for sorting. |
742 | + final String tmpPath = configService.getProperty(TEMPDIR_NAME); |
743 | + properties.setProperty(Configuration.TEMPORARY_VOLUME_DIR_PROPERTY_NAME, tmpPath); |
744 | + ensureDirectoryExists(tmpPath, false); |
745 | + |
746 | // Get the configured buffer size: |
747 | // Default is 16K. Can be overridden with |
748 | // |
749 | |
750 | === modified file 'src/main/resources/com/akiban/server/error/error_code.properties' |
751 | --- src/main/resources/com/akiban/server/error/error_code.properties 2013-07-19 19:40:21 +0000 |
752 | +++ src/main/resources/com/akiban/server/error/error_code.properties 2013-07-29 16:50:42 +0000 |
753 | @@ -302,6 +302,8 @@ |
754 | PROTOBUF_READ = Error while deserializing protobuf message type {0}: {1} |
755 | PROTOBUF_WRITE = Error while serializing protobuf message type {0}: {1} |
756 | INVALID_ALTER = Invalid alter request on `{0}`.`{1}`: {2} |
757 | +MERGE_SORT_IO = Merge Sort had an unexpected IOException: {0} |
758 | + |
759 | # |
760 | # Class 54 - Program limit exceeded |
761 | # |
762 | |
763 | === modified file 'src/main/resources/com/akiban/server/service/config/configuration-defaults.properties' |
764 | --- src/main/resources/com/akiban/server/service/config/configuration-defaults.properties 2013-07-26 23:42:34 +0000 |
765 | +++ src/main/resources/com/akiban/server/service/config/configuration-defaults.properties 2013-07-29 16:50:42 +0000 |
766 | @@ -33,6 +33,7 @@ |
767 | akserver.routines.script_class_path= |
768 | akserver.index_statistics.bucket_count=32 |
769 | akserver.restrict_user_schema=false |
770 | +akserver.tmp_dir=/tmp |
771 | akserver.text.indexpath=/tmp/aktext |
772 | akserver.text.backgroundInterval=3000 |
773 | # EXPERIMENTAL |
774 | @@ -44,6 +45,8 @@ |
775 | akserver.write_lock_enabled=true |
776 | akserver.lookaheadQuantum.indexScan=1 |
777 | akserver.lookaheadQuantum.groupLookup=1 |
778 | +## 64M per sort instance |
779 | +akserver.sort.memory=67108864 |
780 | |
781 | persistit.buffersize=16384 |
782 | |
783 | |
784 | === added directory 'src/test/java/com/akiban/qp/persistitadapter' |
785 | === added directory 'src/test/java/com/akiban/qp/persistitadapter/indexcursor' |
786 | === added file 'src/test/java/com/akiban/qp/persistitadapter/indexcursor/KeyFinalCursorIT.java' |
787 | --- src/test/java/com/akiban/qp/persistitadapter/indexcursor/KeyFinalCursorIT.java 1970-01-01 00:00:00 +0000 |
788 | +++ src/test/java/com/akiban/qp/persistitadapter/indexcursor/KeyFinalCursorIT.java 2013-07-29 16:50:42 +0000 |
789 | @@ -0,0 +1,170 @@ |
790 | +/** |
791 | + * Copyright (C) 2009-2013 Akiban Technologies, Inc. |
792 | + * |
793 | + * This program is free software: you can redistribute it and/or modify |
794 | + * it under the terms of the GNU Affero General Public License as published by |
795 | + * the Free Software Foundation, either version 3 of the License, or |
796 | + * (at your option) any later version. |
797 | + * |
798 | + * This program is distributed in the hope that it will be useful, |
799 | + * but WITHOUT ANY WARRANTY; without even the implied warranty of |
800 | + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
801 | + * GNU Affero General Public License for more details. |
802 | + * |
803 | + * You should have received a copy of the GNU Affero General Public License |
804 | + * along with this program. If not, see <http://www.gnu.org/licenses/>. |
805 | + */ |
806 | + |
807 | +package com.akiban.qp.persistitadapter.indexcursor; |
808 | + |
809 | +import static com.akiban.qp.operator.API.cursor; |
810 | +import static com.akiban.qp.operator.API.valuesScan_Default; |
811 | +import static org.junit.Assert.*; |
812 | + |
813 | +import java.io.ByteArrayInputStream; |
814 | +import java.io.ByteArrayOutputStream; |
815 | +import java.io.IOException; |
816 | +import java.util.ArrayList; |
817 | +import java.util.List; |
818 | +import java.util.Random; |
819 | + |
820 | +import org.junit.Before; |
821 | +import org.junit.Test; |
822 | + |
823 | +import com.akiban.qp.operator.API; |
824 | +import com.akiban.qp.operator.Cursor; |
825 | +import com.akiban.qp.operator.Operator; |
826 | +import com.akiban.qp.operator.RowCursor; |
827 | +import com.akiban.qp.persistitadapter.indexcursor.MergeJoinSorter.KeyReadCursor; |
828 | +import com.akiban.qp.persistitadapter.indexcursor.MergeJoinSorter.KeyReader; |
829 | +import com.akiban.qp.persistitadapter.indexcursor.MergeJoinSorter.KeyWriter; |
830 | +import com.akiban.qp.persistitadapter.indexcursor.MergeJoinSorter.KeyFinalCursor; |
831 | +import com.akiban.qp.persistitadapter.indexcursor.MergeJoinSorter.SortKey; |
832 | +import com.akiban.qp.row.BindableRow; |
833 | +import com.akiban.qp.row.RowBase; |
834 | +import com.akiban.qp.rowtype.RowType; |
835 | +import com.akiban.qp.rowtype.Schema; |
836 | +import com.akiban.server.test.it.qp.OperatorITBase; |
837 | +import com.akiban.server.test.it.qp.TestRow; |
838 | +import com.akiban.server.types3.mcompat.mtypes.MNumeric; |
839 | +import com.akiban.server.types3.mcompat.mtypes.MString; |
840 | +import com.akiban.server.types3.texpressions.TPreparedField; |
841 | +import com.akiban.util.tap.Tap; |
842 | +import com.persistit.Key; |
843 | + |
844 | +public class KeyFinalCursorIT extends OperatorITBase { |
845 | + |
846 | + |
847 | + private Schema schema; |
848 | + private ByteArrayOutputStream os; |
849 | + private ByteArrayInputStream is; |
850 | + private SortKey startKey; |
851 | + private KeyWriter writer; |
852 | + private List<BindableRow> bindRows; |
853 | + private final Random random = new Random (100); |
854 | + |
855 | + @Before |
856 | + public void createFileBuffers() { |
857 | + schema = new Schema(ais()); |
858 | + os = new ByteArrayOutputStream(); |
859 | + startKey = new SortKey(); |
860 | + writer = new KeyWriter(os); |
861 | + bindRows = new ArrayList<>(); |
862 | + |
863 | + } |
864 | + |
865 | + @Test |
866 | + public void cycleComplete() throws IOException { |
867 | + RowType rowType = schema.newValuesType(MNumeric.INT.instance(true)); |
868 | + |
869 | + TestRow[] rows = new TestRow[] { |
870 | + row(rowType, 1L), |
871 | + }; |
872 | + |
873 | + bindRows.add(BindableRow.of(rows[0], true)); |
874 | + |
875 | + RowCursor cursor = cycleRows(rowType); |
876 | + compareRows(rows, cursor); |
877 | + } |
878 | + |
879 | + @Test |
880 | + public void cycleNValues() throws IOException { |
881 | + RowType rowType = schema.newValuesType(MNumeric.INT.instance(false),MNumeric.INT.instance(true), MString.varchar()); |
882 | + TestRow[] rows = new TestRow[] { |
883 | + row(rowType, 1L, 100L, "A"), |
884 | + }; |
885 | + |
886 | + bindRows.add(BindableRow.of(rows[0], true)); |
887 | + RowCursor cursor = cycleRows (rowType); |
888 | + compareRows(rows, cursor); |
889 | + } |
890 | + |
891 | + @Test |
892 | + public void cycleNRows() throws IOException { |
893 | + RowType rowType = schema.newValuesType(MNumeric.INT.instance(true)); |
894 | + |
895 | + List<TestRow> rows = new ArrayList<>(); |
896 | + for (long i = 0; i < 100; i++) { |
897 | + TestRow row = row (rowType, i); |
898 | + rows.add(row); |
899 | + bindRows.add(BindableRow.of(row, true)); |
900 | + } |
901 | + RowCursor cursor = cycleRows (rowType); |
902 | + |
903 | + TestRow[] rowArray = new TestRow[rows.size()]; |
904 | + compareRows (rows.toArray(rowArray), cursor); |
905 | + } |
906 | + |
907 | + @Test |
908 | + public void cycleManyRows() throws IOException { |
909 | + RowType rowType = schema.newValuesType(MNumeric.INT.instance(false), MNumeric.INT.instance(true), MString.varchar()); |
910 | + List<TestRow> rows = new ArrayList<>(); |
911 | + for (long i = 0; i < 100; i++) { |
912 | + TestRow row = row (rowType, random.nextInt(), i, |
913 | + characters(5+random.nextInt(1000))); |
914 | + rows.add(row); |
915 | + bindRows.add(BindableRow.of(row, true)); |
916 | + } |
917 | + RowCursor cursor = cycleRows(rowType); |
918 | + TestRow[] rowArray = new TestRow[rows.size()]; |
919 | + compareRows (rows.toArray(rowArray), cursor); |
920 | + |
921 | + } |
922 | + |
923 | + private RowCursor cycleRows(RowType rowType) throws IOException { |
924 | + KeyReadCursor keyCursor = getKeyCursor(rowType , bindRows); |
925 | + |
926 | + startKey = keyCursor.readNext(); |
927 | + while (startKey != null) { |
928 | + writer.writeEntry(startKey); |
929 | + startKey = keyCursor.readNext(); |
930 | + } |
931 | + |
932 | + is = new ByteArrayInputStream (os.toByteArray()); |
933 | + RowCursor cursor = new KeyFinalCursor(is, rowType); |
934 | + return cursor; |
935 | + |
936 | + } |
937 | + |
938 | + private KeyReadCursor getKeyCursor (RowType rowType, List<BindableRow> rows) { |
939 | + |
940 | + Operator op = valuesScan_Default(rows, rowType); |
941 | + Cursor cursor = cursor(op, queryContext, queryBindings); |
942 | + API.Ordering ordering = API.ordering(); |
943 | + ordering.append(new TPreparedField (rowType.typeInstanceAt(0), 0), true); |
944 | + |
945 | + MergeJoinSorter mergeSorter = new MergeJoinSorter(queryContext, queryBindings, cursor, |
946 | + rowType, ordering, API.SortOption.PRESERVE_DUPLICATES, Tap.createTimer("Test Tap")); |
947 | + |
948 | + cursor.open(); |
949 | + return mergeSorter.readCursor(); |
950 | + } |
951 | + static final String ALPHA = "ABCDEFGHIJKLMNOPQRSTUVWXYZ"; |
952 | + public String characters(final int length) { |
953 | + StringBuilder sb = new StringBuilder(length); |
954 | + for( int i = 0; i < length; i++ ) |
955 | + sb.append(ALPHA.charAt(random.nextInt(ALPHA.length()))); |
956 | + return sb.toString(); |
957 | + } |
958 | + |
959 | +} |
960 | |
961 | === added file 'src/test/java/com/akiban/qp/persistitadapter/indexcursor/KeyReaderWriterTest.java' |
962 | --- src/test/java/com/akiban/qp/persistitadapter/indexcursor/KeyReaderWriterTest.java 1970-01-01 00:00:00 +0000 |
963 | +++ src/test/java/com/akiban/qp/persistitadapter/indexcursor/KeyReaderWriterTest.java 2013-07-29 16:50:42 +0000 |
964 | @@ -0,0 +1,182 @@ |
965 | +/** |
966 | + * Copyright (C) 2009-2013 Akiban Technologies, Inc. |
967 | + * |
968 | + * This program is free software: you can redistribute it and/or modify |
969 | + * it under the terms of the GNU Affero General Public License as published by |
970 | + * the Free Software Foundation, either version 3 of the License, or |
971 | + * (at your option) any later version. |
972 | + * |
973 | + * This program is distributed in the hope that it will be useful, |
974 | + * but WITHOUT ANY WARRANTY; without even the implied warranty of |
975 | + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
976 | + * GNU Affero General Public License for more details. |
977 | + * |
978 | + * You should have received a copy of the GNU Affero General Public License |
979 | + * along with this program. If not, see <http://www.gnu.org/licenses/>. |
980 | + */ |
981 | +package com.akiban.qp.persistitadapter.indexcursor; |
982 | + |
983 | +import static org.junit.Assert.*; |
984 | + |
985 | +import java.io.ByteArrayInputStream; |
986 | +import java.io.ByteArrayOutputStream; |
987 | +import java.io.IOException; |
988 | +import java.util.ArrayList; |
989 | +import java.util.List; |
990 | +import java.util.Random; |
991 | + |
992 | +import org.junit.Before; |
993 | +import org.junit.Test; |
994 | + |
995 | +import com.akiban.qp.persistitadapter.indexcursor.MergeJoinSorter.KeyReader; |
996 | +import com.akiban.qp.persistitadapter.indexcursor.MergeJoinSorter.KeyWriter; |
997 | +import com.akiban.qp.persistitadapter.indexcursor.MergeJoinSorter.SortKey; |
998 | +import com.persistit.Key; |
999 | +import com.persistit.KeyState; |
1000 | +import com.persistit.Persistit; |
1001 | + |
1002 | +public class KeyReaderWriterTest { |
1003 | + |
1004 | + private ByteArrayOutputStream os; |
1005 | + private ByteArrayInputStream is; |
1006 | + private SortKey startKey; |
1007 | + private Key testKey; |
1008 | + private KeyWriter writer; |
1009 | + |
1010 | + @Before |
1011 | + public void createFileBuffers() { |
1012 | + os = new ByteArrayOutputStream(); |
1013 | + startKey = new SortKey(); |
1014 | + testKey = new Key ((Persistit)null); |
1015 | + writer = new KeyWriter(os); |
1016 | + } |
1017 | + @Test |
1018 | + public void cycleSimple() throws IOException { |
1019 | + testKey.append(1); |
1020 | + startKey.sortKeys.add(new KeyState(testKey)); |
1021 | + startKey.rowKey.append(1); |
1022 | + writer.writeEntry(startKey); |
1023 | + verifyInput(); |
1024 | + } |
1025 | + |
1026 | + @Test |
1027 | + public void cycleString() throws IOException { |
1028 | + testKey.append("abcd"); |
1029 | + startKey.sortKeys.add(new KeyState(testKey)); |
1030 | + startKey.rowKey.append("abcd"); |
1031 | + writer.writeEntry(startKey); |
1032 | + verifyInput(); |
1033 | + } |
1034 | + |
1035 | + @Test |
1036 | + public void cycleNIntegers() throws IOException { |
1037 | + for (int i = 0; i < 400; i++) { |
1038 | + testKey.append(i); |
1039 | + startKey.rowKey.append(i); |
1040 | + } |
1041 | + startKey.sortKeys.add(new KeyState(testKey)); |
1042 | + writer.writeEntry(startKey); |
1043 | + verifyInput(); |
1044 | + } |
1045 | + |
1046 | + @Test |
1047 | + public void cycle2Keys() throws IOException { |
1048 | + |
1049 | + testKey.append(1); |
1050 | + startKey.sortKeys.add(new KeyState(testKey)); |
1051 | + startKey.rowKey.append(1); |
1052 | + writer.writeEntry(startKey); |
1053 | + writer.writeEntry(startKey); |
1054 | + |
1055 | + is = new ByteArrayInputStream (os.toByteArray()); |
1056 | + KeyReader reader = new KeyReader (is); |
1057 | + |
1058 | + SortKey endKey = reader.readNext(); |
1059 | + assertTrue (startKey.rowKey.compareTo(endKey.rowKey) == 0); |
1060 | + assertTrue (startKey.sortKeys.get(0).compareTo(endKey.sortKeys.get(0)) == 0); |
1061 | + endKey = reader.readNext(); |
1062 | + assertTrue (startKey.rowKey.compareTo(endKey.rowKey) == 0); |
1063 | + assertTrue (startKey.sortKeys.get(0).compareTo(endKey.sortKeys.get(0)) == 0); |
1064 | + endKey = reader.readNext(); |
1065 | + assertNull (endKey); |
1066 | + } |
1067 | + |
1068 | + @Test |
1069 | + public void cycleNKeys() throws IOException{ |
1070 | + List<SortKey> keys = new ArrayList<>(100); |
1071 | + for (int i = 0; i < 100; i++) { |
1072 | + SortKey newKey = new SortKey(); |
1073 | + newKey.rowKey.append(i); |
1074 | + testKey.clear(); |
1075 | + testKey.append(i); |
1076 | + newKey.sortKeys.add(new KeyState (testKey)); |
1077 | + keys.add(newKey); |
1078 | + } |
1079 | + verifyNKeys (keys); |
1080 | + } |
1081 | + |
1082 | + @Test |
1083 | + public void cycleNStrings() throws IOException { |
1084 | + List<SortKey> keys = new ArrayList<>(100); |
1085 | + for (int i = 0; i < 100; i++) { |
1086 | + SortKey newKey = new SortKey(); |
1087 | + String value = characters(5+random.nextInt(1000)); |
1088 | + newKey.rowKey.append(value); |
1089 | + testKey.clear(); |
1090 | + testKey.append(value); |
1091 | + newKey.sortKeys.add(new KeyState (testKey)); |
1092 | + keys.add(newKey); |
1093 | + } |
1094 | + verifyNKeys (keys); |
1095 | + } |
1096 | + |
1097 | + @Test |
1098 | + public void cycleNMultiKeys () throws IOException { |
1099 | + List<SortKey> keys = new ArrayList<>(100); |
1100 | + for (int i = 0; i < 100; i++) { |
1101 | + SortKey newKey = new SortKey(); |
1102 | + newKey.rowKey.append(random.nextInt()); |
1103 | + newKey.rowKey.append(null); |
1104 | + newKey.rowKey.append(characters(3+random.nextInt(25))); |
1105 | + newKey.rowKey.append(characters(3+random.nextInt(25))); |
1106 | + testKey.clear(); |
1107 | + testKey.append(i); |
1108 | + newKey.sortKeys.add(new KeyState (testKey)); |
1109 | + keys.add(newKey); |
1110 | + } |
1111 | + verifyNKeys(keys); |
1112 | + } |
1113 | + |
1114 | + private void verifyInput() throws IOException { |
1115 | + is = new ByteArrayInputStream (os.toByteArray()); |
1116 | + KeyReader reader = new KeyReader (is); |
1117 | + SortKey endKey = reader.readNext(); |
1118 | + assertTrue (startKey.rowKey.compareTo(endKey.rowKey) == 0); |
1119 | + assertTrue (startKey.sortKeys.get(0).compareTo(endKey.sortKeys.get(0)) == 0); |
1120 | + |
1121 | + } |
1122 | + |
1123 | + private void verifyNKeys(List<SortKey> keys) throws IOException { |
1124 | + for (SortKey key : keys) { |
1125 | + writer.writeEntry(key); |
1126 | + } |
1127 | + is = new ByteArrayInputStream (os.toByteArray()); |
1128 | + KeyReader reader = new KeyReader (is); |
1129 | + |
1130 | + SortKey endKey; |
1131 | + for (SortKey startKey : keys) { |
1132 | + endKey = reader.readNext(); |
1133 | + assertTrue (startKey.rowKey.compareTo(endKey.rowKey) == 0); |
1134 | + assertTrue (startKey.sortKeys.get(0).compareTo(endKey.sortKeys.get(0)) == 0); |
1135 | + } |
1136 | + } |
1137 | + |
1138 | + static final String ALPHA = "ABCDEFGHIJKLMNOPQRSTUVWXYZ"; |
1139 | + final Random random = new Random(100); |
1140 | + public String characters(final int length) { |
1141 | + StringBuilder sb = new StringBuilder(length); |
1142 | + for( int i = 0; i < length; i++ ) |
1143 | + sb.append(ALPHA.charAt(random.nextInt(ALPHA.length()))); |
1144 | + return sb.toString(); |
1145 | + } |
1146 | +} |
1147 | |
1148 | === modified file 'src/test/java/com/akiban/server/test/it/sort/MemorySorterIT.java' |
1149 | --- src/test/java/com/akiban/server/test/it/sort/MemorySorterIT.java 2013-07-08 18:16:50 +0000 |
1150 | +++ src/test/java/com/akiban/server/test/it/sort/MemorySorterIT.java 2013-07-29 16:50:42 +0000 |
1151 | @@ -23,7 +23,6 @@ |
1152 | import com.akiban.qp.operator.QueryContext; |
1153 | import com.akiban.qp.persistitadapter.Sorter; |
1154 | import com.akiban.qp.persistitadapter.indexcursor.MemorySorter; |
1155 | -import com.akiban.qp.persistitadapter.indexcursor.PersistitSorter; |
1156 | import com.akiban.qp.rowtype.RowType; |
1157 | import com.akiban.util.tap.InOutTap; |
1158 | |
1159 | |
1160 | === added file 'src/test/java/com/akiban/server/test/it/sort/MergeJoinSorterIT.java' |
1161 | --- src/test/java/com/akiban/server/test/it/sort/MergeJoinSorterIT.java 1970-01-01 00:00:00 +0000 |
1162 | +++ src/test/java/com/akiban/server/test/it/sort/MergeJoinSorterIT.java 2013-07-29 16:50:42 +0000 |
1163 | @@ -0,0 +1,49 @@ |
1164 | +/** |
1165 | + * Copyright (C) 2009-2013 Akiban Technologies, Inc. |
1166 | + * |
1167 | + * This program is free software: you can redistribute it and/or modify |
1168 | + * it under the terms of the GNU Affero General Public License as published by |
1169 | + * the Free Software Foundation, either version 3 of the License, or |
1170 | + * (at your option) any later version. |
1171 | + * |
1172 | + * This program is distributed in the hope that it will be useful, |
1173 | + * but WITHOUT ANY WARRANTY; without even the implied warranty of |
1174 | + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
1175 | + * GNU Affero General Public License for more details. |
1176 | + * |
1177 | + * You should have received a copy of the GNU Affero General Public License |
1178 | + * along with this program. If not, see <http://www.gnu.org/licenses/>. |
1179 | + */ |
1180 | +package com.akiban.server.test.it.sort; |
1181 | + |
1182 | +import java.util.HashMap; |
1183 | +import java.util.Map; |
1184 | + |
1185 | +import com.akiban.qp.operator.API.Ordering; |
1186 | +import com.akiban.qp.operator.API.SortOption; |
1187 | +import com.akiban.qp.operator.Cursor; |
1188 | +import com.akiban.qp.operator.QueryBindings; |
1189 | +import com.akiban.qp.operator.QueryContext; |
1190 | +import com.akiban.qp.persistitadapter.Sorter; |
1191 | +import com.akiban.qp.persistitadapter.indexcursor.MergeJoinSorter; |
1192 | +import com.akiban.qp.rowtype.RowType; |
1193 | +import com.akiban.util.tap.InOutTap; |
1194 | + |
1195 | +public class MergeJoinSorterIT extends SorterITBase { |
1196 | + |
1197 | + @Override |
1198 | + public Map<String,String> startupConfigProperties() { |
1199 | + Map<String,String> props = new HashMap<>(); |
1200 | + props.putAll(super.startupConfigProperties()); |
1201 | + props.put("akserver.tmp_dir","/tmp/akserver-junit/"); |
1202 | + return props; |
1203 | + } |
1204 | + |
1205 | + @Override |
1206 | + public Sorter createSorter(QueryContext context, QueryBindings bindings, |
1207 | + Cursor input, RowType rowType, Ordering ordering, |
1208 | + SortOption sortOption, InOutTap loadTap) { |
1209 | + return new MergeJoinSorter(context, bindings, input, rowType, ordering, sortOption, loadTap); |
1210 | + } |
1211 | + |
1212 | +} |
This isn't currently hooked up to the any of the Sort operators.