Merge lp:~ajayaa/drizzle/event-notify-interface into lp:drizzle
- event-notify-interface
- Merge into 7.2
Status: | Needs review |
---|---|
Proposed branch: | lp:~ajayaa/drizzle/event-notify-interface |
Merge into: | lp:drizzle |
Diff against target: |
819 lines (+672/-1) 18 files modified
drizzled/plugin/event_observer.cc (+16/-0) drizzled/plugin/event_observer.h (+17/-0) plugin/event_notify/event_notify.cc (+346/-0) plugin/event_notify/event_notify.h (+81/-0) plugin/event_notify/plugin.ini (+3/-0) plugin/event_notify/tests/r/delete.result (+13/-0) plugin/event_notify/tests/r/insert.result (+18/-0) plugin/event_notify/tests/r/sys_replication_log.result (+12/-0) plugin/event_notify/tests/r/time_out.result (+3/-0) plugin/event_notify/tests/r/update.result (+13/-0) plugin/event_notify/tests/t/delete.test (+21/-0) plugin/event_notify/tests/t/insert.test (+28/-0) plugin/event_notify/tests/t/master.opt (+1/-0) plugin/event_notify/tests/t/sys_replication_log.test (+20/-0) plugin/event_notify/tests/t/time_out.test (+2/-0) plugin/event_notify/tests/t/update.test (+21/-0) plugin/slave/queue_producer.cc (+56/-0) plugin/slave/queue_thread.h (+1/-1) |
To merge this branch: | bzr merge lp:~ajayaa/drizzle/event-notify-interface |
Related bugs: |
Reviewer | Review Type | Date Requested | Status |
---|---|---|---|
Stewart Smith (community) | Approve | ||
Review via email: mp+185343@code.launchpad.net |
Commit message
Description of the change
Added event_notify interface. Currently three types of event i.e. INSERT, UPDATE, DELETE in any type of table is observed. Client can issue wait_for() call to wait for certain number of events and then return.
- 2649. By Ajaya Agrawal
-
changed slave code
Unmerged revisions
- 2649. By Ajaya Agrawal
-
changed slave code
- 2648. By Ajaya Agrawal
-
added test cases for all type of events including insert in sys_replication_log and timeout
- 2647. By Ajaya Agrawal
-
added test cases for all type of events including insert in sys_replication_log and timeout
- 2646. By Ajaya Agrawal
-
fixed memory leak.working correctly. writing more test cases
- 2645. By Ajaya Agrawal
-
memory leak need to be fixed
- 2644. By Ajaya Agrawal
-
added a new event for replicationlog
- 2643. By Ajaya Agrawal
-
Added time_out support and test cases
- 2642. By Ajaya Agrawal
-
Added time_out support and test cases
- 2641. By Ajaya Agrawal
-
removed all hard coding and print statements and added destructor for event_t class
- 2640. By Ajaya Agrawal
-
removed all hard coding\n added destructor for event_t class
Preview Diff
1 | === modified file 'drizzled/plugin/event_observer.cc' |
2 | --- drizzled/plugin/event_observer.cc 2011-03-22 12:01:19 +0000 |
3 | +++ drizzled/plugin/event_observer.cc 2013-09-18 20:41:29 +0000 |
4 | @@ -421,6 +421,13 @@ |
5 | return (in_table.getMutableShare()->getTableObservers() != NULL); |
6 | } |
7 | |
8 | + // bool InsertToSysReplicationLogEventData::callEventObservers() |
9 | + // { |
10 | + // EventObserverList *observers = new EventObserverList(); |
11 | + // observers->addObserver(this, AFTER_INSERT_TO_SYS_REPLICATION_LOG); |
12 | + |
13 | + // } |
14 | + |
15 | /*==========================================================*/ |
16 | /* Static meathods called by drizzle to notify interested plugins |
17 | * of a schema event. |
18 | @@ -599,6 +606,15 @@ |
19 | return eventData.callEventObservers(); |
20 | } |
21 | |
22 | + bool EventObserver::afterInsertToSysReplicationLog(Session &session) |
23 | + { |
24 | + if (all_event_plugins.empty()) |
25 | + return false; |
26 | + InsertToSysReplicationLogEventData eventData(session); |
27 | + return eventData.callEventObservers(); |
28 | + // std::cout << "event happening on sys_replication_log\n"; |
29 | + //return true; |
30 | + } |
31 | |
32 | } /* namespace plugin */ |
33 | } /* namespace drizzled */ |
34 | |
35 | === modified file 'drizzled/plugin/event_observer.h' |
36 | --- drizzled/plugin/event_observer.h 2011-08-14 17:04:01 +0000 |
37 | +++ drizzled/plugin/event_observer.h 2013-09-18 20:41:29 +0000 |
38 | @@ -79,6 +79,8 @@ |
39 | BEFORE_UPDATE_RECORD, AFTER_UPDATE_RECORD, |
40 | BEFORE_DELETE_RECORD, AFTER_DELETE_RECORD, |
41 | |
42 | + AFTER_INSERT_TO_SYS_REPLICATION_LOG, |
43 | + |
44 | /* The max event ID marker. */ |
45 | MAX_EVENT_COUNT |
46 | }; |
47 | @@ -87,6 +89,10 @@ |
48 | { |
49 | switch(event) |
50 | { |
51 | + |
52 | + case AFTER_INSERT_TO_SYS_REPLICATION_LOG: |
53 | + return "AFTER_INSERT_TO_SYS_REPLICATION_LOG"; |
54 | + |
55 | case BEFORE_DROP_TABLE: |
56 | return "BEFORE_DROP_TABLE"; |
57 | |
58 | @@ -236,6 +242,8 @@ |
59 | static bool beforeDropDatabase(Session &session, const std::string &db); |
60 | static bool afterDropDatabase(Session &session, const std::string &db, int err); |
61 | |
62 | + static bool afterInsertToSysReplicationLog(Session &session); |
63 | + |
64 | static const EventObserverVector &getEventObservers(void); |
65 | |
66 | }; |
67 | @@ -553,6 +561,15 @@ |
68 | {} |
69 | }; |
70 | |
71 | +class InsertToSysReplicationLogEventData: public SessionEventData |
72 | +{ |
73 | +public: |
74 | + InsertToSysReplicationLogEventData(Session &session_arg): |
75 | + SessionEventData(EventObserver::AFTER_INSERT_TO_SYS_REPLICATION_LOG, session_arg) |
76 | + {} |
77 | + // virtual bool callEventObservers(); |
78 | +}; |
79 | + |
80 | //======================================================= |
81 | |
82 | } /* namespace plugin */ |
83 | |
84 | === added directory 'plugin/event_notify' |
85 | === added file 'plugin/event_notify/event_notify.cc' |
86 | --- plugin/event_notify/event_notify.cc 1970-01-01 00:00:00 +0000 |
87 | +++ plugin/event_notify/event_notify.cc 2013-09-18 20:41:29 +0000 |
88 | @@ -0,0 +1,346 @@ |
89 | + /* -*- mode: c++; c-basic-offset: 2; indent-tabs-mode: nil; -*- |
90 | + * vim:expandtab:shiftwidth=2:tabstop=2:smarttab: |
91 | + * |
92 | + * Copyright (C) Ajaya Agrawal |
93 | + * |
94 | + * This program is free software; you can redistribute it and/or modify |
95 | + * it under the terms of the GNU General Public License as published by |
96 | + * the Free Software Foundation; version 2 of the License. |
97 | + * |
98 | + * This program is distributed in the hope that it will be useful, |
99 | + * but WITHOUT ANY WARRANTY; without even the implied warranty of |
100 | + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
101 | + * GNU General Public License for more details. |
102 | + * |
103 | + * You should have received a copy of the GNU General Public License |
104 | + * along with this program; if not, write to the Free Software |
105 | + * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA |
106 | + */ |
107 | + |
108 | + #include <config.h> |
109 | + #include <fcntl.h> |
110 | + #include <drizzled/function/str/strfunc.h> |
111 | + #include <drizzled/item/func.h> |
112 | + #include <drizzled/plugin.h> |
113 | + #include <drizzled/plugin/function.h> |
114 | + #include <drizzled/item.h> |
115 | + #include <drizzled/module/option_map.h> |
116 | + #include <drizzled/session.h> |
117 | + #include <drizzled/session/times.h> |
118 | + #include <drizzled/table/instance/base.h> |
119 | + #include <drizzled/table.h> |
120 | + |
121 | + #include "event_notify.h" |
122 | + |
123 | + namespace po= boost::program_options; |
124 | + using namespace std; |
125 | + using namespace drizzled; |
126 | + using namespace plugin; |
127 | + |
128 | + static EventNotify *event_notify = NULL; |
129 | + |
130 | + struct AddressIs { |
131 | + event_t *ptr; |
132 | + AddressIs(event_t *ptr) : ptr(ptr) {} |
133 | + bool operator()(const event_t *object) const { |
134 | + return ptr == object; |
135 | + } |
136 | + }; |
137 | + |
138 | + // void show() |
139 | + // { |
140 | + // for (std::map<std::string, list<event_t*> >::iterator map_it=event_notify->all_events.begin(); map_it!=event_notify->all_events.end(); ++map_it){ |
141 | + // std::cout << map_it->first << " "; |
142 | + // for (list<event_t*>::iterator list_it=map_it->second.begin(); list_it!=map_it->second.end(); ++list_it) |
143 | + // std::cout << (*list_it)->event_type << " " << (*list_it)->time_out << " " << (*list_it)->num_of_events_threshold<< " " << (*list_it)->num_of_events_so_far << '\n'; |
144 | + // } |
145 | + // } |
146 | + |
147 | + event_t::event_t(String *_table_name, String *_event_type, String *_time_out, String *_num_of_events_threshold) |
148 | + { |
149 | + this->table_name = _table_name->c_str(); |
150 | + this->event_type = _event_type->c_str(); |
151 | + time_out = atof(_time_out->c_str()); |
152 | + num_of_events_threshold = atoi(_num_of_events_threshold->c_str()); |
153 | + pthread_mutex_init(&this->lock, NULL); |
154 | + pthread_cond_init(&this->cond, NULL); |
155 | + num_of_events_so_far = 0; |
156 | + } |
157 | + |
158 | + event_t::~event_t() |
159 | + { |
160 | + pthread_mutex_destroy(&lock); |
161 | + pthread_cond_destroy(&cond); |
162 | + } |
163 | + |
164 | + EventNotify::EventNotify(bool enabled): |
165 | + drizzled::plugin::EventObserver("event_notify_interface"), |
166 | + sysvar_enabled(enabled) |
167 | + { |
168 | + } |
169 | + |
170 | + EventNotify::~EventNotify() |
171 | + { |
172 | + } |
173 | + |
174 | + void EventNotify::registerTableEventsDo(TableShare &table_share, EventObserverList &observers) |
175 | + { |
176 | + boost::mutex::scoped_lock scoped(all_events_mutex); |
177 | + |
178 | + string str(table_share.getTableName()); |
179 | + //cout << "in registertableeventsdo table name is " << str <<endl; |
180 | + //show(); |
181 | + if (sysvar_enabled == false || (all_events.find(str) == all_events.end())){ |
182 | + return; |
183 | + } |
184 | + //cout << "yes event registered on this table\n"; |
185 | + registerEvent(observers, AFTER_INSERT_RECORD); |
186 | + registerEvent(observers, AFTER_UPDATE_RECORD); |
187 | + registerEvent(observers, AFTER_DELETE_RECORD); |
188 | + } |
189 | + |
190 | + void EventNotify::registerSessionEventsDo(Session &, EventObserverList &observers) |
191 | + { |
192 | + //here need to check whether drizzled was started with option --innodb.replication-log. |
193 | + //cout << "registered for sys_replication_log\n"; |
194 | + registerEvent(observers, AFTER_INSERT_TO_SYS_REPLICATION_LOG); |
195 | + } |
196 | + |
197 | + bool EventNotify::observeEventDo(EventData &data) |
198 | + { |
199 | + if (not sysvar_enabled) |
200 | + return false; |
201 | + |
202 | + switch (data.event) { |
203 | + |
204 | + case AFTER_INSERT_RECORD: |
205 | + afterInsertRecord((AfterInsertRecordEventData &)data); |
206 | + break; |
207 | + |
208 | + case AFTER_UPDATE_RECORD: |
209 | + afterUpdateRecord((AfterUpdateRecordEventData &)data); |
210 | + break; |
211 | + |
212 | + case AFTER_DELETE_RECORD: |
213 | + afterDeleteRecord((AfterDeleteRecordEventData &)data); |
214 | + break; |
215 | + |
216 | + case AFTER_INSERT_TO_SYS_REPLICATION_LOG: |
217 | + afterInsertToSysReplicationLog((InsertToSysReplicationLogEventData&) data); |
218 | + break; |
219 | + |
220 | + default: |
221 | + fprintf(stderr, "event_notify: Unexpected event '%s'\n", |
222 | + EventObserver::eventName(data.event)); |
223 | + } |
224 | + |
225 | + return false; |
226 | + } |
227 | + |
228 | + bool EventNotify::afterInsertRecord(AfterInsertRecordEventData &data) |
229 | + { |
230 | + std::string str(data.table.getShare()->getTableName()); |
231 | + std::map<std::string, list<event_t*> >::iterator map_it; |
232 | + boost::mutex::scoped_lock scoped(all_events_mutex); |
233 | + map_it = all_events.find(str); |
234 | + |
235 | + if(map_it != all_events.end()){ |
236 | + for (list<event_t*>::iterator list_it=map_it->second.begin(); list_it!=map_it->second.end(); ++list_it){ |
237 | + if((*list_it)->event_type.compare("INSERT") == 0){ |
238 | + pthread_mutex_lock(&(*list_it)->lock); |
239 | + (*list_it)->num_of_events_so_far++; |
240 | + if((*list_it)->num_of_events_so_far >= (*list_it)->num_of_events_threshold){ |
241 | + pthread_cond_broadcast(&(*list_it)->cond); |
242 | + fprintf(stderr, "broadcast %p\n", (void*)&(*list_it)->cond); |
243 | + } |
244 | + pthread_mutex_unlock(&(*list_it)->lock); |
245 | + } |
246 | + } |
247 | + } |
248 | + return false; |
249 | + } |
250 | + |
251 | + bool EventNotify::afterUpdateRecord(AfterUpdateRecordEventData &data) |
252 | + { |
253 | + std::string str(data.table.getShare()->getTableName()); |
254 | + std::map<std::string, list<event_t*> >::iterator map_it; |
255 | + boost::mutex::scoped_lock scoped(all_events_mutex); |
256 | + map_it = all_events.find(str); |
257 | + |
258 | + if(map_it != all_events.end()){ |
259 | + for (list<event_t*>::iterator list_it=map_it->second.begin(); list_it!=map_it->second.end(); ++list_it){ |
260 | + if((*list_it)->event_type.compare("UPDATE") == 0){ |
261 | + pthread_mutex_lock(&(*list_it)->lock); |
262 | + (*list_it)->num_of_events_so_far++; |
263 | + if((*list_it)->num_of_events_so_far >= (*list_it)->num_of_events_threshold){ |
264 | + pthread_cond_broadcast(&(*list_it)->cond); |
265 | + fprintf(stderr, "broadcast %p\n", (void*)&(*list_it)->cond); |
266 | + } |
267 | + pthread_mutex_unlock(&(*list_it)->lock); |
268 | + } |
269 | + } |
270 | + } |
271 | + return false; |
272 | + } |
273 | + |
274 | + bool EventNotify::afterDeleteRecord(AfterDeleteRecordEventData &data) |
275 | + { |
276 | + std::string str(data.table.getShare()->getTableName()); |
277 | + std::map<std::string, list<event_t*> >::iterator map_it; |
278 | + boost::mutex::scoped_lock scoped(all_events_mutex); |
279 | + map_it = all_events.find(str); |
280 | + |
281 | + if(map_it != all_events.end()){ |
282 | + for (list<event_t*>::iterator list_it=map_it->second.begin(); list_it!=map_it->second.end(); ++list_it){ |
283 | + if((*list_it)->event_type.compare("DELETE") == 0){ |
284 | + pthread_mutex_lock(&(*list_it)->lock); |
285 | + (*list_it)->num_of_events_so_far++; |
286 | + if((*list_it)->num_of_events_so_far >= (*list_it)->num_of_events_threshold){ |
287 | + pthread_cond_broadcast(&(*list_it)->cond); |
288 | + fprintf(stderr, "broadcast %p\n", (void*)&(*list_it)->cond); |
289 | + } |
290 | + pthread_mutex_unlock(&(*list_it)->lock); |
291 | + } |
292 | + } |
293 | + } |
294 | + return false; |
295 | + } |
296 | + |
297 | + |
298 | + bool EventNotify::afterInsertToSysReplicationLog(InsertToSysReplicationLogEventData &data) |
299 | + { |
300 | + std::string str = "sys_replication_log"; |
301 | + std::map<std::string, list<event_t*> >::iterator map_it; |
302 | + boost::mutex::scoped_lock scoped(all_events_mutex); |
303 | + map_it = all_events.find(str); |
304 | + |
305 | + if(map_it != all_events.end()){ |
306 | + for (list<event_t*>::iterator list_it=map_it->second.begin(); list_it!=map_it->second.end(); ++list_it){ |
307 | + if((*list_it)->event_type.compare("INSERT") == 0){ |
308 | + pthread_mutex_lock(&(*list_it)->lock); |
309 | + (*list_it)->num_of_events_so_far++; |
310 | + if((*list_it)->num_of_events_so_far >= (*list_it)->num_of_events_threshold){ |
311 | + pthread_cond_broadcast(&(*list_it)->cond); |
312 | + fprintf(stderr, "broadcast %p\n", (void*)&(*list_it)->cond); |
313 | + } |
314 | + pthread_mutex_unlock(&(*list_it)->lock); |
315 | + } |
316 | + } |
317 | + } |
318 | + return false; |
319 | + } |
320 | + |
321 | + |
322 | + class Item_func_event_notify : public Item_str_func |
323 | + { |
324 | + public: |
325 | + Item_func_event_notify() : Item_str_func() {} |
326 | + const char *func_name() const { return "wait_for"; } |
327 | + |
328 | + String *val_str(String* s) |
329 | + { |
330 | + String *table_name = args[0]->val_str(s); |
331 | + event_t *new_event = new event_t(args[0]->val_str(s),(args[1]->val_str(s)), (args[2]->val_str(s)), (args[3]->val_str(s))); |
332 | + |
333 | + map<std::string, list<event_t*> >::iterator map_it; |
334 | + list<event_t*>::iterator list_it; |
335 | + |
336 | + { |
337 | + boost::mutex::scoped_lock scoped(event_notify->all_events_mutex); |
338 | + map_it = event_notify->all_events.find(new_event->table_name); |
339 | + |
340 | + if(map_it == event_notify->all_events.end()){ |
341 | + list<event_t*> my_list; |
342 | + my_list.push_back(new_event); |
343 | + event_notify->all_events[new_event->table_name] = my_list; |
344 | + } |
345 | + else{ |
346 | + for (list_it=map_it->second.begin(); list_it!=map_it->second.end(); ++list_it){ |
347 | + if((*list_it)->event_type.compare(new_event->event_type) == 0 ) { |
348 | + if( new_event->num_of_events_threshold <= (*list_it)->num_of_events_so_far){ |
349 | + //int retval = new_event->num_of_events_threshold; |
350 | + delete new_event; |
351 | + return table_name; |
352 | + } |
353 | + new_event->num_of_events_so_far = (*list_it)->num_of_events_so_far; |
354 | + break; |
355 | + } |
356 | + } |
357 | + map_it->second.push_back(new_event); |
358 | + } |
359 | + } |
360 | + /* |
361 | + * waiting for number of events reach its threshold. |
362 | + */ |
363 | + struct timespec timeToWait; |
364 | + struct timeval now; |
365 | + gettimeofday(&now,NULL); |
366 | + timeToWait.tv_sec = now.tv_sec+new_event->time_out; |
367 | + pthread_mutex_lock(&new_event->lock); |
368 | + //while(new_event->num_of_events_so_far < new_event->num_of_events_threshold ) |
369 | + fprintf(stderr, "wait %p\n", (void*)&new_event->cond); |
370 | + pthread_cond_timedwait(&new_event->cond, &new_event->lock, &timeToWait); |
371 | + fprintf(stderr, "SIGNALLED %p\n", (void*)&new_event->cond); |
372 | + |
373 | + pthread_mutex_unlock(&new_event->lock); |
374 | + int count_events = 0; |
375 | + { |
376 | + boost::mutex::scoped_lock scoped(event_notify->all_events_mutex); |
377 | + map<std::string, list<event_t*> >::iterator map_iter; |
378 | + list<event_t*>::iterator list_iter; |
379 | + map_iter = event_notify->all_events.find(new_event->table_name); |
380 | + for (list_iter=map_iter->second.begin(); list_iter!=map_iter->second.end(); ++list_iter){ |
381 | + if((*list_iter)->event_type.compare(new_event->event_type) == 0 ){ |
382 | + count_events++; |
383 | + } |
384 | + } |
385 | + if(count_events > 1){ |
386 | + map_iter->second.remove_if(AddressIs(new_event)); |
387 | + delete new_event; |
388 | + } |
389 | + } |
390 | + return table_name; |
391 | + } |
392 | + |
393 | + void fix_length_and_dec() { |
394 | + max_length=strlen("wait_for"); |
395 | + } |
396 | + |
397 | + bool check_argument_count(int n) |
398 | + { |
399 | + return (n == 4); |
400 | + } |
401 | + }; |
402 | + |
403 | + plugin::Create_function<Item_func_event_notify> *event_notify_udf= NULL; |
404 | + |
405 | + static void init_options(drizzled::module::option_context &) |
406 | + { |
407 | + event_notify = new EventNotify(true); |
408 | + } |
409 | + |
410 | + static int event_notifier_plugin_init(drizzled::module::Context &context) |
411 | + { |
412 | + |
413 | + event_notify_udf= |
414 | + new plugin::Create_function<Item_func_event_notify>("wait_for"); |
415 | + context.add(event_notify_udf); |
416 | + context.add(event_notify); |
417 | + context.registerVariable(new sys_var_bool_ptr("enabled", &event_notify->sysvar_enabled)); |
418 | + return 0; |
419 | + } |
420 | + |
421 | + |
422 | + DRIZZLE_DECLARE_PLUGIN |
423 | + { |
424 | + DRIZZLE_VERSION_ID, /* DRIZZLE_VERSION_ID */ |
425 | + "Event_Notifier", /* module name */ |
426 | + "1.0", /* module version */ |
427 | + "Ajaya Agrawal", /* author(s) */ |
428 | + N_("indicates an event happening on certain table/database/session"), /* description */ |
429 | + PLUGIN_LICENSE_BSD, /* license */ |
430 | + event_notifier_plugin_init, /* init module function */ |
431 | + NULL, /* module dependencies */ |
432 | + init_options /* init options function */ |
433 | + } |
434 | + DRIZZLE_DECLARE_PLUGIN_END; |
435 | \ No newline at end of file |
436 | |
437 | === added file 'plugin/event_notify/event_notify.h' |
438 | --- plugin/event_notify/event_notify.h 1970-01-01 00:00:00 +0000 |
439 | +++ plugin/event_notify/event_notify.h 2013-09-18 20:41:29 +0000 |
440 | @@ -0,0 +1,81 @@ |
441 | +/* - mode: c; c-basic-offset: 2; indent-tabs-mode: nil; -*- |
442 | + * vim:expandtab:shiftwidth=2:tabstop=2:smarttab: |
443 | + * |
444 | + * Copyright 2013 Ajaya Agrawal |
445 | + * |
446 | + * This program is free software: you can redistribute it and/or modify |
447 | + * it under the terms of the GNU General Public License as published by |
448 | + * the Free Software Foundation, either version 3 of the License, or |
449 | + * (at your option) any later version. |
450 | + * |
451 | + * This program is distributed in the hope that it will be useful, |
452 | + * but WITHOUT ANY WARRANTY; without even the implied warranty of |
453 | + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
454 | + * GNU General Public License for more details. |
455 | + * |
456 | + * You should have received a copy of the GNU General Public License |
457 | + * along with this program. If not, see <http://www.gnu.org/licenses/>. |
458 | + */ |
459 | + |
460 | +#pragma once |
461 | + |
462 | +#include <drizzled/plugin/event_observer.h> |
463 | +#include <boost/thread/mutex.hpp> |
464 | + |
465 | +#include <map> |
466 | +#include <list> |
467 | +#include <pthread.h> |
468 | +#include <string> |
469 | +#include <cstring> |
470 | + |
471 | +namespace drizzled { |
472 | +namespace plugin { |
473 | + |
474 | +struct cmp_str |
475 | +{ |
476 | + bool operator()(std::string a, std::string b) |
477 | + { |
478 | + return (a.compare(b) < 0); |
479 | + } |
480 | +}; |
481 | + |
482 | +class event_t { |
483 | + |
484 | +public: |
485 | + event_t(String *_session_time, String *_event_type, String *_table_name, String *_num_of_events); |
486 | + ~event_t(); |
487 | + std::string table_name; |
488 | + std::string event_type; |
489 | + pthread_mutex_t lock; |
490 | + pthread_cond_t cond; |
491 | + int num_of_events_threshold; |
492 | + double time_out; |
493 | + int num_of_events_so_far; |
494 | +}; |
495 | + |
496 | +class EventNotify : public drizzled::plugin::EventObserver |
497 | +{ |
498 | +public: |
499 | + EventNotify(bool enabled); |
500 | + ~EventNotify(); |
501 | + |
502 | + void registerTableEventsDo(TableShare &table_share, EventObserverList &observers); |
503 | + void registerSessionEventsDo(Session &session, EventObserverList &observers); |
504 | + bool observeEventDo(EventData &); |
505 | + bool afterStatement(AfterStatementEventData &data); |
506 | + bool afterInsertRecord(AfterInsertRecordEventData &data); |
507 | + bool afterUpdateRecord(AfterUpdateRecordEventData &data); |
508 | + bool afterDeleteRecord(AfterDeleteRecordEventData &data); |
509 | + bool afterInsertToSysReplicationLog(InsertToSysReplicationLogEventData &data); |
510 | + /** |
511 | + * These are the event_notify system variables. So sysvar_enabled is |
512 | + * event_notify_enabled in SHOW VARIABLES, etc. They are all global and dynamic. |
513 | + */ |
514 | + bool sysvar_enabled; |
515 | + boost::mutex all_events_mutex; |
516 | + boost::mutex counter_mutex; |
517 | + std::map <std::string, std::list<event_t*>, cmp_str > all_events; |
518 | +}; |
519 | + |
520 | +} /* namespace plugin */ |
521 | +} /* namespace drizzled */ |
522 | |
523 | === added file 'plugin/event_notify/plugin.ini' |
524 | --- plugin/event_notify/plugin.ini 1970-01-01 00:00:00 +0000 |
525 | +++ plugin/event_notify/plugin.ini 2013-09-18 20:41:29 +0000 |
526 | @@ -0,0 +1,3 @@ |
527 | +[plugin] |
528 | +sources=event_notify.cc |
529 | +headers=event_notify.h |
530 | |
531 | === added directory 'plugin/event_notify/tests' |
532 | === added directory 'plugin/event_notify/tests/r' |
533 | === added file 'plugin/event_notify/tests/r/delete.result' |
534 | --- plugin/event_notify/tests/r/delete.result 1970-01-01 00:00:00 +0000 |
535 | +++ plugin/event_notify/tests/r/delete.result 2013-09-18 20:41:29 +0000 |
536 | @@ -0,0 +1,13 @@ |
537 | +drop table if exists t1; |
538 | +create table t1 (a int primary key); |
539 | +select wait_for("t1", "DELETE", "200.9", "2"); |
540 | +insert into t1 values(1),(2),(3); |
541 | +delete from t1 where a=1; |
542 | +delete from t1 where a=2; |
543 | +delete from t1 where a=3; |
544 | +wait_for("t1", "DELETE", "200.9", "2") |
545 | +t1 |
546 | +select wait_for("t1", "DELETE", "1000", "3"); |
547 | +wait_for("t1", "DELETE", "1000", "3") |
548 | +t1 |
549 | +drop table t1; |
550 | |
551 | === added file 'plugin/event_notify/tests/r/insert.result' |
552 | --- plugin/event_notify/tests/r/insert.result 1970-01-01 00:00:00 +0000 |
553 | +++ plugin/event_notify/tests/r/insert.result 2013-09-18 20:41:29 +0000 |
554 | @@ -0,0 +1,18 @@ |
555 | +drop table if exists t1; |
556 | +create table t1 (a int primary key); |
557 | +select wait_for("t1", "INSERT", "100", "4"); |
558 | +insert into t1 values(1); |
559 | +insert into t1 values(2); |
560 | +insert into t1 values(3); |
561 | +insert into t1 values(4); |
562 | +wait_for("t1", "INSERT", "100", "4") |
563 | +t1 |
564 | +select wait_for("t1", "INSERT", "100", "5"); |
565 | +insert into t1 values(5); |
566 | +wait_for("t1", "INSERT", "100", "5") |
567 | +t1 |
568 | +insert into t1 values(60); |
569 | +select wait_for("t1", "INSERT", "100", "6"); |
570 | +wait_for("t1", "INSERT", "100", "6") |
571 | +t1 |
572 | +drop table t1; |
573 | |
574 | === added file 'plugin/event_notify/tests/r/sys_replication_log.result' |
575 | --- plugin/event_notify/tests/r/sys_replication_log.result 1970-01-01 00:00:00 +0000 |
576 | +++ plugin/event_notify/tests/r/sys_replication_log.result 2013-09-18 20:41:29 +0000 |
577 | @@ -0,0 +1,12 @@ |
578 | +drop table if exists t1; |
579 | +create table t1 (a int primary key); |
580 | +select wait_for("sys_replication_log", "INSERT", "200.9", "1"); |
581 | +insert into t1 values(1); |
582 | +insert into t1 values(2); |
583 | +insert into t1 values(3); |
584 | +wait_for("sys_replication_log", "INSERT", "200.9", "1") |
585 | +sys_replication_log |
586 | +select wait_for("sys_replication_log", "INSERT", "100", "3"); |
587 | +wait_for("sys_replication_log", "INSERT", "100", "3") |
588 | +sys_replication_log |
589 | +drop table t1; |
590 | |
591 | === added file 'plugin/event_notify/tests/r/time_out.result' |
592 | --- plugin/event_notify/tests/r/time_out.result 1970-01-01 00:00:00 +0000 |
593 | +++ plugin/event_notify/tests/r/time_out.result 2013-09-18 20:41:29 +0000 |
594 | @@ -0,0 +1,3 @@ |
595 | +select wait_for("t1", "INSERT", "5", "20"); |
596 | +wait_for("t1", "INSERT", "5", "20") |
597 | +t1 |
598 | |
599 | === added file 'plugin/event_notify/tests/r/update.result' |
600 | --- plugin/event_notify/tests/r/update.result 1970-01-01 00:00:00 +0000 |
601 | +++ plugin/event_notify/tests/r/update.result 2013-09-18 20:41:29 +0000 |
602 | @@ -0,0 +1,13 @@ |
603 | +drop table if exists t1; |
604 | +create table t1 (a int primary key); |
605 | +select wait_for("t1", "UPDATE", "200.9", "1"); |
606 | +insert into t1 values(1),(2),(3); |
607 | +update t1 set a=10 where a=1; |
608 | +update t1 set a=20 where a=2; |
609 | +update t1 set a=30 where a=3; |
610 | +wait_for("t1", "UPDATE", "200.9", "1") |
611 | +t1 |
612 | +select wait_for("t1", "UPDATE", "1000", "2"); |
613 | +wait_for("t1", "UPDATE", "1000", "2") |
614 | +t1 |
615 | +drop table t1; |
616 | |
617 | === added directory 'plugin/event_notify/tests/t' |
618 | === added file 'plugin/event_notify/tests/t/delete.test' |
619 | --- plugin/event_notify/tests/t/delete.test 1970-01-01 00:00:00 +0000 |
620 | +++ plugin/event_notify/tests/t/delete.test 2013-09-18 20:41:29 +0000 |
621 | @@ -0,0 +1,21 @@ |
622 | +connect (con1,localhost,root,,); |
623 | +--disable_warnings |
624 | +drop table if exists t1; |
625 | +--enable_warnings |
626 | +create table t1 (a int primary key); |
627 | +connection con1; |
628 | +send select wait_for("t1", "DELETE", "200.9", "2"); |
629 | +connection default; |
630 | +insert into t1 values(1),(2),(3); |
631 | +delete from t1 where a=1; |
632 | +delete from t1 where a=2; |
633 | +delete from t1 where a=3; |
634 | +connection con1; |
635 | +reap; |
636 | +send select wait_for("t1", "DELETE", "1000", "3"); |
637 | +connection default; |
638 | +connection con1; |
639 | +reap; |
640 | +disconnect con1; |
641 | +connection default; |
642 | +drop table t1; |
643 | |
644 | === added file 'plugin/event_notify/tests/t/insert.test' |
645 | --- plugin/event_notify/tests/t/insert.test 1970-01-01 00:00:00 +0000 |
646 | +++ plugin/event_notify/tests/t/insert.test 2013-09-18 20:41:29 +0000 |
647 | @@ -0,0 +1,28 @@ |
648 | +connect (con1,localhost,root,,); |
649 | +connect (con2,localhost,root,,); |
650 | +--disable_warnings |
651 | +drop table if exists t1; |
652 | +--enable_warnings |
653 | +create table t1 (a int primary key); |
654 | +connection con1; |
655 | +send select wait_for("t1", "INSERT", "100", "4"); |
656 | +connection default; |
657 | +insert into t1 values(1); |
658 | +insert into t1 values(2); |
659 | +insert into t1 values(3); |
660 | +insert into t1 values(4); |
661 | +connection con1; |
662 | +reap; |
663 | +connection con2; |
664 | +send select wait_for("t1", "INSERT", "100", "5"); |
665 | +connection default; |
666 | +insert into t1 values(5); |
667 | +connection con2; |
668 | +reap; |
669 | +insert into t1 values(60); |
670 | +send select wait_for("t1", "INSERT", "100", "6"); |
671 | +reap; |
672 | +disconnect con1; |
673 | +disconnect con2; |
674 | +connection default; |
675 | +drop table t1; |
676 | |
677 | === added file 'plugin/event_notify/tests/t/master.opt' |
678 | --- plugin/event_notify/tests/t/master.opt 1970-01-01 00:00:00 +0000 |
679 | +++ plugin/event_notify/tests/t/master.opt 2013-09-18 20:41:29 +0000 |
680 | @@ -0,0 +1,1 @@ |
681 | +--plugin-add=event_notify --innodb.replication-log |
682 | |
683 | === added file 'plugin/event_notify/tests/t/sys_replication_log.test' |
684 | --- plugin/event_notify/tests/t/sys_replication_log.test 1970-01-01 00:00:00 +0000 |
685 | +++ plugin/event_notify/tests/t/sys_replication_log.test 2013-09-18 20:41:29 +0000 |
686 | @@ -0,0 +1,20 @@ |
687 | +connect (con1,localhost,root,,); |
688 | +--disable_warnings |
689 | +drop table if exists t1; |
690 | +--enable_warnings |
691 | +create table t1 (a int primary key); |
692 | +connection con1; |
693 | +send select wait_for("sys_replication_log", "INSERT", "200.9", "1"); |
694 | +connection default; |
695 | +insert into t1 values(1); |
696 | +insert into t1 values(2); |
697 | +insert into t1 values(3); |
698 | +connection con1; |
699 | +reap; |
700 | +send select wait_for("sys_replication_log", "INSERT", "100", "3"); |
701 | +connection default; |
702 | +connection con1; |
703 | +reap; |
704 | +disconnect con1; |
705 | +connection default; |
706 | +drop table t1; |
707 | |
708 | === added file 'plugin/event_notify/tests/t/time_out.test' |
709 | --- plugin/event_notify/tests/t/time_out.test 1970-01-01 00:00:00 +0000 |
710 | +++ plugin/event_notify/tests/t/time_out.test 2013-09-18 20:41:29 +0000 |
711 | @@ -0,0 +1,2 @@ |
712 | +send select wait_for("t1", "INSERT", "5", "20"); |
713 | +reap; |
714 | |
715 | === added file 'plugin/event_notify/tests/t/update.test' |
716 | --- plugin/event_notify/tests/t/update.test 1970-01-01 00:00:00 +0000 |
717 | +++ plugin/event_notify/tests/t/update.test 2013-09-18 20:41:29 +0000 |
718 | @@ -0,0 +1,21 @@ |
719 | +connect (con1,localhost,root,,); |
720 | +--disable_warnings |
721 | +drop table if exists t1; |
722 | +--enable_warnings |
723 | +create table t1 (a int primary key); |
724 | +connection con1; |
725 | +send select wait_for("t1", "UPDATE", "200.9", "1"); |
726 | +connection default; |
727 | +insert into t1 values(1),(2),(3); |
728 | +update t1 set a=10 where a=1; |
729 | +update t1 set a=20 where a=2; |
730 | +update t1 set a=30 where a=3; |
731 | +connection con1; |
732 | +reap; |
733 | +send select wait_for("t1", "UPDATE", "1000", "2"); |
734 | +connection default; |
735 | +connection con1; |
736 | +reap; |
737 | +disconnect con1; |
738 | +connection default; |
739 | +drop table t1; |
740 | |
741 | === modified file 'plugin/slave/queue_producer.cc' |
742 | --- plugin/slave/queue_producer.cc 2012-04-14 20:43:20 +0000 |
743 | +++ plugin/slave/queue_producer.cc 2013-09-18 20:41:29 +0000 |
744 | @@ -49,6 +49,62 @@ |
745 | return reconnect(true); |
746 | } |
747 | |
748 | +void QueueProducer::run() |
749 | +{ |
750 | + boost::posix_time::seconds duration(getSleepInterval()); |
751 | + |
752 | + /* thread setup needed to do things like create a Session */ |
753 | + internal::my_thread_init(); |
754 | + |
755 | + if (not init()) |
756 | + return; |
757 | + |
758 | + while (1) |
759 | + { |
760 | + { |
761 | + /* This uninterruptable block processes the message queue */ |
762 | + boost::this_thread::disable_interruption di; |
763 | + |
764 | + if (not process()) |
765 | + { |
766 | + shutdown(); |
767 | + return; |
768 | + } |
769 | + } |
770 | + |
771 | + /* Interruptable only when not doing work (aka, sleeping) */ |
772 | + try |
773 | + { |
774 | + /* waiting for num_of_events through a wait_for() call*/ |
775 | + int num_of_events = 2; |
776 | + std::string event_sql("select wait_for(\"sys_replication_log\", \"INSERT\", \"2000\", \""); |
777 | + event_sql.append(boost::lexical_cast<string>(num_of_events)); |
778 | + event_sql.append("\")"); |
779 | + drizzle_return_t event_ret; |
780 | + drizzle_result_st event_result; |
781 | + drizzle_query_str(_connection, &event_result, event_sql.c_str(), &event_ret); |
782 | + |
783 | + if (event_ret != DRIZZLE_RETURN_OK){ |
784 | + drizzle_result_free(&event_result); |
785 | + boost::this_thread::sleep(duration); |
786 | + string error = ""; |
787 | + error.append(drizzle_error(_drizzle)); |
788 | + cout << error <<": producer thread sleeping\n"; |
789 | + } |
790 | + else{ |
791 | + num_of_events += 1; |
792 | + cout << "wait_for returned\n"; |
793 | + drizzle_result_free(&event_result); |
794 | + } |
795 | + } |
796 | + catch (boost::thread_interrupted &) |
797 | + { |
798 | + return; |
799 | + } |
800 | + } |
801 | +} |
802 | + |
803 | + |
804 | bool QueueProducer::process() |
805 | { |
806 | if (_saved_max_commit_id == 0) |
807 | |
808 | === modified file 'plugin/slave/queue_thread.h' |
809 | --- plugin/slave/queue_thread.h 2011-03-14 05:40:28 +0000 |
810 | +++ plugin/slave/queue_thread.h 2013-09-18 20:41:29 +0000 |
811 | @@ -40,7 +40,7 @@ |
812 | virtual ~QueueThread() |
813 | {} |
814 | |
815 | - void run(void); |
816 | + virtual void run(void); |
817 | |
818 | /** |
819 | * Do any initialization work. |