Merge lp:~ajayaa/drizzle/event-notify-interface into lp:drizzle

Proposed by Ajaya Agrawal
Status: Needs review
Proposed branch: lp:~ajayaa/drizzle/event-notify-interface
Merge into: lp:drizzle
Diff against target: 819 lines (+672/-1)
18 files modified
drizzled/plugin/event_observer.cc (+16/-0)
drizzled/plugin/event_observer.h (+17/-0)
plugin/event_notify/event_notify.cc (+346/-0)
plugin/event_notify/event_notify.h (+81/-0)
plugin/event_notify/plugin.ini (+3/-0)
plugin/event_notify/tests/r/delete.result (+13/-0)
plugin/event_notify/tests/r/insert.result (+18/-0)
plugin/event_notify/tests/r/sys_replication_log.result (+12/-0)
plugin/event_notify/tests/r/time_out.result (+3/-0)
plugin/event_notify/tests/r/update.result (+13/-0)
plugin/event_notify/tests/t/delete.test (+21/-0)
plugin/event_notify/tests/t/insert.test (+28/-0)
plugin/event_notify/tests/t/master.opt (+1/-0)
plugin/event_notify/tests/t/sys_replication_log.test (+20/-0)
plugin/event_notify/tests/t/time_out.test (+2/-0)
plugin/event_notify/tests/t/update.test (+21/-0)
plugin/slave/queue_producer.cc (+56/-0)
plugin/slave/queue_thread.h (+1/-1)
To merge this branch: bzr merge lp:~ajayaa/drizzle/event-notify-interface
Reviewer Review Type Date Requested Status
Stewart Smith (community) Approve
Review via email: mp+185343@code.launchpad.net

Description of the change

Added event_notify interface. Currently three types of event i.e. INSERT, UPDATE, DELETE in any type of table is observed. Client can issue wait_for() call to wait for certain number of events and then return.

To post a comment you must log in.
2649. By Ajaya Agrawal

changed slave code

Revision history for this message
Stewart Smith (stewart) :
review: Approve

Unmerged revisions

2649. By Ajaya Agrawal

changed slave code

2648. By Ajaya Agrawal

added test cases for all type of events including insert in sys_replication_log and timeout

2647. By Ajaya Agrawal

added test cases for all type of events including insert in sys_replication_log and timeout

2646. By Ajaya Agrawal

fixed memory leak.working correctly. writing more test cases

2645. By Ajaya Agrawal

memory leak need to be fixed

2644. By Ajaya Agrawal

added a new event for replicationlog

2643. By Ajaya Agrawal

Added time_out support and test cases

2642. By Ajaya Agrawal

Added time_out support and test cases

2641. By Ajaya Agrawal

removed all hard coding and print statements and added destructor for event_t class

2640. By Ajaya Agrawal

