Merge lp:~tjoneslo/akiban-server/file-merge-sort into lp:~akiban-technologies/akiban-server/trunk

Proposed by Thomas Jones-Low
Status: Merged
Approved by: Nathan Williams
Approved revision: 2708
Merged at revision: 2722
Proposed branch: lp:~tjoneslo/akiban-server/file-merge-sort
Merge into: lp:~akiban-technologies/akiban-server/trunk
Diff against target: 1212 lines (+1075/-1)
13 files modified
pom.xml (+5/-0)
src/main/java/com/akiban/qp/operator/SimpleQueryContext.java (+4/-0)
src/main/java/com/akiban/qp/persistitadapter/PersistitAdapter.java (+1/-0)
src/main/java/com/akiban/qp/persistitadapter/indexcursor/MergeJoinSorter.java (+621/-0)
src/main/java/com/akiban/server/error/ErrorCode.java (+1/-0)
src/main/java/com/akiban/server/error/MergeSortIOException.java (+27/-0)
src/main/java/com/akiban/server/service/tree/TreeServiceImpl.java (+10/-0)
src/main/resources/com/akiban/server/error/error_code.properties (+2/-0)
src/main/resources/com/akiban/server/service/config/configuration-defaults.properties (+3/-0)
src/test/java/com/akiban/qp/persistitadapter/indexcursor/KeyFinalCursorIT.java (+170/-0)
src/test/java/com/akiban/qp/persistitadapter/indexcursor/KeyReaderWriterTest.java (+182/-0)
src/test/java/com/akiban/server/test/it/sort/MemorySorterIT.java (+0/-1)
src/test/java/com/akiban/server/test/it/sort/MergeJoinSorterIT.java (+49/-0)
To merge this branch: bzr merge lp:~tjoneslo/akiban-server/file-merge-sort
Reviewer Review Type Date Requested Status
Nathan Williams Approve
Thomas Jones-Low Needs Resubmitting
Review via email: mp+176781@code.launchpad.net

Description of the change

Add the FileMergeSort to the akiban trunk.

This relies upon a library called java-merge-sort, done by the same people that did the Jackson JSON libaray already in use.

All of the code is in the new MergeJoinSorter and consists of class to translate between the AkibanServer way of doing things and what the java merge sort expected.

SortKey -> the single class that holds the KeyState for comparison and the single Key for the whole row.
KeyReadCursor -> Reads rows from the input cursor, converts them to SortKeys, and passes them to the java merge sort for processing. This is the external input.
KeyFinalCursor -> reads from the final sorted output and return rows through the cursor interface for next operator in line. This is the external output.

KeyReader/KeyWriter -> used by the java merge sort library to read/write the intermediate blocks of SortKeys.

KeySortCompare -> used by the java merge sort library to compare the SortKeys to put them into the correct order.

Internally the java merge sort library reads input (KeyReadCursor) and sorts it (KeySortCompare) into a buffer (40MB). If size of the input exceeds the buffer size, the (full) buffer is written to disk (KeyWriter) and the process repeated with a new buffer. When all of the input has been read, the java merge sort library does a 16 way merge (KeyReader) to create the final output. This final output is written (KeyFinalCursor) into the row stream.

Also two complete new tests, one for the KeyReader/Writer pair, the other for the KeyReadCursor and KeyFinalCursor to make sure the process doesn't magle rows on the way through. And Extend the SortIT test to verify the sorting works as expected.

To post a comment you must log in.
Revision history for this message
Thomas Jones-Low (tjoneslo) wrote :

This isn't currently hooked up to the any of the Sort operators.

Revision history for this message
Nathan Williams (nwilliams) wrote :

Nice! This looks pretty good overall. A handful of small issues below.

The exceptions on lines 166 and 192 probably needs bubbled up.

Line 291, 306 and 319 look suspicious. InputStream returns -1 at EOF, whereas this is checking read < size. Maybe an assert instead?

Line 491, a new config instead of persistit.tmpvoldir might be nice. Or perhaps an akserver.tmp_dir, which TreeService can then assign to tmpvoldir if otherwise unset.

Did you investigate if the fasterxml.Sorter closes all of the temp files from the provider on finish? On error? If not, I wonder if we need to hang onto them all and ensure they are in close().

Line 496, could SimplQueryContext look at adapter.getSession().sessionId() instead of throwing? Or is there no Session either?

Line 550, is that TODO relevant or just stale?

Line 576, I think we just have to eat that one. Maybe log for... good measure?

TestKeyReaderWriter.random being static and unseeded makes me a little nervous. A non-deterministic IT is asking for headaches.

Does surefire actually pick up TestKeyReaderWriter? I think our pattern only looks for *Test.java.

Maybe a new configuration that get shimmed into SortConfig? I can imagine wanting to set that pretty quickly.

review: Needs Fixing
2707. By tjoneslo

Address merge comments.
Add new error code for returning problems on IO errors
Clean up other error handling.

Revision history for this message
Thomas Jones-Low (tjoneslo) wrote :

> Nice! This looks pretty good overall. A handful of small issues below.
>
> The exceptions on lines 166 and 192 probably needs bubbled up.
>
Done

> Line 291, 306 and 319 look suspicious. InputStream returns -1 at EOF, whereas
> this is checking read < size. Maybe an assert instead?
>
Converted to an assert.

> Line 491, a new config instead of persistit.tmpvoldir might be nice. Or
> perhaps an akserver.tmp_dir, which TreeService can then assign to tmpvoldir if
> otherwise unset.
>
tmpvoldir is a persistit specific configuration item, the only way to alter the manner in which it gets set is to make changes to Persistit.

Added the akserver.tmp_dir variable and use that instead.

> Did you investigate if the fasterxml.Sorter closes all of the temp files from
> the provider on finish? On error? If not, I wonder if we need to hang onto
> them all and ensure they are in close().
>
   The initial read into the temp files does so correctly. The merge stage does not, and I've filed a bug in the Merge-sort github. https://github.com/cowtowncoder/java-merge-sort/issues/8

> Line 496, could SimplQueryContext look at adapter.getSession().sessionId()
> instead of throwing? Or is there no Session either?
>

Fixed, removed the try/catch in MergeJoinSorter too. The adapter session sessionIDs and the context sessionIDs are different creatures (the former is from SessionService, the latter are from the MontiorService) but for these purposes they can be the same.

> Line 550, is that TODO relevant or just stale?
>
It's relevant. I still don't have an answer for that.

> Line 576, I think we just have to eat that one. Maybe log for... good measure?
>

> TestKeyReaderWriter.random being static and unseeded makes me a little
> nervous. A non-deterministic IT is asking for headaches.
>

Fixed.

> Does surefire actually pick up TestKeyReaderWriter? I think our pattern only
> looks for *Test.java.
>

Renamed to KeyReaderWriterTest

> Maybe a new configuration that get shimmed into SortConfig? I can imagine
> wanting to set that pretty quickly.

The only other configuration item is the size of the sort buffer. Added a akserver.sort.memory configuration item and set the default to 64M.

review: Needs Resubmitting
Revision history for this message
Nathan Williams (nwilliams) wrote :

