Merge lp:~nwilliams/akiban-server/full-text-background-rewrite into lp:~akiban-technologies/akiban-server/trunk

Proposed by Nathan Williams
Status: Merged
Approved by: Thomas Jones-Low
Approved revision: 2724
Merged at revision: 2720
Proposed branch: lp:~nwilliams/akiban-server/full-text-background-rewrite
Merge into: lp:~akiban-technologies/akiban-server/trunk
Diff against target: 1809 lines (+400/-777)
19 files modified
src/main/java/com/akiban/server/service/BackgroundObserver.java (+0/-24)
src/main/java/com/akiban/server/service/BackgroundObserverImpl.java (+0/-44)
src/main/java/com/akiban/server/service/BackgroundWork.java (+0/-46)
src/main/java/com/akiban/server/service/BackgroundWorkBase.java (+0/-51)
src/main/java/com/akiban/server/service/text/FullTextIndexService.java (+3/-8)
src/main/java/com/akiban/server/service/text/FullTextIndexServiceImpl.java (+261/-274)
src/main/java/com/akiban/server/types3/mcompat/mfuncs/FulltextMaintenanceWait.java (+0/-86)
src/main/java/com/akiban/server/types3/mcompat/mfuncs/WaitFunctionHelpers.java (+0/-79)
src/main/resources/com/akiban/server/service/config/configuration-defaults.properties (+1/-2)
src/test/java/com/akiban/rest/RestServiceFilesIT.java (+3/-23)
src/test/java/com/akiban/server/service/config/TestConfigService.java (+1/-2)
src/test/java/com/akiban/server/service/text/FullTextIndexServiceBug1172013IT.java (+22/-60)
src/test/java/com/akiban/server/service/text/FullTextIndexServiceIT.java (+22/-52)
src/test/java/com/akiban/server/service/text/FullTextIndexServiceITBase.java (+73/-0)
src/test/resources/com/akiban/rest/text/full_text_background_wait (+0/-1)
src/test/resources/com/akiban/sql/pg/yaml/functional/test-drop-fulltext.yaml (+4/-4)
src/test/resources/com/akiban/sql/pg/yaml/functional/test-ft-with-inherited-key.yaml (+2/-4)
src/test/resources/com/akiban/sql/pg/yaml/functional/test-ft-with-pk.yaml (+2/-7)
src/test/resources/com/akiban/sql/pg/yaml/functional/test-fulltext-maintenance.yaml (+6/-10)
To merge this branch: bzr merge lp:~nwilliams/akiban-server/full-text-background-rewrite
Reviewer Review Type Date Requested Status
Thomas Jones-Low Approve
Review via email: mp+177383@code.launchpad.net

Description of the change

Rewrite, and simplify, full text background worker processes.

As shown by our long string of intermittent failures, the background Timer tasks in FullTextIndexServiceImpl weren't stable. In particular, it was possible for multiple instances to run at a single time (e.g. see associated bug), for a test wait to not actually wait for work to complete, and lack of control on when task start and stop. Part of the trouble was using Timer/Task and a larger part was just incorrect code.

Instead, replace with a simple BackgroundRunner helper that has well defined, and checked, state transitions with explicit wait/notify support. Important changes including ensuring that (at most) one runner of each type will be active, a specific sleep object for trivial 'kicking', and a proper waiter method to ensure a complete cycle is observed.

While in the area, remove the more complicated than necessary BackgroundWork/Observer/WaitFunction classes and replace the SQL wait function with a system routine.

To help convince myself of the correctness, this branch 'mvn verify' in a loop on sleepy for most of the weekend. Zero errors were observed in 320 individual runs.

To post a comment you must log in.
Revision history for this message
Thomas Jones-Low (tjoneslo) wrote :

Looks much better. Any time we end up with less code for a bug fix than when we started is a bonus.

