Merge lp:~mmcm/akiban-server/pipeline-select-bloom-filter into lp:~akiban-technologies/akiban-server/trunk

Proposed by Mike McMahon
Status: Merged
Approved by: Nathan Williams
Approved revision: 2717
Merged at revision: 2707
Proposed branch: lp:~mmcm/akiban-server/pipeline-select-bloom-filter
Merge into: lp:~akiban-technologies/akiban-server/trunk
Prerequisite: lp:~mmcm/akiban-server/pipeline-union-all
Diff against target: 505 lines (+221/-29)
11 files modified
src/main/java/com/akiban/qp/operator/API.java (+19/-7)
src/main/java/com/akiban/qp/operator/Map_NestedLoops.java (+11/-7)
src/main/java/com/akiban/qp/operator/Select_BloomFilter.java (+132/-6)
src/main/java/com/akiban/qp/operator/Using_BloomFilter.java (+3/-1)
src/main/java/com/akiban/sql/optimizer/rule/OperatorAssembler.java (+5/-1)
src/main/java/com/akiban/sql/optimizer/rule/PipelineConfiguration.java (+7/-0)
src/test/java/com/akiban/server/test/costmodel/Select_BloomFilterCT.java (+4/-4)
src/test/java/com/akiban/server/test/it/qp/Map_NestedLoopsIT.java (+1/-1)
src/test/java/com/akiban/server/test/it/qp/Select_BloomFilterIT.java (+12/-1)
src/test/java/com/akiban/server/test/it/qp/Select_BloomFilterPipelineIT.java (+26/-0)
src/test/java/com/akiban/server/test/it/qp/Select_BloomFilter_CaseInsensitive_IT.java (+1/-1)
To merge this branch: bzr merge lp:~mmcm/akiban-server/pipeline-select-bloom-filter
Reviewer Review Type Date Requested Status
Nathan Williams Approve
Review via email: mp+175701@code.launchpad.net

Description of the change

Pipeline support for Select_BloomFilter.

The pipeline looks like Map_NestedLoops. In fact, the implementation inherits from those cursor classes.

Input rows are hashed against the Bloom filter taken from the given binding position. If this indicates that they might match, a query bindings is output with the row in that position (a different bindings object, so using the same position is now somewhat safer). This stream of bindings feeds the check plan. At the other end, if any bindings at the given depth has a row (from the check plan), the original input row from those bindings is output.

To post a comment you must log in.
2717. By Mike McMahon

Accidental.

Revision history for this message
Nathan Williams (nwilliams) wrote :

Looks good.