> tmpvoldir is a persistit specific configuration item,
> the only way to alter the manner in which it gets set
> is to make changes to Persistit.

Sorry, I wasn't clear. TreeServiceImpl#setupPersistitProperties() does some remapping already. It can take the new tmp_dir config and map it to tmpvoldir. That way we only need one in the config file.

>> Line 550, is that TODO relevant or just stale?
> It's relevant. I still don't have an answer for that.

Now 576. I think it needs to throw an exception instead of returning null. Null indicates end of stream and query would succeed, whereas an IOException may mean one of the sort files couldn't be read (for example).

The Random in the test is still static, which makes it sensitive to run order. Could we make it not static for the unlikely-but-frustrating scenario if something ever goes wrong?

review: Needs Fixing
2708. By tjoneslo

Update TreeServiceImpl to use new tmp_dir for persistit tmpvoldir to have only one config item
Fix missing exception in MergeJoinSorter
Fix the reused static random in the Key tests.

Revision history for this message
Thomas Jones-Low (tjoneslo) wrote :

address review comments

- Update TreeServiceImpl processing to use akserver.tmp_dir into persisitit.tmpvoldir
- Throw exception from MergeJoinSort
- Remove/fix the static random() used in two places that may cause problems.

review: Needs Resubmitting
Revision history for this message
Nathan Williams (nwilliams) wrote :

Excellent. Thanks for the tweaks.