removed all hard coding\n added destructor for event_t class

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'drizzled/plugin/event_observer.cc'
2--- drizzled/plugin/event_observer.cc 2011-03-22 12:01:19 +0000
3+++ drizzled/plugin/event_observer.cc 2013-09-18 20:41:29 +0000
4@@ -421,6 +421,13 @@
5 return (in_table.getMutableShare()->getTableObservers() != NULL);
6 }
7
8+ // bool InsertToSysReplicationLogEventData::callEventObservers()
9+ // {
10+ // EventObserverList *observers = new EventObserverList();
11+ // observers->addObserver(this, AFTER_INSERT_TO_SYS_REPLICATION_LOG);
12+
13+ // }
14+
15 /*==========================================================*/
16 /* Static meathods called by drizzle to notify interested plugins
17 * of a schema event.
18@@ -599,6 +606,15 @@
19 return eventData.callEventObservers();
20 }
21
22+ bool EventObserver::afterInsertToSysReplicationLog(Session &session)
23+ {
24+ if (all_event_plugins.empty())
25+ return false;
26+ InsertToSysReplicationLogEventData eventData(session);
27+ return eventData.callEventObservers();
28+ // std::cout << "event happening on sys_replication_log\n";
29+ //return true;
30+ }
31
32 } /* namespace plugin */
33 } /* namespace drizzled */
34
35=== modified file 'drizzled/plugin/event_observer.h'
36--- drizzled/plugin/event_observer.h 2011-08-14 17:04:01 +0000
37+++ drizzled/plugin/event_observer.h 2013-09-18 20:41:29 +0000
38@@ -79,6 +79,8 @@
39 BEFORE_UPDATE_RECORD, AFTER_UPDATE_RECORD,
40 BEFORE_DELETE_RECORD, AFTER_DELETE_RECORD,
41
42+ AFTER_INSERT_TO_SYS_REPLICATION_LOG,
43+
44 /* The max event ID marker. */
45 MAX_EVENT_COUNT
46 };
47@@ -87,6 +89,10 @@
48 {
49 switch(event)
50 {
51+
52+ case AFTER_INSERT_TO_SYS_REPLICATION_LOG:
53+ return "AFTER_INSERT_TO_SYS_REPLICATION_LOG";
54+
55 case BEFORE_DROP_TABLE:
56 return "BEFORE_DROP_TABLE";
57
58@@ -236,6 +242,8 @@
59 static bool beforeDropDatabase(Session &session, const std::string &db);
60 static bool afterDropDatabase(Session &session, const std::string &db, int err);
61
62+ static bool afterInsertToSysReplicationLog(Session &session);
63+
64 static const EventObserverVector &getEventObservers(void);
65
66 };
67@@ -553,6 +561,15 @@
68 {}
69 };
70
71+class InsertToSysReplicationLogEventData: public SessionEventData
72+{
73+public:
74+ InsertToSysReplicationLogEventData(Session &session_arg):
75+ SessionEventData(EventObserver::AFTER_INSERT_TO_SYS_REPLICATION_LOG, session_arg)
76+ {}
77+ // virtual bool callEventObservers();
78+};
79+
80 //=======================================================
81
82 } /* namespace plugin */
83
84=== added directory 'plugin/event_notify'
85=== added file 'plugin/event_notify/event_notify.cc'
86--- plugin/event_notify/event_notify.cc 1970-01-01 00:00:00 +0000
87+++ plugin/event_notify/event_notify.cc 2013-09-18 20:41:29 +0000
88@@ -0,0 +1,346 @@
89+ /* -*- mode: c++; c-basic-offset: 2; indent-tabs-mode: nil; -*-
90+ * vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
91+ *
92+ * Copyright (C) Ajaya Agrawal
93+ *
94+ * This program is free software; you can redistribute it and/or modify
95+ * it under the terms of the GNU General Public License as published by
96+ * the Free Software Foundation; version 2 of the License.
97+ *
98+ * This program is distributed in the hope that it will be useful,
99+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
100+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
101+ * GNU General Public License for more details.
102+ *
103+ * You should have received a copy of the GNU General Public License
104+ * along with this program; if not, write to the Free Software
105+ * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
106+ */
107+
108+ #include <config.h>
109+ #include <fcntl.h>
110+ #include <drizzled/function/str/strfunc.h>
111+ #include <drizzled/item/func.h>
112+ #include <drizzled/plugin.h>
113+ #include <drizzled/plugin/function.h>
114+ #include <drizzled/item.h>
115+ #include <drizzled/module/option_map.h>
116+ #include <drizzled/session.h>
117+ #include <drizzled/session/times.h>
118+ #include <drizzled/table/instance/base.h>
119+ #include <drizzled/table.h>
120+
121+ #include "event_notify.h"
122+
123+ namespace po= boost::program_options;
124+ using namespace std;
125+ using namespace drizzled;
126+ using namespace plugin;
127+
128+ static EventNotify *event_notify = NULL;
129+
130+ struct AddressIs {
131+ event_t *ptr;
132+ AddressIs(event_t *ptr) : ptr(ptr) {}
133+ bool operator()(const event_t *object) const {
134+ return ptr == object;
135+ }
136+ };
137+
138+ // void show()
139+ // {
140+ // for (std::map<std::string, list<event_t*> >::iterator map_it=event_notify->all_events.begin(); map_it!=event_notify->all_events.end(); ++map_it){
141+ // std::cout << map_it->first << " ";
142+ // for (list<event_t*>::iterator list_it=map_it->second.begin(); list_it!=map_it->second.end(); ++list_it)
143+ // std::cout << (*list_it)->event_type << " " << (*list_it)->time_out << " " << (*list_it)->num_of_events_threshold<< " " << (*list_it)->num_of_events_so_far << '\n';
144+ // }
145+ // }
146+
147+ event_t::event_t(String *_table_name, String *_event_type, String *_time_out, String *_num_of_events_threshold)
148+ {
149+ this->table_name = _table_name->c_str();
150+ this->event_type = _event_type->c_str();
151+ time_out = atof(_time_out->c_str());
152+ num_of_events_threshold = atoi(_num_of_events_threshold->c_str());
153+ pthread_mutex_init(&this->lock, NULL);
154+ pthread_cond_init(&this->cond, NULL);
155+ num_of_events_so_far = 0;
156+ }
157+
158+ event_t::~event_t()
159+ {
160+ pthread_mutex_destroy(&lock);
161+ pthread_cond_destroy(&cond);
162+ }
163+
164+ EventNotify::EventNotify(bool enabled):
165+ drizzled::plugin::EventObserver("event_notify_interface"),
166+ sysvar_enabled(enabled)
167+ {
168+ }
169+
170+ EventNotify::~EventNotify()
171+ {
172+ }
173+
174+ void EventNotify::registerTableEventsDo(TableShare &table_share, EventObserverList &observers)
175+ {
176+ boost::mutex::scoped_lock scoped(all_events_mutex);
177+
178+ string str(table_share.getTableName());
179+ //cout << "in registertableeventsdo table name is " << str <<endl;
180+ //show();
181+ if (sysvar_enabled == false || (all_events.find(str) == all_events.end())){
182+ return;
183+ }
184+ //cout << "yes event registered on this table\n";
185+ registerEvent(observers, AFTER_INSERT_RECORD);
186+ registerEvent(observers, AFTER_UPDATE_RECORD);
187+ registerEvent(observers, AFTER_DELETE_RECORD);
188+ }
189+
190+ void EventNotify::registerSessionEventsDo(Session &, EventObserverList &observers)
191+ {
192+ //here need to check whether drizzled was started with option --innodb.replication-log.
193+ //cout << "registered for sys_replication_log\n";
194+ registerEvent(observers, AFTER_INSERT_TO_SYS_REPLICATION_LOG);
195+ }
196+
197+ bool EventNotify::observeEventDo(EventData &data)
198+ {
199+ if (not sysvar_enabled)
200+ return false;
201+
202+ switch (data.event) {
203+
204+ case AFTER_INSERT_RECORD:
205+ afterInsertRecord((AfterInsertRecordEventData &)data);
206+ break;
207+
208+ case AFTER_UPDATE_RECORD:
209+ afterUpdateRecord((AfterUpdateRecordEventData &)data);
210+ break;
211+
212+ case AFTER_DELETE_RECORD:
213+ afterDeleteRecord((AfterDeleteRecordEventData &)data);
214+ break;
215+
216+ case AFTER_INSERT_TO_SYS_REPLICATION_LOG:
217+ afterInsertToSysReplicationLog((InsertToSysReplicationLogEventData&) data);
218+ break;
219+
220+ default:
221+ fprintf(stderr, "event_notify: Unexpected event '%s'\n",
222+ EventObserver::eventName(data.event));
223+ }
224+
225+ return false;
226+ }
227+
228+ bool EventNotify::afterInsertRecord(AfterInsertRecordEventData &data)
229+ {
230+ std::string str(data.table.getShare()->getTableName());
231+ std::map<std::string, list<event_t*> >::iterator map_it;
232+ boost::mutex::scoped_lock scoped(all_events_mutex);
233+ map_it = all_events.find(str);
234+
235+ if(map_it != all_events.end()){
236+ for (list<event_t*>::iterator list_it=map_it->second.begin(); list_it!=map_it->second.end(); ++list_it){
237+ if((*list_it)->event_type.compare("INSERT") == 0){
238+ pthread_mutex_lock(&(*list_it)->lock);
239+ (*list_it)->num_of_events_so_far++;
240+ if((*list_it)->num_of_events_so_far >= (*list_it)->num_of_events_threshold){
241+ pthread_cond_broadcast(&(*list_it)->cond);
242+ fprintf(stderr, "broadcast %p\n", (void*)&(*list_it)->cond);
243+ }
244+ pthread_mutex_unlock(&(*list_it)->lock);
245+ }
246+ }
247+ }
248+ return false;
249+ }
250+
251+ bool EventNotify::afterUpdateRecord(AfterUpdateRecordEventData &data)
252+ {
253+ std::string str(data.table.getShare()->getTableName());
254+ std::map<std::string, list<event_t*> >::iterator map_it;
255+ boost::mutex::scoped_lock scoped(all_events_mutex);
256+ map_it = all_events.find(str);
257+
258+ if(map_it != all_events.end()){
259+ for (list<event_t*>::iterator list_it=map_it->second.begin(); list_it!=map_it->second.end(); ++list_it){
260+ if((*list_it)->event_type.compare("UPDATE") == 0){
261+ pthread_mutex_lock(&(*list_it)->lock);
262+ (*list_it)->num_of_events_so_far++;
263+ if((*list_it)->num_of_events_so_far >= (*list_it)->num_of_events_threshold){
264+ pthread_cond_broadcast(&(*list_it)->cond);
265+ fprintf(stderr, "broadcast %p\n", (void*)&(*list_it)->cond);
266+ }
267+ pthread_mutex_unlock(&(*list_it)->lock);
268+ }
269+ }
270+ }
271+ return false;
272+ }
273+
274+ bool EventNotify::afterDeleteRecord(AfterDeleteRecordEventData &data)
275+ {
276+ std::string str(data.table.getShare()->getTableName());
277+ std::map<std::string, list<event_t*> >::iterator map_it;
278+ boost::mutex::scoped_lock scoped(all_events_mutex);
279+ map_it = all_events.find(str);
280+
281+ if(map_it != all_events.end()){
282+ for (list<event_t*>::iterator list_it=map_it->second.begin(); list_it!=map_it->second.end(); ++list_it){
283+ if((*list_it)->event_type.compare("DELETE") == 0){
284+ pthread_mutex_lock(&(*list_it)->lock);
285+ (*list_it)->num_of_events_so_far++;
286+ if((*list_it)->num_of_events_so_far >= (*list_it)->num_of_events_threshold){
287+ pthread_cond_broadcast(&(*list_it)->cond);
288+ fprintf(stderr, "broadcast %p\n", (void*)&(*list_it)->cond);
289+ }
290+ pthread_mutex_unlock(&(*list_it)->lock);
291+ }
292+ }
293+ }
294+ return false;
295+ }
296+
297+
298+ bool EventNotify::afterInsertToSysReplicationLog(InsertToSysReplicationLogEventData &data)
299+ {
300+ std::string str = "sys_replication_log";
301+ std::map<std::string, list<event_t*> >::iterator map_it;
302+ boost::mutex::scoped_lock scoped(all_events_mutex);
303+ map_it = all_events.find(str);
304+
305+ if(map_it != all_events.end()){
306+ for (list<event_t*>::iterator list_it=map_it->second.begin(); list_it!=map_it->second.end(); ++list_it){
307+ if((*list_it)->event_type.compare("INSERT") == 0){
308+ pthread_mutex_lock(&(*list_it)->lock);
309+ (*list_it)->num_of_events_so_far++;
310+ if((*list_it)->num_of_events_so_far >= (*list_it)->num_of_events_threshold){
311+ pthread_cond_broadcast(&(*list_it)->cond);
312+ fprintf(stderr, "broadcast %p\n", (void*)&(*list_it)->cond);
313+ }
314+ pthread_mutex_unlock(&(*list_it)->lock);
315+ }
316+ }
317+ }
318+ return false;
319+ }
320+
321+
322+ class Item_func_event_notify : public Item_str_func
323+ {
324+ public:
325+ Item_func_event_notify() : Item_str_func() {}
326+ const char *func_name() const { return "wait_for"; }
327+
328+ String *val_str(String* s)
329+ {
330+ String *table_name = args[0]->val_str(s);
331+ event_t *new_event = new event_t(args[0]->val_str(s),(args[1]->val_str(s)), (args[2]->val_str(s)), (args[3]->val_str(s)));
332+
333+ map<std::string, list<event_t*> >::iterator map_it;
334+ list<event_t*>::iterator list_it;
335+
336+ {
337+ boost::mutex::scoped_lock scoped(event_notify->all_events_mutex);
338+ map_it = event_notify->all_events.find(new_event->table_name);
339+
340+ if(map_it == event_notify->all_events.end()){
341+ list<event_t*> my_list;
342+ my_list.push_back(new_event);
343+ event_notify->all_events[new_event->table_name] = my_list;
344+ }
345+ else{
346+ for (list_it=map_it->second.begin(); list_it!=map_it->second.end(); ++list_it){
347+ if((*list_it)->event_type.compare(new_event->event_type) == 0 ) {
348+ if( new_event->num_of_events_threshold <= (*list_it)->num_of_events_so_far){
349+ //int retval = new_event->num_of_events_threshold;
350+ delete new_event;
351+ return table_name;
352+ }
353+ new_event->num_of_events_so_far = (*list_it)->num_of_events_so_far;
354+ break;
355+ }
356+ }
357+ map_it->second.push_back(new_event);
358+ }
359+ }
360+ /*
361+ * waiting for number of events reach its threshold.
362+ */
363+ struct timespec timeToWait;
364+ struct timeval now;
365+ gettimeofday(&now,NULL);
366+ timeToWait.tv_sec = now.tv_sec+new_event->time_out;
367+ pthread_mutex_lock(&new_event->lock);
368+ //while(new_event->num_of_events_so_far < new_event->num_of_events_threshold )
369+ fprintf(stderr, "wait %p\n", (void*)&new_event->cond);
370+ pthread_cond_timedwait(&new_event->cond, &new_event->lock, &timeToWait);
371+ fprintf(stderr, "SIGNALLED %p\n", (void*)&new_event->cond);
372+
373+ pthread_mutex_unlock(&new_event->lock);
374+ int count_events = 0;
375+ {
376+ boost::mutex::scoped_lock scoped(event_notify->all_events_mutex);
377+ map<std::string, list<event_t*> >::iterator map_iter;
378+ list<event_t*>::iterator list_iter;
379+ map_iter = event_notify->all_events.find(new_event->table_name);
380+ for (list_iter=map_iter->second.begin(); list_iter!=map_iter->second.end(); ++list_iter){
381+ if((*list_iter)->event_type.compare(new_event->event_type) == 0 ){
382+ count_events++;
383+ }
384+ }
385+ if(count_events > 1){
386+ map_iter->second.remove_if(AddressIs(new_event));
387+ delete new_event;
388+ }
389+ }
390+ return table_name;
391+ }
392+
393+ void fix_length_and_dec() {
394+ max_length=strlen("wait_for");
395+ }
396+
397+ bool check_argument_count(int n)
398+ {
399+ return (n == 4);
400+ }
401+ };
402+
403+ plugin::Create_function<Item_func_event_notify> *event_notify_udf= NULL;
404+
405+ static void init_options(drizzled::module::option_context &)
406+ {
407+ event_notify = new EventNotify(true);
408+ }
409+
410+ static int event_notifier_plugin_init(drizzled::module::Context &context)
411+ {
412+
413+ event_notify_udf=
414+ new plugin::Create_function<Item_func_event_notify>("wait_for");
415+ context.add(event_notify_udf);
416+ context.add(event_notify);
417+ context.registerVariable(new sys_var_bool_ptr("enabled", &event_notify->sysvar_enabled));
418+ return 0;
419+ }
420+
421+
422+ DRIZZLE_DECLARE_PLUGIN
423+ {
424+ DRIZZLE_VERSION_ID, /* DRIZZLE_VERSION_ID */
425+ "Event_Notifier", /* module name */
426+ "1.0", /* module version */
427+ "Ajaya Agrawal", /* author(s) */
428+ N_("indicates an event happening on certain table/database/session"), /* description */
429+ PLUGIN_LICENSE_BSD, /* license */
430+ event_notifier_plugin_init, /* init module function */
431+ NULL, /* module dependencies */
432+ init_options /* init options function */
433+ }
434+ DRIZZLE_DECLARE_PLUGIN_END;
435\ No newline at end of file
436
437=== added file 'plugin/event_notify/event_notify.h'
438--- plugin/event_notify/event_notify.h 1970-01-01 00:00:00 +0000
439+++ plugin/event_notify/event_notify.h 2013-09-18 20:41:29 +0000
440@@ -0,0 +1,81 @@
441+/* - mode: c; c-basic-offset: 2; indent-tabs-mode: nil; -*-
442+ * vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
443+ *
444+ * Copyright 2013 Ajaya Agrawal
445+ *
446+ * This program is free software: you can redistribute it and/or modify
447+ * it under the terms of the GNU General Public License as published by
448+ * the Free Software Foundation, either version 3 of the License, or
449+ * (at your option) any later version.
450+ *
451+ * This program is distributed in the hope that it will be useful,
452+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
453+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
454+ * GNU General Public License for more details.
455+ *
456+ * You should have received a copy of the GNU General Public License
457+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
458+ */
459+
460+#pragma once
461+
462+#include <drizzled/plugin/event_observer.h>
463+#include <boost/thread/mutex.hpp>
464+
465+#include <map>
466+#include <list>
467+#include <pthread.h>
468+#include <string>
469+#include <cstring>
470+
471+namespace drizzled {
472+namespace plugin {
473+
474+struct cmp_str
475+{
476+ bool operator()(std::string a, std::string b)
477+ {
478+ return (a.compare(b) < 0);
479+ }
480+};
481+
482+class event_t {
483+
484+public:
485+ event_t(String *_session_time, String *_event_type, String *_table_name, String *_num_of_events);
486+ ~event_t();
487+ std::string table_name;
488+ std::string event_type;
489+ pthread_mutex_t lock;
490+ pthread_cond_t cond;
491+ int num_of_events_threshold;
492+ double time_out;
493+ int num_of_events_so_far;
494+};
495+
496+class EventNotify : public drizzled::plugin::EventObserver
497+{
498+public:
499+ EventNotify(bool enabled);
500+ ~EventNotify();
501+
502+ void registerTableEventsDo(TableShare &table_share, EventObserverList &observers);
503+ void registerSessionEventsDo(Session &session, EventObserverList &observers);
504+ bool observeEventDo(EventData &);
505+ bool afterStatement(AfterStatementEventData &data);
506+ bool afterInsertRecord(AfterInsertRecordEventData &data);
507+ bool afterUpdateRecord(AfterUpdateRecordEventData &data);
508+ bool afterDeleteRecord(AfterDeleteRecordEventData &data);
509+ bool afterInsertToSysReplicationLog(InsertToSysReplicationLogEventData &data);
510+ /**
511+ * These are the event_notify system variables. So sysvar_enabled is
512+ * event_notify_enabled in SHOW VARIABLES, etc. They are all global and dynamic.
513+ */
514+ bool sysvar_enabled;
515+ boost::mutex all_events_mutex;
516+ boost::mutex counter_mutex;
517+ std::map <std::string, std::list<event_t*>, cmp_str > all_events;
518+};
519+
520+} /* namespace plugin */
521+} /* namespace drizzled */
522
523=== added file 'plugin/event_notify/plugin.ini'
524--- plugin/event_notify/plugin.ini 1970-01-01 00:00:00 +0000
525+++ plugin/event_notify/plugin.ini 2013-09-18 20:41:29 +0000
526@@ -0,0 +1,3 @@
527+[plugin]
528+sources=event_notify.cc
529+headers=event_notify.h
530
531=== added directory 'plugin/event_notify/tests'
532=== added directory 'plugin/event_notify/tests/r'
533=== added file 'plugin/event_notify/tests/r/delete.result'
534--- plugin/event_notify/tests/r/delete.result 1970-01-01 00:00:00 +0000
535+++ plugin/event_notify/tests/r/delete.result 2013-09-18 20:41:29 +0000
536@@ -0,0 +1,13 @@
537+drop table if exists t1;
538+create table t1 (a int primary key);
539+select wait_for("t1", "DELETE", "200.9", "2");
540+insert into t1 values(1),(2),(3);
541+delete from t1 where a=1;
542+delete from t1 where a=2;
543+delete from t1 where a=3;
544+wait_for("t1", "DELETE", "200.9", "2")
545+t1
546+select wait_for("t1", "DELETE", "1000", "3");
547+wait_for("t1", "DELETE", "1000", "3")
548+t1
549+drop table t1;
550
551=== added file 'plugin/event_notify/tests/r/insert.result'
552--- plugin/event_notify/tests/r/insert.result 1970-01-01 00:00:00 +0000
553+++ plugin/event_notify/tests/r/insert.result 2013-09-18 20:41:29 +0000
554@@ -0,0 +1,18 @@
555+drop table if exists t1;
556+create table t1 (a int primary key);
557+select wait_for("t1", "INSERT", "100", "4");
558+insert into t1 values(1);
559+insert into t1 values(2);
560+insert into t1 values(3);
561+insert into t1 values(4);
562+wait_for("t1", "INSERT", "100", "4")
563+t1
564+select wait_for("t1", "INSERT", "100", "5");
565+insert into t1 values(5);
566+wait_for("t1", "INSERT", "100", "5")
567+t1
568+insert into t1 values(60);
569+select wait_for("t1", "INSERT", "100", "6");
570+wait_for("t1", "INSERT", "100", "6")
571+t1
572+drop table t1;
573
574=== added file 'plugin/event_notify/tests/r/sys_replication_log.result'
575--- plugin/event_notify/tests/r/sys_replication_log.result 1970-01-01 00:00:00 +0000
576+++ plugin/event_notify/tests/r/sys_replication_log.result 2013-09-18 20:41:29 +0000
577@@ -0,0 +1,12 @@
578+drop table if exists t1;
579+create table t1 (a int primary key);
580+select wait_for("sys_replication_log", "INSERT", "200.9", "1");
581+insert into t1 values(1);
582+insert into t1 values(2);
583+insert into t1 values(3);
584+wait_for("sys_replication_log", "INSERT", "200.9", "1")
585+sys_replication_log
586+select wait_for("sys_replication_log", "INSERT", "100", "3");
587+wait_for("sys_replication_log", "INSERT", "100", "3")
588+sys_replication_log
589+drop table t1;
590
591=== added file 'plugin/event_notify/tests/r/time_out.result'
592--- plugin/event_notify/tests/r/time_out.result 1970-01-01 00:00:00 +0000
593+++ plugin/event_notify/tests/r/time_out.result 2013-09-18 20:41:29 +0000
594@@ -0,0 +1,3 @@
595+select wait_for("t1", "INSERT", "5", "20");
596+wait_for("t1", "INSERT", "5", "20")
597+t1
598
599=== added file 'plugin/event_notify/tests/r/update.result'
600--- plugin/event_notify/tests/r/update.result 1970-01-01 00:00:00 +0000
601+++ plugin/event_notify/tests/r/update.result 2013-09-18 20:41:29 +0000
602@@ -0,0 +1,13 @@
603+drop table if exists t1;
604+create table t1 (a int primary key);
605+select wait_for("t1", "UPDATE", "200.9", "1");
606+insert into t1 values(1),(2),(3);
607+update t1 set a=10 where a=1;
608+update t1 set a=20 where a=2;
609+update t1 set a=30 where a=3;
610+wait_for("t1", "UPDATE", "200.9", "1")
611+t1
612+select wait_for("t1", "UPDATE", "1000", "2");
613+wait_for("t1", "UPDATE", "1000", "2")
614+t1
615+drop table t1;
616
617=== added directory 'plugin/event_notify/tests/t'
618=== added file 'plugin/event_notify/tests/t/delete.test'
619--- plugin/event_notify/tests/t/delete.test 1970-01-01 00:00:00 +0000
620+++ plugin/event_notify/tests/t/delete.test 2013-09-18 20:41:29 +0000
621@@ -0,0 +1,21 @@
622+connect (con1,localhost,root,,);
623+--disable_warnings
624+drop table if exists t1;
625+--enable_warnings
626+create table t1 (a int primary key);
627+connection con1;
628+send select wait_for("t1", "DELETE", "200.9", "2");
629+connection default;
630+insert into t1 values(1),(2),(3);
631+delete from t1 where a=1;
632+delete from t1 where a=2;
633+delete from t1 where a=3;
634+connection con1;
635+reap;
636+send select wait_for("t1", "DELETE", "1000", "3");
637+connection default;
638+connection con1;
639+reap;
640+disconnect con1;
641+connection default;
642+drop table t1;
643
644=== added file 'plugin/event_notify/tests/t/insert.test'
645--- plugin/event_notify/tests/t/insert.test 1970-01-01 00:00:00 +0000
646+++ plugin/event_notify/tests/t/insert.test 2013-09-18 20:41:29 +0000
647@@ -0,0 +1,28 @@
648+connect (con1,localhost,root,,);
649+connect (con2,localhost,root,,);
650+--disable_warnings
651+drop table if exists t1;
652+--enable_warnings
653+create table t1 (a int primary key);
654+connection con1;
655+send select wait_for("t1", "INSERT", "100", "4");
656+connection default;
657+insert into t1 values(1);
658+insert into t1 values(2);
659+insert into t1 values(3);
660+insert into t1 values(4);
661+connection con1;
662+reap;
663+connection con2;
664+send select wait_for("t1", "INSERT", "100", "5");
665+connection default;
666+insert into t1 values(5);
667+connection con2;
668+reap;
669+insert into t1 values(60);
670+send select wait_for("t1", "INSERT", "100", "6");
671+reap;
672+disconnect con1;
673+disconnect con2;
674+connection default;
675+drop table t1;
676
677=== added file 'plugin/event_notify/tests/t/master.opt'
678--- plugin/event_notify/tests/t/master.opt 1970-01-01 00:00:00 +0000
679+++ plugin/event_notify/tests/t/master.opt 2013-09-18 20:41:29 +0000
680@@ -0,0 +1,1 @@
681+--plugin-add=event_notify --innodb.replication-log
682
683=== added file 'plugin/event_notify/tests/t/sys_replication_log.test'
684--- plugin/event_notify/tests/t/sys_replication_log.test 1970-01-01 00:00:00 +0000
685+++ plugin/event_notify/tests/t/sys_replication_log.test 2013-09-18 20:41:29 +0000
686@@ -0,0 +1,20 @@
687+connect (con1,localhost,root,,);
688+--disable_warnings
689+drop table if exists t1;
690+--enable_warnings
691+create table t1 (a int primary key);
692+connection con1;
693+send select wait_for("sys_replication_log", "INSERT", "200.9", "1");
694+connection default;
695+insert into t1 values(1);
696+insert into t1 values(2);
697+insert into t1 values(3);
698+connection con1;
699+reap;
700+send select wait_for("sys_replication_log", "INSERT", "100", "3");
701+connection default;
702+connection con1;
703+reap;
704+disconnect con1;
705+connection default;
706+drop table t1;
707
708=== added file 'plugin/event_notify/tests/t/time_out.test'
709--- plugin/event_notify/tests/t/time_out.test 1970-01-01 00:00:00 +0000
710+++ plugin/event_notify/tests/t/time_out.test 2013-09-18 20:41:29 +0000
711@@ -0,0 +1,2 @@
712+send select wait_for("t1", "INSERT", "5", "20");
713+reap;
714
715=== added file 'plugin/event_notify/tests/t/update.test'
716--- plugin/event_notify/tests/t/update.test 1970-01-01 00:00:00 +0000
717+++ plugin/event_notify/tests/t/update.test 2013-09-18 20:41:29 +0000
718@@ -0,0 +1,21 @@
719+connect (con1,localhost,root,,);
720+--disable_warnings
721+drop table if exists t1;
722+--enable_warnings
723+create table t1 (a int primary key);
724+connection con1;
725+send select wait_for("t1", "UPDATE", "200.9", "1");
726+connection default;
727+insert into t1 values(1),(2),(3);
728+update t1 set a=10 where a=1;
729+update t1 set a=20 where a=2;
730+update t1 set a=30 where a=3;
731+connection con1;
732+reap;
733+send select wait_for("t1", "UPDATE", "1000", "2");
734+connection default;
735+connection con1;
736+reap;
737+disconnect con1;
738+connection default;
739+drop table t1;
740
741=== modified file 'plugin/slave/queue_producer.cc'
742--- plugin/slave/queue_producer.cc 2012-04-14 20:43:20 +0000
743+++ plugin/slave/queue_producer.cc 2013-09-18 20:41:29 +0000
744@@ -49,6 +49,62 @@
745 return reconnect(true);
746 }
747
748+void QueueProducer::run()
749+{
750+ boost::posix_time::seconds duration(getSleepInterval());
751+
752+ /* thread setup needed to do things like create a Session */
753+ internal::my_thread_init();
754+
755+ if (not init())
756+ return;
757+
758+ while (1)
759+ {
760+ {
761+ /* This uninterruptable block processes the message queue */
762+ boost::this_thread::disable_interruption di;
763+
764+ if (not process())
765+ {
766+ shutdown();
767+ return;
768+ }
769+ }
770+
771+ /* Interruptable only when not doing work (aka, sleeping) */
772+ try
773+ {
774+ /* waiting for num_of_events through a wait_for() call*/
775+ int num_of_events = 2;
776+ std::string event_sql("select wait_for(\"sys_replication_log\", \"INSERT\", \"2000\", \"");
777+ event_sql.append(boost::lexical_cast<string>(num_of_events));
778+ event_sql.append("\")");
779+ drizzle_return_t event_ret;
780+ drizzle_result_st event_result;
781+ drizzle_query_str(_connection, &event_result, event_sql.c_str(), &event_ret);
782+
783+ if (event_ret != DRIZZLE_RETURN_OK){
784+ drizzle_result_free(&event_result);
785+ boost::this_thread::sleep(duration);
786+ string error = "";
787+ error.append(drizzle_error(_drizzle));
788+ cout << error <<": producer thread sleeping\n";
789+ }
790+ else{
791+ num_of_events += 1;
792+ cout << "wait_for returned\n";
793+ drizzle_result_free(&event_result);
794+ }
795+ }
796+ catch (boost::thread_interrupted &)
797+ {
798+ return;
799+ }
800+ }
801+}
802+
803+
804 bool QueueProducer::process()
805 {
806 if (_saved_max_commit_id == 0)
807
808=== modified file 'plugin/slave/queue_thread.h'
809--- plugin/slave/queue_thread.h 2011-03-14 05:40:28 +0000
810+++ plugin/slave/queue_thread.h 2013-09-18 20:41:29 +0000
811@@ -40,7 +40,7 @@
812 virtual ~QueueThread()
813 {}
814
815- void run(void);
816+ virtual void run(void);
817
818 /**
819 * Do any initialization work.

Subscribers

People subscribed via source and target branches