review: Approve

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'src/main/java/com/akiban/qp/operator/API.java'
2--- src/main/java/com/akiban/qp/operator/API.java 2013-07-18 22:48:23 +0000
3+++ src/main/java/com/akiban/qp/operator/API.java 2013-07-18 22:48:23 +0000
4@@ -710,18 +710,22 @@
5 public static Operator select_BloomFilter(Operator input,
6 Operator onPositive,
7 List<? extends ExpressionGenerator> filterFields,
8- int bindingPosition)
9+ int bindingPosition,
10+ boolean pipeline,
11+ int depth)
12 {
13- return select_BloomFilter(input, onPositive, generateOld(filterFields), generateNew(filterFields), null, bindingPosition);
14+ return select_BloomFilter(input, onPositive, generateOld(filterFields), generateNew(filterFields), null, bindingPosition, pipeline, depth);
15 }
16
17 public static Operator select_BloomFilter(Operator input,
18 Operator onPositive,
19 List<? extends Expression> filterFields,
20 List<? extends TPreparedExpression> tFilterFields,
21- int bindingPosition)
22+ int bindingPosition,
23+ boolean pipeline,
24+ int depth)
25 {
26- return select_BloomFilter(input, onPositive, filterFields, tFilterFields, null, bindingPosition);
27+ return select_BloomFilter(input, onPositive, filterFields, tFilterFields, null, bindingPosition, pipeline, depth);
28 }
29
30 public static Operator select_BloomFilter(Operator input,
31@@ -729,14 +733,18 @@
32 List<? extends Expression> filterFields,
33 List<? extends TPreparedExpression> tFilterFields,
34 List<AkCollator> collators,
35- int bindingPosition)
36+ int bindingPosition,
37+ boolean pipeline,
38+ int depth)
39 {
40 return new Select_BloomFilter(input,
41 onPositive,
42 filterFields,
43 tFilterFields,
44 collators,
45- bindingPosition);
46+ bindingPosition,
47+ pipeline,
48+ depth);
49 }
50
51 public static Operator select_BloomFilter(Operator input,
52@@ -744,6 +752,8 @@
53 List<? extends ExpressionGenerator> filterFields,
54 List<AkCollator> collators,
55 int bindingPosition,
56+ boolean pipeline,
57+ int depth,
58 ExpressionGenerator.ErasureMaker marker)
59 {
60 return new Select_BloomFilter(input,
61@@ -751,7 +761,9 @@
62 generateOld(filterFields),
63 generateNew(filterFields),
64 collators,
65- bindingPosition);
66+ bindingPosition,
67+ pipeline,
68+ depth);
69 }
70
71 // EmitBoundRow_Nested
72
73=== modified file 'src/main/java/com/akiban/qp/operator/Map_NestedLoops.java'
74--- src/main/java/com/akiban/qp/operator/Map_NestedLoops.java 2013-07-18 14:20:06 +0000
75+++ src/main/java/com/akiban/qp/operator/Map_NestedLoops.java 2013-07-18 22:48:23 +0000
76@@ -174,9 +174,9 @@
77 // Pipeline execution: turn outer loop row stream into binding stream for inner loop.
78 protected static class RowToBindingsCursor implements QueryBindingsCursor
79 {
80- private final Cursor input;
81- private final int depth, bindingPosition;
82- private QueryBindings baseBindings;
83+ protected final Cursor input;
84+ protected final int depth, bindingPosition;
85+ protected QueryBindings baseBindings;
86
87 public RowToBindingsCursor(Cursor input, int bindingPosition, int depth) {
88 this.input = input;
89@@ -193,7 +193,7 @@
90 @Override
91 public QueryBindings nextBindings() {
92 if (baseBindings != null) {
93- Row row = input.next();
94+ Row row = nextInputRow();
95 if (row != null) {
96 QueryBindings bindings = baseBindings.createBindings();
97 assert (bindings.getDepth() == depth);
98@@ -231,6 +231,10 @@
99 input.cancelBindings(bindings);
100 }
101 }
102+
103+ protected Row nextInputRow() {
104+ return input.next();
105+ }
106 }
107
108 // Other end of pipeline: remove the extra binding levels that we
109@@ -238,9 +242,9 @@
110 // entire outer rowset.
111 protected static class CollapseBindingsCursor extends OperatorCursor
112 {
113- private final Cursor input;
114- private final int depth;
115- private QueryBindings currentBindings, pendingBindings, openBindings, inputOpenBindings;
116+ protected final Cursor input;
117+ protected final int depth;
118+ protected QueryBindings currentBindings, pendingBindings, openBindings, inputOpenBindings;
119
120 public CollapseBindingsCursor(QueryContext context, Cursor input, int depth) {
121 super(context);
122
123=== modified file 'src/main/java/com/akiban/qp/operator/Select_BloomFilter.java'
124--- src/main/java/com/akiban/qp/operator/Select_BloomFilter.java 2013-07-15 22:01:49 +0000
125+++ src/main/java/com/akiban/qp/operator/Select_BloomFilter.java 2013-07-18 22:48:23 +0000
126@@ -53,6 +53,10 @@
127 * loaded by the Using_BloomFilter operator. This bindingPosition is also used to hold a row from
128 * the input stream that passes the filter and needs to be tested for existence using the onPositive
129 * operator.
130+ <li><b>boolean pipeline:</b> Whether to use bracketing cursors instead of rebinding.
131+
132+ <li><b>int depth:</b> Number of nested Maps, including this one.
133+
134 * <p/>
135 * <h1>Behavior</h1>
136 * <p/>
137@@ -103,10 +107,19 @@
138 @Override
139 protected Cursor cursor(QueryContext context, QueryBindingsCursor bindingsCursor)
140 {
141- if (tFields == null)
142- return new Execution<>(context, bindingsCursor, fields, oldExpressionsAdapater);
143- else
144- return new Execution<>(context, bindingsCursor, tFields, newExpressionsAdapter);
145+ if (!pipeline) {
146+ if (tFields == null)
147+ return new Execution<>(context, bindingsCursor, fields, oldExpressionsAdapater);
148+ else
149+ return new Execution<>(context, bindingsCursor, tFields, newExpressionsAdapter);
150+ }
151+ else {
152+ assert (tFields != null);
153+ Cursor inputCursor = input.cursor(context, bindingsCursor);
154+ QueryBindingsCursor toBindings = new FilterBindingsCursor(context, inputCursor, bindingPosition, depth, tFields, newExpressionsAdapter);
155+ Cursor checkCursor = onPositive.cursor(context, toBindings);
156+ return new RecoverRowsCursor(context, checkCursor, bindingPosition, depth);
157+ }
158 }
159
160 @Override
161@@ -128,7 +141,9 @@
162 List<? extends Expression> fields,
163 List<? extends TPreparedExpression> tFields,
164 List<AkCollator> collators,
165- int bindingPosition)
166+ int bindingPosition,
167+ boolean pipeline,
168+ int depth)
169 {
170 ArgumentValidation.notNull("input", input);
171 ArgumentValidation.notNull("onPositive", onPositive);
172@@ -142,9 +157,12 @@
173 }
174 ArgumentValidation.isGT("fields.size()", size, 0);
175 ArgumentValidation.isGTE("bindingPosition", bindingPosition, 0);
176+ ArgumentValidation.isGT("depth", depth, 0);
177 this.input = input;
178 this.onPositive = onPositive;
179 this.bindingPosition = bindingPosition;
180+ this.pipeline = pipeline;
181+ this.depth = depth;
182 this.fields = fields;
183 this.tFields = tFields;
184 this.collators = collators;
185@@ -168,7 +186,8 @@
186
187 private final Operator input;
188 private final Operator onPositive;
189- private final int bindingPosition;
190+ private final int bindingPosition, depth;
191+ private final boolean pipeline;
192 private final List<? extends Expression> fields;
193 private final List<? extends TPreparedExpression> tFields;
194 private final List<AkCollator> collators;
195@@ -380,6 +399,9 @@
196 // It is safe to reuse the binding position in this way because the filter is extracted and stored
197 // in a field during open(), while the use of the binding position for use in the onPositive lookup
198 // occurs during next().
199+ if (LOG_EXECUTION) {
200+ LOG.debug("Select_BloomFilter: candidate {}", row);
201+ }
202 TAP_CHECK.in();
203 try {
204 bindings.setRow(bindingPosition, row);
205@@ -408,4 +430,108 @@
206 private boolean idle = true;
207 private boolean destroyed = false;
208 }
209+
210+ // Turn input rows that match the filter into bindings for the onPositive plan.
211+ private class FilterBindingsCursor extends Map_NestedLoops.RowToBindingsCursor
212+ {
213+ private final StoreAdapter storeAdapter;
214+ private final List<TEvaluatableExpression> fieldEvals = new ArrayList<>();
215+ private final ExpressionAdapter<TPreparedExpression, TEvaluatableExpression> expressionAdapter;
216+
217+ public FilterBindingsCursor(QueryContext context, Cursor input,
218+ int bindingPosition, int depth,
219+ List<? extends TPreparedExpression> expressions, ExpressionAdapter<TPreparedExpression, TEvaluatableExpression> expressionAdapter) {
220+ super(input, bindingPosition, depth);
221+ this.storeAdapter = context.getStore();
222+ this.expressionAdapter = expressionAdapter;
223+ for (TPreparedExpression field : expressions) {
224+ TEvaluatableExpression eval = expressionAdapter.evaluate(field, context);
225+ fieldEvals.add(eval);
226+ }
227+ }
228+
229+ @Override
230+ protected Row nextInputRow() {
231+ BloomFilter filter = baseBindings.getBloomFilter(bindingPosition);
232+ while (true) {
233+ Row row = input.next();
234+ if (row == null) {
235+ return row;
236+ }
237+ if (filter.maybePresent(hashProjectedRow(row))) {
238+ if (ExecutionBase.LOG_EXECUTION) {
239+ LOG.debug("Select_BloomFilter: candidate {}", row);
240+ }
241+ return row;
242+ }
243+ }
244+ }
245+
246+ private int hashProjectedRow(Row row)
247+ {
248+ int hash = 0;
249+ for (int f = 0; f < fieldEvals.size(); f++) {
250+ TEvaluatableExpression fieldEval = fieldEvals.get(f);
251+ hash = hash ^ expressionAdapter.hash(storeAdapter, fieldEval, row, collator(f));
252+ }
253+ return hash;
254+ }
255+ }
256+
257+ // If any context at our depth has a non-empty rowset from
258+ // onPositive, it passed, so let it through.
259+ private static class RecoverRowsCursor extends Map_NestedLoops.CollapseBindingsCursor
260+ {
261+ private final int bindingPosition;
262+
263+ public RecoverRowsCursor(QueryContext context, Cursor input, int bindingPosition, int depth) {
264+ super(context, input, depth);
265+ this.bindingPosition = bindingPosition;
266+ }
267+
268+ @Override
269+ public Row next() {
270+ if (TAP_NEXT_ENABLED) {
271+ TAP_NEXT.in();
272+ }
273+ try {
274+ if (CURSOR_LIFECYCLE_ENABLED) {
275+ CursorLifecycle.checkIdleOrActive(this);
276+ }
277+ checkQueryCancelation();
278+ Row row = null;
279+ while (true) {
280+ QueryBindings bindings = input.nextBindings();
281+ if (bindings == null) {
282+ openBindings = null;
283+ break;
284+ }
285+ if (bindings.getDepth() < depth) {
286+ pendingBindings = bindings;
287+ openBindings = null;
288+ break;
289+ }
290+ assert (bindings.getDepth() == depth);
291+ input.open();
292+ inputOpenBindings = bindings;
293+ row = input.next();
294+ input.close();
295+ inputOpenBindings = null;
296+ if (row != null) {
297+ row = bindings.getRow(bindingPosition);
298+ break;
299+ }
300+ }
301+ if (LOG_EXECUTION) {
302+ LOG.debug("Select_BloomFilter: yield {}", row);
303+ }
304+ return row;
305+ }
306+ finally {
307+ if (TAP_NEXT_ENABLED) {
308+ TAP_NEXT.out();
309+ }
310+ }
311+ }
312+ }
313 }
314
315=== modified file 'src/main/java/com/akiban/qp/operator/Using_BloomFilter.java'
316--- src/main/java/com/akiban/qp/operator/Using_BloomFilter.java 2013-07-09 17:26:33 +0000
317+++ src/main/java/com/akiban/qp/operator/Using_BloomFilter.java 2013-07-18 22:48:23 +0000
318@@ -215,7 +215,9 @@
319 {
320 close();
321 input.destroy();
322- bindings.setBloomFilter(filterBindingPosition, null);
323+ if (bindings != null) {
324+ bindings.setBloomFilter(filterBindingPosition, null);
325+ }
326 }
327
328 // Execution interface
329
330=== modified file 'src/main/java/com/akiban/sql/optimizer/rule/OperatorAssembler.java'
331--- src/main/java/com/akiban/sql/optimizer/rule/OperatorAssembler.java 2013-07-18 22:48:23 +0000
332+++ src/main/java/com/akiban/sql/optimizer/rule/OperatorAssembler.java 2013-07-18 22:48:23 +0000
333@@ -1827,6 +1827,7 @@
334 int pos = getHashTablePosition(bloomFilter);
335 RowStream stream = assembleStream(bloomFilterFilter.getInput());
336 boundRows.set(pos, stream.fieldOffsets);
337+ nestedBindingsDepth++;
338 RowStream cstream = assembleStream(bloomFilterFilter.getCheck());
339 boundRows.set(pos, null);
340 List<Expression> fields = oldPartialAssembler.assembleExpressions(bloomFilterFilter.getLookupExpressions(),
341@@ -1842,7 +1843,10 @@
342 fields,
343 tFields,
344 collators,
345- pos + loopBindingsOffset);
346+ pos + loopBindingsOffset,
347+ rulesContext.getPipelineConfiguration().isSelectBloomFilterEnabled(),
348+ nestedBindingsDepth);
349+ nestedBindingsDepth--;
350 return stream;
351 }
352
353
354=== modified file 'src/main/java/com/akiban/sql/optimizer/rule/PipelineConfiguration.java'
355--- src/main/java/com/akiban/sql/optimizer/rule/PipelineConfiguration.java 2013-07-13 20:04:24 +0000
356+++ src/main/java/com/akiban/sql/optimizer/rule/PipelineConfiguration.java 2013-07-18 22:48:23 +0000
357@@ -25,6 +25,7 @@
358 private int indexScanLookaheadQuantum = 1;
359 private int groupLookupLookaheadQuantum = 1;
360 private boolean unionAllOpenBoth = false;
361+ private boolean selectBloomFilterEnabled = false;
362
363 public PipelineConfiguration() {
364 }
365@@ -49,6 +50,10 @@
366 return unionAllOpenBoth;
367 }
368
369+ public boolean isSelectBloomFilterEnabled() {
370+ return selectBloomFilterEnabled;
371+ }
372+
373 public void load(Properties properties) {
374 for (String prop : properties.stringPropertyNames()) {
375 String val = properties.getProperty(prop);
376@@ -60,6 +65,8 @@
377 groupLookupLookaheadQuantum = Integer.parseInt(val);
378 else if ("unionAll.openBoth".equals(prop))
379 unionAllOpenBoth = Boolean.parseBoolean(val);
380+ else if ("selectBloomFilter.enabled".equals(prop))
381+ selectBloomFilterEnabled = Boolean.parseBoolean(val);
382 else
383 throw new IllegalArgumentException("Unknown property " + prop);
384 }
385
386=== modified file 'src/test/java/com/akiban/server/test/costmodel/Select_BloomFilterCT.java'
387--- src/test/java/com/akiban/server/test/costmodel/Select_BloomFilterCT.java 2013-07-09 21:10:22 +0000
388+++ src/test/java/com/akiban/server/test/costmodel/Select_BloomFilterCT.java 2013-07-18 22:48:23 +0000
389@@ -160,8 +160,8 @@
390 new Ordering()),
391 // filterFields
392 Arrays.asList(ExpressionGenerators.field(dIndexRowType, 0)),
393- // filterBindingPosition
394- 0));
395+ // filterBindingPosition, pipeline, depth
396+ 0, false, 1));
397 return plan;
398 }
399
400@@ -213,8 +213,8 @@
401 new Ordering()),
402 // filterFields
403 Arrays.asList(ExpressionGenerators.field(dIndexRowType, 0)),
404- // filterBindingPosition
405- 0));
406+ // filterBindingPosition, pipeline, depth
407+ 0, false, 1));
408 return plan;
409 }
410
411
412=== modified file 'src/test/java/com/akiban/server/test/it/qp/Map_NestedLoopsIT.java'
413--- src/test/java/com/akiban/server/test/it/qp/Map_NestedLoopsIT.java 2013-07-18 02:32:32 +0000
414+++ src/test/java/com/akiban/server/test/it/qp/Map_NestedLoopsIT.java 2013-07-18 22:48:23 +0000
415@@ -262,7 +262,7 @@
416 public boolean reopenTopLevel() {
417 // You cannot just re-open() a pipelined Map_NestedLoops, but you can
418 // openTopLevel() it again.
419- return true;
420+ return pipelineMap();
421 }
422 };
423 testCursorLifecycle(plan, testCase);
424
425=== modified file 'src/test/java/com/akiban/server/test/it/qp/Select_BloomFilterIT.java'
426--- src/test/java/com/akiban/server/test/it/qp/Select_BloomFilterIT.java 2013-07-08 20:48:05 +0000
427+++ src/test/java/com/akiban/server/test/it/qp/Select_BloomFilterIT.java 2013-07-18 22:48:23 +0000
428@@ -107,6 +107,10 @@
429 use(db);
430 }
431
432+ protected boolean pipelineSelectBloomFilter() {
433+ return false;
434+ }
435+
436 // Test argument validation
437
438 @Test
439@@ -239,6 +243,11 @@
440 row(outputRowType, 6L, 62L, 602L),
441 };
442 }
443+
444+ @Override
445+ public boolean reopenTopLevel() {
446+ return pipelineSelectBloomFilter();
447+ }
448 };
449 testCursorLifecycle(plan, testCase);
450 }
451@@ -301,7 +310,9 @@
452 ExpressionGenerators.field(dIndexRowType, 1),
453 ExpressionGenerators.field(dIndexRowType, 2)),
454 // filterBindingPosition
455- 0)),
456+ 0,
457+ pipelineSelectBloomFilter(),
458+ 1)),
459 dIndexRowType,
460 Arrays.asList(
461 ExpressionGenerators.field(dIndexRowType, 0), // test_id
462
463=== added file 'src/test/java/com/akiban/server/test/it/qp/Select_BloomFilterPipelineIT.java'
464--- src/test/java/com/akiban/server/test/it/qp/Select_BloomFilterPipelineIT.java 1970-01-01 00:00:00 +0000
465+++ src/test/java/com/akiban/server/test/it/qp/Select_BloomFilterPipelineIT.java 2013-07-18 22:48:23 +0000
466@@ -0,0 +1,26 @@
467+/**
468+ * Copyright (C) 2009-2013 Akiban Technologies, Inc.
469+ *
470+ * This program is free software: you can redistribute it and/or modify
471+ * it under the terms of the GNU Affero General Public License as published by
472+ * the Free Software Foundation, either version 3 of the License, or
473+ * (at your option) any later version.
474+ *
475+ * This program is distributed in the hope that it will be useful,
476+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
477+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
478+ * GNU Affero General Public License for more details.
479+ *
480+ * You should have received a copy of the GNU Affero General Public License
481+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
482+ */
483+
484+package com.akiban.server.test.it.qp;
485+
486+public class Select_BloomFilterPipelineIT extends Select_BloomFilterIT
487+{
488+ @Override
489+ protected boolean pipelineSelectBloomFilter() {
490+ return true;
491+ }
492+}
493
494=== modified file 'src/test/java/com/akiban/server/test/it/qp/Select_BloomFilter_CaseInsensitive_IT.java'
495--- src/test/java/com/akiban/server/test/it/qp/Select_BloomFilter_CaseInsensitive_IT.java 2013-07-08 20:48:05 +0000
496+++ src/test/java/com/akiban/server/test/it/qp/Select_BloomFilter_CaseInsensitive_IT.java 2013-07-18 22:48:23 +0000
497@@ -317,7 +317,7 @@
498 // collators
499 collators,
500 // filterBindingPosition
501- 0,
502+ 0, false, 1,
503 ExpressionGenerator.ErasureMaker.MARK),
504 // collators
505 collators

Subscribers

People subscribed via source and target branches