review: Approve

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'pom.xml'
2--- pom.xml 2013-06-14 11:23:11 +0000
3+++ pom.xml 2013-07-29 16:50:42 +0000
4@@ -206,6 +206,11 @@
5 <version>4.0.0</version>
6 </dependency>
7 <dependency>
8+ <groupId>com.fasterxml.util</groupId>
9+ <artifactId>java-merge-sort</artifactId>
10+ <version>0.7.1</version>
11+ </dependency>
12+ <dependency>
13 <groupId>org.apache.httpcomponents</groupId>
14 <artifactId>httpclient</artifactId>
15 <version>4.2.3</version>
16
17=== modified file 'src/main/java/com/akiban/qp/operator/SimpleQueryContext.java'
18--- src/main/java/com/akiban/qp/operator/SimpleQueryContext.java 2013-03-22 20:05:57 +0000
19+++ src/main/java/com/akiban/qp/operator/SimpleQueryContext.java 2013-07-29 16:50:42 +0000
20@@ -77,6 +77,10 @@
21
22 @Override
23 public int getSessionId() {
24+ if (adapter != null) {
25+ return (int)adapter.getSession().sessionId();
26+ }
27+
28 throw new UnsupportedOperationException();
29 }
30
31
32=== modified file 'src/main/java/com/akiban/qp/persistitadapter/PersistitAdapter.java'
33--- src/main/java/com/akiban/qp/persistitadapter/PersistitAdapter.java 2013-07-18 18:28:04 +0000
34+++ src/main/java/com/akiban/qp/persistitadapter/PersistitAdapter.java 2013-07-29 16:50:42 +0000
35@@ -93,6 +93,7 @@
36 InOutTap loadTap)
37 {
38 return new PersistitSorter(context, bindings, input, rowType, ordering, sortOption, loadTap);
39+ //return new MergeJoinSorter(context, bindings, input, rowType, ordering, sortOption, loadTap);
40 }
41
42 @Override
43
44=== added file 'src/main/java/com/akiban/qp/persistitadapter/indexcursor/MergeJoinSorter.java'
45--- src/main/java/com/akiban/qp/persistitadapter/indexcursor/MergeJoinSorter.java 1970-01-01 00:00:00 +0000
46+++ src/main/java/com/akiban/qp/persistitadapter/indexcursor/MergeJoinSorter.java 2013-07-29 16:50:42 +0000
47@@ -0,0 +1,621 @@
48+/**
49+ * Copyright (C) 2009-2013 Akiban Technologies, Inc.
50+ *
51+ * This program is free software: you can redistribute it and/or modify
52+ * it under the terms of the GNU Affero General Public License as published by
53+ * the Free Software Foundation, either version 3 of the License, or
54+ * (at your option) any later version.
55+ *
56+ * This program is distributed in the hope that it will be useful,
57+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
58+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
59+ * GNU Affero General Public License for more details.
60+ *
61+ * You should have received a copy of the GNU Affero General Public License
62+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
63+ */
64+package com.akiban.qp.persistitadapter.indexcursor;
65+
66+import java.io.File;
67+import java.io.FileInputStream;
68+import java.io.FileNotFoundException;
69+import java.io.FileOutputStream;
70+import java.io.IOException;
71+import java.io.InputStream;
72+import java.io.OutputStream;
73+import java.nio.ByteBuffer;
74+import java.util.ArrayList;
75+import java.util.Arrays;
76+import java.util.Comparator;
77+import java.util.List;
78+
79+import com.akiban.qp.operator.API;
80+import com.akiban.qp.operator.API.Ordering;
81+import com.akiban.qp.operator.CursorLifecycle;
82+import com.akiban.qp.operator.QueryBindings;
83+import com.akiban.qp.operator.QueryContext;
84+import com.akiban.qp.operator.RowCursor;
85+import com.akiban.qp.persistitadapter.Sorter;
86+import com.akiban.qp.row.Row;
87+import com.akiban.qp.row.ValuesHolderRow;
88+import com.akiban.qp.rowtype.RowType;
89+import com.akiban.server.PersistitKeyPValueSource;
90+import com.akiban.server.PersistitKeyPValueTarget;
91+import com.akiban.server.api.dml.ColumnSelector;
92+import com.akiban.server.error.MergeSortIOException;
93+import com.akiban.server.types3.TInstance;
94+import com.akiban.server.types3.pvalue.PValueSource;
95+import com.akiban.server.types3.pvalue.PValueTargets;
96+import com.akiban.util.tap.InOutTap;
97+import com.persistit.Key;
98+import com.persistit.KeyState;
99+import com.persistit.Persistit;
100+import com.persistit.exception.KeyTooLongException;
101+
102+import com.fasterxml.sort.DataReader;
103+import com.fasterxml.sort.DataReaderFactory;
104+import com.fasterxml.sort.DataWriter;
105+import com.fasterxml.sort.DataWriterFactory;
106+import com.fasterxml.sort.SortConfig;
107+import com.fasterxml.sort.TempFileProvider;
108+
109+/**
110+ * <h1>Overview</h1>
111+ *
112+ * Sort rows by inserting them in sorted order into a memory buffer, then into on-disk files, and performing
113+ * a multiway merge sort on the resulting files.
114+ *
115+ * <h1>Behavior</h1>
116+ *
117+ * The rows of the input stream are written into a memory pool (40MB as a default). When (if) the memory pool is filled,
118+ * the pool is written to disk, then emptied and rows are written again to the memory pool. When the final row is written
119+ * to the disk files are merged in an n-way merge sort. The default is 16 way sort. When all of the temp files are merged
120+ * into one file, the rows are read from the file in sorted order.
121+ *
122+ * If the initial input stream does not produce enough data to overflow the memory pool, no disk files will be produced.
123+ *
124+ * <h1>Performance</h1>
125+ *
126+ * The MergeJoinSorter generates IO dependent upon the size of the input stream.
127+ * If the input stream generates less than the memory pool size (40MB default), there is no IO generated.
128+ * If the input stream generates more than the memory pool size, but less than 16x the pool size, it should
129+ * generate two read and two writes for each row. One write to, one read from into the initial temporary file,
130+ * one write to, one read from the final sorted temporary file. For each 16x larger the input set gets it adds
131+ * one more write to/read from temporary file cycle.
132+ *
133+ * <h1>Memory Requirements</h1>
134+ *
135+ * The MergeJoinSorter allocates a single memory buffer for each instance to perform an initial sort,
136+ * defaulting to 40MB in size.
137+*/
138+
139+public class MergeJoinSorter implements Sorter {
140+
141+ private QueryContext context;
142+ private QueryBindings bindings;
143+ private RowCursor input;
144+ private RowType rowType;
145+ private Ordering ordering;
146+ private InOutTap loadTap;
147+
148+ private File finalFile;
149+
150+ private final SorterAdapter<?, ?, ?> sorterAdapter;
151+ private final List<Integer> orderChanges;
152+ private Key sortKey;
153+ private Comparator<SortKey> compare;
154+
155+ public MergeJoinSorter (QueryContext context,
156+ QueryBindings bindings,
157+ RowCursor input,
158+ RowType rowType,
159+ API.Ordering ordering,
160+ API.SortOption sortOption,
161+ InOutTap loadTap)
162+ {
163+ this.context = context;
164+ this.bindings = bindings;
165+ this.input = input;
166+ this.rowType = rowType;
167+ this.ordering = ordering.copy();
168+ this.loadTap = loadTap;
169+
170+ this.sortKey = new Key ((Persistit)null);
171+ this.sorterAdapter = new PValueSorterAdapter();
172+ // Note: init may change this.ordering
173+ sorterAdapter.init(rowType, this.ordering, this.sortKey, null, this.context, this.bindings, sortOption);
174+ // Explicitly use input ordering to avoid appended field
175+ this.orderChanges = new ArrayList<>();
176+ List<Comparator<KeyState>> comparators = new ArrayList<>();
177+ for(int i = 0; i < ordering.sortColumns(); ++i) {
178+ Comparator<KeyState> c = ordering.ascending(i) ? ASC_COMPARATOR : DESC_COMPARATOR;
179+ if(i == 0 || ordering.ascending(i-1) != ordering.ascending(i)) {
180+ orderChanges.add(i);
181+ comparators.add(c);
182+ }
183+ }
184+ this.orderChanges.add(ordering.sortColumns());
185+ this.compare = new KeySortCompare(comparators);
186+
187+ }
188+
189+ @Override
190+ public RowCursor sort() {
191+ try {
192+ loadTree();
193+ } catch (IOException e) {
194+ throw new MergeSortIOException(e);
195+ }
196+ return cursor();
197+ }
198+
199+ @Override
200+ public void close() {
201+ }
202+
203+ private void loadTree() throws FileNotFoundException, IOException {
204+ MergeTempFileProvider tmpFileProvider = new MergeTempFileProvider(context);
205+ finalFile = tmpFileProvider.provide();
206+
207+ com.fasterxml.sort.Sorter<SortKey> s = new com.fasterxml.sort.Sorter<SortKey> (
208+ getSortConfig(tmpFileProvider),
209+ new KeyReaderFactory(),
210+ new KeyWriterFactory(),
211+ compare);
212+ s.sort(new KeyReadCursor(), new KeyWriter(new FileOutputStream(finalFile)));
213+ }
214+
215+ private RowCursor cursor() {
216+ KeyFinalCursor cursor = null;
217+ try {
218+ cursor = new KeyFinalCursor (finalFile, rowType);
219+ } catch (FileNotFoundException e) {
220+ throw new MergeSortIOException(e);
221+ }
222+ return cursor;
223+ }
224+
225+ public KeyReadCursor readCursor() {
226+ return new KeyReadCursor();
227+ }
228+
229+ private SortConfig getSortConfig (MergeTempFileProvider tmpFileProvider) {
230+ long maxMemory = Long.parseLong(context.getServiceManager().getConfigurationService().getProperty("akserver.sort.memory"));
231+ return new SortConfig().withTempFileProvider(tmpFileProvider).withMaxMemoryUsage(maxMemory);
232+ }
233+ /*
234+ * Base class for reading/writing bytes -
235+ * KeyState[] is list of key segments broken by ASC/DESC ordering
236+ * rowKey is the whole, unaltered row of data.
237+ */
238+ public static class SortKey {
239+ public List<KeyState> sortKeys;
240+ public Key rowKey;
241+
242+ public SortKey () {
243+ this.sortKeys = new ArrayList<>();
244+ this.rowKey = new Key ((Persistit)null);
245+ rowKey.clear();
246+ }
247+
248+ public SortKey (List<KeyState> sortKeys, Key rowKey) {
249+ this.sortKeys = sortKeys;
250+ this.rowKey = rowKey;
251+ }
252+
253+ // Sorter uses size of elements to determine when the
254+ // presort buffer is full.
255+ public int getSize() {
256+ int size = 0;
257+ for (KeyState state : sortKeys) {
258+ size += state.getBytes().length + 4;
259+ size += 4;
260+ }
261+ size += rowKey.getEncodedSize() + 4;
262+ return size;
263+ }
264+ }
265+
266+ private class KeyReaderFactory extends DataReaderFactory<SortKey> {
267+
268+ @Override
269+ public DataReader<SortKey> constructReader(InputStream arg0)
270+ throws IOException {
271+ return new KeyReader(arg0);
272+ }
273+
274+ }
275+
276+ public static class KeyReader extends DataReader<SortKey> {
277+
278+ private InputStream is;
279+ private ByteBuffer length;
280+ public KeyReader (InputStream is) {
281+ this.is = is;
282+ length = ByteBuffer.allocate(4);
283+ }
284+
285+ @Override
286+ public void close() throws IOException {
287+ is.close();
288+ }
289+
290+ @Override
291+ public int estimateSizeInBytes(SortKey arg0) {
292+ return arg0.getSize();
293+ }
294+
295+ @Override
296+ public SortKey readNext() throws IOException {
297+
298+ SortKey key = new SortKey();
299+ int states = readLength();
300+ if (states < 0) {
301+ return null;
302+ }
303+ for (int i = 0; i < states; i++) {
304+ KeyState state = readKeyState();
305+ if (state == null) {
306+ return null;
307+ }
308+ key.sortKeys.add(state);
309+ }
310+
311+ key.rowKey = readKey();
312+
313+ return key;
314+ }
315+
316+ private KeyState readKeyState() throws IOException {
317+ int size = readLength();
318+ if (size < 1) {
319+ return null;
320+ }
321+ byte[] bytes = new byte[size];
322+ int bytesRead = is.read(bytes);
323+
324+ assert bytesRead == size: "Invalid byte count on key state read";
325+
326+ return new KeyState(bytes);
327+ }
328+
329+ private Key readKey() throws IOException{
330+ int size = readLength();
331+ if (size < 1) {
332+ return null;
333+ }
334+ Key key = new Key ((Persistit)null);
335+ key.setMaximumSize(size);
336+ int bytesRead = is.read(key.getEncodedBytes(), 0, size);
337+
338+ assert bytesRead == size : "Invalid byte count on key read";
339+
340+ key.setEncodedSize(size);
341+ return key;
342+ }
343+
344+ private int readLength() throws IOException {
345+ length.clear();
346+ int bytesRead = is.read(length.array());
347+ if (bytesRead == -1) { // EOF marker
348+ return -1;
349+ }
350+ assert bytesRead == 4 : "Invalid byte count on length read";
351+ return length.getInt();
352+ }
353+ }
354+
355+ /*
356+ * Class to read rows from the input cursor to the Sort,
357+ * converting them to SortKey elements for the Sorter.
358+ */
359+ public class KeyReadCursor extends DataReader<SortKey> {
360+
361+ private int rowCount = 0;
362+ private Key convertKey;
363+ private int rowFields;
364+ private TInstance tFieldTypes[];
365+ private PersistitKeyPValueTarget valueTarget;
366+
367+ public KeyReadCursor () {
368+ this.rowFields = rowType.nFields();
369+ this.convertKey = new Key ((Persistit)null);
370+ this.tFieldTypes = new TInstance[rowFields];
371+ for (int i = 0; i < rowFields; i++) {
372+ tFieldTypes[i] = rowType.typeInstanceAt(i);
373+ }
374+ valueTarget = new PersistitKeyPValueTarget();
375+ valueTarget.attach(convertKey);
376+ }
377+
378+ @Override
379+ public void close() throws IOException {
380+ // Do Nothing;
381+ }
382+
383+ @Override
384+ public int estimateSizeInBytes(SortKey arg0) {
385+ return arg0.getSize();
386+ }
387+
388+ @Override
389+ public SortKey readNext() throws IOException {
390+ SortKey sortKey = null;
391+ loadTap.in();
392+ try {
393+ Row row = input.next();
394+ context.checkQueryCancelation();
395+
396+ if (row != null) {
397+ ++rowCount;
398+ sortKey = new SortKey (createKey(row, rowCount), createValue(row));
399+ }
400+ } finally {
401+ loadTap.out();
402+ }
403+ return sortKey;
404+ }
405+
406+ private List<KeyState> createKey(Row row, int rowCount) {
407+ KeyState[] states = new KeyState[orderChanges.size() - 1];
408+ for(int i = 0; i < states.length; ++i) {
409+ int startOffset = orderChanges.get(i);
410+ int endOffset = orderChanges.get(i + 1);
411+ boolean isLast = i == states.length - 1;
412+ // Loop for key growth
413+ while(true) {
414+ try {
415+ sortKey.clear();
416+ for(int j = startOffset; j < endOffset; ++j) {
417+ sorterAdapter.evaluateToKey(row, j);
418+ }
419+ if(isLast && sorterAdapter.preserveDuplicates()) {
420+ sortKey.append(rowCount);
421+ }
422+ break;
423+ } catch (KeyTooLongException e) {
424+ enlargeKey(sortKey);
425+ }
426+ }
427+ states[i] = new KeyState(sortKey);
428+ }
429+ return Arrays.asList(states);
430+ }
431+
432+ private Key createValue(Row row)
433+ {
434+ while(true) {
435+ try {
436+ convertKey.clear();
437+ for (int i = 0; i < rowFields; i++) {
438+ //sorterAdapter.evaluateToTarget(row, i);
439+ PValueSource field = row.pvalue(i);
440+ //putFieldToTarget(field, i, oFieldTypes, tFieldTypes);
441+ tFieldTypes[i].writeCanonical(field, valueTarget);
442+ }
443+ break;
444+ } catch (KeyTooLongException e) {
445+ enlargeKey(convertKey);
446+ }
447+ }
448+ return new Key(convertKey);
449+ }
450+
451+ private void enlargeKey (Key key) {
452+ if (key.getMaximumSize() == Key.MAX_KEY_LENGTH_UPPER_BOUND) {
453+ throw new KeyTooLongException("Maximum size exceeded=" + Key.MAX_KEY_LENGTH_UPPER_BOUND);
454+ }
455+ key.setMaximumSize(Math.min((key.getMaximumSize() * 2), Key.MAX_KEY_LENGTH_UPPER_BOUND));
456+ }
457+
458+ public int rowCount() {
459+ return rowCount;
460+ }
461+ }
462+
463+ private class KeyWriterFactory extends DataWriterFactory<SortKey> {
464+
465+ @Override
466+ public DataWriter<SortKey> constructWriter(OutputStream arg0)
467+ throws IOException {
468+ return new KeyWriter(arg0);
469+ }
470+ }
471+
472+ public static class KeyWriter extends DataWriter<SortKey> {
473+ private OutputStream os;
474+ private ByteBuffer length;
475+
476+ public KeyWriter(OutputStream os) {
477+ this.os = os;
478+ length = ByteBuffer.allocate(4);
479+ }
480+ @Override
481+ public void close() throws IOException {
482+ os.close();
483+
484+ }
485+
486+ @Override
487+ public void writeEntry(SortKey arg0) throws IOException {
488+ writeInt(arg0.sortKeys.size());
489+ for (KeyState state : arg0.sortKeys) {
490+ writeKeyState (state);
491+ }
492+ writeKey (arg0.rowKey);
493+ }
494+
495+ private void writeKeyState (KeyState state) throws IOException {
496+ writeInt (state.getBytes().length);
497+ os.write(state.getBytes());
498+ }
499+
500+ private void writeKey (Key key) throws IOException {
501+ writeInt(key.getEncodedSize());
502+ os.write(key.getEncodedBytes(), 0, key.getEncodedSize());
503+ }
504+
505+ private void writeInt (int size) throws IOException {
506+ length.clear();
507+ length.putInt(size);
508+ os.write(length.array());
509+ }
510+ }
511+
512+ /*
513+ * Class to provide temporary file names for inserting the
514+ * overflow buffers to disk. Implemented to the MergeJoin sort interface
515+ */
516+ public class MergeTempFileProvider implements TempFileProvider {
517+
518+ private final File directory;
519+ private final String prefix;
520+ private final String suffix;
521+ public MergeTempFileProvider (QueryContext context) {
522+ directory = new File (context.getServiceManager().getConfigurationService().getProperty("akserver.tmp_dir"));
523+ suffix = ".tmp";
524+ String tmpPrefix;
525+ tmpPrefix = "sort-" + context.getSessionId() + "-";
526+ prefix = tmpPrefix;
527+ }
528+
529+ @Override
530+ public File provide() throws IOException {
531+ File f = File.createTempFile(prefix, suffix, directory);
532+ f.deleteOnExit();
533+ return f;
534+ }
535+ }
536+
537+ /*
538+ * Class to create a cursor which reads the final sorted output
539+ * from the file, returning each sorted item as a Row.
540+ */
541+ public static class KeyFinalCursor implements RowCursor {
542+ private boolean isIdle = true;
543+ private boolean isDestroyed = false;
544+
545+ private final KeyReader read;
546+ private final RowType rowType;
547+ private PersistitKeyPValueSource valueSources[];
548+
549+ public KeyFinalCursor (File inputFile, RowType rowType) throws FileNotFoundException {
550+ this (new FileInputStream(inputFile), rowType);
551+ }
552+
553+ public KeyFinalCursor(InputStream stream, RowType rowType) {
554+ read = new KeyReader(stream);
555+ this.rowType = rowType;
556+ valueSources = new PersistitKeyPValueSource[rowType.nFields()];
557+ for (int i = 0; i < rowType.nFields(); i++) {
558+ valueSources[i] = new PersistitKeyPValueSource (rowType.typeInstanceAt(i));
559+ }
560+ }
561+
562+ @Override
563+ public void open() {
564+ CursorLifecycle.checkIdle(this);
565+ isIdle = false;
566+ }
567+
568+ @Override
569+ public Row next() {
570+ CursorLifecycle.checkIdleOrActive(this);
571+ SortKey key;
572+ Row row = null;
573+ try {
574+ key = read.readNext();
575+ } catch (IOException e) {
576+ throw new MergeSortIOException (e);
577+ }
578+ if (key != null) {
579+ row = createRow (key);
580+ return row;
581+ }
582+ return null;
583+ }
584+
585+ private Row createRow (SortKey key) {
586+ ValuesHolderRow rowCopy = new ValuesHolderRow(rowType, true);
587+ for(int i = 0 ; i < rowType.nFields(); ++i) {
588+ valueSources[i].attach(key.rowKey, i, valueSources[i].tInstance());
589+ PValueTargets.copyFrom(valueSources[i], rowCopy.pvalueAt(i));
590+ }
591+ return rowCopy;
592+ }
593+
594+ @Override
595+ public void close() {
596+ CursorLifecycle.checkIdleOrActive(this);
597+ if(!isIdle) {
598+ try {
599+ read.close();
600+ } catch (IOException e) {
601+ // TODO: manage this exception?
602+ }
603+ isIdle = true;
604+ }
605+ }
606+
607+ @Override
608+ public void jump(Row row, ColumnSelector columnSelector) {
609+ throw new UnsupportedOperationException();
610+ }
611+
612+ @Override
613+ public void destroy() {
614+ isDestroyed = true;
615+ }
616+
617+ @Override
618+ public boolean isIdle() {
619+ return !isDestroyed && isIdle;
620+ }
621+
622+ @Override
623+ public boolean isActive() {
624+ return !isDestroyed && !isIdle;
625+ }
626+
627+ @Override
628+ public boolean isDestroyed() {
629+ return isDestroyed;
630+ }
631+ }
632+
633+ /*
634+ * Comparison function, implemented for MergeSort to compare
635+ * the KeyState lists generated by the KeyReadCursor
636+ */
637+ public static class KeySortCompare implements Comparator<SortKey> {
638+ private final Comparator<KeyState>[] comparators;
639+
640+ @SuppressWarnings("unchecked")
641+ private KeySortCompare (List<Comparator<KeyState>> comparators) {
642+ this.comparators = comparators.toArray(new Comparator[comparators.size()]);
643+ }
644+
645+ @Override
646+ public int compare(SortKey o1, SortKey o2) {
647+ int val = 0;
648+ for (int i = 0; (i < comparators.length) && (val == 0); ++i) {
649+ val = comparators[i].compare(o1.sortKeys.get(i), o2.sortKeys.get(i));
650+ }
651+ return val;
652+ }
653+ }
654+
655+ private static final Comparator<KeyState> ASC_COMPARATOR = new Comparator<KeyState>() {
656+ @Override
657+ public int compare(KeyState k1, KeyState k2) {
658+ return k1.compareTo(k2);
659+ }
660+ };
661+
662+ private static final Comparator<KeyState> DESC_COMPARATOR = new Comparator<KeyState>() {
663+ @Override
664+ public int compare(KeyState k1, KeyState k2) {
665+ return k2.compareTo(k1);
666+ }
667+ };
668+}
669\ No newline at end of file
670
671=== modified file 'src/main/java/com/akiban/server/error/ErrorCode.java'
672--- src/main/java/com/akiban/server/error/ErrorCode.java 2013-07-19 19:40:21 +0000
673+++ src/main/java/com/akiban/server/error/ErrorCode.java 2013-07-29 16:50:42 +0000
674@@ -422,6 +422,7 @@
675 PROTOBUF_READ ("53", "00A", Importance.ERROR, ProtobufReadException.class),
676 PROTOBUF_WRITE ("53", "00B", Importance.ERROR, ProtobufWriteException.class),
677 INVALID_ALTER ("53", "00C", Importance.ERROR, InvalidAlterException.class),
678+ MERGE_SORT_IO ("53", "00D", Importance.ERROR, MergeSortIOException.class),
679
680 // Class 55 - Type conversion errors
681 UNKNOWN_TYPE ("55", "001", Importance.DEBUG, UnknownDataTypeException.class),
682
683=== added file 'src/main/java/com/akiban/server/error/MergeSortIOException.java'
684--- src/main/java/com/akiban/server/error/MergeSortIOException.java 1970-01-01 00:00:00 +0000
685+++ src/main/java/com/akiban/server/error/MergeSortIOException.java 2013-07-29 16:50:42 +0000
686@@ -0,0 +1,27 @@
687+/**
688+ * Copyright (C) 2009-2013 Akiban Technologies, Inc.
689+ *
690+ * This program is free software: you can redistribute it and/or modify
691+ * it under the terms of the GNU Affero General Public License as published by
692+ * the Free Software Foundation, either version 3 of the License, or
693+ * (at your option) any later version.
694+ *
695+ * This program is distributed in the hope that it will be useful,
696+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
697+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
698+ * GNU Affero General Public License for more details.
699+ *
700+ * You should have received a copy of the GNU Affero General Public License
701+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
702+ */
703+package com.akiban.server.error;
704+
705+import java.io.IOException;
706+
707+public class MergeSortIOException extends InvalidOperationException {
708+
709+ public MergeSortIOException(IOException ex) {
710+ super(ErrorCode.MERGE_SORT_IO, ex.getMessage());
711+ }
712+
713+}
714
715=== modified file 'src/main/java/com/akiban/server/service/tree/TreeServiceImpl.java'
716--- src/main/java/com/akiban/server/service/tree/TreeServiceImpl.java 2013-05-03 00:59:49 +0000
717+++ src/main/java/com/akiban/server/service/tree/TreeServiceImpl.java 2013-07-29 16:50:42 +0000
718@@ -42,6 +42,7 @@
719 import com.akiban.server.service.session.Session;
720 import com.akiban.server.service.session.SessionService;
721 import com.google.inject.Inject;
722+import com.persistit.Configuration;
723 import com.persistit.Exchange;
724 import com.persistit.Key;
725 import com.persistit.Persistit;
726@@ -63,6 +64,8 @@
727 private static final String PERSISTIT_MODULE_NAME = "persistit.";
728
729 private static final String DATAPATH_PROP_NAME = "datapath";
730+
731+ private static final String TEMPDIR_NAME = "akserver.tmp_dir";
732
733 private static final String BUFFER_SIZE_PROP_NAME = "buffersize";
734
735@@ -189,6 +192,13 @@
736 properties.setProperty(DATAPATH_PROP_NAME, datapath);
737 ensureDirectoryExists(datapath, false);
738
739+ //
740+ // Copied the akserver.tmp_dir property to the Persistit tmpvoldir
741+ // The latter is used for temporary Persistit volumes used for sorting.
742+ final String tmpPath = configService.getProperty(TEMPDIR_NAME);
743+ properties.setProperty(Configuration.TEMPORARY_VOLUME_DIR_PROPERTY_NAME, tmpPath);
744+ ensureDirectoryExists(tmpPath, false);
745+
746 // Get the configured buffer size:
747 // Default is 16K. Can be overridden with
748 //
749
750=== modified file 'src/main/resources/com/akiban/server/error/error_code.properties'
751--- src/main/resources/com/akiban/server/error/error_code.properties 2013-07-19 19:40:21 +0000
752+++ src/main/resources/com/akiban/server/error/error_code.properties 2013-07-29 16:50:42 +0000
753@@ -302,6 +302,8 @@
754 PROTOBUF_READ = Error while deserializing protobuf message type {0}: {1}
755 PROTOBUF_WRITE = Error while serializing protobuf message type {0}: {1}
756 INVALID_ALTER = Invalid alter request on `{0}`.`{1}`: {2}
757+MERGE_SORT_IO = Merge Sort had an unexpected IOException: {0}
758+
759 #
760 # Class 54 - Program limit exceeded
761 #
762
763=== modified file 'src/main/resources/com/akiban/server/service/config/configuration-defaults.properties'
764--- src/main/resources/com/akiban/server/service/config/configuration-defaults.properties 2013-07-26 23:42:34 +0000
765+++ src/main/resources/com/akiban/server/service/config/configuration-defaults.properties 2013-07-29 16:50:42 +0000
766@@ -33,6 +33,7 @@
767 akserver.routines.script_class_path=
768 akserver.index_statistics.bucket_count=32
769 akserver.restrict_user_schema=false
770+akserver.tmp_dir=/tmp
771 akserver.text.indexpath=/tmp/aktext
772 akserver.text.backgroundInterval=3000
773 # EXPERIMENTAL
774@@ -44,6 +45,8 @@
775 akserver.write_lock_enabled=true
776 akserver.lookaheadQuantum.indexScan=1
777 akserver.lookaheadQuantum.groupLookup=1
778+## 64M per sort instance
779+akserver.sort.memory=67108864
780
781 persistit.buffersize=16384
782
783
784=== added directory 'src/test/java/com/akiban/qp/persistitadapter'
785=== added directory 'src/test/java/com/akiban/qp/persistitadapter/indexcursor'
786=== added file 'src/test/java/com/akiban/qp/persistitadapter/indexcursor/KeyFinalCursorIT.java'
787--- src/test/java/com/akiban/qp/persistitadapter/indexcursor/KeyFinalCursorIT.java 1970-01-01 00:00:00 +0000
788+++ src/test/java/com/akiban/qp/persistitadapter/indexcursor/KeyFinalCursorIT.java 2013-07-29 16:50:42 +0000
789@@ -0,0 +1,170 @@
790+/**
791+ * Copyright (C) 2009-2013 Akiban Technologies, Inc.
792+ *
793+ * This program is free software: you can redistribute it and/or modify
794+ * it under the terms of the GNU Affero General Public License as published by
795+ * the Free Software Foundation, either version 3 of the License, or
796+ * (at your option) any later version.
797+ *
798+ * This program is distributed in the hope that it will be useful,
799+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
800+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
801+ * GNU Affero General Public License for more details.
802+ *
803+ * You should have received a copy of the GNU Affero General Public License
804+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
805+ */
806+
807+package com.akiban.qp.persistitadapter.indexcursor;
808+
809+import static com.akiban.qp.operator.API.cursor;
810+import static com.akiban.qp.operator.API.valuesScan_Default;
811+import static org.junit.Assert.*;
812+
813+import java.io.ByteArrayInputStream;
814+import java.io.ByteArrayOutputStream;
815+import java.io.IOException;
816+import java.util.ArrayList;
817+import java.util.List;
818+import java.util.Random;
819+
820+import org.junit.Before;
821+import org.junit.Test;
822+
823+import com.akiban.qp.operator.API;
824+import com.akiban.qp.operator.Cursor;
825+import com.akiban.qp.operator.Operator;
826+import com.akiban.qp.operator.RowCursor;
827+import com.akiban.qp.persistitadapter.indexcursor.MergeJoinSorter.KeyReadCursor;
828+import com.akiban.qp.persistitadapter.indexcursor.MergeJoinSorter.KeyReader;
829+import com.akiban.qp.persistitadapter.indexcursor.MergeJoinSorter.KeyWriter;
830+import com.akiban.qp.persistitadapter.indexcursor.MergeJoinSorter.KeyFinalCursor;
831+import com.akiban.qp.persistitadapter.indexcursor.MergeJoinSorter.SortKey;
832+import com.akiban.qp.row.BindableRow;
833+import com.akiban.qp.row.RowBase;
834+import com.akiban.qp.rowtype.RowType;
835+import com.akiban.qp.rowtype.Schema;
836+import com.akiban.server.test.it.qp.OperatorITBase;
837+import com.akiban.server.test.it.qp.TestRow;
838+import com.akiban.server.types3.mcompat.mtypes.MNumeric;
839+import com.akiban.server.types3.mcompat.mtypes.MString;
840+import com.akiban.server.types3.texpressions.TPreparedField;
841+import com.akiban.util.tap.Tap;
842+import com.persistit.Key;
843+
844+public class KeyFinalCursorIT extends OperatorITBase {
845+
846+
847+ private Schema schema;
848+ private ByteArrayOutputStream os;
849+ private ByteArrayInputStream is;
850+ private SortKey startKey;
851+ private KeyWriter writer;
852+ private List<BindableRow> bindRows;
853+ private final Random random = new Random (100);
854+
855+ @Before
856+ public void createFileBuffers() {
857+ schema = new Schema(ais());
858+ os = new ByteArrayOutputStream();
859+ startKey = new SortKey();
860+ writer = new KeyWriter(os);
861+ bindRows = new ArrayList<>();
862+
863+ }
864+
865+ @Test
866+ public void cycleComplete() throws IOException {
867+ RowType rowType = schema.newValuesType(MNumeric.INT.instance(true));
868+
869+ TestRow[] rows = new TestRow[] {
870+ row(rowType, 1L),
871+ };
872+
873+ bindRows.add(BindableRow.of(rows[0], true));
874+
875+ RowCursor cursor = cycleRows(rowType);
876+ compareRows(rows, cursor);
877+ }
878+
879+ @Test
880+ public void cycleNValues() throws IOException {
881+ RowType rowType = schema.newValuesType(MNumeric.INT.instance(false),MNumeric.INT.instance(true), MString.varchar());
882+ TestRow[] rows = new TestRow[] {
883+ row(rowType, 1L, 100L, "A"),
884+ };
885+
886+ bindRows.add(BindableRow.of(rows[0], true));
887+ RowCursor cursor = cycleRows (rowType);
888+ compareRows(rows, cursor);
889+ }
890+
891+ @Test
892+ public void cycleNRows() throws IOException {
893+ RowType rowType = schema.newValuesType(MNumeric.INT.instance(true));
894+
895+ List<TestRow> rows = new ArrayList<>();
896+ for (long i = 0; i < 100; i++) {
897+ TestRow row = row (rowType, i);
898+ rows.add(row);
899+ bindRows.add(BindableRow.of(row, true));
900+ }
901+ RowCursor cursor = cycleRows (rowType);
902+
903+ TestRow[] rowArray = new TestRow[rows.size()];
904+ compareRows (rows.toArray(rowArray), cursor);
905+ }
906+
907+ @Test
908+ public void cycleManyRows() throws IOException {
909+ RowType rowType = schema.newValuesType(MNumeric.INT.instance(false), MNumeric.INT.instance(true), MString.varchar());
910+ List<TestRow> rows = new ArrayList<>();
911+ for (long i = 0; i < 100; i++) {
912+ TestRow row = row (rowType, random.nextInt(), i,
913+ characters(5+random.nextInt(1000)));
914+ rows.add(row);
915+ bindRows.add(BindableRow.of(row, true));
916+ }
917+ RowCursor cursor = cycleRows(rowType);
918+ TestRow[] rowArray = new TestRow[rows.size()];
919+ compareRows (rows.toArray(rowArray), cursor);
920+
921+ }
922+
923+ private RowCursor cycleRows(RowType rowType) throws IOException {
924+ KeyReadCursor keyCursor = getKeyCursor(rowType , bindRows);
925+
926+ startKey = keyCursor.readNext();
927+ while (startKey != null) {
928+ writer.writeEntry(startKey);
929+ startKey = keyCursor.readNext();
930+ }
931+
932+ is = new ByteArrayInputStream (os.toByteArray());
933+ RowCursor cursor = new KeyFinalCursor(is, rowType);
934+ return cursor;
935+
936+ }
937+
938+ private KeyReadCursor getKeyCursor (RowType rowType, List<BindableRow> rows) {
939+
940+ Operator op = valuesScan_Default(rows, rowType);
941+ Cursor cursor = cursor(op, queryContext, queryBindings);
942+ API.Ordering ordering = API.ordering();
943+ ordering.append(new TPreparedField (rowType.typeInstanceAt(0), 0), true);
944+
945+ MergeJoinSorter mergeSorter = new MergeJoinSorter(queryContext, queryBindings, cursor,
946+ rowType, ordering, API.SortOption.PRESERVE_DUPLICATES, Tap.createTimer("Test Tap"));
947+
948+ cursor.open();
949+ return mergeSorter.readCursor();
950+ }
951+ static final String ALPHA = "ABCDEFGHIJKLMNOPQRSTUVWXYZ";
952+ public String characters(final int length) {
953+ StringBuilder sb = new StringBuilder(length);
954+ for( int i = 0; i < length; i++ )
955+ sb.append(ALPHA.charAt(random.nextInt(ALPHA.length())));
956+ return sb.toString();
957+ }
958+
959+}
960
961=== added file 'src/test/java/com/akiban/qp/persistitadapter/indexcursor/KeyReaderWriterTest.java'
962--- src/test/java/com/akiban/qp/persistitadapter/indexcursor/KeyReaderWriterTest.java 1970-01-01 00:00:00 +0000
963+++ src/test/java/com/akiban/qp/persistitadapter/indexcursor/KeyReaderWriterTest.java 2013-07-29 16:50:42 +0000
964@@ -0,0 +1,182 @@
965+/**
966+ * Copyright (C) 2009-2013 Akiban Technologies, Inc.
967+ *
968+ * This program is free software: you can redistribute it and/or modify
969+ * it under the terms of the GNU Affero General Public License as published by
970+ * the Free Software Foundation, either version 3 of the License, or
971+ * (at your option) any later version.
972+ *
973+ * This program is distributed in the hope that it will be useful,
974+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
975+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
976+ * GNU Affero General Public License for more details.
977+ *
978+ * You should have received a copy of the GNU Affero General Public License
979+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
980+ */
981+package com.akiban.qp.persistitadapter.indexcursor;
982+
983+import static org.junit.Assert.*;
984+
985+import java.io.ByteArrayInputStream;
986+import java.io.ByteArrayOutputStream;
987+import java.io.IOException;
988+import java.util.ArrayList;
989+import java.util.List;
990+import java.util.Random;
991+
992+import org.junit.Before;
993+import org.junit.Test;
994+
995+import com.akiban.qp.persistitadapter.indexcursor.MergeJoinSorter.KeyReader;
996+import com.akiban.qp.persistitadapter.indexcursor.MergeJoinSorter.KeyWriter;
997+import com.akiban.qp.persistitadapter.indexcursor.MergeJoinSorter.SortKey;
998+import com.persistit.Key;
999+import com.persistit.KeyState;
1000+import com.persistit.Persistit;
1001+
1002+public class KeyReaderWriterTest {
1003+
1004+ private ByteArrayOutputStream os;
1005+ private ByteArrayInputStream is;
1006+ private SortKey startKey;
1007+ private Key testKey;
1008+ private KeyWriter writer;
1009+
1010+ @Before
1011+ public void createFileBuffers() {
1012+ os = new ByteArrayOutputStream();
1013+ startKey = new SortKey();
1014+ testKey = new Key ((Persistit)null);
1015+ writer = new KeyWriter(os);
1016+ }
1017+ @Test
1018+ public void cycleSimple() throws IOException {
1019+ testKey.append(1);
1020+ startKey.sortKeys.add(new KeyState(testKey));
1021+ startKey.rowKey.append(1);
1022+ writer.writeEntry(startKey);
1023+ verifyInput();
1024+ }
1025+
1026+ @Test
1027+ public void cycleString() throws IOException {
1028+ testKey.append("abcd");
1029+ startKey.sortKeys.add(new KeyState(testKey));
1030+ startKey.rowKey.append("abcd");
1031+ writer.writeEntry(startKey);
1032+ verifyInput();
1033+ }
1034+
1035+ @Test
1036+ public void cycleNIntegers() throws IOException {
1037+ for (int i = 0; i < 400; i++) {
1038+ testKey.append(i);
1039+ startKey.rowKey.append(i);
1040+ }
1041+ startKey.sortKeys.add(new KeyState(testKey));
1042+ writer.writeEntry(startKey);
1043+ verifyInput();
1044+ }
1045+
1046+ @Test
1047+ public void cycle2Keys() throws IOException {
1048+
1049+ testKey.append(1);
1050+ startKey.sortKeys.add(new KeyState(testKey));
1051+ startKey.rowKey.append(1);
1052+ writer.writeEntry(startKey);
1053+ writer.writeEntry(startKey);
1054+
1055+ is = new ByteArrayInputStream (os.toByteArray());
1056+ KeyReader reader = new KeyReader (is);
1057+
1058+ SortKey endKey = reader.readNext();
1059+ assertTrue (startKey.rowKey.compareTo(endKey.rowKey) == 0);
1060+ assertTrue (startKey.sortKeys.get(0).compareTo(endKey.sortKeys.get(0)) == 0);
1061+ endKey = reader.readNext();
1062+ assertTrue (startKey.rowKey.compareTo(endKey.rowKey) == 0);
1063+ assertTrue (startKey.sortKeys.get(0).compareTo(endKey.sortKeys.get(0)) == 0);
1064+ endKey = reader.readNext();
1065+ assertNull (endKey);
1066+ }
1067+
1068+ @Test
1069+ public void cycleNKeys() throws IOException{
1070+ List<SortKey> keys = new ArrayList<>(100);
1071+ for (int i = 0; i < 100; i++) {
1072+ SortKey newKey = new SortKey();
1073+ newKey.rowKey.append(i);
1074+ testKey.clear();
1075+ testKey.append(i);
1076+ newKey.sortKeys.add(new KeyState (testKey));
1077+ keys.add(newKey);
1078+ }
1079+ verifyNKeys (keys);
1080+ }
1081+
1082+ @Test
1083+ public void cycleNStrings() throws IOException {
1084+ List<SortKey> keys = new ArrayList<>(100);
1085+ for (int i = 0; i < 100; i++) {
1086+ SortKey newKey = new SortKey();
1087+ String value = characters(5+random.nextInt(1000));
1088+ newKey.rowKey.append(value);
1089+ testKey.clear();
1090+ testKey.append(value);
1091+ newKey.sortKeys.add(new KeyState (testKey));
1092+ keys.add(newKey);
1093+ }
1094+ verifyNKeys (keys);
1095+ }
1096+
1097+ @Test
1098+ public void cycleNMultiKeys () throws IOException {
1099+ List<SortKey> keys = new ArrayList<>(100);
1100+ for (int i = 0; i < 100; i++) {
1101+ SortKey newKey = new SortKey();
1102+ newKey.rowKey.append(random.nextInt());
1103+ newKey.rowKey.append(null);
1104+ newKey.rowKey.append(characters(3+random.nextInt(25)));
1105+ newKey.rowKey.append(characters(3+random.nextInt(25)));
1106+ testKey.clear();
1107+ testKey.append(i);
1108+ newKey.sortKeys.add(new KeyState (testKey));
1109+ keys.add(newKey);
1110+ }
1111+ verifyNKeys(keys);
1112+ }
1113+
1114+ private void verifyInput() throws IOException {
1115+ is = new ByteArrayInputStream (os.toByteArray());
1116+ KeyReader reader = new KeyReader (is);
1117+ SortKey endKey = reader.readNext();
1118+ assertTrue (startKey.rowKey.compareTo(endKey.rowKey) == 0);
1119+ assertTrue (startKey.sortKeys.get(0).compareTo(endKey.sortKeys.get(0)) == 0);
1120+
1121+ }
1122+
1123+ private void verifyNKeys(List<SortKey> keys) throws IOException {
1124+ for (SortKey key : keys) {
1125+ writer.writeEntry(key);
1126+ }
1127+ is = new ByteArrayInputStream (os.toByteArray());
1128+ KeyReader reader = new KeyReader (is);
1129+
1130+ SortKey endKey;
1131+ for (SortKey startKey : keys) {
1132+ endKey = reader.readNext();
1133+ assertTrue (startKey.rowKey.compareTo(endKey.rowKey) == 0);
1134+ assertTrue (startKey.sortKeys.get(0).compareTo(endKey.sortKeys.get(0)) == 0);
1135+ }
1136+ }
1137+
1138+ static final String ALPHA = "ABCDEFGHIJKLMNOPQRSTUVWXYZ";
1139+ final Random random = new Random(100);
1140+ public String characters(final int length) {
1141+ StringBuilder sb = new StringBuilder(length);
1142+ for( int i = 0; i < length; i++ )
1143+ sb.append(ALPHA.charAt(random.nextInt(ALPHA.length())));
1144+ return sb.toString();
1145+ }
1146+}
1147
1148=== modified file 'src/test/java/com/akiban/server/test/it/sort/MemorySorterIT.java'
1149--- src/test/java/com/akiban/server/test/it/sort/MemorySorterIT.java 2013-07-08 18:16:50 +0000
1150+++ src/test/java/com/akiban/server/test/it/sort/MemorySorterIT.java 2013-07-29 16:50:42 +0000
1151@@ -23,7 +23,6 @@
1152 import com.akiban.qp.operator.QueryContext;
1153 import com.akiban.qp.persistitadapter.Sorter;
1154 import com.akiban.qp.persistitadapter.indexcursor.MemorySorter;
1155-import com.akiban.qp.persistitadapter.indexcursor.PersistitSorter;
1156 import com.akiban.qp.rowtype.RowType;
1157 import com.akiban.util.tap.InOutTap;
1158
1159
1160=== added file 'src/test/java/com/akiban/server/test/it/sort/MergeJoinSorterIT.java'
1161--- src/test/java/com/akiban/server/test/it/sort/MergeJoinSorterIT.java 1970-01-01 00:00:00 +0000
1162+++ src/test/java/com/akiban/server/test/it/sort/MergeJoinSorterIT.java 2013-07-29 16:50:42 +0000
1163@@ -0,0 +1,49 @@
1164+/**
1165+ * Copyright (C) 2009-2013 Akiban Technologies, Inc.
1166+ *
1167+ * This program is free software: you can redistribute it and/or modify
1168+ * it under the terms of the GNU Affero General Public License as published by
1169+ * the Free Software Foundation, either version 3 of the License, or
1170+ * (at your option) any later version.
1171+ *
1172+ * This program is distributed in the hope that it will be useful,
1173+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
1174+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
1175+ * GNU Affero General Public License for more details.
1176+ *
1177+ * You should have received a copy of the GNU Affero General Public License
1178+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
1179+ */
1180+package com.akiban.server.test.it.sort;
1181+
1182+import java.util.HashMap;
1183+import java.util.Map;
1184+
1185+import com.akiban.qp.operator.API.Ordering;
1186+import com.akiban.qp.operator.API.SortOption;
1187+import com.akiban.qp.operator.Cursor;
1188+import com.akiban.qp.operator.QueryBindings;
1189+import com.akiban.qp.operator.QueryContext;
1190+import com.akiban.qp.persistitadapter.Sorter;
1191+import com.akiban.qp.persistitadapter.indexcursor.MergeJoinSorter;
1192+import com.akiban.qp.rowtype.RowType;
1193+import com.akiban.util.tap.InOutTap;
1194+
1195+public class MergeJoinSorterIT extends SorterITBase {
1196+
1197+ @Override
1198+ public Map<String,String> startupConfigProperties() {
1199+ Map<String,String> props = new HashMap<>();
1200+ props.putAll(super.startupConfigProperties());
1201+ props.put("akserver.tmp_dir","/tmp/akserver-junit/");
1202+ return props;
1203+ }
1204+
1205+ @Override
1206+ public Sorter createSorter(QueryContext context, QueryBindings bindings,
1207+ Cursor input, RowType rowType, Ordering ordering,
1208+ SortOption sortOption, InOutTap loadTap) {
1209+ return new MergeJoinSorter(context, bindings, input, rowType, ordering, sortOption, loadTap);
1210+ }
1211+
1212+}

Subscribers

People subscribed via source and target branches