review: Approve

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== removed file 'src/main/java/com/akiban/server/service/BackgroundObserver.java'
2--- src/main/java/com/akiban/server/service/BackgroundObserver.java 2013-07-02 19:16:02 +0000
3+++ src/main/java/com/akiban/server/service/BackgroundObserver.java 1970-01-01 00:00:00 +0000
4@@ -1,24 +0,0 @@
5-/**
6- * Copyright (C) 2009-2013 Akiban Technologies, Inc.
7- *
8- * This program is free software: you can redistribute it and/or modify
9- * it under the terms of the GNU Affero General Public License as published by
10- * the Free Software Foundation, either version 3 of the License, or
11- * (at your option) any later version.
12- *
13- * This program is distributed in the hope that it will be useful,
14- * but WITHOUT ANY WARRANTY; without even the implied warranty of
15- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16- * GNU Affero General Public License for more details.
17- *
18- * You should have received a copy of the GNU Affero General Public License
19- * along with this program. If not, see <http://www.gnu.org/licenses/>.
20- */
21-
22-package com.akiban.server.service;
23-
24-public interface BackgroundObserver
25-{
26- public void update(BackgroundWork event);
27- public boolean backgroundFinished();
28-}
29
30=== removed file 'src/main/java/com/akiban/server/service/BackgroundObserverImpl.java'
31--- src/main/java/com/akiban/server/service/BackgroundObserverImpl.java 2013-04-17 16:23:40 +0000
32+++ src/main/java/com/akiban/server/service/BackgroundObserverImpl.java 1970-01-01 00:00:00 +0000
33@@ -1,44 +0,0 @@
34-/**
35- * Copyright (C) 2009-2013 Akiban Technologies, Inc.
36- *
37- * This program is free software: you can redistribute it and/or modify
38- * it under the terms of the GNU Affero General Public License as published by
39- * the Free Software Foundation, either version 3 of the License, or
40- * (at your option) any later version.
41- *
42- * This program is distributed in the hope that it will be useful,
43- * but WITHOUT ANY WARRANTY; without even the implied warranty of
44- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
45- * GNU Affero General Public License for more details.
46- *
47- * You should have received a copy of the GNU Affero General Public License
48- * along with this program. If not, see <http://www.gnu.org/licenses/>.
49- */
50-
51-package com.akiban.server.service;
52-
53-
54-/**
55- *
56- * Set the hasFinished flag to true when the background has done
57- */
58-public class BackgroundObserverImpl implements BackgroundObserver
59-{
60- private volatile boolean hasFinished = false;
61- public BackgroundObserverImpl ()
62- {
63- }
64-
65- @Override
66- public boolean backgroundFinished()
67- {
68- return hasFinished;
69- }
70-
71- @Override
72- public void update(BackgroundWork event)
73- {
74- hasFinished = true;
75- }
76-
77-}
78
79=== removed file 'src/main/java/com/akiban/server/service/BackgroundWork.java'
80--- src/main/java/com/akiban/server/service/BackgroundWork.java 2013-07-02 19:16:02 +0000
81+++ src/main/java/com/akiban/server/service/BackgroundWork.java 1970-01-01 00:00:00 +0000
82@@ -1,46 +0,0 @@
83-/**
84- * Copyright (C) 2009-2013 Akiban Technologies, Inc.
85- *
86- * This program is free software: you can redistribute it and/or modify
87- * it under the terms of the GNU Affero General Public License as published by
88- * the Free Software Foundation, either version 3 of the License, or
89- * (at your option) any later version.
90- *
91- * This program is distributed in the hope that it will be useful,
92- * but WITHOUT ANY WARRANTY; without even the implied warranty of
93- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
94- * GNU Affero General Public License for more details.
95- *
96- * You should have received a copy of the GNU Affero General Public License
97- * along with this program. If not, see <http://www.gnu.org/licenses/>.
98- */
99-
100-package com.akiban.server.service;
101-
102-import java.util.Collection;
103-
104-
105-public interface BackgroundWork
106-{
107- public void addObserver(BackgroundObserver observer);
108-
109- public void removeObservers(Collection<BackgroundObserver> os);
110-
111- public void notifyObservers();
112-
113- /**
114- * @return the minimum time interval that one has to wait to be guaranteed that
115- * the work has been executed at least once.
116- */
117- public abstract long getMinimumWaitTime();
118-
119- /**
120- * Force the work to be executed immediately if it is not already running.
121- *
122- * (This is only a request, not a command. The handler might choose not
123- * to carry it out, if that would cause conflicts)
124- *
125- * @return true if the request is executed, false otherwise
126- */
127- public boolean forceExecution();
128-}
129
130=== removed file 'src/main/java/com/akiban/server/service/BackgroundWorkBase.java'
131--- src/main/java/com/akiban/server/service/BackgroundWorkBase.java 2013-07-02 19:16:02 +0000
132+++ src/main/java/com/akiban/server/service/BackgroundWorkBase.java 1970-01-01 00:00:00 +0000
133@@ -1,51 +0,0 @@
134-/**
135- * Copyright (C) 2009-2013 Akiban Technologies, Inc.
136- *
137- * This program is free software: you can redistribute it and/or modify
138- * it under the terms of the GNU Affero General Public License as published by
139- * the Free Software Foundation, either version 3 of the License, or
140- * (at your option) any later version.
141- *
142- * This program is distributed in the hope that it will be useful,
143- * but WITHOUT ANY WARRANTY; without even the implied warranty of
144- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
145- * GNU Affero General Public License for more details.
146- *
147- * You should have received a copy of the GNU Affero General Public License
148- * along with this program. If not, see <http://www.gnu.org/licenses/>.
149- */
150-
151-package com.akiban.server.service;
152-
153-import java.util.Collection;
154-import java.util.concurrent.ConcurrentHashMap;
155-
156-public abstract class BackgroundWorkBase implements BackgroundWork
157-{
158- private final ConcurrentHashMap<BackgroundObserver,Boolean> observers = new ConcurrentHashMap<>();
159-
160- public BackgroundWorkBase()
161- {
162- }
163-
164- @Override
165- public void addObserver(BackgroundObserver observer)
166- {
167- // two observers are equal IFF they are the same object.
168- observers.put(observer, true);
169- }
170-
171- public void removeObservers(Collection<BackgroundObserver> os)
172- {
173- for (BackgroundObserver key : os) {
174- observers.remove(key);
175- }
176- }
177-
178- @Override
179- public void notifyObservers()
180- {
181- for (BackgroundObserver o : observers.keySet())
182- o.update(this);
183- }
184-}
185
186=== modified file 'src/main/java/com/akiban/server/service/text/FullTextIndexService.java'
187--- src/main/java/com/akiban/server/service/text/FullTextIndexService.java 2013-07-10 21:42:55 +0000
188+++ src/main/java/com/akiban/server/service/text/FullTextIndexService.java 2013-07-29 13:55:36 +0000
189@@ -20,18 +20,13 @@
190 import com.akiban.ais.model.IndexName;
191 import com.akiban.qp.operator.RowCursor;
192 import com.akiban.qp.operator.QueryContext;
193-import com.akiban.server.service.BackgroundWork;
194
195-import java.util.List;
196 import org.apache.lucene.search.Query;
197
198 /** Full service that does index maintenance and querying. */
199 public interface FullTextIndexService extends FullTextIndexInfos {
200- /**
201- * @return An array of available background works
202- */
203- public List<? extends BackgroundWork> getBackgroundWorks();
204+ public RowCursor searchIndex(QueryContext context, IndexName name, Query query, int limit);
205
206- public RowCursor searchIndex(QueryContext context, IndexName name,
207- Query query, int limit);
208+ /** Wait for a complete run of background workers */
209+ public void backgroundWait();
210 }
211
212=== modified file 'src/main/java/com/akiban/server/service/text/FullTextIndexServiceImpl.java'
213--- src/main/java/com/akiban/server/service/text/FullTextIndexServiceImpl.java 2013-07-25 00:11:16 +0000
214+++ src/main/java/com/akiban/server/service/text/FullTextIndexServiceImpl.java 2013-07-29 13:55:36 +0000
215@@ -22,6 +22,7 @@
216 import com.akiban.ais.model.Index;
217 import com.akiban.ais.model.Index.IndexType;
218 import com.akiban.ais.model.IndexName;
219+import com.akiban.ais.model.Routine;
220 import com.akiban.ais.model.TableName;
221 import com.akiban.ais.model.UserTable;
222 import com.akiban.ais.model.aisb2.AISBBasedBuilder;
223@@ -47,8 +48,6 @@
224 import com.akiban.server.error.NoSuchRowException;
225 import com.akiban.server.error.QueryCanceledException;
226 import com.akiban.server.rowdata.RowData;
227-import com.akiban.server.service.BackgroundWork;
228-import com.akiban.server.service.BackgroundWorkBase;
229 import com.akiban.server.service.Service;
230 import com.akiban.server.service.config.ConfigurationService;
231 import com.akiban.server.service.listener.ListenerService;
232@@ -59,8 +58,9 @@
233 import com.akiban.server.service.transaction.TransactionService;
234 import com.akiban.server.store.SchemaManager;
235 import com.akiban.server.store.Store;
236-import com.akiban.server.types3.mcompat.mfuncs.WaitFunctionHelpers;
237
238+import com.akiban.sql.server.ServerCallContextStack;
239+import com.akiban.sql.server.ServerQueryContext;
240 import org.apache.lucene.index.IndexWriter;
241 import org.apache.lucene.search.Query;
242
243@@ -73,23 +73,21 @@
244 import java.io.*;
245 import java.util.Arrays;
246 import java.util.Collection;
247-import java.util.Collections;
248 import java.util.Iterator;
249-import java.util.List;
250 import java.util.NoSuchElementException;
251-import java.util.Timer;
252-import java.util.TimerTask;
253 import java.util.concurrent.ConcurrentHashMap;
254
255
256 public class FullTextIndexServiceImpl extends FullTextIndexInfosImpl implements FullTextIndexService, Service, TableListener, RowListener
257 {
258+ private static final Logger logger = LoggerFactory.getLogger(FullTextIndexServiceImpl.class);
259+
260 public static final String INDEX_PATH_PROPERTY = "akserver.text.indexpath";
261- public static final String UPDATE_INTERVAL = "akserver.text.maintenanceInterval";
262- public static final String POPULATE_DELAY_INTERVAL = "akserver.text.populateDelayInterval";
263+ public static final String BACKGROUND_INTERVAL_PROPERTY = "akserver.text.backgroundInterval";
264
265 private static final TableName POPULATE_TABLE = new TableName(TableName.INFORMATION_SCHEMA, "full_text_populate");
266 private static final TableName CHANGES_TABLE = new TableName(TableName.INFORMATION_SCHEMA, "full_text_changes");
267+ private static final TableName BACKGROUND_WAIT_PROC_NAME = new TableName(TableName.SYS_SCHEMA, "full_text_background_wait");
268
269
270 private final ConfigurationService configService;
271@@ -98,18 +96,15 @@
272 private final SchemaManager schemaManager;
273 private final Store store;
274 private final TransactionService transactionService;
275+ private final ConcurrentHashMap<IndexName,Session> populating;
276+ private final Object BACKGROUND_CHANGE_LOCK = new Object();
277
278+ private volatile IndexName updatingIndex;
279+ private BackgroundRunner backgroundPopulate;
280+ private BackgroundRunner backgroundUpdate;
281+ private long backgroundInterval;
282 private File indexPath;
283
284- private Timer maintenanceTimer;
285- private long maintenanceInterval;
286- private volatile TimerTask updateWorker;
287-
288- private Timer populateTimer;
289- private long populateDelayInterval;
290- private ConcurrentHashMap<IndexName,Session> populating;
291-
292- private static final Logger logger = LoggerFactory.getLogger(FullTextIndexServiceImpl.class);
293
294 @Inject
295 public FullTextIndexServiceImpl(ConfigurationService configService,
296@@ -141,12 +136,7 @@
297 if (populatingSession != null) {
298 // if the population process is running, cancel it
299 populatingSession.cancelCurrentQuery(true);
300- // wait for the thread to complete
301- try {
302- WaitFunctionHelpers.waitOn(Collections.singleton(POPULATE_BACKGROUND));
303- } catch (InterruptedException e) {
304- logger.error("waitOn populate during dropIndex failed", e);
305- }
306+ waitPopulateCycle();
307 } else {
308 // delete 'promise' for population, if any
309 try {
310@@ -156,18 +146,12 @@
311 }
312 }
313
314- // This deals with the update thread. If the update
315- // is running, wait for it to complete. Since the update
316- // process mixes different indexes in the same update, there's
317- // no good way to tell if our index is being updated.
318- if (updateRunning) {
319- try {
320- WaitFunctionHelpers.waitOn(Collections.singleton(UPDATE_BACKGROUND));
321- } catch (InterruptedException e) {
322- logger.error("waitOn update during dropIndex failed", e);
323- }
324+ // If updatingIndex is currently equal, wait for it to change.
325+ // If it isn't, or once it does, it can never become equal as it now exists in the populating map.
326+ if(idx.getIndexName().equals(updatingIndex)) {
327+ waitUpdateCycle();
328 }
329-
330+
331 // delete documents
332 FullTextIndexInfo idxInfo = getIndex(session, idx.getIndexName(), idx.getIndexedTable().getAIS());
333 idxInfo.deletePath();
334@@ -190,36 +174,54 @@
335 }
336
337 @Override
338- public List<? extends BackgroundWork> getBackgroundWorks()
339- {
340- return Arrays.asList(POPULATE_BACKGROUND, UPDATE_BACKGROUND);
341+ public void backgroundWait() {
342+ waitPopulateCycle();
343+ waitUpdateCycle();
344 }
345-
346- /* Service */
347+
348+
349+ //
350+ // Service
351+ //
352
353 @Override
354 public void start() {
355+ indexPath = new File(configService.getProperty(INDEX_PATH_PROPERTY));
356+ boolean success = indexPath.mkdirs();
357+ if(!success && !indexPath.exists()) {
358+ throw new AkibanInternalException("Could not create indexPath directories: " + indexPath);
359+ }
360+
361 registerSystemTables();
362+ listenerService.registerTableListener(this);
363+ listenerService.registerRowListener(this);
364+
365+ backgroundInterval = Long.parseLong(configService.getProperty(BACKGROUND_INTERVAL_PROPERTY));
366 enableUpdateWorker();
367 enablePopulateWorker();
368- listenerService.registerTableListener(this);
369- listenerService.registerRowListener(this);
370 }
371
372 @Override
373 public void stop() {
374+ disableUpdateWorker();
375+ disablePopulateWorker();
376+
377 listenerService.deregisterTableListener(this);
378 listenerService.deregisterRowListener(this);
379+
380 try {
381 for (FullTextIndexShared index : indexes.values()) {
382 index.close();
383 }
384- disableUpdateWorker();
385- disablePopulateWorker();
386 }
387 catch (IOException ex) {
388 throw new AkibanInternalException("Error closing index", ex);
389 }
390+
391+ populating.clear();
392+
393+ backgroundInterval = 0;
394+ indexPath = null;
395 }
396
397 @Override
398@@ -230,14 +232,7 @@
399 /* FullTextIndexInfosImpl */
400
401 @Override
402- protected synchronized File getIndexPath() {
403- if (indexPath == null) {
404- indexPath = new File(configService.getProperty(INDEX_PATH_PROPERTY));
405- boolean success = indexPath.mkdirs();
406- if (!success && !indexPath.exists()) {
407- throw new AkibanInternalException("Could not create indexPath directories: " + indexPath);
408- }
409- }
410+ protected File getIndexPath() {
411 return indexPath;
412 }
413
414@@ -389,116 +384,64 @@
415 }
416
417
418- private volatile boolean updateRunning = false;
419- private class DefaultUpdateWorker extends TimerTask
420- {
421- @Override
422- public void run()
423- {
424- runUpdate();
425- }
426- }
427-
428- final BackgroundWork POPULATE_BACKGROUND = new BackgroundWorkBase()
429- {
430- @Override
431- public boolean forceExecution()
432- {
433- return forcePopulate();
434- }
435-
436- @Override
437- public long getMinimumWaitTime()
438- {
439- return populateEnabled && hasScheduled
440- ? populateDelayInterval
441- : 0;
442- }
443-
444- @Override
445- public String toString()
446- {
447- return "POPULATE";
448- }
449- };
450-
451- final BackgroundWork UPDATE_BACKGROUND = new BackgroundWorkBase()
452- {
453- @Override
454- public boolean forceExecution()
455- {
456- return forceUpdate();
457- }
458-
459- @Override
460- public long getMinimumWaitTime()
461- {
462- return updateWorker == null
463- ? 0 // worker is disabled.
464- : maintenanceInterval;
465- }
466-
467- @Override
468- public String toString()
469- {
470- return "UPDATE";
471- }
472- };
473-
474- private volatile boolean populateRunning = false;
475- private class DefaultPopulateWorker extends TimerTask
476- {
477- @Override
478- public void run()
479- {
480- runPopulate();
481- }
482- }
483-
484 // ---------- mostly for testing ---------------
485- void disableUpdateWorker()
486- {
487- if (maintenanceTimer == null)
488- {
489- logger.debug("maintenance worker ALREADY disabled");
490- return;
491- }
492- updateWorker.cancel();
493- maintenanceTimer.cancel();
494- maintenanceTimer.purge();
495- maintenanceTimer = null;
496- updateWorker = null;
497- }
498-
499- protected void enableUpdateWorker()
500- {
501- maintenanceInterval = Long.parseLong(configService.getProperty(UPDATE_INTERVAL));
502- maintenanceTimer = new Timer();
503- updateWorker = new DefaultUpdateWorker();
504- maintenanceTimer.scheduleAtFixedRate(updateWorker, maintenanceInterval, maintenanceInterval);
505- }
506-
507- protected void enablePopulateWorker()
508- {
509- populateDelayInterval = Long.parseLong(configService.getProperty(POPULATE_DELAY_INTERVAL));
510- populateTimer = new Timer();
511- populateTimer.schedule(populateWorker(), populateDelayInterval);
512- populateEnabled = true;
513- hasScheduled = true;
514- }
515-
516- void disablePopulateWorker()
517- {
518- if (populateTimer == null)
519- {
520- logger.debug("populate worker already disabled");
521- return;
522- }
523- populateTimer.cancel();
524- populateTimer.purge();
525- hasScheduled = false;
526- populateEnabled = false;
527- populateTimer = null;
528+
529+ public void disableUpdateWorker() {
530+ synchronized(BACKGROUND_CHANGE_LOCK) {
531+ assert backgroundUpdate != null;
532+ backgroundUpdate.toFinished();
533+ backgroundUpdate = null;
534+ }
535+ }
536+
537+ public void enableUpdateWorker() {
538+ synchronized(BACKGROUND_CHANGE_LOCK) {
539+ assert backgroundUpdate == null;
540+ backgroundUpdate = new BackgroundRunner("FullText_Update", backgroundInterval, new Runnable() {
541+ @Override
542+ public void run() {
543+ runUpdate();
544+ }
545+ });
546+ backgroundUpdate.start();
547+ }
548+ }
549+
550+ public void disablePopulateWorker() {
551+ synchronized(BACKGROUND_CHANGE_LOCK) {
552+ assert backgroundPopulate != null;
553+ backgroundPopulate.toFinished();
554+ backgroundPopulate = null;
555+ }
556+ }
557+
558+ public void enablePopulateWorker() {
559+ synchronized(BACKGROUND_CHANGE_LOCK) {
560+ assert backgroundPopulate == null;
561+ backgroundPopulate = new BackgroundRunner("FullText_Populate", backgroundInterval, new Runnable() {
562+ @Override
563+ public void run() {
564+ runPopulate();
565+ }
566+ });
567+ backgroundPopulate.start();
568+ }
569+ }
570+
571+ public void waitPopulateCycle() {
572+ synchronized(BACKGROUND_CHANGE_LOCK) {
573+ if(backgroundPopulate != null) {
574+ backgroundPopulate.waitForCycle();
575+ }
576+ }
577+ }
578+
579+ public void waitUpdateCycle() {
580+ synchronized(BACKGROUND_CHANGE_LOCK) {
581+ if(backgroundUpdate != null) {
582+ backgroundUpdate.waitForCycle();
583+ }
584+ }
585 }
586
587 Cursor populateTableCursor(Session session) {
588@@ -510,12 +453,16 @@
589 return API.cursor(plan, context, context.createBindings());
590 }
591
592- protected boolean populateNextIndex(Session session) throws PersistitException
593+ protected boolean populateNextIndex(Session session)
594 {
595 transactionService.beginTransaction(session);
596 Cursor cursor = null;
597 IndexName toPopulate = null;
598 try {
599+ // Quick exit if we wont' see any this transaction
600+ if(populateRowCount(session) == 0) {
601+ return false;
602+ }
603 cursor = populateTableCursor(session);
604 cursor.open();
605 toPopulate = nextInQueue(session, cursor, false);
606@@ -523,56 +470,34 @@
607 FullTextIndexInfo index = getIndex(session, toPopulate, null);
608 populateIndex(session, index);
609 store.deleteRow(session, createPopulateRow(session, toPopulate), true, false);
610- transactionService.commitTransaction(session);
611 populating.remove(toPopulate);
612+ transactionService.commitTransaction(session);
613 return true;
614 }
615 } catch (QueryCanceledException e2) {
616 // The query could be canceled if the user drops the index
617- // while this thread is populating the index
618- // The Lock Obtained failed exception occurs for the same reason.
619- // we're trying to delete the index at the same time as the
620- // populate is running, but with slightly different timing.
621- // Clean up after ourselves.
622+ // while this thread is populating the index.
623+ // Clean up after ourselves.
624 if(cursor != null) {
625 cursor.close();
626 }
627 populating.remove(toPopulate);
628- transactionService.commitTransaction(session);
629- // start another thread to make sure we're not missing anything
630- populateTimer.schedule(populateWorker(), populateDelayInterval);
631- hasScheduled = true;
632 logger.warn("populateNextIndex aborted : {}", e2.getMessage());
633- } catch (IOException ioex) {
634- throw new AkibanInternalException ("Failed to populate index ", ioex);
635+ } catch(IOException e) {
636+ throw new AkibanInternalException("Failed to populate index ", e);
637 } finally {
638 transactionService.rollbackTransactionIfOpen(session);
639 }
640 return false;
641 }
642
643- protected synchronized void runPopulate()
644- {
645- populateRunning = true;
646- Session session = sessionService.createSession();
647- try
648- {
649+ private void runPopulate() {
650+ try(Session session = sessionService.createSession()) {
651 boolean more = true;
652 while(more) {
653 more = populateNextIndex(session);
654 }
655 }
656- catch (PersistitException ex1)
657- {
658- throw PersistitAdapter.wrapPersistitException(session, ex1);
659- }
660- finally
661- {
662- hasScheduled = false;
663- POPULATE_BACKGROUND.notifyObservers();
664- populateRunning = false;
665- session.close();
666- }
667 }
668
669 private boolean stillExists(Session session, IndexName indexName)
670@@ -582,98 +507,44 @@
671 return !(table == null || table.getFullTextIndex(indexName.getName()) == null);
672 }
673
674- private synchronized void runUpdate()
675+ private void runUpdate()
676 {
677- updateRunning = true;
678 Session session = sessionService.createSession();
679 HKeyBytesStream rows = null;
680- try
681- {
682+ try {
683 // Consume and commit updates to each index in distinct blocks to keep r/w window small-ish
684 boolean done = false;
685 while(!done) {
686 transactionService.beginTransaction(session);
687- rows = getChangedRows(session);
688- if(rows.hasStream()) {
689- if(populating.get(rows.getIndexName()) == null) {
690- updateIndex(session, rows.getIndexName(), rows);
691+ // Quick exit if we won't see any
692+ if(changesRowCount(session) == 0) {
693+ done = true;
694+ } else {
695+ rows = getChangedRows(session);
696+ if(rows.hasStream()) {
697+ IndexName name = rows.getIndexName();
698+ if(populating.get(name) == null) {
699+ updatingIndex = name;
700+ updateIndex(session, name, rows);
701+ }
702+ rows.cursor.close();
703+ rows = null;
704+ } else {
705+ done = true;
706 }
707- } else {
708- done = true;
709 }
710 transactionService.commitTransaction(session);
711 }
712-
713- }
714- catch(Exception e)
715- {
716- logger.error("Error while maintaining full_text indices", e);
717- }
718- finally
719- {
720+ } finally {
721+ updatingIndex = null;
722 if(rows != null && rows.cursor != null) {
723 rows.cursor.close();
724 }
725 transactionService.rollbackTransactionIfOpen(session);
726 session.close();
727- UPDATE_BACKGROUND.notifyObservers();
728- updateRunning = false;
729 }
730 }
731
732- private boolean forceUpdate()
733- {
734- // worker is disabled or is already running
735- if (updateWorker == null || updateRunning)
736- return false;
737-
738- // execute the thread in different
739- // thread (so it has a new session)
740- new Thread(updateWorker).start();
741- return true;
742- }
743-
744-
745- /**
746- * If the populate job is not already running, force the worker to wake
747- * up and do its job immediately.
748- *
749- * If the worker is disabled, return false and do nothing.
750- */
751- private boolean forcePopulate()
752- {
753- // no work to do
754- // or it is already being done
755- if (!populateEnabled || !hasScheduled || populateRunning)
756- return false;
757-
758- // block the timer (so other threads would have to wait)
759- // Unlike the update case, this is needed because population does not
760- // have to be done periodically
761- // So unless there are new index created, we don't need to
762- // execute the task again after this execution
763-
764- // cancel scheduled task
765- populateTimer.cancel();
766-
767- // execute the task
768- // in a different thread
769- // because we'd otherwise get "transaction already began" exception
770- // as each thread only has one session)
771- new Thread(populateWorker()).start();
772- hasScheduled = true;
773- // get a new timer
774- // (So schedulePopulate can schedule new task if new index is created)
775- populateTimer = new Timer();
776-
777- return true;
778- }
779-
780- private synchronized TimerTask populateWorker ()
781- {
782- return new DefaultPopulateWorker();
783- }
784-
785 IndexName nextInQueue(Session session, Cursor cursor, boolean traversing) {
786 Row row;
787 while((row = cursor.next()) != null) {
788@@ -694,9 +565,6 @@
789 return null;
790 }
791
792- private volatile boolean hasScheduled = false;
793- private volatile boolean populateEnabled = false;
794-
795 private HKeyRow toHKeyRow(byte rowBytes[], HKeyRowType hKeyRowType,
796 StoreAdapter store)
797 {
798@@ -834,13 +702,6 @@
799
800 private void trackPopulate(Session session, IndexName indexName) {
801 store.writeRow(session, createPopulateRow(session, indexName), null);
802-
803- // TODO: What if this fires before this row is committed?
804- // if there are no scheduled populate workers running, add one to run shortly.
805- if(populateEnabled && !hasScheduled) {
806- populateTimer.schedule(populateWorker(), populateDelayInterval);
807- hasScheduled = true;
808- }
809 }
810
811 private void trackChange(Session session, UserTable table, Key hKey) {
812@@ -860,6 +721,14 @@
813 }
814 }
815
816+ private long populateRowCount(Session session) {
817+ return store.getAIS(session).getUserTable(POPULATE_TABLE).rowDef().getTableStatus().getRowCount(session);
818+ }
819+
820+ private long changesRowCount(Session session) {
821+ return store.getAIS(session).getUserTable(CHANGES_TABLE).rowDef().getTableStatus().getRowCount(session);
822+ }
823+
824 private void registerSystemTables() {
825 final int identMax = 128;
826 final int tableVersion = 1;
827@@ -877,8 +746,126 @@
828 .colString("index_name", identMax, false)
829 .colLong("index_id", false)
830 .colVarBinary("hkey", 4096, false);
831+ builder.procedure(BACKGROUND_WAIT_PROC_NAME)
832+ .language("java", Routine.CallingConvention.JAVA)
833+ .externalName(Routines.class.getName(), "backgroundWait");
834 AkibanInformationSchema ais = builder.ais();
835 schemaManager.registerStoredInformationSchemaTable(ais.getUserTable(POPULATE_TABLE), tableVersion);
836 schemaManager.registerStoredInformationSchemaTable(ais.getUserTable(CHANGES_TABLE), tableVersion);
837+ schemaManager.registerSystemRoutine(ais.getRoutine(BACKGROUND_WAIT_PROC_NAME));
838+ }
839+
840+ @SuppressWarnings("unused") // Called reflectively
841+ public static class Routines {
842+ public static void backgroundWait() {
843+ ServerQueryContext context = ServerCallContextStack.current().getContext();
844+ FullTextIndexService ft = context.getServer().getServiceManager().getServiceByClass(FullTextIndexService.class);
845+ ft.backgroundWait();
846+ }
847+ }
848+
849+
850+ public enum STATE {
851+ NOT_STARTED,
852+ RUNNING,
853+ STOPPING,
854+ FINISHED
855+ }
856+
857+ public class BackgroundRunner extends Thread {
858+ private final Object SLEEP_MONITOR = new Object();
859+ private final Runnable runnable;
860+ private final long sleepMillis;
861+ private volatile STATE state;
862+ private long runCount;
863+
864+ public BackgroundRunner(String name, long sleepMillis, Runnable runnable) {
865+ super(name);
866+ this.runnable = runnable;
867+ this.sleepMillis = sleepMillis;
868+ this.state = STATE.NOT_STARTED;
869+ }
870+
871+ @Override
872+ public void run() {
873+ state = STATE.RUNNING;
874+ while(state != STATE.STOPPING) {
875+ try {
876+ runInternal();
877+ } catch(Exception e) {
878+ logger.error("{} run failed with exception", getName(), e);
879+ }
880+ sleep();
881+ }
882+ }
883+
884+ public synchronized void toFinished() {
885+ checkRunning();
886+ state = STATE.STOPPING;
887+ sleepNotify();
888+ waitWhile(STATE.STOPPING, STATE.FINISHED);
889+ notifyAll();
890+ }
891+
892+ public synchronized void waitForCycle() {
893+ checkRunning();
894+ // +2 ensures that we've observed one full run no matter where we initially started
895+ long target = runCount + 2;
896+ while(runCount < target) {
897+ sleepNotify();
898+ waitThenSet(null);
899+ }
900+ }
901+
902+ public void sleepNotify() {
903+ synchronized(SLEEP_MONITOR) {
904+ SLEEP_MONITOR.notify();
905+ }
906+ }
907+
908+ //
909+ // Helpers
910+ //
911+
912+ private void runInternal() {
913+ runnable.run();
914+ synchronized(this) {
915+ ++runCount;
916+ notifyAll();
917+ }
918+ }
919+
920+ private void checkRunning() {
921+ if(state == STATE.STOPPING || state == STATE.FINISHED) {
922+ throw new IllegalStateException("Not RUNNING: " + state);
923+ }
924+ }
925+
926+ private void waitWhile(STATE whileState, STATE newState) {
927+ while(state == whileState) {
928+ waitThenSet(newState);
929+ }
930+ }
931+
932+ private void waitThenSet(STATE newState) {
933+ try {
934+ wait();
935+ if(newState != null) {
936+ state = newState;
937+ }
938+ } catch(InterruptedException e) {
939+ // None
940+ }
941+ }
942+
943+ private void sleep() {
944+ try {
945+ synchronized(SLEEP_MONITOR) {
946+ SLEEP_MONITOR.wait(sleepMillis);
947+ }
948+ } catch(InterruptedException e) {
949+ // None
950+ }
951+ }
952 }
953 }
954
955=== removed file 'src/main/java/com/akiban/server/types3/mcompat/mfuncs/FulltextMaintenanceWait.java'
956--- src/main/java/com/akiban/server/types3/mcompat/mfuncs/FulltextMaintenanceWait.java 2013-04-08 19:26:06 +0000
957+++ src/main/java/com/akiban/server/types3/mcompat/mfuncs/FulltextMaintenanceWait.java 1970-01-01 00:00:00 +0000
958@@ -1,86 +0,0 @@
959-/**
960- * Copyright (C) 2009-2013 Akiban Technologies, Inc.
961- *
962- * This program is free software: you can redistribute it and/or modify
963- * it under the terms of the GNU Affero General Public License as published by
964- * the Free Software Foundation, either version 3 of the License, or
965- * (at your option) any later version.
966- *
967- * This program is distributed in the hope that it will be useful,
968- * but WITHOUT ANY WARRANTY; without even the implied warranty of
969- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
970- * GNU Affero General Public License for more details.
971- *
972- * You should have received a copy of the GNU Affero General Public License
973- * along with this program. If not, see <http://www.gnu.org/licenses/>.
974- */
975-
976-package com.akiban.server.types3.mcompat.mfuncs;
977-
978-import com.akiban.qp.operator.QueryContext;
979-import com.akiban.server.error.AkibanInternalException;
980-import com.akiban.server.service.text.FullTextIndexService;
981-import com.akiban.server.types3.LazyList;
982-import com.akiban.server.types3.TExecutionContext;
983-import com.akiban.server.types3.TOverloadResult;
984-import com.akiban.server.types3.TScalar;
985-import com.akiban.server.types3.mcompat.mtypes.MNumeric;
986-import com.akiban.server.types3.pvalue.PValueSource;
987-import com.akiban.server.types3.pvalue.PValueTarget;
988-import com.akiban.server.types3.texpressions.TInputSetBuilder;
989-import com.akiban.server.types3.texpressions.TScalarBase;
990-
991-
992-public class FulltextMaintenanceWait extends TScalarBase
993-{
994-
995- public static final TScalar INSTANCE = new FulltextMaintenanceWait();
996-
997- private FulltextMaintenanceWait() {}
998-
999- @Override
1000- protected void buildInputSets(TInputSetBuilder builder)
1001- {
1002- // no argument
1003- }
1004-
1005- @Override
1006- protected void doEvaluate(TExecutionContext context, LazyList<? extends PValueSource> inputs, PValueTarget output)
1007- {
1008- QueryContext qc = context.getQueryContext();
1009- if (qc == null)
1010- {
1011- context.logError ("No querycontext for retreiving full-text service");
1012- output.putInt32(-1);
1013- }
1014- else
1015- {
1016- FullTextIndexService service = qc.getServiceManager().getServiceByClass(FullTextIndexService.class);
1017- try
1018- {
1019- WaitFunctionHelpers.waitOn(service.getBackgroundWorks());
1020- output.putInt32(0);
1021- }
1022- catch (InterruptedException ex)
1023- {
1024- context.logError(ex.getMessage());
1025- output.putInt32(-2);
1026- }
1027- }
1028- }
1029-
1030- @Override
1031- public String displayName()
1032- {
1033- return "fulltext_maintenance_wait";
1034- }
1035-
1036- @Override
1037- public TOverloadResult resultType()
1038- {
1039- // TODO: return value probably doesn't mean anything! to anyone not having
1040- // access to the code.
1041- // Maybe return a STRING/msg instead?
1042- return TOverloadResult.fixed(MNumeric.INT);
1043- }
1044-}
1045
1046=== removed file 'src/main/java/com/akiban/server/types3/mcompat/mfuncs/WaitFunctionHelpers.java'
1047--- src/main/java/com/akiban/server/types3/mcompat/mfuncs/WaitFunctionHelpers.java 2013-07-02 21:57:02 +0000
1048+++ src/main/java/com/akiban/server/types3/mcompat/mfuncs/WaitFunctionHelpers.java 1970-01-01 00:00:00 +0000
1049@@ -1,79 +0,0 @@
1050-/**
1051- * Copyright (C) 2009-2013 Akiban Technologies, Inc.
1052- *
1053- * This program is free software: you can redistribute it and/or modify
1054- * it under the terms of the GNU Affero General Public License as published by
1055- * the Free Software Foundation, either version 3 of the License, or
1056- * (at your option) any later version.
1057- *
1058- * This program is distributed in the hope that it will be useful,
1059- * but WITHOUT ANY WARRANTY; without even the implied warranty of
1060- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
1061- * GNU Affero General Public License for more details.
1062- *
1063- * You should have received a copy of the GNU Affero General Public License
1064- * along with this program. If not, see <http://www.gnu.org/licenses/>.
1065- */
1066-
1067-package com.akiban.server.types3.mcompat.mfuncs;
1068-
1069-import com.akiban.server.service.BackgroundObserver;
1070-import com.akiban.server.service.BackgroundObserverImpl;
1071-import com.akiban.server.service.BackgroundWork;
1072-
1073-import java.util.Collection;
1074-import java.util.LinkedList;
1075-import java.util.List;
1076-
1077-
1078-public class WaitFunctionHelpers
1079-{
1080- /**
1081- * Blocks until all `works` have finished
1082- * @param works
1083- * @throws InterruptedException
1084- */
1085- public static void waitOn(Collection<? extends BackgroundWork> works) throws InterruptedException
1086- {
1087- if (works != null)
1088- {
1089- List<BackgroundObserver> waiters = new LinkedList<>();
1090- for (BackgroundWork wk : works)
1091- {
1092- // This work doesn't require wait time
1093- if (wk.getMinimumWaitTime() <= 0)
1094- continue;
1095-
1096- // add observer
1097- BackgroundObserverImpl w = new BackgroundObserverImpl();
1098- wk.addObserver(w);
1099-
1100- // request the task to be executed
1101- wk.forceExecution();
1102-
1103- waiters.add(w);
1104- }
1105-
1106- // busy-loop waiting for all the works to be done
1107- boolean allAwaken;
1108- while (true)
1109- {
1110- allAwaken = true;
1111- for (BackgroundObserver w : waiters)
1112- {
1113- allAwaken &= w.backgroundFinished();
1114- }
1115-
1116- if (allAwaken)
1117- break;
1118- }
1119-
1120- // clean up
1121- for (BackgroundWork wk : works)
1122- {
1123- wk.removeObservers(waiters);
1124- }
1125- }
1126- }
1127-
1128-}
1129
1130=== modified file 'src/main/resources/com/akiban/server/service/config/configuration-defaults.properties'
1131--- src/main/resources/com/akiban/server/service/config/configuration-defaults.properties 2013-07-12 01:35:28 +0000
1132+++ src/main/resources/com/akiban/server/service/config/configuration-defaults.properties 2013-07-29 13:55:36 +0000
1133@@ -34,8 +34,7 @@
1134 akserver.index_statistics.bucket_count=32
1135 akserver.restrict_user_schema=false
1136 akserver.text.indexpath=/tmp/aktext
1137-akserver.text.maintenanceInterval=3000
1138-akserver.text.populateDelayInterval=1000
1139+akserver.text.backgroundInterval=3000
1140 # EXPERIMENTAL
1141 akserver.indexRowPooling = false
1142 akserver.pt.osc.hook=disabled
1143
1144=== modified file 'src/test/java/com/akiban/rest/RestServiceFilesIT.java'
1145--- src/test/java/com/akiban/rest/RestServiceFilesIT.java 2013-04-21 02:07:31 +0000
1146+++ src/test/java/com/akiban/rest/RestServiceFilesIT.java 2013-07-29 13:55:36 +0000
1147@@ -25,7 +25,6 @@
1148 import com.akiban.server.service.is.BasicInfoSchemaTablesServiceImpl;
1149 import com.akiban.server.service.servicemanager.GuicedServiceManager;
1150 import com.akiban.server.test.it.ITBase;
1151-import com.akiban.server.types3.mcompat.mfuncs.WaitFunctionHelpers;
1152 import com.akiban.sql.RegexFilenameFilter;
1153 import com.akiban.util.Strings;
1154 import com.fasterxml.jackson.core.JsonParseException;
1155@@ -204,28 +203,9 @@
1156 loadDataFile(SCHEMA_NAME, data);
1157 }
1158
1159- String postURI = dumpFileIfExists(new File(subDir, "schema.prepost"));
1160- if (postURI != null) {
1161- HttpExchange httpConn = openConnection(getRestURL(postURI.trim()), "POST");
1162- postContents(httpConn, "[]".getBytes());
1163- httpClient.send(httpConn);
1164- fullyDisconnect(httpConn);
1165- }
1166-
1167- // The file should contain only the name of the wait function
1168- // (Don't need to make a SELECT node here)
1169- String waitFunc = dumpFileIfExists(new File(subDir, "background.wait"));
1170- if (waitFunc != null)
1171- {
1172- switch(waitFunc.trim().toLowerCase())
1173- {
1174- case "fulltext_maintenance_wait":
1175- WaitFunctionHelpers.waitOn(ftService.getBackgroundWorks());
1176- break;
1177-
1178- default:
1179- throw new UnsupportedOperationException("Unknown Wait Function: " + waitFunc);
1180- }
1181+ String waitNeeded = dumpFileIfExists(new File(subDir, "full_text_background_wait"));
1182+ if(waitNeeded != null) {
1183+ ftService.backgroundWait();
1184 }
1185 }
1186
1187
1188=== modified file 'src/test/java/com/akiban/server/service/config/TestConfigService.java'
1189--- src/test/java/com/akiban/server/service/config/TestConfigService.java 2013-06-14 11:23:11 +0000
1190+++ src/test/java/com/akiban/server/service/config/TestConfigService.java 2013-07-29 13:55:36 +0000
1191@@ -59,8 +59,7 @@
1192 protected Map<String, String> loadProperties() {
1193 Map<String, String> ret = new HashMap<>(super.loadProperties());
1194 makeDataDirectory();
1195- ret.put(FullTextIndexServiceImpl.UPDATE_INTERVAL, Long.toString(1000));
1196- ret.put(FullTextIndexServiceImpl.POPULATE_DELAY_INTERVAL, Long.toString(1000));
1197+ ret.put(FullTextIndexServiceImpl.BACKGROUND_INTERVAL_PROPERTY, "1000");
1198 ret.put(DATA_PATH_KEY, dataDirectory.getAbsolutePath());
1199 ret.put(TEXT_INDEX_PATH_KEY, dataDirectory.getAbsolutePath());
1200 final int bufferSize = Integer.parseInt(ret.get(BUFFER_SIZE_KEY));
1201
1202=== modified file 'src/test/java/com/akiban/server/service/text/FullTextIndexServiceBug1172013IT.java'
1203--- src/test/java/com/akiban/server/service/text/FullTextIndexServiceBug1172013IT.java 2013-07-13 17:24:25 +0000
1204+++ src/test/java/com/akiban/server/service/text/FullTextIndexServiceBug1172013IT.java 2013-07-29 13:55:36 +0000
1205@@ -21,47 +21,21 @@
1206 import java.util.Collections;
1207
1208 import com.akiban.qp.operator.Cursor;
1209-import com.akiban.qp.operator.StoreAdapter;
1210 import com.akiban.server.service.transaction.TransactionService.CloseableTransaction;
1211 import org.junit.Before;
1212 import org.junit.Test;
1213
1214-import org.slf4j.Logger;
1215-import org.slf4j.LoggerFactory;
1216-
1217 import com.akiban.ais.model.IndexName;
1218 import com.akiban.ais.model.TableName;
1219-import com.akiban.qp.operator.QueryContext;
1220-import com.akiban.qp.persistitadapter.PersistitAdapter;
1221-import com.akiban.qp.rowtype.Schema;
1222 import com.akiban.qp.util.SchemaCache;
1223 import com.akiban.server.error.DuplicateIndexException;
1224-import com.akiban.server.service.servicemanager.GuicedServiceManager;
1225 import com.akiban.server.service.session.Session;
1226-import com.akiban.server.test.it.ITBase;
1227-import com.akiban.server.types3.mcompat.mfuncs.WaitFunctionHelpers;
1228-import com.akiban.sql.embedded.EmbeddedJDBCService;
1229-import com.akiban.sql.embedded.EmbeddedJDBCServiceImpl;
1230+import org.slf4j.Logger;
1231+import org.slf4j.LoggerFactory;
1232
1233-public class FullTextIndexServiceBug1172013IT extends ITBase {
1234- public static final String SCHEMA = "test";
1235- protected FullTextIndexService fullText;
1236- protected Schema schema;
1237- protected StoreAdapter adapter;
1238- protected QueryContext queryContext;
1239- private int c;
1240- private int o;
1241- private int i;
1242- private int a;
1243+public class FullTextIndexServiceBug1172013IT extends FullTextIndexServiceITBase {
1244 private static final Logger logger = LoggerFactory.getLogger(FullTextIndexServiceBug1172013IT.class);
1245 private final Object lock = new Object();
1246- private FullTextIndexServiceImpl fullTextImpl = null;
1247- @Override
1248- protected GuicedServiceManager.BindingsConfigurationProvider serviceBindingsProvider() {
1249- return super.serviceBindingsProvider()
1250- .bindAndRequire(FullTextIndexService.class, FullTextIndexServiceImpl.class)
1251- .bindAndRequire(EmbeddedJDBCService.class, EmbeddedJDBCServiceImpl.class);
1252- }
1253
1254 @Before
1255 public void createData() {
1256@@ -100,16 +74,9 @@
1257 writeRow(a, 301, 3, "MA");
1258 writeRow(a, 302, 3, "ME");
1259
1260- fullText = serviceManager().getServiceByClass(FullTextIndexService.class);
1261-
1262 schema = SchemaCache.globalSchema(ais());
1263 adapter = newStoreAdapter(schema);
1264 queryContext = queryContext(adapter);
1265-
1266- // This test is specifically for FullTextIndexServiceImpl.java
1267- assertEquals(FullTextIndexServiceImpl.class, fullText.getClass());
1268- fullTextImpl = (FullTextIndexServiceImpl)fullText;
1269-
1270 }
1271
1272 /** Race concurrent drop vs create. */
1273@@ -138,7 +105,7 @@
1274 new DropIndex().run();
1275 }
1276
1277- verifyClean(fullTextImpl);
1278+ verifyClean();
1279 }
1280
1281 /** Race concurrent drop vs create, with lined up start. */
1282@@ -148,9 +115,8 @@
1283 createFullTextIndex(
1284 SCHEMA, "o", "idx3_o",
1285 "oid", "c1", "c2", "c3", "c4");
1286- fullTextImpl.POPULATE_BACKGROUND.forceExecution();
1287- WaitFunctionHelpers.waitOn(fullText.getBackgroundWorks());
1288-
1289+ waitPopulate();
1290+
1291 Thread t = new Thread(new DropIndex());
1292 t.start();
1293
1294@@ -173,7 +139,7 @@
1295 new DropIndex().run();
1296 }
1297
1298- verifyClean(fullTextImpl);
1299+ verifyClean();
1300 }
1301
1302 /** Serial create and drop. */
1303@@ -185,10 +151,10 @@
1304 "oid", "c1", "c2", "c3", "c4");
1305
1306 // Prevents potential rollback for deterministic test
1307- WaitFunctionHelpers.waitOn(Collections.singleton(fullTextImpl.POPULATE_BACKGROUND));
1308+ waitPopulate();
1309
1310 new DropIndex().run();
1311- verifyClean(fullTextImpl);
1312+ verifyClean();
1313 }
1314
1315 @Test
1316@@ -199,20 +165,17 @@
1317 createFullTextIndex(
1318 SCHEMA, "o", "idx3_o",
1319 "oid", "c1", "c2", "c3", "c4");
1320- fullTextImpl.POPULATE_BACKGROUND.forceExecution();
1321- WaitFunctionHelpers.waitOn(fullText.getBackgroundWorks());
1322-
1323+ waitPopulate();
1324+
1325 writeRow(o, 103, 1, "c1", "c2", "c3", "c4", "2012-12-12");
1326 writeRow(o, 104, 1, "c1", "c2", "c3", "c4", "2012-12-12");
1327 writeRow(o, 105, 1, "c1", "c2", "c3", "c4", "2012-12-12");
1328
1329- new Thread(new DropIndex()).start();
1330- synchronized (lock) {
1331- lock.wait();
1332- }
1333-
1334- // kick start the background updater
1335- fullTextImpl.UPDATE_BACKGROUND.forceExecution();
1336+ // Race update vs drop
1337+ Thread t = new Thread(new DropIndex());
1338+ t.start();
1339+ // But don't let it fall off the end
1340+ t.join();
1341 }
1342
1343 private static interface Visitor
1344@@ -235,10 +198,9 @@
1345 }
1346 }
1347
1348- private void verifyClean(FullTextIndexServiceImpl fullTextImpl) throws InterruptedException {
1349- WaitFunctionHelpers.waitOn(fullText.getBackgroundWorks());
1350- traverse(fullTextImpl,
1351- new Visitor()
1352+ private void verifyClean() throws InterruptedException {
1353+ waitPopulateAndUpdate();
1354+ traverse(new Visitor()
1355 {
1356 int n = 0;
1357
1358@@ -257,16 +219,16 @@
1359
1360 }
1361
1362- private void traverse(FullTextIndexServiceImpl serv, Visitor visitor)
1363+ private void traverse(Visitor visitor)
1364 {
1365 Cursor cursor = null;
1366 try(Session session = createNewSession();
1367 CloseableTransaction txn = txnService().beginCloseableTransaction(session))
1368 {
1369- cursor = serv.populateTableCursor(session);
1370+ cursor = fullTextImpl.populateTableCursor(session);
1371 cursor.open();
1372 IndexName toPopulate;
1373- while((toPopulate = serv.nextInQueue(session, cursor, true)) != null) {
1374+ while((toPopulate = fullTextImpl.nextInQueue(session, cursor, true)) != null) {
1375 visitor.visit(toPopulate);
1376 }
1377 visitor.endOfTree();
1378
1379=== modified file 'src/test/java/com/akiban/server/service/text/FullTextIndexServiceIT.java'
1380--- src/test/java/com/akiban/server/service/text/FullTextIndexServiceIT.java 2013-07-10 19:34:13 +0000
1381+++ src/test/java/com/akiban/server/service/text/FullTextIndexServiceIT.java 2013-07-29 13:55:36 +0000
1382@@ -21,45 +21,25 @@
1383 import com.akiban.ais.model.IndexName;
1384 import com.akiban.qp.operator.Cursor;
1385 import com.akiban.qp.operator.Operator;
1386-import com.akiban.qp.operator.QueryBindings;
1387-import com.akiban.qp.operator.QueryContext;
1388 import static com.akiban.qp.operator.API.cursor;
1389
1390-import com.akiban.qp.operator.StoreAdapter;
1391 import com.akiban.qp.row.RowBase;
1392 import com.akiban.qp.rowtype.RowType;
1393-import com.akiban.qp.rowtype.Schema;
1394 import com.akiban.qp.util.SchemaCache;
1395 import com.akiban.server.service.servicemanager.GuicedServiceManager;
1396 import com.akiban.server.service.session.Session;
1397 import com.akiban.server.service.session.SessionServiceImpl;
1398 import com.akiban.server.service.transaction.TransactionService.CloseableTransaction;
1399-import com.akiban.server.test.it.ITBase;
1400 import com.akiban.server.test.it.qp.TestRow;
1401
1402-import com.akiban.server.types3.mcompat.mfuncs.WaitFunctionHelpers;
1403 import org.junit.Before;
1404 import org.junit.Test;
1405-import org.slf4j.Logger;
1406-import org.slf4j.LoggerFactory;
1407
1408 import static org.junit.Assert.*;
1409
1410-public class FullTextIndexServiceIT extends ITBase
1411+public class FullTextIndexServiceIT extends FullTextIndexServiceITBase
1412 {
1413 public static final String SCHEMA = "test";
1414- protected FullTextIndexService fullText;
1415- protected Schema schema;
1416- protected StoreAdapter adapter;
1417- protected QueryContext queryContext;
1418- protected QueryBindings queryBindings;
1419- private static final Logger logger = LoggerFactory.getLogger(FullTextIndexServiceIT.class);
1420-
1421-
1422- private int c;
1423- private int o;
1424- private int i;
1425- private int a;
1426
1427 @Override
1428 protected GuicedServiceManager.BindingsConfigurationProvider serviceBindingsProvider() {
1429@@ -100,21 +80,16 @@
1430 writeRow(a, 301, 3, "MA");
1431 writeRow(a, 302, 3, "ME");
1432
1433- fullText = serviceManager().getServiceByClass(FullTextIndexService.class);
1434-
1435 schema = SchemaCache.globalSchema(ais());
1436 adapter = newStoreAdapter(schema);
1437 queryContext = queryContext(adapter);
1438 queryBindings = queryContext.createBindings();
1439 }
1440
1441+
1442 @Test
1443 public void testPopulateScheduling() throws InterruptedException
1444 {
1445- // This test is specifically for FullTextIndexServiceImpl.java
1446- assertEquals(FullTextIndexServiceImpl.class, fullText.getClass());
1447- FullTextIndexServiceImpl fullTextImpl = (FullTextIndexServiceImpl)fullText;
1448-
1449 // disable the populate worker (so it doesn't read all the entries
1450 // out before we get a chance to look at the tree.
1451 fullTextImpl.disablePopulateWorker();
1452@@ -155,7 +130,7 @@
1453 // let the worker do its job.
1454 // (After it is done, the tree had better be empty)
1455 fullTextImpl.enablePopulateWorker();
1456- WaitFunctionHelpers.waitOn(fullText.getBackgroundWorks());
1457+ waitPopulate();
1458
1459 traverse(fullTextImpl,
1460 new Visitor()
1461@@ -180,11 +155,6 @@
1462 @Test
1463 public void testDeleteIndex() throws InterruptedException
1464 {
1465-
1466- // This test is specifically for FullTextIndexServiceImpl.java
1467- assertEquals(FullTextIndexServiceImpl.class, fullText.getClass());
1468- FullTextIndexServiceImpl fullTextImpl = (FullTextIndexServiceImpl)fullText;
1469-
1470 // <1> disable worker
1471 fullTextImpl.disablePopulateWorker();
1472
1473@@ -209,7 +179,7 @@
1474 deleteFullTextIndex(expecteds[0].getIndexName());
1475 deleteFullTextIndex(expecteds[1].getIndexName());
1476
1477- // <4> check that the tree only has one entry now (ie., epxecteds2[2]
1478+ // <4> check that the tree only has one entry now (ie. expected[2])
1479 traverse(fullTextImpl,
1480 new Visitor()
1481 {
1482@@ -231,8 +201,8 @@
1483
1484 // wake the worker up to do its job
1485 fullTextImpl.enablePopulateWorker();
1486- WaitFunctionHelpers.waitOn(fullText.getBackgroundWorks());
1487-
1488+ waitPopulate();
1489+
1490 session.close();
1491 }
1492
1493@@ -287,7 +257,7 @@
1494 SCHEMA, "c", "idx_c",
1495 "name", "i.sku", "a.state");
1496
1497- WaitFunctionHelpers.waitOn(fullText.getBackgroundWorks());
1498+ waitPopulate();
1499 RowType rowType = rowType("c");
1500 RowBase[] expected1 = new RowBase[]
1501 {
1502@@ -304,8 +274,8 @@
1503 writeRow(c, 5, "Sherlock Flintstone");
1504 writeRow(c, 6, "Mycroft Holmes");
1505 writeRow(c, 7, "Flintstone Lestrade");
1506-
1507- WaitFunctionHelpers.waitOn(fullText.getBackgroundWorks());
1508+
1509+ waitUpdate();
1510 RowBase expected2[] = new RowBase[]
1511 {
1512 row(rowType, 1L),
1513@@ -318,20 +288,20 @@
1514 ftScanAndCompare(builder, "flintstone", 15, expected2);
1515
1516 // part 3
1517- ((FullTextIndexServiceImpl)fullText).disableUpdateWorker();
1518+ fullTextImpl.disableUpdateWorker();
1519
1520 writeRow(c, 8, "Flintstone Hudson");
1521 writeRow(c, 9, "Jim Flintstone");
1522
1523 // The worker has been disabled, waitOn should return immediately
1524- WaitFunctionHelpers.waitOn(fullText.getBackgroundWorks());
1525-
1526+ waitUpdate();
1527+
1528 // confirm that new rows are not found (ie., expected2 still works)
1529 ftScanAndCompare(builder, "flintstone", 15, expected2);
1530
1531- ((FullTextIndexServiceImpl)fullText).enableUpdateWorker();
1532- WaitFunctionHelpers.waitOn(fullText.getBackgroundWorks());
1533-
1534+ fullTextImpl.enableUpdateWorker();
1535+ waitUpdate();
1536+
1537 // now the rows should be seen.
1538 // (Because disabling the worker does not stop the changes fron being recorded)
1539 RowBase expected3[] = new RowBase[]
1540@@ -352,7 +322,7 @@
1541 FullTextIndex index = createFullTextIndex(
1542 SCHEMA, "c", "idx_c",
1543 "name", "i.sku", "a.state");
1544- WaitFunctionHelpers.waitOn(fullText.getBackgroundWorks());
1545+ waitPopulate();
1546 RowType rowType = rowType("c");
1547 RowBase[] expected = new RowBase[] {
1548 row(rowType, 1L),
1549@@ -368,7 +338,7 @@
1550 FullTextIndex index = createFullTextIndex(
1551 SCHEMA, "o", "idx_o",
1552 "c.name", "i.sku");
1553- WaitFunctionHelpers.waitOn(fullText.getBackgroundWorks());
1554+ waitPopulate();
1555 RowType rowType = rowType("o");
1556 RowBase[] expected = new RowBase[] {
1557 row(rowType, 1L, 101L)
1558@@ -380,7 +350,7 @@
1559 @Test
1560 public void testTruncate() throws InterruptedException {
1561 FullTextIndex index = createFullTextIndex(SCHEMA, "c", "idx_c", "name", "i.sku", "a.state");
1562- WaitFunctionHelpers.waitOn(fullText.getBackgroundWorks());
1563+ waitPopulate();
1564
1565 final int limit = 15;
1566 RowType rowType = rowType("c");
1567@@ -398,25 +368,25 @@
1568 ftScanAndCompare(builder, skuQuery, limit, skuExpected);
1569
1570 dml().truncateTable(session(), a);
1571- WaitFunctionHelpers.waitOn(fullText.getBackgroundWorks());
1572+ waitUpdate();
1573 ftScanAndCompare(builder, nameQuery, limit, nameExpected);
1574 ftScanAndCompare(builder, stateQuery, limit, emptyExpected);
1575 ftScanAndCompare(builder, skuQuery, limit, skuExpected);
1576
1577 dml().truncateTable(session(), o);
1578- WaitFunctionHelpers.waitOn(fullText.getBackgroundWorks());
1579+ waitUpdate();
1580 ftScanAndCompare(builder, nameQuery, limit, nameExpected);
1581 ftScanAndCompare(builder, stateQuery, limit, emptyExpected);
1582 ftScanAndCompare(builder, skuQuery, limit, emptyExpected); // Non-cascading key, connection to c1 is unknown
1583
1584 dml().truncateTable(session(), i);
1585- WaitFunctionHelpers.waitOn(fullText.getBackgroundWorks());
1586+ waitUpdate();
1587 ftScanAndCompare(builder, nameQuery, limit, nameExpected);
1588 ftScanAndCompare(builder, stateQuery, limit, emptyExpected);
1589 ftScanAndCompare(builder, skuQuery, limit, emptyExpected);
1590
1591 dml().truncateTable(session(), c);
1592- WaitFunctionHelpers.waitOn(fullText.getBackgroundWorks());
1593+ waitUpdate();
1594 ftScanAndCompare(builder, nameQuery, limit, emptyExpected);
1595 ftScanAndCompare(builder, stateQuery, limit, emptyExpected);
1596 ftScanAndCompare(builder, skuQuery, limit, emptyExpected);
1597
1598=== added file 'src/test/java/com/akiban/server/service/text/FullTextIndexServiceITBase.java'
1599--- src/test/java/com/akiban/server/service/text/FullTextIndexServiceITBase.java 1970-01-01 00:00:00 +0000
1600+++ src/test/java/com/akiban/server/service/text/FullTextIndexServiceITBase.java 2013-07-29 13:55:36 +0000
1601@@ -0,0 +1,73 @@
1602+/**
1603+ * Copyright (C) 2009-2013 Akiban Technologies, Inc.
1604+ *
1605+ * This program is free software: you can redistribute it and/or modify
1606+ * it under the terms of the GNU Affero General Public License as published by
1607+ * the Free Software Foundation, either version 3 of the License, or
1608+ * (at your option) any later version.
1609+ *
1610+ * This program is distributed in the hope that it will be useful,
1611+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
1612+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
1613+ * GNU Affero General Public License for more details.
1614+ *
1615+ * You should have received a copy of the GNU Affero General Public License
1616+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
1617+ */
1618+
1619+package com.akiban.server.service.text;
1620+
1621+import com.akiban.qp.operator.QueryBindings;
1622+import com.akiban.qp.operator.QueryContext;
1623+import com.akiban.qp.operator.StoreAdapter;
1624+import com.akiban.qp.rowtype.Schema;
1625+import com.akiban.server.service.servicemanager.GuicedServiceManager;
1626+import com.akiban.server.test.it.ITBase;
1627+import org.junit.Before;
1628+
1629+import java.util.Map;
1630+
1631+public class FullTextIndexServiceITBase extends ITBase
1632+{
1633+ public static final String SCHEMA = "test";
1634+ protected FullTextIndexServiceImpl fullTextImpl;
1635+ protected Schema schema;
1636+ protected StoreAdapter adapter;
1637+ protected QueryContext queryContext;
1638+ protected QueryBindings queryBindings;
1639+ protected int c;
1640+ protected int o;
1641+ protected int i;
1642+ protected int a;
1643+
1644+
1645+ @Override
1646+ protected GuicedServiceManager.BindingsConfigurationProvider serviceBindingsProvider() {
1647+ return super.serviceBindingsProvider()
1648+ .bindAndRequire(FullTextIndexService.class, FullTextIndexServiceImpl.class);
1649+ }
1650+
1651+ @Override
1652+ public Map<String, String> startupConfigProperties() {
1653+ return uniqueStartupConfigProperties(getClass());
1654+ }
1655+
1656+
1657+ @Before
1658+ public final void castService() {
1659+ fullTextImpl = (FullTextIndexServiceImpl)serviceManager().getServiceByClass(FullTextIndexService.class);
1660+ }
1661+
1662+ protected void waitPopulate() {
1663+ fullTextImpl.waitPopulateCycle();
1664+ }
1665+
1666+ protected void waitUpdate() {
1667+ fullTextImpl.waitUpdateCycle();
1668+ }
1669+
1670+ protected void waitPopulateAndUpdate() {
1671+ waitPopulate();
1672+ waitUpdate();
1673+ }
1674+}
1675
1676=== renamed file 'src/test/resources/com/akiban/rest/text/background.wait' => 'src/test/resources/com/akiban/rest/text/full_text_background_wait'
1677--- src/test/resources/com/akiban/rest/text/background.wait 2013-04-17 16:23:40 +0000
1678+++ src/test/resources/com/akiban/rest/text/full_text_background_wait 2013-07-29 13:55:36 +0000
1679@@ -1,1 +0,0 @@
1680-fulltext_maintenance_wait
1681\ No newline at end of file
1682
1683=== modified file 'src/test/resources/com/akiban/sql/pg/yaml/functional/test-drop-fulltext.yaml'
1684--- src/test/resources/com/akiban/sql/pg/yaml/functional/test-drop-fulltext.yaml 2013-07-04 06:00:37 +0000
1685+++ src/test/resources/com/akiban/sql/pg/yaml/functional/test-drop-fulltext.yaml 2013-07-29 13:55:36 +0000
1686@@ -4,11 +4,11 @@
1687 ---
1688 - Statement: CREATE INDEX idx on t(full_text(name));
1689 ---
1690-- Statement: SELECT fulltext_maintenance_wait();
1691+- Statement: CALL sys.full_text_background_wait()
1692 ---
1693 - Statement: INSERT INTO t values (1, 'bar1'), (2, 'bar2'), (3, 'bar3'), (4, 'bar1');
1694 ---
1695-- Statement: SELECT fulltext_maintenance_wait();
1696+- Statement: CALL sys.full_text_background_wait()
1697 ---
1698 - Statement: SELECT id from t where full_text_search(name='bar1');
1699 - output: [[1], [4]]
1700@@ -23,7 +23,7 @@
1701 ---
1702 - Statement: CREATE INDEX t2_idx on t2(full_text(name));
1703 ---
1704-- Statement: SELECT fulltext_maintenance_wait();
1705+- Statement: CALL sys.full_text_background_wait()
1706 ---
1707 - Statement: SELECT * FROM t2 WHERE FULL_TEXT_SEARCH(name, 'Jo*');
1708 - row_count: 0
1709@@ -36,7 +36,7 @@
1710 ---
1711 - Statement: CREATE INDEX t2_idx on t2(full_text(name));
1712 ---
1713-- Statement: SELECT fulltext_maintenance_wait()
1714+- Statement: CALL sys.full_text_background_wait()
1715 ---
1716 - Statement: SELECT * FROM t2 WHERE FULL_TEXT_SEARCH(name, 'Jo*');
1717 - output: [[1, 'John']]
1718
1719=== modified file 'src/test/resources/com/akiban/sql/pg/yaml/functional/test-ft-with-inherited-key.yaml'
1720--- src/test/resources/com/akiban/sql/pg/yaml/functional/test-ft-with-inherited-key.yaml 2013-07-02 19:16:02 +0000
1721+++ src/test/resources/com/akiban/sql/pg/yaml/functional/test-ft-with-inherited-key.yaml 2013-07-29 13:55:36 +0000
1722@@ -14,16 +14,14 @@
1723 ---
1724 - Statement: CREATE INDEX t3_ft ON t3(FULL_TEXT(name));
1725 ---
1726-- Statement: SELECT fulltext_maintenance_wait();
1727-- output: [[0]]
1728+- Statement: CALL sys.full_text_background_wait()
1729 ---
1730 - Statement: SELECT * FROM t3 WHERE FULL_TEXT_SEARCH(name, 'fred');
1731 - output: [[111, 11, 'Fred']]
1732 ---
1733 - Statement: UPDATE t2 SET pid = 2 WHERE pid = 1;
1734 ---
1735-- Statement: SELECT fulltext_maintenance_wait();
1736-- output: [[0]]
1737+- Statement: CALL sys.full_text_background_wait()
1738 ---
1739 - Statement: SELECT * FROM t3 WHERE FULL_TEXT_SEARCH(name, 'fred');
1740 - output: [[111, 11, 'Fred']]
1741
1742=== modified file 'src/test/resources/com/akiban/sql/pg/yaml/functional/test-ft-with-pk.yaml'
1743--- src/test/resources/com/akiban/sql/pg/yaml/functional/test-ft-with-pk.yaml 2013-07-02 19:16:02 +0000
1744+++ src/test/resources/com/akiban/sql/pg/yaml/functional/test-ft-with-pk.yaml 2013-07-29 13:55:36 +0000
1745@@ -8,19 +8,14 @@
1746 ---
1747 - Statement: create index idx1 on t (full_text(name));
1748 ---
1749-- Statement: SELECT fulltext_maintenance_wait();
1750-- output: [[0]]
1751+- Statement: CALL sys.full_text_background_wait()
1752 ---
1753 - Statement: SELECT id FROM t where full_text_search(name = 'foo1');
1754 - output: [[1]]
1755 ---
1756 - Statement: UPDATE t SET id = 3 where id = 1;
1757 ---
1758-- Statement: SELECT fulltext_maintenance_wait();
1759-- output: [[0]]
1760----
1761-- Statement: SELECT fulltext_maintenance_wait();
1762-- output: [[0]]
1763+- Statement: CALL sys.full_text_background_wait()
1764 ---
1765 - Statement: SELECT id FROM t where full_text_search(name = 'foo1');
1766 - output: [[3]]
1767
1768=== modified file 'src/test/resources/com/akiban/sql/pg/yaml/functional/test-fulltext-maintenance.yaml'
1769--- src/test/resources/com/akiban/sql/pg/yaml/functional/test-fulltext-maintenance.yaml 2013-07-04 06:00:37 +0000
1770+++ src/test/resources/com/akiban/sql/pg/yaml/functional/test-fulltext-maintenance.yaml 2013-07-29 13:55:36 +0000
1771@@ -17,14 +17,12 @@
1772 - Statement: INSERT INTO t VALUES (4, 'bar4');
1773 ---
1774 - Statement: CREATE INDEX idx ON t (full_text(name));
1775---- # blocks until the maintenance is done. Return value of 0 means nothing wrong happened!
1776-- Statement: SELECT fulltext_maintenance_wait();
1777-- output: [[0]]
1778---- # create a yet another index (to ensure the scheduler still works after forced execution)
1779+---
1780+- Statement: CALL sys.full_text_background_wait()
1781+---
1782 - Statement: CREATE INDEX idx2 on t2(full_text(name2));
1783 ---
1784-- Statement: SELECT fulltext_maintenance_wait();
1785-- output: [[0]]
1786+- Statement: CALL sys.full_text_background_wait()
1787 ---
1788 - Statement: SELECT id from t2 where full_text_search(name2 = 'foo1');
1789 - output: [[1]]
1790@@ -34,8 +32,7 @@
1791 ---
1792 - Statement: UPDATE t set name='foo' WHERE id = 1;
1793 ---
1794-- Statement: SELECT fulltext_maintenance_wait();
1795-- output: [[0]]
1796+- Statement: CALL sys.full_text_background_wait()
1797 ---
1798 - Statement: select id from t where full_text_search(name = 'bar1');
1799 - output: [[2]]
1800@@ -45,8 +42,7 @@
1801 ---
1802 - Statement: DELETE FROM t where id = 4;
1803 ---
1804-- Statement: SELECT fulltext_maintenance_wait();
1805-- output: [[0]]
1806+- Statement: CALL sys.full_text_background_wait()
1807 ---
1808 - Statement: SELECT id from t where full_text_search(name = 'bar4');
1809 - row_count: 0

Subscribers

People subscribed via source and target branches