Merge lp:~jaypipes/drizzle/replication-ddl into lp:~drizzle-trunk/drizzle/development

Proposed by Jay Pipes
Status: Merged
Merged at revision: not available
Proposed branch: lp:~jaypipes/drizzle/replication-ddl
Merge into: lp:~drizzle-trunk/drizzle/development
Prerequisite: lp:~jaypipes/drizzle/replication-write-buffers
Diff against target: 2587 lines (+894/-424)
49 files modified
drizzled/atomic/gcc_traits.h (+11/-10)
drizzled/atomic/pthread_traits.h (+20/-8)
drizzled/atomic/sun_studio.h (+152/-29)
drizzled/atomics.h (+16/-11)
drizzled/cursor.cc (+1/-4)
drizzled/include.am (+1/-0)
drizzled/plugin/replication.h (+46/-0)
drizzled/plugin/transaction_applier.h (+7/-2)
drizzled/plugin/transaction_replicator.h (+8/-4)
drizzled/replication_services.cc (+20/-13)
drizzled/replication_services.h (+4/-1)
drizzled/session.cc (+5/-5)
drizzled/set_var.cc (+1/-1)
drizzled/sql_delete.cc (+1/-1)
drizzled/sql_insert.cc (+2/-2)
drizzled/sql_parse.cc (+1/-1)
drizzled/sql_table.cc (+3/-3)
drizzled/sql_update.cc (+1/-1)
drizzled/statement/alter_table.cc (+3/-3)
drizzled/statement/release_savepoint.cc (+1/-1)
drizzled/statement/rollback_to_savepoint.cc (+2/-2)
drizzled/statement/savepoint.cc (+2/-2)
drizzled/transaction_services.cc (+47/-145)
drizzled/transaction_services.h (+40/-10)
plugin/default_replicator/default_replicator.cc (+5/-2)
plugin/default_replicator/default_replicator.h (+6/-1)
plugin/filtered_replicator/filtered_replicator.cc (+6/-3)
plugin/filtered_replicator/filtered_replicator.h (+6/-2)
plugin/innobase/handler/ha_innodb.cc (+0/-30)
plugin/transaction_log/module.cc (+26/-4)
plugin/transaction_log/plugin.ini (+2/-2)
plugin/transaction_log/tests/r/ddl_transaction_id.result (+36/-0)
plugin/transaction_log/tests/r/filtered_replicator.result (+4/-0)
plugin/transaction_log/tests/r/information_schema.result (+8/-5)
plugin/transaction_log/tests/r/insert.result (+4/-0)
plugin/transaction_log/tests/r/truncate_log.result (+2/-1)
plugin/transaction_log/tests/r/udf_print_transaction_message.result (+1/-0)
plugin/transaction_log/tests/r/variables.result (+2/-1)
plugin/transaction_log/tests/t/ddl_transaction_id-master.opt (+1/-0)
plugin/transaction_log/tests/t/ddl_transaction_id.inc (+29/-0)
plugin/transaction_log/tests/t/ddl_transaction_id.test (+14/-0)
plugin/transaction_log/tests/t/information_schema.test (+2/-2)
plugin/transaction_log/tests/t/insert.inc (+2/-0)
plugin/transaction_log/transaction_log.cc (+83/-32)
plugin/transaction_log/transaction_log.h (+32/-5)
plugin/transaction_log/transaction_log_applier.cc (+47/-72)
plugin/transaction_log/transaction_log_applier.h (+20/-3)
plugin/transaction_log/write_buffer.cc (+71/-0)
plugin/transaction_log/write_buffer.h (+90/-0)
To merge this branch: bzr merge lp:~jaypipes/drizzle/replication-ddl
Reviewer Review Type Date Requested Status
Brian Aker Needs Fixing
Jay Pipes (community) Needs Resubmitting
Monty Taylor Needs Fixing
Review via email: mp+22558@code.launchpad.net

Description of the change

    * Fixes drizzled's atomics:

    - fetch_and_add() was actually add_and_fetch() - fixed to have both methods correct
    - compare_and_swap() was incorrect for all traits classes. Fixed to return a bool
    true only when the supplied value is actually swapped
    - fixes increment() and decrement() methods and operator+=() in outer atomics class
    template to call proper add_and_fetch() methods on traits classes
    - Now that above are fixed, removed the hacks in Query_id and TransactionLog to
    have query ID and the new transactoin ID start properly at 1.

    * Transaction messages sent over replication stream now use
    a real transaction ID, managed by drizzled::TransactionServices. Previously,
    the Query_id was being used, resulting in SELECT statements incrementing the
    transaction ID.

    * Added a test case to ensure that DDL ops are given a transaction ID and SELECT
    ops do not increment the transaction ID.

    The transaction ID will be paired with a channel ID to become the global
    transaction identifier. ReplicationServices will manage the pairing of
    channel and transaction ID and understand how far a particular subscriber
    node has applied.

To post a comment you must log in.
Revision history for this message
Monty Taylor (mordred) wrote :

It's missing an update to the sun studio atomic trait... but other than that looks good.

review: Needs Fixing
Revision history for this message
Jay Pipes (jaypipes) wrote :

Pushed Sun atomics fixes. Please review r1419. If we could run it through a hudson builder for Sun-only, that would be cool...

Revision history for this message
Jay Pipes (jaypipes) wrote :

OK, all green in build. Fixed Sun Studio atomics.

review: Needs Resubmitting
lp:~jaypipes/drizzle/replication-ddl updated
1423. By Jay Pipes <jpipes@serialcoder>

Fix cpplint header guard.

Revision history for this message
Brian Aker (brianaker) wrote :

This needs to be remerged with Trunk.

Thanks,
   -Brian

review: Needs Fixing
Revision history for this message
Jay Pipes (jaypipes) wrote :

This has already been merged with trunk.

Revision history for this message
Brian Aker (brianaker) wrote :

Sorry, I must have clicked on the wrong link. Though I am not seeing
this in the merge history.

On Apr 1, 2010, at 12:55 PM, Jay Pipes wrote:

> This has already been merged with trunk.
> --
> https://code.launchpad.net/~jaypipes/drizzle/replication-ddl/+merge/22558
> You are reviewing the proposed merge of lp:~jaypipes/drizzle/
> replication-ddl into lp:drizzle.

Revision history for this message
Jay Pipes (jaypipes) wrote :

No worries. I'm pretty sure I'm all merged up, though, as I see no revisions to pull if I bzr merge trunk locally...

Revision history for this message
Jay Pipes (jaypipes) wrote :

fyi, r1422 on this branch contains the most recent merge with trunk

lp:~jaypipes/drizzle/replication-ddl updated
1424. By Jay Pipes <jpipes@serialcoder>

merge trunk

1425. By Jay Pipes <jpipes@serialcoder>

merge replication-api

1426. By Jay Pipes <jpipes@serialcoder>

Merge trunk

1427. By Jay Pipes <jpipes@serialcoder>

Try another const_cast<> technique on pthread_traits.

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'drizzled/atomic/gcc_traits.h'
2--- drizzled/atomic/gcc_traits.h 2009-07-17 20:59:45 +0000
3+++ drizzled/atomic/gcc_traits.h 2010-04-06 19:16:34 +0000
4@@ -32,36 +32,37 @@
5
6 gcc_traits() {}
7
8- /* YES. I know these are semantically backwards...
9- * so... TODO: Ensure we're doing the "right" thing here
10- */
11+ inline value_type add_and_fetch(volatile value_type *value, D addend )
12+ {
13+ return __sync_add_and_fetch(value, addend);
14+ }
15+
16 inline value_type fetch_and_add(volatile value_type *value, D addend )
17 {
18- return __sync_add_and_fetch(value, addend);
19+ return __sync_fetch_and_add(value, addend);
20 }
21
22 inline value_type fetch_and_increment(volatile value_type *value)
23 {
24- return __sync_add_and_fetch(value, 1);
25+ return __sync_fetch_and_add(value, 1);
26 }
27
28 inline value_type fetch_and_decrement(volatile value_type *value)
29 {
30- return __sync_sub_and_fetch(value, 1);
31+ return __sync_fetch_and_sub(value, 1);
32 }
33
34 inline value_type fetch_and_store(volatile value_type *value,
35 value_type new_value)
36 {
37- /* TODO: Is this the right one? */
38 return __sync_lock_test_and_set(value, new_value);
39 }
40
41- inline value_type compare_and_swap(volatile value_type *value,
42+ inline bool compare_and_swap(volatile value_type *value,
43 value_type new_value,
44 value_type comparand )
45 {
46- return __sync_val_compare_and_swap(value, comparand, new_value);
47+ return __sync_bool_compare_and_swap(value, comparand, new_value);
48 }
49
50 inline value_type fetch(const volatile value_type *value) const volatile
51@@ -76,7 +77,7 @@
52 * Look at how to rewrite the below to something that ICC feels is
53 * OK and yet respects memory barriers.
54 */
55- return __sync_add_and_fetch(const_cast<value_type *>(value), 0);
56+ return __sync_fetch_and_add(const_cast<value_type *>(value), 0);
57 }
58
59 inline value_type store_with_release(volatile value_type *value,
60
61=== modified file 'drizzled/atomic/pthread_traits.h'
62--- drizzled/atomic/pthread_traits.h 2009-12-14 19:51:51 +0000
63+++ drizzled/atomic/pthread_traits.h 2010-04-06 19:16:34 +0000
64@@ -67,11 +67,20 @@
65
66 pthread_traits() {}
67
68+ inline value_type add_and_fetch(volatile value_type *value, D addend )
69+ {
70+ my_lock.lock();
71+ *value += addend;
72+ value_type ret= *value;
73+ my_lock.unlock();
74+ return ret;
75+ }
76+
77 inline value_type fetch_and_add(volatile value_type *value, D addend )
78 {
79 my_lock.lock();
80+ value_type ret= *value;
81 *value += addend;
82- value_type ret= *value;
83 my_lock.unlock();
84 return ret;
85 }
86@@ -79,8 +88,8 @@
87 inline value_type fetch_and_increment(volatile value_type *value)
88 {
89 my_lock.lock();
90+ value_type ret= *value;
91 *value++;
92- value_type ret= *value;
93 my_lock.unlock();
94 return ret;
95 }
96@@ -88,8 +97,8 @@
97 inline value_type fetch_and_decrement(volatile value_type *value)
98 {
99 my_lock.lock();
100+ value_type ret= *value;
101 *value--;
102- value_type ret= *value;
103 my_lock.unlock();
104 return ret;
105 }
106@@ -98,27 +107,30 @@
107 value_type new_value )
108 {
109 my_lock.lock();
110+ value_type ret= *value;
111 *value= new_value;
112- value_type ret= *value;
113 my_lock.unlock();
114 return ret;
115 }
116
117- inline value_type compare_and_swap(volatile value_type *value,
118+ inline bool compare_and_swap(volatile value_type *value,
119 value_type new_value,
120 value_type comparand )
121 {
122 my_lock.lock();
123- if (*value == comparand)
124+ bool ret= (*value == comparand);
125+ if (ret)
126 *value= new_value;
127- value_type ret= *value;
128 my_lock.unlock();
129 return ret;
130 }
131
132 inline value_type fetch(const volatile value_type *value) const volatile
133 {
134- return *value;
135+ const_cast<pthread_traits *>(this)->my_lock.lock();
136+ value_type ret= *value;
137+ const_cast<pthread_traits *>(this)->my_lock.unlock();
138+ return ret;
139 }
140
141 inline value_type store_with_release(volatile value_type *value,
142
143=== modified file 'drizzled/atomic/sun_studio.h'
144--- drizzled/atomic/sun_studio.h 2009-08-30 00:26:17 +0000
145+++ drizzled/atomic/sun_studio.h 2010-04-06 19:16:34 +0000
146@@ -24,91 +24,177 @@
147 #include <atomic.h>
148 #undef _KERNEL
149
150+inline bool __sync_fetch_and_add(volatile bool* ptr, bool val)
151+{
152+ bool ret= *ptr;
153+ (val == true) ? atomic_inc_8((volatile uint8_t *)ptr) : atomic_add_8((volatile uint8_t *)ptr, (int8_t)val);
154+ return ret;
155+}
156+
157+inline int8_t __sync_fetch_and_add(volatile int8_t* ptr, int8_t val)
158+{
159+ int8_t ret= *ptr;
160+ (val == 1) ? atomic_inc_8((volatile uint8_t*)ptr) : atomic_add_8((volatile uint8_t*)ptr, val);
161+ return ret;
162+}
163+
164+inline int16_t __sync_fetch_and_add(volatile int16_t* ptr, int16_t val)
165+{
166+ int16_t ret= *ptr;
167+ (val == 1) ? atomic_inc_16((volatile uint16_t*)ptr) : atomic_add_16((volatile uint16_t*)ptr, val);
168+ return ret;
169+}
170+
171+inline int32_t __sync_fetch_and_add(volatile int32_t* ptr, int32_t val)
172+{
173+ int32_t ret= *ptr;
174+ (val == 1) ? atomic_inc_32((volatile uint32_t*)ptr) : atomic_add_32((volatile uint32_t*)ptr, val);
175+ return ret;
176+}
177+
178+inline uint8_t __sync_fetch_and_add(volatile uint8_t* ptr, uint8_t val)
179+{
180+ uint8_t ret= *ptr;
181+ (val == 1) ? atomic_inc_8(ptr) : atomic_add_8(ptr, (int8_t)val);
182+ return ret;
183+}
184+
185+inline uint16_t __sync_fetch_and_add(volatile uint16_t* ptr, uint16_t val)
186+{
187+ uint16_t ret= *ptr;
188+ (val == 1) ? atomic_inc_16(ptr) : atomic_add_16(ptr, (int16_t)val);
189+ return ret;
190+}
191+
192+inline uint32_t __sync_fetch_and_add(volatile uint32_t* ptr, uint32_t val)
193+{
194+ uint32_t ret= *ptr;
195+ (val == 1) ? atomic_inc_32(ptr) : atomic_add_32(ptr, (int32_t)val);
196+ return ret;
197+}
198+
199+# if defined(_KERNEL) || defined(_INT64_TYPE)
200+inline uint64_t __sync_fetch_and_add(volatile uint64_t* ptr, uint64_t val)
201+{
202+ uint64_t ret= *ptr;
203+ (val == 1) ? atomic_inc_64(ptr) : atomic_add_64(ptr, (int64_t)val);
204+ return ret;
205+}
206+
207+inline int64_t __sync_fetch_and_add(volatile int64_t* ptr, int64_t val)
208+{
209+ int64_t ret= *ptr;
210+ (val == 1) ? atomic_inc_64((volatile uint64_t*)ptr) : atomic_add_64((volatile uint64_t*)ptr, val);
211+ return ret;
212+}
213+# endif /* defined(_KERNEL) || defined(_INT64_TYPE) */
214+
215+inline uint8_t __sync_fetch_and_sub(volatile uint8_t* ptr, uint8_t val)
216+{
217+ uint8_t ret= *ptr;
218+ (val == 1) ? atomic_dec_8(ptr) : atomic_add_8(ptr, 0-(int8_t)val);
219+ return ret;
220+}
221+
222+inline uint16_t __sync_fetch_and_sub(volatile uint16_t* ptr, uint16_t val)
223+{
224+ uint16_t ret= *ptr;
225+ (val == 1) ? atomic_dec_16(ptr) : atomic_add_16(ptr, 0-(int16_t)val);
226+ return ret;
227+}
228+
229+inline uint32_t __sync_fetch_and_sub(volatile uint32_t* ptr, uint32_t val)
230+{
231+ uint32_t ret= *ptr;
232+ (val == 1) ? atomic_dec_32(ptr) : atomic_add_32(ptr, 0-(int32_t)val);
233+ return ret;
234+}
235+
236+# if defined(_KERNEL) || defined(_INT64_TYPE)
237+inline uint64_t __sync_fetch_and_sub(volatile uint64_t* ptr, uint64_t val)
238+{
239+ uint64_t ret= *ptr;
240+ (val == 1) ? atomic_dec_64(ptr) : atomic_add_64(ptr, 0-(int64_t)val);
241+ return ret;
242+}
243+inline int64_t __sync_fetch_and_sub(volatile int64_t* ptr, uint64_t val)
244+{
245+ int64_t ret= *ptr;
246+ (val == 1) ? atomic_dec_64((volatile uint64_t *) ptr) : atomic_add_64((volatile uint64_t *) ptr, 0-(int64_t)val);
247+ return ret;
248+}
249+# endif /* defined(_KERNEL) || defined(_INT64_TYPE) */
250+
251 inline bool __sync_add_and_fetch(volatile bool* ptr, bool val)
252 {
253- (val == true) ? atomic_inc_8((volatile uint8_t *)ptr) : atomic_add_8((volatile uint8_t *)ptr, (int8_t)val);
254- return *ptr;
255+ return (val == true) ? atomic_inc_8_nv((volatile uint8_t *)ptr) : atomic_add_8_nv((volatile uint8_t *)ptr, (int8_t)val);
256 }
257
258 inline int8_t __sync_add_and_fetch(volatile int8_t* ptr, int8_t val)
259 {
260- (val == 1) ? atomic_inc_8((volatile uint8_t*)ptr) : atomic_add_8((volatile uint8_t*)ptr, val);
261- return *ptr;
262+ return (val == 1) ? atomic_inc_8_nv((volatile uint8_t*)ptr) : atomic_add_8_nv((volatile uint8_t*)ptr, val);
263 }
264
265 inline int16_t __sync_add_and_fetch(volatile int16_t* ptr, int16_t val)
266 {
267- (val == 1) ? atomic_inc_16((volatile uint16_t*)ptr) : atomic_add_16((volatile uint16_t*)ptr, val);
268- return *ptr;
269+ return (val == 1) ? atomic_inc_16_nv((volatile uint16_t*)ptr) : atomic_add_16_nv((volatile uint16_t*)ptr, val);
270 }
271
272 inline int32_t __sync_add_and_fetch(volatile int32_t* ptr, int32_t val)
273 {
274- (val == 1) ? atomic_inc_32((volatile uint32_t*)ptr) : atomic_add_32((volatile uint32_t*)ptr, val);
275- return *ptr;
276+ return (val == 1) ? atomic_inc_32_nv((volatile uint32_t*)ptr) : atomic_add_32_nv((volatile uint32_t*)ptr, val);
277 }
278
279 inline uint8_t __sync_add_and_fetch(volatile uint8_t* ptr, uint8_t val)
280 {
281- (val == 1) ? atomic_inc_8(ptr) : atomic_add_8(ptr, (int8_t)val);
282- return *ptr;
283+ return (val == 1) ? atomic_inc_8_nv(ptr) : atomic_add_8_nv(ptr, (int8_t)val);
284 }
285
286 inline uint16_t __sync_add_and_fetch(volatile uint16_t* ptr, uint16_t val)
287 {
288- (val == 1) ? atomic_inc_16(ptr) : atomic_add_16(ptr, (int16_t)val);
289- return *ptr;
290+ return (val == 1) ? atomic_inc_16_nv(ptr) : atomic_add_16_nv(ptr, (int16_t)val);
291 }
292
293 inline uint32_t __sync_add_and_fetch(volatile uint32_t* ptr, uint32_t val)
294 {
295- (val == 1) ? atomic_inc_32(ptr) : atomic_add_32(ptr, (int32_t)val);
296- return *ptr;
297+ return (val == 1) ? atomic_inc_32_nv(ptr) : atomic_add_32_nv(ptr, (int32_t)val);
298 }
299
300 # if defined(_KERNEL) || defined(_INT64_TYPE)
301 inline uint64_t __sync_add_and_fetch(volatile uint64_t* ptr, uint64_t val)
302 {
303- (val == 1) ? atomic_inc_64(ptr) : atomic_add_64(ptr, (int64_t)val);
304- return *ptr;
305+ return (val == 1) ? atomic_inc_64_nv(ptr) : atomic_add_64_nv(ptr, (int64_t)val);
306 }
307
308 inline int64_t __sync_add_and_fetch(volatile int64_t* ptr, int64_t val)
309 {
310- (val == 1) ? atomic_inc_64((volatile uint64_t*)ptr) : atomic_add_64((volatile uint64_t*)ptr, val);
311- return *ptr;
312+ return (val == 1) ? atomic_inc_64_nv((volatile uint64_t*)ptr) : atomic_add_64_nv((volatile uint64_t*)ptr, val);
313 }
314 # endif /* defined(_KERNEL) || defined(_INT64_TYPE) */
315
316-
317 inline uint8_t __sync_sub_and_fetch(volatile uint8_t* ptr, uint8_t val)
318 {
319- (val == 1) ? atomic_dec_8(ptr) : atomic_add_8(ptr, 0-(int8_t)val);
320- return *ptr;
321+ return (val == 1) ? atomic_dec_8_nv(ptr) : atomic_add_8_nv(ptr, 0-(int8_t)val);
322 }
323
324 inline uint16_t __sync_sub_and_fetch(volatile uint16_t* ptr, uint16_t val)
325 {
326- (val == 1) ? atomic_dec_16(ptr) : atomic_add_16(ptr, 0-(int16_t)val);
327- return *ptr;
328+ return (val == 1) ? atomic_dec_16_nv(ptr) : atomic_add_16_nv(ptr, 0-(int16_t)val);
329 }
330
331 inline uint32_t __sync_sub_and_fetch(volatile uint32_t* ptr, uint32_t val)
332 {
333- (val == 1) ? atomic_dec_32(ptr) : atomic_add_32(ptr, 0-(int32_t)val);
334- return *ptr;
335+ return (val == 1) ? atomic_dec_32_nv(ptr) : atomic_add_32_nv(ptr, 0-(int32_t)val);
336 }
337
338 # if defined(_KERNEL) || defined(_INT64_TYPE)
339 inline uint64_t __sync_sub_and_fetch(volatile uint64_t* ptr, uint64_t val)
340 {
341- (val == 1) ? atomic_dec_64(ptr) : atomic_add_64(ptr, 0-(int64_t)val);
342- return *ptr;
343+ return (val == 1) ? atomic_dec_64_nv(ptr) : atomic_add_64_nv(ptr, 0-(int64_t)val);
344 }
345 inline int64_t __sync_sub_and_fetch(volatile int64_t* ptr, uint64_t val)
346 {
347- (val == 1) ? atomic_dec_64((volatile uint64_t *) ptr) : atomic_add_64((volatile uint64_t *) ptr, 0-(int64_t)val);
348- return *ptr;
349+ return (val == 1) ? atomic_dec_64_nv((volatile uint64_t *) ptr) : atomic_add_64_nv((volatile uint64_t *) ptr, 0-(int64_t)val);
350 }
351 # endif /* defined(_KERNEL) || defined(_INT64_TYPE) */
352
353@@ -175,4 +261,41 @@
354 }
355 #endif /* defined(_KERNEL) || defined(_INT64_TYPE) */
356
357+inline int8_t __sync_bool_compare_and_swap(volatile int8_t* ptr,
358+ int8_t old_val, int8_t val)
359+{
360+ int8_t orig= *ptr;
361+ return orig == atomic_cas_8((volatile uint8_t *)ptr, old_val, val);
362+}
363+
364+inline uint8_t __sync_bool_compare_and_swap(volatile uint8_t* ptr,
365+ uint8_t old_val, uint8_t val)
366+{
367+ uint8_t orig= *ptr;
368+ return orig == atomic_cas_8(ptr, old_val, val);
369+}
370+
371+inline uint16_t __sync_bool_compare_and_swap(volatile uint16_t* ptr,
372+ uint16_t old_val, uint16_t val)
373+{
374+ uint16_t orig= *ptr;
375+ return orig == atomic_cas_16(ptr, old_val, val);
376+}
377+
378+inline uint32_t __sync_bool_compare_and_swap(volatile uint32_t* ptr,
379+ uint32_t old_val, uint32_t val)
380+{
381+ uint32_t orig= *ptr;
382+ return orig == atomic_cas_32(ptr, old_val, val);
383+}
384+
385+# if defined(_KERNEL) || defined(_INT64_TYPE)
386+inline uint64_t __sync_bool_compare_and_swap(volatile uint64_t* ptr,
387+ uint64_t old_val, uint64_t val)
388+{
389+ uint64_t orig= *ptr;
390+ return orig == atomic_cas_64(ptr, old_val, val);
391+}
392+#endif /* defined(_KERNEL) || defined(_INT64_TYPE) */
393+
394 #endif /* DRIZZLED_ATOMIC_SUN_STUDIO_H */
395
396=== modified file 'drizzled/atomics.h'
397--- drizzled/atomics.h 2010-01-04 22:52:52 +0000
398+++ drizzled/atomics.h 2010-04-06 19:16:34 +0000
399@@ -58,6 +58,11 @@
400
401 atomic_impl() : atomic_base<I>(), traits() {}
402
403+ value_type add_and_fetch( D addend )
404+ {
405+ return traits.add_and_fetch(&this->my_value, addend);
406+ }
407+
408 value_type fetch_and_add( D addend )
409 {
410 return traits.fetch_and_add(&this->my_value, addend);
411@@ -78,7 +83,7 @@
412 return traits.fetch_and_store(&this->my_value, value);
413 }
414
415- value_type compare_and_swap( value_type value, value_type comparand )
416+ bool compare_and_swap( value_type value, value_type comparand )
417 {
418 return traits.compare_and_swap(&this->my_value, value, comparand);
419 }
420@@ -102,7 +107,7 @@
421 public:
422 atomic_impl<I,D,T>& operator+=( D addend )
423 {
424- fetch_and_add(addend)+addend;
425+ increment(addend);
426 return *this;
427 }
428
429@@ -113,15 +118,15 @@
430 return operator+=(D(0)-addend);
431 }
432
433- value_type increment() {
434- return fetch_and_add(1)+1;
435- }
436-
437- value_type decrement() {
438- return fetch_and_add(D(-1))-1;
439- }
440-
441-
442+ value_type increment()
443+ {
444+ return add_and_fetch(1);
445+ }
446+
447+ value_type decrement()
448+ {
449+ return add_and_fetch(D(-1));
450+ }
451 };
452
453 } /* namespace internal */
454
455=== modified file 'drizzled/cursor.cc'
456--- drizzled/cursor.cc 2010-04-01 06:30:46 +0000
457+++ drizzled/cursor.cc 2010-04-06 19:16:34 +0000
458@@ -36,7 +36,6 @@
459 #include "drizzled/session.h"
460 #include "drizzled/sql_base.h"
461 #include "drizzled/transaction_services.h"
462-#include "drizzled/replication_services.h"
463 #include "drizzled/lock.h"
464 #include "drizzled/item/int.h"
465 #include "drizzled/item/empty_string.h"
466@@ -44,7 +43,6 @@
467 #include "drizzled/message/table.pb.h"
468 #include "drizzled/plugin/client.h"
469 #include "drizzled/internal/my_sys.h"
470-#include "drizzled/transaction_services.h"
471
472 using namespace std;
473
474@@ -1297,10 +1295,9 @@
475 const unsigned char *after_record)
476 {
477 TransactionServices &transaction_services= TransactionServices::singleton();
478- ReplicationServices &replication_services= ReplicationServices::singleton();
479 Session *const session= table->in_use;
480
481- if (table->s->tmp_table || not replication_services.isActive())
482+ if (table->s->tmp_table || not transaction_services.shouldConstructMessages())
483 return false;
484
485 bool result= false;
486
487=== modified file 'drizzled/include.am'
488--- drizzled/include.am 2010-04-02 07:45:12 +0000
489+++ drizzled/include.am 2010-04-06 19:16:34 +0000
490@@ -307,6 +307,7 @@
491 drizzled/plugin/query_cache.h \
492 drizzled/plugin/query_rewrite.h \
493 drizzled/plugin/registry.h \
494+ drizzled/plugin/replication.h \
495 drizzled/plugin/scheduler.h \
496 drizzled/plugin/storage_engine.h \
497 drizzled/plugin/table_function.h \
498
499=== added file 'drizzled/plugin/replication.h'
500--- drizzled/plugin/replication.h 1970-01-01 00:00:00 +0000
501+++ drizzled/plugin/replication.h 2010-04-06 19:16:34 +0000
502@@ -0,0 +1,46 @@
503+/* -*- mode: c++; c-basic-offset: 2; indent-tabs-mode: nil; -*-
504+ * vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
505+ *
506+ * Copyright (c) 2010 Jay Pipes
507+ *
508+ * Authors:
509+ *
510+ * Jay Pipes <jaypipes@gmail.com>
511+ *
512+ * This program is free software; you can redistribute it and/or modify
513+ * it under the terms of the GNU General Public License as published by
514+ * the Free Software Foundation; version 2 of the License.
515+ *
516+ * This program is distributed in the hope that it will be useful,
517+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
518+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
519+ * GNU General Public License for more details.
520+ *
521+ * You should have received a copy of the GNU General Public License
522+ * along with this program; if not, write to the Free Software
523+ * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
524+ */
525+
526+#ifndef DRIZZLED_PLUGIN_REPLICATION_H
527+#define DRIZZLED_PLUGIN_REPLICATION_H
528+
529+/**
530+ * @file Common structs and enums for the replication API
531+ */
532+
533+namespace drizzled
534+{
535+
536+namespace plugin
537+{
538+
539+enum ReplicationReturnCode
540+{
541+ SUCCESS= 0, /* no error */
542+ UNKNOWN_ERROR= 1
543+};
544+
545+} /* namespace plugin */
546+} /* namespace drizzled */
547+
548+#endif /* DRIZZLED_PLUGIN_REPLICATION_H */
549
550=== modified file 'drizzled/plugin/transaction_applier.h'
551--- drizzled/plugin/transaction_applier.h 2009-10-28 16:38:30 +0000
552+++ drizzled/plugin/transaction_applier.h 2010-04-06 19:16:34 +0000
553@@ -2,10 +2,11 @@
554 * vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
555 *
556 * Copyright (C) 2008-2009 Sun Microsystems
557+ * Copyright (c) 2010 Jay Pipes
558 *
559 * Authors:
560 *
561- * Jay Pipes <joinfu@sun.com>
562+ * Jay Pipes <jaypipes@gmail.com>
563 *
564 * This program is free software; you can redistribute it and/or modify
565 * it under the terms of the GNU General Public License as published by
566@@ -33,11 +34,14 @@
567 */
568
569 #include "drizzled/plugin/plugin.h"
570+#include "drizzled/plugin/replication.h"
571 #include "drizzled/atomics.h"
572
573 namespace drizzled
574 {
575
576+class Session;
577+
578 namespace message { class Transaction; }
579
580 namespace plugin
581@@ -74,7 +78,8 @@
582 *
583 * @param Transaction message to be replicated
584 */
585- virtual void apply(const message::Transaction &to_apply)= 0;
586+ virtual ReplicationReturnCode apply(Session &in_session,
587+ const message::Transaction &to_apply)= 0;
588
589 static bool addPlugin(TransactionApplier *applier);
590 static void removePlugin(TransactionApplier *applier);
591
592=== modified file 'drizzled/plugin/transaction_replicator.h'
593--- drizzled/plugin/transaction_replicator.h 2009-12-04 23:47:14 +0000
594+++ drizzled/plugin/transaction_replicator.h 2010-04-06 19:16:34 +0000
595@@ -2,10 +2,11 @@
596 * vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
597 *
598 * Copyright (C) 2008-2009 Sun Microsystems
599+ * Copyright (c) 2010 Jay Pipes
600 *
601 * Authors:
602 *
603- * Jay Pipes <joinfu@sun.com>
604+ * Jay Pipes <jaypipes@gmail.com>
605 *
606 * This program is free software; you can redistribute it and/or modify
607 * it under the terms of the GNU General Public License as published by
608@@ -25,6 +26,7 @@
609 #define DRIZZLED_PLUGIN_TRANSACTION_REPLICATOR_H
610
611 #include "drizzled/atomics.h"
612+#include "drizzled/plugin/replication.h"
613 #include "drizzled/plugin/plugin.h"
614
615 /**
616@@ -36,7 +38,6 @@
617 * An applier is responsible for applying events, not a replicator...
618 */
619
620-
621 namespace drizzled
622 {
623 namespace message
624@@ -45,6 +46,8 @@
625 class Statement;
626 }
627
628+class Session;
629+
630 namespace plugin
631 {
632
633@@ -83,8 +86,9 @@
634 * @param Pointer to the applier of the command message
635 * @param Transaction message to be replicated
636 */
637- virtual void replicate(TransactionApplier *in_applier,
638- message::Transaction &to_replicate)= 0;
639+ virtual ReplicationReturnCode replicate(TransactionApplier *in_applier,
640+ Session &session,
641+ message::Transaction &to_replicate)= 0;
642 static bool addPlugin(TransactionReplicator *replicator);
643 static void removePlugin(TransactionReplicator *replicator);
644
645
646=== modified file 'drizzled/replication_services.cc'
647--- drizzled/replication_services.cc 2010-03-17 03:11:34 +0000
648+++ drizzled/replication_services.cc 2010-04-06 19:16:34 +0000
649@@ -39,8 +39,6 @@
650 #include "drizzled/plugin/transaction_replicator.h"
651 #include "drizzled/plugin/transaction_applier.h"
652 #include "drizzled/message/transaction.pb.h"
653-#include "drizzled/message/table.pb.h"
654-#include "drizzled/message/statement_transform.h"
655 #include "drizzled/gettext.h"
656 #include "drizzled/session.h"
657 #include "drizzled/error.h"
658@@ -145,7 +143,8 @@
659 return is_active;
660 }
661
662-void ReplicationServices::pushTransactionMessage(message::Transaction &to_push)
663+plugin::ReplicationReturnCode ReplicationServices::pushTransactionMessage(Session &in_session,
664+ message::Transaction &to_push)
665 {
666 vector<plugin::TransactionReplicator *>::iterator repl_iter= replicators.begin();
667 vector<plugin::TransactionApplier *>::iterator appl_start_iter, appl_iter;
668@@ -154,6 +153,8 @@
669 plugin::TransactionReplicator *cur_repl;
670 plugin::TransactionApplier *cur_appl;
671
672+ plugin::ReplicationReturnCode result= plugin::SUCCESS;
673+
674 while (repl_iter != replicators.end())
675 {
676 cur_repl= *repl_iter;
677@@ -174,19 +175,25 @@
678 continue;
679 }
680
681- cur_repl->replicate(cur_appl, to_push);
682-
683- /*
684- * We update the timestamp for the last applied Transaction so that
685- * publisher plugins can ask the replication services when the
686- * last known applied Transaction was using the getLastAppliedTimestamp()
687- * method.
688- */
689- last_applied_timestamp.fetch_and_store(to_push.transaction_context().end_timestamp());
690- ++appl_iter;
691+ result= cur_repl->replicate(cur_appl, in_session, to_push);
692+
693+ if (result == plugin::SUCCESS)
694+ {
695+ /*
696+ * We update the timestamp for the last applied Transaction so that
697+ * publisher plugins can ask the replication services when the
698+ * last known applied Transaction was using the getLastAppliedTimestamp()
699+ * method.
700+ */
701+ last_applied_timestamp.fetch_and_store(to_push.transaction_context().end_timestamp());
702+ ++appl_iter;
703+ }
704+ else
705+ return result;
706 }
707 ++repl_iter;
708 }
709+ return result;
710 }
711
712 } /* namespace drizzled */
713
714=== modified file 'drizzled/replication_services.h'
715--- drizzled/replication_services.h 2010-03-16 21:30:44 +0000
716+++ drizzled/replication_services.h 2010-04-06 19:16:34 +0000
717@@ -26,6 +26,7 @@
718 #define DRIZZLED_REPLICATION_SERVICES_H
719
720 #include "drizzled/atomics.h"
721+#include "drizzled/plugin/replication.h"
722
723 #include <vector>
724
725@@ -96,9 +97,11 @@
726 * Helper method which pushes a constructed message out to the registered
727 * replicator and applier plugins.
728 *
729+ * @param Session descriptor
730 * @param Message to push out
731 */
732- void pushTransactionMessage(message::Transaction &to_push);
733+ plugin::ReplicationReturnCode pushTransactionMessage(Session &in_session,
734+ message::Transaction &to_push);
735 /**
736 * Constructor
737 */
738
739=== modified file 'drizzled/session.cc'
740--- drizzled/session.cc 2010-03-31 22:22:16 +0000
741+++ drizzled/session.cc 2010-04-06 19:16:34 +0000
742@@ -369,7 +369,7 @@
743 #endif
744 {
745 TransactionServices &transaction_services= TransactionServices::singleton();
746- transaction_services.ha_rollback_trans(this, true);
747+ transaction_services.rollbackTransaction(this, true);
748 xid_cache_delete(&transaction.xid_state);
749 }
750 hash_free(&user_vars);
751@@ -772,7 +772,7 @@
752 * (Which of course should never happen...)
753 */
754 server_status&= ~SERVER_STATUS_IN_TRANS;
755- if (transaction_services.ha_commit_trans(this, true))
756+ if (transaction_services.commitTransaction(this, true))
757 result= false;
758 options&= ~(OPTION_BEGIN);
759 break;
760@@ -789,7 +789,7 @@
761 case ROLLBACK_AND_CHAIN:
762 {
763 server_status&= ~SERVER_STATUS_IN_TRANS;
764- if (transaction_services.ha_rollback_trans(this, true))
765+ if (transaction_services.rollbackTransaction(this, true))
766 result= false;
767 options&= ~(OPTION_BEGIN);
768 if (result == true && (completion == ROLLBACK_AND_CHAIN))
769@@ -822,7 +822,7 @@
770 if (options & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))
771 {
772 server_status&= ~SERVER_STATUS_IN_TRANS;
773- if (transaction_services.ha_commit_trans(this, true))
774+ if (transaction_services.commitTransaction(this, true))
775 result= false;
776 }
777 options&= ~(OPTION_BEGIN);
778@@ -1947,7 +1947,7 @@
779 {
780 TransactionServices &transaction_services= TransactionServices::singleton();
781 main_da.can_overwrite_status= true;
782- transaction_services.ha_autocommit_or_rollback(this, is_error());
783+ transaction_services.autocommitOrRollback(this, is_error());
784 main_da.can_overwrite_status= false;
785 transaction.stmt.reset();
786 }
787
788=== modified file 'drizzled/set_var.cc'
789--- drizzled/set_var.cc 2010-03-30 09:29:02 +0000
790+++ drizzled/set_var.cc 2010-04-06 19:16:34 +0000
791@@ -1381,7 +1381,7 @@
792 session->options&= ~(uint64_t) (OPTION_BEGIN);
793 session->server_status|= SERVER_STATUS_AUTOCOMMIT;
794 TransactionServices &transaction_services= TransactionServices::singleton();
795- if (transaction_services.ha_commit_trans(session, true))
796+ if (transaction_services.commitTransaction(session, true))
797 return 1;
798 }
799 else
800
801=== modified file 'drizzled/sql_delete.cc'
802--- drizzled/sql_delete.cc 2010-02-06 03:22:59 +0000
803+++ drizzled/sql_delete.cc 2010-04-06 19:16:34 +0000
804@@ -396,7 +396,7 @@
805 Safety, in case the engine ignored ha_enable_transaction(false)
806 above. Also clears session->transaction.*.
807 */
808- error= transaction_services.ha_autocommit_or_rollback(&session, error);
809+ error= transaction_services.autocommitOrRollback(&session, error);
810 session.options= save_options;
811
812 return error;
813
814=== modified file 'drizzled/sql_insert.cc'
815--- drizzled/sql_insert.cc 2010-03-31 21:15:40 +0000
816+++ drizzled/sql_insert.cc 2010-04-06 19:16:34 +0000
817@@ -1325,7 +1325,7 @@
818 {
819 /*
820 We must invalidate the table in the query cache before binlog writing
821- and ha_autocommit_or_rollback.
822+ and autocommitOrRollback.
823 */
824 if (session->transaction.stmt.hasModifiedNonTransData())
825 session->transaction.all.markModifiedNonTransData();
826@@ -1711,7 +1711,7 @@
827 if (!table->s->tmp_table)
828 {
829 TransactionServices &transaction_services= TransactionServices::singleton();
830- transaction_services.ha_autocommit_or_rollback(session, 0);
831+ transaction_services.autocommitOrRollback(session, 0);
832 (void) session->endActiveTransaction();
833 }
834
835
836=== modified file 'drizzled/sql_parse.cc'
837--- drizzled/sql_parse.cc 2010-03-31 21:15:40 +0000
838+++ drizzled/sql_parse.cc 2010-04-06 19:16:34 +0000
839@@ -254,7 +254,7 @@
840 /* If commit fails, we should be able to reset the OK status. */
841 session->main_da.can_overwrite_status= true;
842 TransactionServices &transaction_services= TransactionServices::singleton();
843- transaction_services.ha_autocommit_or_rollback(session, session->is_error());
844+ transaction_services.autocommitOrRollback(session, session->is_error());
845 session->main_da.can_overwrite_status= false;
846
847 session->transaction.stmt.reset();
848
849=== modified file 'drizzled/sql_table.cc'
850--- drizzled/sql_table.cc 2010-03-31 21:15:40 +0000
851+++ drizzled/sql_table.cc 2010-04-06 19:16:34 +0000
852@@ -1829,7 +1829,7 @@
853 length= snprintf(buff, sizeof(buff), ER(ER_OPEN_AS_READONLY),
854 table_name);
855 session->client->store(buff, length);
856- transaction_services.ha_autocommit_or_rollback(session, false);
857+ transaction_services.autocommitOrRollback(session, false);
858 session->endTransaction(COMMIT);
859 session->close_thread_tables();
860 lex->reset_query_tables_list(false);
861@@ -1952,7 +1952,7 @@
862 }
863 }
864 }
865- transaction_services.ha_autocommit_or_rollback(session, false);
866+ transaction_services.autocommitOrRollback(session, false);
867 session->endTransaction(COMMIT);
868 session->close_thread_tables();
869 table->table=0; // For query cache
870@@ -1964,7 +1964,7 @@
871 return(false);
872
873 err:
874- transaction_services.ha_autocommit_or_rollback(session, true);
875+ transaction_services.autocommitOrRollback(session, true);
876 session->endTransaction(ROLLBACK);
877 session->close_thread_tables(); // Shouldn't be needed
878 if (table)
879
880=== modified file 'drizzled/sql_update.cc'
881--- drizzled/sql_update.cc 2010-03-26 21:33:42 +0000
882+++ drizzled/sql_update.cc 2010-04-06 19:16:34 +0000
883@@ -523,7 +523,7 @@
884 last one without error. error > 0 means an error (e.g. unique key
885 violation and no IGNORE or REPLACE). error == 0 is also an error (if
886 preparing the record or invoking before triggers fails). See
887- ha_autocommit_or_rollback(error>=0) and return(error>=0) below.
888+ autocommitOrRollback(error>=0) and return(error>=0) below.
889 Sometimes we want to binlog even if we updated no rows, in case user used
890 it to be sure master and slave are in same state.
891 */
892
893=== modified file 'drizzled/statement/alter_table.cc'
894--- drizzled/statement/alter_table.cc 2010-03-31 22:22:16 +0000
895+++ drizzled/statement/alter_table.cc 2010-04-06 19:16:34 +0000
896@@ -619,7 +619,7 @@
897 goto err;
898
899 /* The ALTER Table is always in its own transaction */
900- error= transaction_services.ha_autocommit_or_rollback(session, false);
901+ error= transaction_services.autocommitOrRollback(session, false);
902 if (! session->endActiveTransaction())
903 error=1;
904 if (error)
905@@ -627,7 +627,7 @@
906 write_bin_log(session, session->query.c_str());
907
908 err:
909- (void) transaction_services.ha_autocommit_or_rollback(session, error);
910+ (void) transaction_services.autocommitOrRollback(session, error);
911 session->tablespace_op=false;
912
913 if (error == 0)
914@@ -1480,7 +1480,7 @@
915 Ensure that the new table is saved properly to disk so that we
916 can do a rename
917 */
918- if (transaction_services.ha_autocommit_or_rollback(session, false))
919+ if (transaction_services.autocommitOrRollback(session, false))
920 error=1;
921 if (! session->endActiveTransaction())
922 error=1;
923
924=== modified file 'drizzled/statement/release_savepoint.cc'
925--- drizzled/statement/release_savepoint.cc 2010-01-28 20:48:31 +0000
926+++ drizzled/statement/release_savepoint.cc 2010-04-06 19:16:34 +0000
927@@ -59,7 +59,7 @@
928 if (iter != savepoints.end())
929 {
930 NamedSavepoint &sv= *iter;
931- (void) transaction_services.ha_release_savepoint(session, sv);
932+ (void) transaction_services.releaseSavepoint(session, sv);
933 savepoints.erase(iter);
934 session->my_ok();
935 }
936
937=== modified file 'drizzled/statement/rollback_to_savepoint.cc'
938--- drizzled/statement/rollback_to_savepoint.cc 2010-02-06 03:22:59 +0000
939+++ drizzled/statement/rollback_to_savepoint.cc 2010-04-06 19:16:34 +0000
940@@ -73,7 +73,7 @@
941 first_savepoint_name.size()) == 0)
942 {
943 /* Found the named savepoint we want to rollback to */
944- (void) transaction_services.ha_rollback_to_savepoint(session, first_savepoint);
945+ (void) transaction_services.rollbackToSavepoint(session, first_savepoint);
946
947 if (session->transaction.all.hasModifiedNonTransData())
948 {
949@@ -111,7 +111,7 @@
950 /* Found the named savepoint we want to rollback to */
951 found= true;
952
953- (void) transaction_services.ha_rollback_to_savepoint(session, sv);
954+ (void) transaction_services.rollbackToSavepoint(session, sv);
955 }
956 if (found)
957 {
958
959=== modified file 'drizzled/statement/savepoint.cc'
960--- drizzled/statement/savepoint.cc 2010-01-28 20:48:31 +0000
961+++ drizzled/statement/savepoint.cc 2010-04-06 19:16:34 +0000
962@@ -65,13 +65,13 @@
963 if (iter != savepoints.end())
964 {
965 NamedSavepoint &sv= *iter;
966- (void) transaction_services.ha_release_savepoint(session, sv);
967+ (void) transaction_services.releaseSavepoint(session, sv);
968 savepoints.erase(iter);
969 }
970
971 NamedSavepoint newsv(session->lex->ident.str, session->lex->ident.length);
972
973- if (transaction_services.ha_savepoint(session, newsv))
974+ if (transaction_services.setSavepoint(session, newsv))
975 {
976 return true;
977 }
978
979=== modified file 'drizzled/transaction_services.cc'
980--- drizzled/transaction_services.cc 2010-03-17 03:11:34 +0000
981+++ drizzled/transaction_services.cc 2010-04-06 19:16:34 +0000
982@@ -427,65 +427,6 @@
983 }
984
985 /**
986- Check if we can skip the two-phase commit.
987-
988- A helper function to evaluate if two-phase commit is mandatory.
989- As a side effect, propagates the read-only/read-write flags
990- of the statement transaction to its enclosing normal transaction.
991-
992- @retval true we must run a two-phase commit. Returned
993- if we have at least two engines with read-write changes.
994- @retval false Don't need two-phase commit. Even if we have two
995- transactional engines, we can run two independent
996- commits if changes in one of the engines are read-only.
997-*/
998-static
999-bool
1000-ha_check_and_coalesce_trx_read_only(Session *session,
1001- TransactionContext::ResourceContexts &resource_contexts,
1002- bool normal_transaction)
1003-{
1004- /* The number of storage engines that have actual changes. */
1005- unsigned num_resources_modified_data= 0;
1006- ResourceContext *resource_context;
1007-
1008- for (TransactionContext::ResourceContexts::iterator it= resource_contexts.begin();
1009- it != resource_contexts.end();
1010- ++it)
1011- {
1012- resource_context= *it;
1013- if (resource_context->hasModifiedData())
1014- ++num_resources_modified_data;
1015-
1016- if (! normal_transaction)
1017- {
1018- ResourceContext *resource_context_normal= session->getResourceContext(resource_context->getMonitored(), true);
1019- assert(resource_context != resource_context_normal);
1020- /*
1021- Merge read-only/read-write information about statement
1022- transaction to its enclosing normal transaction. Do this
1023- only if in a real transaction -- that is, if we know
1024- that resource_context_all is registered in session->transaction.all.
1025- Since otherwise we only clutter the normal transaction flags.
1026- */
1027- if (resource_context_normal->isStarted()) /* false if autocommit. */
1028- resource_context_normal->coalesceWith(resource_context);
1029- }
1030- else if (num_resources_modified_data > 1)
1031- {
1032- /*
1033- It is a normal transaction, so we don't need to merge read/write
1034- information up, and the need for two-phase commit has been
1035- already established. Break the loop prematurely.
1036- */
1037- break;
1038- }
1039- }
1040- return num_resources_modified_data > 1;
1041-}
1042-
1043-
1044-/**
1045 @retval
1046 0 ok
1047 @retval
1048@@ -499,7 +440,7 @@
1049 stored functions or triggers. So we simply do nothing now.
1050 TODO: This should be fixed in later ( >= 5.1) releases.
1051 */
1052-int TransactionServices::ha_commit_trans(Session *session, bool normal_transaction)
1053+int TransactionServices::commitTransaction(Session *session, bool normal_transaction)
1054 {
1055 int error= 0, cookie= 0;
1056 /*
1057@@ -522,17 +463,18 @@
1058
1059 if (resource_contexts.empty() == false)
1060 {
1061- bool must_2pc;
1062-
1063 if (is_real_trans && wait_if_global_read_lock(session, 0, 0))
1064 {
1065- ha_rollback_trans(session, normal_transaction);
1066+ rollbackTransaction(session, normal_transaction);
1067 return 1;
1068 }
1069
1070- must_2pc= ha_check_and_coalesce_trx_read_only(session, resource_contexts, normal_transaction);
1071-
1072- if (! trans->no_2pc && must_2pc)
1073+ /*
1074+ * If replication is on, we do a PREPARE on the resource managers, push the
1075+ * Transaction message across the replication stream, and then COMMIT if the
1076+ * replication stream returned successfully.
1077+ */
1078+ if (shouldConstructMessages())
1079 {
1080 for (TransactionContext::ResourceContexts::iterator it= resource_contexts.begin();
1081 it != resource_contexts.end() && ! error;
1082@@ -563,14 +505,22 @@
1083 }
1084 }
1085 }
1086+ if (error == 0 && is_real_trans)
1087+ {
1088+ /*
1089+ * Push the constructed Transaction messages across to
1090+ * replicators and appliers.
1091+ */
1092+ error= commitTransactionMessage(session);
1093+ }
1094 if (error)
1095 {
1096- ha_rollback_trans(session, normal_transaction);
1097+ rollbackTransaction(session, normal_transaction);
1098 error= 1;
1099 goto end;
1100 }
1101 }
1102- error= ha_commit_one_phase(session, normal_transaction) ? (cookie ? 2 : 1) : 0;
1103+ error= commitPhaseOne(session, normal_transaction) ? (cookie ? 2 : 1) : 0;
1104 end:
1105 if (is_real_trans)
1106 start_waiting_global_read_lock(session);
1107@@ -582,7 +532,7 @@
1108 @note
1109 This function does not care about global read lock. A caller should.
1110 */
1111-int TransactionServices::ha_commit_one_phase(Session *session, bool normal_transaction)
1112+int TransactionServices::commitPhaseOne(Session *session, bool normal_transaction)
1113 {
1114 int error=0;
1115 TransactionContext *trans= normal_transaction ? &session->transaction.all : &session->transaction.stmt;
1116@@ -638,21 +588,10 @@
1117 }
1118 }
1119 trans->reset();
1120- if (error == 0)
1121- {
1122- if (is_real_trans)
1123- {
1124- /*
1125- * We commit the normal transaction by finalizing the transaction message
1126- * and propogating the message to all registered replicators.
1127- */
1128- commitTransactionMessage(session);
1129- }
1130- }
1131 return error;
1132 }
1133
1134-int TransactionServices::ha_rollback_trans(Session *session, bool normal_transaction)
1135+int TransactionServices::rollbackTransaction(Session *session, bool normal_transaction)
1136 {
1137 int error= 0;
1138 TransactionContext *trans= normal_transaction ? &session->transaction.all : &session->transaction.stmt;
1139@@ -751,20 +690,20 @@
1140 the user has used LOCK TABLES then that mechanism does not know to do the
1141 commit.
1142 */
1143-int TransactionServices::ha_autocommit_or_rollback(Session *session, int error)
1144+int TransactionServices::autocommitOrRollback(Session *session, int error)
1145 {
1146 if (session->transaction.stmt.getResourceContexts().empty() == false)
1147 {
1148 if (! error)
1149 {
1150- if (ha_commit_trans(session, false))
1151+ if (commitTransaction(session, false))
1152 error= 1;
1153 }
1154 else
1155 {
1156- (void) ha_rollback_trans(session, false);
1157+ (void) rollbackTransaction(session, false);
1158 if (session->transaction_rollback_request)
1159- (void) ha_rollback_trans(session, true);
1160+ (void) rollbackTransaction(session, true);
1161 }
1162
1163 session->variables.tx_isolation= session->session_tx_isolation;
1164@@ -772,51 +711,6 @@
1165 return error;
1166 }
1167
1168-/**
1169- return the list of XID's to a client, the same way SHOW commands do.
1170-
1171- @note
1172- I didn't find in XA specs that an RM cannot return the same XID twice,
1173- so mysql_xa_recover does not filter XID's to ensure uniqueness.
1174- It can be easily fixed later, if necessary.
1175-*/
1176-bool TransactionServices::mysql_xa_recover(Session *session)
1177-{
1178- List<Item> field_list;
1179- int i= 0;
1180- XID_STATE *xs;
1181-
1182- field_list.push_back(new Item_int("formatID", 0, MY_INT32_NUM_DECIMAL_DIGITS));
1183- field_list.push_back(new Item_int("gtrid_length", 0, MY_INT32_NUM_DECIMAL_DIGITS));
1184- field_list.push_back(new Item_int("bqual_length", 0, MY_INT32_NUM_DECIMAL_DIGITS));
1185- field_list.push_back(new Item_empty_string("data", DRIZZLE_XIDDATASIZE));
1186-
1187- if (session->client->sendFields(&field_list))
1188- return 1;
1189-
1190- pthread_mutex_lock(&LOCK_xid_cache);
1191- while ((xs= (XID_STATE*)hash_element(&xid_cache, i++)))
1192- {
1193- if (xs->xa_state==XA_PREPARED)
1194- {
1195- session->client->store((int64_t)xs->xid.formatID);
1196- session->client->store((int64_t)xs->xid.gtrid_length);
1197- session->client->store((int64_t)xs->xid.bqual_length);
1198- session->client->store(xs->xid.data,
1199- xs->xid.gtrid_length+xs->xid.bqual_length);
1200- if (session->client->flush())
1201- {
1202- pthread_mutex_unlock(&LOCK_xid_cache);
1203- return 1;
1204- }
1205- }
1206- }
1207-
1208- pthread_mutex_unlock(&LOCK_xid_cache);
1209- session->my_eof();
1210- return 0;
1211-}
1212-
1213 struct ResourceContextCompare : public std::binary_function<ResourceContext *, ResourceContext *, bool>
1214 {
1215 result_type operator()(const ResourceContext *lhs, const ResourceContext *rhs) const
1216@@ -827,7 +721,7 @@
1217 }
1218 };
1219
1220-int TransactionServices::ha_rollback_to_savepoint(Session *session, NamedSavepoint &sv)
1221+int TransactionServices::rollbackToSavepoint(Session *session, NamedSavepoint &sv)
1222 {
1223 int error= 0;
1224 TransactionContext *trans= &session->transaction.all;
1225@@ -923,7 +817,7 @@
1226 section "4.33.4 SQL-statements and transaction states",
1227 NamedSavepoint is *not* transaction-initiating SQL-statement
1228 */
1229-int TransactionServices::ha_savepoint(Session *session, NamedSavepoint &sv)
1230+int TransactionServices::setSavepoint(Session *session, NamedSavepoint &sv)
1231 {
1232 int error= 0;
1233 TransactionContext *trans= &session->transaction.all;
1234@@ -961,7 +855,7 @@
1235 return error;
1236 }
1237
1238-int TransactionServices::ha_release_savepoint(Session *session, NamedSavepoint &sv)
1239+int TransactionServices::releaseSavepoint(Session *session, NamedSavepoint &sv)
1240 {
1241 int error= 0;
1242
1243@@ -988,6 +882,12 @@
1244 return error;
1245 }
1246
1247+bool TransactionServices::shouldConstructMessages()
1248+{
1249+ ReplicationServices &replication_services= ReplicationServices::singleton();
1250+ return replication_services.isActive();
1251+}
1252+
1253 message::Transaction *TransactionServices::getActiveTransactionMessage(Session *in_session)
1254 {
1255 message::Transaction *transaction= in_session->getTransactionMessage();
1256@@ -1013,7 +913,7 @@
1257 {
1258 message::TransactionContext *trx= in_transaction.mutable_transaction_context();
1259 trx->set_server_id(in_session->getServerId());
1260- trx->set_transaction_id(in_session->getQueryId());
1261+ trx->set_transaction_id(getNextTransactionId());
1262 trx->set_start_timestamp(in_session->getCurrentTimestamp());
1263 }
1264
1265@@ -1032,11 +932,11 @@
1266 in_session->setTransactionMessage(NULL);
1267 }
1268
1269-void TransactionServices::commitTransactionMessage(Session *in_session)
1270+int TransactionServices::commitTransactionMessage(Session *in_session)
1271 {
1272 ReplicationServices &replication_services= ReplicationServices::singleton();
1273 if (! replication_services.isActive())
1274- return;
1275+ return 0;
1276
1277 /* If there is an active statement message, finalize it */
1278 message::Statement *statement= in_session->getStatementMessage();
1279@@ -1046,15 +946,17 @@
1280 finalizeStatementMessage(*statement, in_session);
1281 }
1282 else
1283- return; /* No data modification occurred inside the transaction */
1284+ return 0; /* No data modification occurred inside the transaction */
1285
1286 message::Transaction* transaction= getActiveTransactionMessage(in_session);
1287
1288 finalizeTransactionMessage(*transaction, in_session);
1289
1290- replication_services.pushTransactionMessage(*transaction);
1291+ plugin::ReplicationReturnCode result= replication_services.pushTransactionMessage(*in_session, *transaction);
1292
1293 cleanupTransactionMessage(transaction, in_session);
1294+
1295+ return static_cast<int>(result);
1296 }
1297
1298 void TransactionServices::initStatementMessage(message::Statement &statement,
1299@@ -1114,7 +1016,7 @@
1300
1301 finalizeTransactionMessage(*transaction, in_session);
1302
1303- replication_services.pushTransactionMessage(*transaction);
1304+ (void) replication_services.pushTransactionMessage(*in_session, *transaction);
1305 }
1306 cleanupTransactionMessage(transaction, in_session);
1307 }
1308@@ -1552,7 +1454,7 @@
1309
1310 finalizeTransactionMessage(*transaction, in_session);
1311
1312- replication_services.pushTransactionMessage(*transaction);
1313+ (void) replication_services.pushTransactionMessage(*in_session, *transaction);
1314
1315 cleanupTransactionMessage(transaction, in_session);
1316
1317@@ -1582,7 +1484,7 @@
1318
1319 finalizeTransactionMessage(*transaction, in_session);
1320
1321- replication_services.pushTransactionMessage(*transaction);
1322+ (void) replication_services.pushTransactionMessage(*in_session, *transaction);
1323
1324 cleanupTransactionMessage(transaction, in_session);
1325
1326@@ -1611,7 +1513,7 @@
1327
1328 finalizeTransactionMessage(*transaction, in_session);
1329
1330- replication_services.pushTransactionMessage(*transaction);
1331+ (void) replication_services.pushTransactionMessage(*in_session, *transaction);
1332
1333 cleanupTransactionMessage(transaction, in_session);
1334 }
1335@@ -1647,7 +1549,7 @@
1336
1337 finalizeTransactionMessage(*transaction, in_session);
1338
1339- replication_services.pushTransactionMessage(*transaction);
1340+ (void) replication_services.pushTransactionMessage(*in_session, *transaction);
1341
1342 cleanupTransactionMessage(transaction, in_session);
1343 }
1344@@ -1682,7 +1584,7 @@
1345
1346 finalizeTransactionMessage(*transaction, in_session);
1347
1348- replication_services.pushTransactionMessage(*transaction);
1349+ (void) replication_services.pushTransactionMessage(*in_session, *transaction);
1350
1351 cleanupTransactionMessage(transaction, in_session);
1352 }
1353@@ -1702,7 +1604,7 @@
1354
1355 finalizeTransactionMessage(*transaction, in_session);
1356
1357- replication_services.pushTransactionMessage(*transaction);
1358+ (void) replication_services.pushTransactionMessage(*in_session, *transaction);
1359
1360 cleanupTransactionMessage(transaction, in_session);
1361 }
1362
1363=== modified file 'drizzled/transaction_services.h'
1364--- drizzled/transaction_services.h 2010-03-16 21:30:44 +0000
1365+++ drizzled/transaction_services.h 2010-04-06 19:16:34 +0000
1366@@ -25,6 +25,7 @@
1367 #ifndef DRIZZLED_TRANSACTION_SERVICES_H
1368 #define DRIZZLED_TRANSACTION_SERVICES_H
1369
1370+#include "drizzled/atomics.h"
1371 #include "drizzled/message/transaction.pb.h"
1372
1373 namespace drizzled
1374@@ -49,10 +50,17 @@
1375 {
1376 public:
1377 static const size_t DEFAULT_RECORD_SIZE= 100;
1378+ typedef uint64_t TransactionId;
1379 /**
1380 * Constructor
1381 */
1382- TransactionServices() {}
1383+ TransactionServices()
1384+ {
1385+ /**
1386+ * @todo set transaction ID to the last one from an applier...
1387+ */
1388+ current_transaction_id= 0;
1389+ }
1390
1391 /**
1392 * Singleton method
1393@@ -63,6 +71,12 @@
1394 static TransactionServices transaction_services;
1395 return transaction_services;
1396 }
1397+
1398+ /**
1399+ * Returns true if the transaction manager should construct
1400+ * Transaction and Statement messages, false otherwise.
1401+ */
1402+ bool shouldConstructMessages();
1403 /**
1404 * Method which returns the active Transaction message
1405 * for the supplied Session. If one is not found, a new Transaction
1406@@ -189,7 +203,7 @@
1407 *
1408 * @param Pointer to the Session committing the transaction
1409 */
1410- void commitTransactionMessage(Session *in_session);
1411+ int commitTransactionMessage(Session *in_session);
1412 /**
1413 * Marks the current active transaction message as being rolled back and
1414 * pushes the transaction message out to replicators.
1415@@ -293,18 +307,17 @@
1416 */
1417 void rawStatement(Session *in_session, const std::string &query);
1418 /* transactions: interface to plugin::StorageEngine functions */
1419- int ha_commit_one_phase(Session *session, bool all);
1420- int ha_rollback_trans(Session *session, bool all);
1421+ int commitPhaseOne(Session *session, bool all);
1422+ int rollbackTransaction(Session *session, bool all);
1423
1424 /* transactions: these functions never call plugin::StorageEngine functions directly */
1425- int ha_commit_trans(Session *session, bool all);
1426- int ha_autocommit_or_rollback(Session *session, int error);
1427+ int commitTransaction(Session *session, bool all);
1428+ int autocommitOrRollback(Session *session, int error);
1429
1430 /* savepoints */
1431- int ha_rollback_to_savepoint(Session *session, NamedSavepoint &sv);
1432- int ha_savepoint(Session *session, NamedSavepoint &sv);
1433- int ha_release_savepoint(Session *session, NamedSavepoint &sv);
1434- bool mysql_xa_recover(Session *session);
1435+ int rollbackToSavepoint(Session *session, NamedSavepoint &sv);
1436+ int setSavepoint(Session *session, NamedSavepoint &sv);
1437+ int releaseSavepoint(Session *session, NamedSavepoint &sv);
1438
1439 /**
1440 * Marks a storage engine as participating in a statement
1441@@ -383,6 +396,23 @@
1442 plugin::MonitoredInTransaction *monitored,
1443 plugin::TransactionalStorageEngine *engine,
1444 plugin::XaResourceManager *resource_manager);
1445+ TransactionId getNextTransactionId()
1446+ {
1447+ return current_transaction_id.increment();
1448+ }
1449+ TransactionId getCurrentTransactionId()
1450+ {
1451+ return current_transaction_id;
1452+ }
1453+ /**
1454+ * DEBUG ONLY. See plugin::TransactionLog::truncate()
1455+ */
1456+ void resetTransactionId()
1457+ {
1458+ current_transaction_id= 0;
1459+ }
1460+private:
1461+ atomic<TransactionId> current_transaction_id;
1462 };
1463
1464 } /* namespace drizzled */
1465
1466=== modified file 'plugin/default_replicator/default_replicator.cc'
1467--- plugin/default_replicator/default_replicator.cc 2010-03-06 02:08:13 +0000
1468+++ plugin/default_replicator/default_replicator.cc 2010-04-06 19:16:34 +0000
1469@@ -62,13 +62,16 @@
1470 sysvar_default_replicator_enable= false;
1471 }
1472
1473-void DefaultReplicator::replicate(plugin::TransactionApplier *in_applier, message::Transaction &to_replicate)
1474+plugin::ReplicationReturnCode
1475+DefaultReplicator::replicate(plugin::TransactionApplier *in_applier,
1476+ Session &in_session,
1477+ message::Transaction &to_replicate)
1478 {
1479 /*
1480 * We do absolutely nothing but call the applier's apply() method, passing
1481 * along the supplied Transaction. Yep, told you it was simple...
1482 */
1483- in_applier->apply(to_replicate);
1484+ return in_applier->apply(in_session, to_replicate);
1485 }
1486
1487 static DefaultReplicator *default_replicator= NULL; /* The singleton replicator */
1488
1489=== modified file 'plugin/default_replicator/default_replicator.h'
1490--- plugin/default_replicator/default_replicator.h 2009-10-19 19:19:38 +0000
1491+++ plugin/default_replicator/default_replicator.h 2010-04-06 19:16:34 +0000
1492@@ -66,9 +66,14 @@
1493 * the supplied message to their own controlled memory storage
1494 * area.
1495 *
1496+ * @param Applier to replicate to
1497+ * @param Session descriptor
1498 * @param Transaction message to be replicated
1499 */
1500- void replicate(drizzled::plugin::TransactionApplier *in_applier, drizzled::message::Transaction &to_replicate);
1501+ drizzled::plugin::ReplicationReturnCode
1502+ replicate(drizzled::plugin::TransactionApplier *in_applier,
1503+ drizzled::Session &in_session,
1504+ drizzled::message::Transaction &to_replicate);
1505
1506 };
1507
1508
1509=== modified file 'plugin/filtered_replicator/filtered_replicator.cc'
1510--- plugin/filtered_replicator/filtered_replicator.cc 2010-03-06 02:08:13 +0000
1511+++ plugin/filtered_replicator/filtered_replicator.cc 2010-04-06 19:16:34 +0000
1512@@ -214,8 +214,10 @@
1513 sysvar_filtered_replicator_enabled= false;
1514 }
1515
1516-void FilteredReplicator::replicate(plugin::TransactionApplier *in_applier,
1517- message::Transaction &to_replicate)
1518+plugin::ReplicationReturnCode
1519+FilteredReplicator::replicate(plugin::TransactionApplier *in_applier,
1520+ Session &in_session,
1521+ message::Transaction &to_replicate)
1522 {
1523 string schema_name;
1524 string table_name;
1525@@ -287,8 +289,9 @@
1526 */
1527 message::TransactionContext *tc= filtered_transaction.mutable_transaction_context();
1528 *tc= to_replicate.transaction_context(); /* copy construct */
1529- in_applier->apply(filtered_transaction);
1530+ return in_applier->apply(in_session, filtered_transaction);
1531 }
1532+ return plugin::SUCCESS;
1533 }
1534
1535 void FilteredReplicator::populateFilter(std::string input,
1536
1537=== modified file 'plugin/filtered_replicator/filtered_replicator.h'
1538--- plugin/filtered_replicator/filtered_replicator.h 2009-10-19 19:19:38 +0000
1539+++ plugin/filtered_replicator/filtered_replicator.h 2010-04-06 19:16:34 +0000
1540@@ -84,10 +84,14 @@
1541 * the supplied message to their own controlled memory storage
1542 * area.
1543 *
1544+ * @param Applier to replicate to
1545+ * @param Session descriptor
1546 * @param Transaction message to be replicated
1547 */
1548- void replicate(drizzled::plugin::TransactionApplier *in_applier,
1549- drizzled::message::Transaction &to_replicate);
1550+ drizzled::plugin::ReplicationReturnCode
1551+ replicate(drizzled::plugin::TransactionApplier *in_applier,
1552+ drizzled::Session &in_session,
1553+ drizzled::message::Transaction &to_replicate);
1554
1555 /**
1556 * Populate the vector of schemas to filter from the
1557
1558=== modified file 'plugin/innobase/handler/ha_innodb.cc'
1559--- plugin/innobase/handler/ha_innodb.cc 2010-04-01 15:54:57 +0000
1560+++ plugin/innobase/handler/ha_innodb.cc 2010-04-06 19:16:34 +0000
1561@@ -2259,11 +2259,6 @@
1562 pthread_mutex_unlock(&commit_cond_m);
1563 }
1564
1565- if (trx->conc_state == TRX_PREPARED) {
1566-
1567- pthread_mutex_unlock(&prepare_commit_mutex);
1568- }
1569-
1570 /* Now do a write + flush of logs. */
1571 trx_commit_complete_for_mysql(trx);
1572
1573@@ -8150,31 +8145,6 @@
1574
1575 srv_active_wake_master_thread();
1576
1577- if (all || !session_test_options(session, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))
1578- {
1579-
1580- /* For ibbackup to work the order of transactions in binlog
1581- and InnoDB must be the same. Consider the situation
1582-
1583- thread1> prepare; write to binlog; ...
1584- <context switch>
1585- thread2> prepare; write to binlog; commit
1586- thread1> ... commit
1587-
1588- To ensure this will not happen we're taking the mutex on
1589- prepare, and releasing it on commit.
1590-
1591- Note: only do it for normal commits, done via ha_commit_trans.
1592- If 2pc protocol is executed by external transaction
1593- coordinator, it will be just a regular MySQL client
1594- executing XA PREPARE and XA COMMIT commands.
1595- In this case we cannot know how many minutes or hours
1596- will be between XA PREPARE and XA COMMIT, and we don't want
1597- to block for undefined period of time.
1598- */
1599- pthread_mutex_lock(&prepare_commit_mutex);
1600- trx->conc_state = TRX_PREPARED;
1601- }
1602 return(error);
1603 }
1604
1605
1606=== modified file 'plugin/transaction_log/module.cc'
1607--- plugin/transaction_log/module.cc 2010-03-15 22:27:00 +0000
1608+++ plugin/transaction_log/module.cc 2010-04-06 19:16:34 +0000
1609@@ -61,6 +61,10 @@
1610 * in truncating the log file.
1611 */
1612 static bool sysvar_transaction_log_truncate_debug= false;
1613+/**
1614+ * The name of the main transaction log file on disk. With no prefix,
1615+ * this goes into Drizzle's $datadir.
1616+ */
1617 static const char DEFAULT_LOG_FILE_PATH[]= "transaction.log"; /* In datadir... */
1618 /**
1619 * Transaction Log plugin system variable - Should we write a CRC32 checksum for
1620@@ -76,6 +80,11 @@
1621 * TransactionLog::SYNC_METHOD_EVERY_SECOND == 2 ... sync at most once a second
1622 */
1623 static uint32_t sysvar_transaction_log_sync_method= 0;
1624+/**
1625+ * Transaction Log plugin system variable - Number of slots to create
1626+ * for managing write buffers
1627+ */
1628+static uint32_t sysvar_transaction_log_num_write_buffers= 8;
1629
1630 /** DATA_DICTIONARY views */
1631 static TransactionLogTool *transaction_log_tool;
1632@@ -99,7 +108,8 @@
1633 if (sysvar_transaction_log_enabled)
1634 {
1635 transaction_log= new (nothrow) TransactionLog(string(sysvar_transaction_log_file),
1636- sysvar_transaction_log_sync_method);
1637+ sysvar_transaction_log_sync_method,
1638+ sysvar_transaction_log_checksum_enabled);
1639
1640 if (transaction_log == NULL)
1641 {
1642@@ -120,7 +130,7 @@
1643 /* Create the applier plugin and register it */
1644 transaction_log_applier= new (nothrow) TransactionLogApplier("transaction_log_applier",
1645 *transaction_log,
1646- sysvar_transaction_log_checksum_enabled);
1647+ sysvar_transaction_log_num_write_buffers);
1648 if (transaction_log_applier == NULL)
1649 {
1650 errmsg_printf(ERRMSG_LVL_ERROR, _("Failed to allocate the TransactionLogApplier instance. Got error: %s\n"),
1651@@ -212,7 +222,7 @@
1652 set_truncate_debug, /* update func */
1653 false /* default */);
1654
1655-static DRIZZLE_SYSVAR_STR(log_file,
1656+static DRIZZLE_SYSVAR_STR(file,
1657 sysvar_transaction_log_file,
1658 PLUGIN_VAR_READONLY,
1659 N_("Path to the file to use for transaction log"),
1660@@ -241,12 +251,24 @@
1661 2,
1662 0);
1663
1664+static DRIZZLE_SYSVAR_UINT(num_write_buffers,
1665+ sysvar_transaction_log_num_write_buffers,
1666+ PLUGIN_VAR_OPCMDARG,
1667+ N_("Number of slots for in-memory write buffers (default=8)."),
1668+ NULL, /* check func */
1669+ NULL, /* update func */
1670+ 8, /* default */
1671+ 4,
1672+ 8192,
1673+ 0);
1674+
1675 static drizzle_sys_var* sys_variables[]= {
1676 DRIZZLE_SYSVAR(enable),
1677 DRIZZLE_SYSVAR(truncate_debug),
1678- DRIZZLE_SYSVAR(log_file),
1679+ DRIZZLE_SYSVAR(file),
1680 DRIZZLE_SYSVAR(enable_checksum),
1681 DRIZZLE_SYSVAR(sync_method),
1682+ DRIZZLE_SYSVAR(num_write_buffers),
1683 NULL
1684 };
1685
1686
1687=== modified file 'plugin/transaction_log/plugin.ini'
1688--- plugin/transaction_log/plugin.ini 2010-03-05 18:08:49 +0000
1689+++ plugin/transaction_log/plugin.ini 2010-04-06 19:16:34 +0000
1690@@ -6,8 +6,8 @@
1691 title=Transaction Log
1692 description=Log of Transaction Messages
1693 load_by_default=yes
1694-sources= background_worker.cc hexdump_transaction_message.cc module.cc print_transaction_message.cc transaction_log.cc transaction_log_applier.cc transaction_log_entry.cc transaction_log_index.cc transaction_log_reader.cc data_dictionary_schema.cc
1695-headers= background_worker.h hexdump_transaction_message.h print_transaction_message.h transaction_log.h transaction_log_applier.h transaction_log_entry.h transaction_log_index.h transaction_log_reader.h data_dictionary_schema.h
1696+sources= background_worker.cc hexdump_transaction_message.cc module.cc print_transaction_message.cc transaction_log.cc transaction_log_applier.cc transaction_log_entry.cc transaction_log_index.cc transaction_log_reader.cc data_dictionary_schema.cc write_buffer.cc
1697+headers= background_worker.h hexdump_transaction_message.h print_transaction_message.h transaction_log.h transaction_log_applier.h transaction_log_entry.h transaction_log_index.h transaction_log_reader.h data_dictionary_schema.h write_buffer.h
1698 libs=${top_builddir}/drizzled/algorithm/libhash.la
1699 libadd=$(LIBZ)
1700 cxxflags=${PROTOSKIP_WARNINGS}
1701
1702=== added file 'plugin/transaction_log/tests/r/ddl_transaction_id.result'
1703--- plugin/transaction_log/tests/r/ddl_transaction_id.result 1970-01-01 00:00:00 +0000
1704+++ plugin/transaction_log/tests/r/ddl_transaction_id.result 2010-04-06 19:16:34 +0000
1705@@ -0,0 +1,36 @@
1706+SET GLOBAL transaction_log_truncate_debug= true;
1707+DROP TABLE IF EXISTS t1;
1708+CREATE TABLE t1 (
1709+id INT NOT NULL PRIMARY KEY
1710+, padding VARCHAR(200) NOT NULL
1711+);
1712+INSERT INTO t1 VALUES (1, "I love testing.");
1713+INSERT INTO t1 VALUES (2, "I hate testing.");
1714+SELECT * FROM t1;
1715+id padding
1716+1 I love testing.
1717+2 I hate testing.
1718+ALTER TABLE t1 CHANGE COLUMN padding less_padding VARCHAR(180) NOT NULL;
1719+SELECT * FROM t1 WHERE id = 3;
1720+id less_padding
1721+DROP TABLE t1;
1722+SELECT * FROM DATA_DICTIONARY.TRANSACTION_LOG;
1723+FILE_NAME FILE_LENGTH NUM_LOG_ENTRIES NUM_TRANSACTIONS MIN_TRANSACTION_ID MAX_TRANSACTION_ID MIN_END_TIMESTAMP MAX_END_TIMESTAMP INDEX_SIZE_IN_BYTES
1724+transaction.log X 6 6 1 6 X X X
1725+SELECT * FROM DATA_DICTIONARY.TRANSACTION_LOG_ENTRIES;
1726+ENTRY_OFFSET ENTRY_TYPE ENTRY_LENGTH
1727+X TRANSACTION X
1728+X TRANSACTION X
1729+X TRANSACTION X
1730+X TRANSACTION X
1731+X TRANSACTION X
1732+X TRANSACTION X
1733+SELECT * FROM DATA_DICTIONARY.TRANSACTION_LOG_TRANSACTIONS;
1734+ENTRY_OFFSET TRANSACTION_ID SERVER_ID START_TIMESTAMP END_TIMESTAMP NUM_STATEMENTS CHECKSUM
1735+X 1 1 X X 1 0
1736+X 2 1 X X 1 0
1737+X 3 1 X X 1 0
1738+X 4 1 X X 1 0
1739+X 5 1 X X 1 0
1740+X 6 1 X X 1 0
1741+SET GLOBAL transaction_log_truncate_debug= true;
1742
1743=== modified file 'plugin/transaction_log/tests/r/filtered_replicator.result'
1744--- plugin/transaction_log/tests/r/filtered_replicator.result 2010-02-27 23:47:33 +0000
1745+++ plugin/transaction_log/tests/r/filtered_replicator.result 2010-04-06 19:16:34 +0000
1746@@ -6,6 +6,7 @@
1747 );
1748 INSERT INTO t1 VALUES (1, "I love testing.");
1749 INSERT INTO t1 VALUES (2, "I hate testing.");
1750+DROP TABLE t1;
1751 DROP TABLE IF EXISTS t1;
1752 CREATE TABLE t1 (
1753 id INT NOT NULL PRIMARY KEY
1754@@ -134,6 +135,9 @@
1755 INSERT INTO `test`.`t1` (`id`,`padding`) VALUES (2,'I hate testing.');
1756 COMMIT;
1757 START TRANSACTION;
1758+DROP TABLE `test`.`t1`;
1759+COMMIT;
1760+START TRANSACTION;
1761 DROP TABLE IF EXISTS `test`.`t1`;
1762 COMMIT;
1763 START TRANSACTION;
1764
1765=== modified file 'plugin/transaction_log/tests/r/information_schema.result'
1766--- plugin/transaction_log/tests/r/information_schema.result 2010-03-05 04:44:58 +0000
1767+++ plugin/transaction_log/tests/r/information_schema.result 2010-04-06 19:16:34 +0000
1768@@ -6,19 +6,22 @@
1769 );
1770 INSERT INTO t1 VALUES (1, "I love testing.");
1771 INSERT INTO t1 VALUES (2, "I hate testing.");
1772+DROP TABLE t1;
1773 SELECT * FROM DATA_DICTIONARY.TRANSACTION_LOG;
1774 FILE_NAME FILE_LENGTH NUM_LOG_ENTRIES NUM_TRANSACTIONS MIN_TRANSACTION_ID MAX_TRANSACTION_ID MIN_END_TIMESTAMP MAX_END_TIMESTAMP INDEX_SIZE_IN_BYTES
1775-transaction.log X 4 4 X X X X X
1776+transaction.log X 5 5 1 5 X X X
1777 SELECT * FROM DATA_DICTIONARY.TRANSACTION_LOG_ENTRIES;
1778 ENTRY_OFFSET ENTRY_TYPE ENTRY_LENGTH
1779 X TRANSACTION X
1780 X TRANSACTION X
1781 X TRANSACTION X
1782 X TRANSACTION X
1783+X TRANSACTION X
1784 SELECT * FROM DATA_DICTIONARY.TRANSACTION_LOG_TRANSACTIONS;
1785 ENTRY_OFFSET TRANSACTION_ID SERVER_ID START_TIMESTAMP END_TIMESTAMP NUM_STATEMENTS CHECKSUM
1786-X X 1 X X 1 0
1787-X X 1 X X 1 0
1788-X X 1 X X 1 0
1789-X X 1 X X 1 0
1790+X 1 1 X X 1 0
1791+X 2 1 X X 1 0
1792+X 3 1 X X 1 0
1793+X 4 1 X X 1 0
1794+X 5 1 X X 1 0
1795 SET GLOBAL transaction_log_truncate_debug= true;
1796
1797=== modified file 'plugin/transaction_log/tests/r/insert.result'
1798--- plugin/transaction_log/tests/r/insert.result 2010-02-27 23:47:33 +0000
1799+++ plugin/transaction_log/tests/r/insert.result 2010-04-06 19:16:34 +0000
1800@@ -5,6 +5,7 @@
1801 );
1802 INSERT INTO t1 VALUES (1, "I love testing.");
1803 INSERT INTO t1 VALUES (2, "I hate testing.");
1804+DROP TABLE t1;
1805 START TRANSACTION;
1806 DROP TABLE IF EXISTS `test`.`t1`;
1807 COMMIT;
1808@@ -17,4 +18,7 @@
1809 START TRANSACTION;
1810 INSERT INTO `test`.`t1` (`id`,`padding`) VALUES (2,'I hate testing.');
1811 COMMIT;
1812+START TRANSACTION;
1813+DROP TABLE `test`.`t1`;
1814+COMMIT;
1815 SET GLOBAL transaction_log_truncate_debug= true;
1816
1817=== modified file 'plugin/transaction_log/tests/r/truncate_log.result'
1818--- plugin/transaction_log/tests/r/truncate_log.result 2010-03-05 04:44:58 +0000
1819+++ plugin/transaction_log/tests/r/truncate_log.result 2010-04-06 19:16:34 +0000
1820@@ -6,6 +6,7 @@
1821 );
1822 INSERT INTO t1 VALUES (1, "I love testing.");
1823 INSERT INTO t1 VALUES (2, "I hate testing.");
1824-24var/master-data/transaction.log
1825+DROP TABLE t1;
1826+29var/master-data/transaction.log
1827 SET GLOBAL transaction_log_truncate_debug= true;
1828 0var/master-data/transaction.log
1829
1830=== modified file 'plugin/transaction_log/tests/r/udf_print_transaction_message.result'
1831--- plugin/transaction_log/tests/r/udf_print_transaction_message.result 2010-02-11 04:18:03 +0000
1832+++ plugin/transaction_log/tests/r/udf_print_transaction_message.result 2010-04-06 19:16:34 +0000
1833@@ -5,6 +5,7 @@
1834 );
1835 INSERT INTO t1 VALUES (1, "I love testing.");
1836 INSERT INTO t1 VALUES (2, "I hate testing.");
1837+DROP TABLE t1;
1838 SELECT LENGTH(PRINT_TRANSACTION_MESSAGE('transaction.log', ENTRY_OFFSET)) > 0 as checked
1839 FROM DATA_DICTIONARY.TRANSACTION_LOG_ENTRIES
1840 LIMIT 1;
1841
1842=== renamed file 'plugin/transaction_log/tests/r/sync_method_every_write.result' => 'plugin/transaction_log/tests/r/variables.result'
1843--- plugin/transaction_log/tests/r/sync_method_every_write.result 2010-02-05 08:11:15 +0000
1844+++ plugin/transaction_log/tests/r/variables.result 2010-04-06 19:16:34 +0000
1845@@ -3,6 +3,7 @@
1846 VARIABLE_NAME VARIABLE_VALUE
1847 transaction_log_enable ON
1848 transaction_log_enable_checksum OFF
1849-transaction_log_log_file transaction.log
1850+transaction_log_file transaction.log
1851+transaction_log_num_write_buffers 8
1852 transaction_log_sync_method 1
1853 transaction_log_truncate_debug OFF
1854
1855=== added file 'plugin/transaction_log/tests/t/ddl_transaction_id-master.opt'
1856--- plugin/transaction_log/tests/t/ddl_transaction_id-master.opt 1970-01-01 00:00:00 +0000
1857+++ plugin/transaction_log/tests/t/ddl_transaction_id-master.opt 2010-04-06 19:16:34 +0000
1858@@ -0,0 +1,1 @@
1859+--default-replicator-enable --transaction-log-enable --scheduler=multi_thread
1860
1861=== added file 'plugin/transaction_log/tests/t/ddl_transaction_id.inc'
1862--- plugin/transaction_log/tests/t/ddl_transaction_id.inc 1970-01-01 00:00:00 +0000
1863+++ plugin/transaction_log/tests/t/ddl_transaction_id.inc 2010-04-06 19:16:34 +0000
1864@@ -0,0 +1,29 @@
1865+#
1866+# Tests ordering of transaction ID, that SELECT queries
1867+# do not increment the transaction ID, and that the proper
1868+# generation of transaction IDs is done for DDL operations.
1869+#
1870+
1871+--disable_warnings
1872+DROP TABLE IF EXISTS t1;
1873+--enable_warnings
1874+
1875+CREATE TABLE t1 (
1876+ id INT NOT NULL PRIMARY KEY
1877+, padding VARCHAR(200) NOT NULL
1878+);
1879+
1880+INSERT INTO t1 VALUES (1, "I love testing.");
1881+INSERT INTO t1 VALUES (2, "I hate testing.");
1882+
1883+SELECT * FROM t1;
1884+
1885+ALTER TABLE t1 CHANGE COLUMN padding less_padding VARCHAR(180) NOT NULL;
1886+
1887+#
1888+# Should be no result here...
1889+#
1890+
1891+SELECT * FROM t1 WHERE id = 3;
1892+
1893+DROP TABLE t1;
1894
1895=== added file 'plugin/transaction_log/tests/t/ddl_transaction_id.test'
1896--- plugin/transaction_log/tests/t/ddl_transaction_id.test 1970-01-01 00:00:00 +0000
1897+++ plugin/transaction_log/tests/t/ddl_transaction_id.test 2010-04-06 19:16:34 +0000
1898@@ -0,0 +1,14 @@
1899+# Truncate the log file to reset for the next test
1900+--source ../plugin/transaction_log/tests/t/truncate_log.inc
1901+
1902+--source ../plugin/transaction_log/tests/t/ddl_transaction_id.inc
1903+
1904+--replace_column 2 X 7 X 8 X 9 X
1905+SELECT * FROM DATA_DICTIONARY.TRANSACTION_LOG;
1906+--replace_column 1 X 3 X
1907+SELECT * FROM DATA_DICTIONARY.TRANSACTION_LOG_ENTRIES;
1908+--replace_column 1 X 4 X 5 X
1909+SELECT * FROM DATA_DICTIONARY.TRANSACTION_LOG_TRANSACTIONS;
1910+
1911+# Truncate the log file to reset for the next test
1912+--source ../plugin/transaction_log/tests/t/truncate_log.inc
1913
1914=== modified file 'plugin/transaction_log/tests/t/information_schema.test'
1915--- plugin/transaction_log/tests/t/information_schema.test 2010-03-05 04:44:58 +0000
1916+++ plugin/transaction_log/tests/t/information_schema.test 2010-04-06 19:16:34 +0000
1917@@ -13,11 +13,11 @@
1918 # that the information contained in them matches
1919 # the transaction log.
1920
1921---replace_column 2 X 5 X 6 X 7 X 8 X 9 X
1922+--replace_column 2 X 7 X 8 X 9 X
1923 SELECT * FROM DATA_DICTIONARY.TRANSACTION_LOG;
1924 --replace_column 1 X 3 X
1925 SELECT * FROM DATA_DICTIONARY.TRANSACTION_LOG_ENTRIES;
1926---replace_column 1 X 2 X 4 X 5 X
1927+--replace_column 1 X 4 X 5 X
1928 SELECT * FROM DATA_DICTIONARY.TRANSACTION_LOG_TRANSACTIONS;
1929
1930 # Truncate the log file to reset for the next test
1931
1932=== modified file 'plugin/transaction_log/tests/t/insert.inc'
1933--- plugin/transaction_log/tests/t/insert.inc 2009-12-11 19:08:29 +0000
1934+++ plugin/transaction_log/tests/t/insert.inc 2010-04-06 19:16:34 +0000
1935@@ -18,3 +18,5 @@
1936
1937 INSERT INTO t1 VALUES (1, "I love testing.");
1938 INSERT INTO t1 VALUES (2, "I hate testing.");
1939+
1940+DROP TABLE t1;
1941
1942=== renamed file 'plugin/transaction_log/tests/t/sync_method_every_write-master.opt' => 'plugin/transaction_log/tests/t/variables-master.opt'
1943=== renamed file 'plugin/transaction_log/tests/t/sync_method_every_write.test' => 'plugin/transaction_log/tests/t/variables.test'
1944=== modified file 'plugin/transaction_log/transaction_log.cc'
1945--- plugin/transaction_log/transaction_log.cc 2010-03-10 18:22:56 +0000
1946+++ plugin/transaction_log/transaction_log.cc 2010-04-06 19:16:34 +0000
1947@@ -6,7 +6,7 @@
1948 *
1949 * Authors:
1950 *
1951- * Jay Pipes <jaypipes@gmail.com.com>
1952+ * Jay Pipes <jaypipes@gmail.com.com>
1953 *
1954 * This program is free software; you can redistribute it and/or modify
1955 * it under the terms of the GNU General Public License as published by
1956@@ -30,31 +30,37 @@
1957 *
1958 * @details
1959 *
1960- * Currently, the log file uses this implementation:
1961+ * Currently, the transaction log file uses a simple, single-file, append-only
1962+ * format.
1963 *
1964 * We have an atomic off_t called log_offset which keeps track of the
1965- * offset into the log file for writing the next Transaction.
1966- *
1967- * We write Transaction message encapsulated in an 8-byte length/type header and a
1968- * 4-byte checksum trailer.
1969- *
1970- * When writing a Transaction to the log, we calculate the length of the
1971- * Transaction to be written. We then increment log_offset by the length
1972- * of the Transaction plus 2 * sizeof(uint32_t) plus sizeof(uint32_t) and store
1973- * this new offset in a local off_t called cur_offset (see TransactionLog::apply().
1974- * This compare and set is done in an atomic instruction.
1975- *
1976- * We then adjust the local off_t (cur_offset) back to the original
1977- * offset by subtracting the length and sizeof(uint32_t) and sizeof(uint32_t).
1978- *
1979- * We then first write a 64-bit length and then the serialized transaction/transaction
1980- * and optional checksum to our log file at our local cur_offset.
1981- *
1982- * --------------------------------------------------------------------------------
1983- * |<- 4 bytes ->|<- 4 bytes ->|<- # Bytes of Transaction Message ->|<- 4 bytes ->|
1984- * --------------------------------------------------------------------------------
1985- * | Msg Type | Length | Serialized Transaction Message | Checksum |
1986- * --------------------------------------------------------------------------------
1987+ * offset into the log file for writing the next log entry. The log
1988+ * entries are written, one after the other, in the following way:
1989+ *
1990+ * <pre>
1991+ * --------------------------------------
1992+ * |<- 4 bytes ->|<- # Bytes of Entry ->|
1993+ * --------------------------------------
1994+ * | Entry Type | Serialized Entry |
1995+ * --------------------------------------
1996+ * </pre>
1997+ *
1998+ * The Entry Type is an integer defined as an enumeration in the
1999+ * /drizzled/message/transaction.proto file called TransactionLogEntry::Type.
2000+ *
2001+ * Each transaction log entry type is written to the log differently. Here,
2002+ * we cover the format of each log entry type.
2003+ *
2004+ * Committed and Prepared Transaction Log Entries
2005+ * -----------------------------------------------
2006+ *
2007+ * <pre>
2008+ * ------------------------------------------------------------------
2009+ * |<- 4 bytes ->|<- # Bytes of Transaction Message ->|<- 4 bytes ->|
2010+ * ------------------------------------------------------------------
2011+ * | Length | Serialized Transaction Message | Checksum |
2012+ * ------------------------------------------------------------------
2013+ * </pre>
2014 *
2015 * @todo
2016 *
2017@@ -77,19 +83,27 @@
2018 #include <drizzled/internal/my_sys.h> /* for internal::my_sync */
2019 #include <drizzled/errmsg_print.h>
2020 #include <drizzled/gettext.h>
2021+#include <drizzled/message/transaction.pb.h>
2022+#include <drizzled/transaction_services.h>
2023+#include <drizzled/algorithm/crc32.h>
2024+
2025+#include <google/protobuf/io/coded_stream.h>
2026
2027 using namespace std;
2028 using namespace drizzled;
2029+using namespace google;
2030
2031 TransactionLog *transaction_log= NULL; /* The singleton transaction log */
2032
2033 TransactionLog::TransactionLog(const string in_log_file_path,
2034- uint32_t in_sync_method) :
2035+ uint32_t in_sync_method,
2036+ bool in_do_checksum) :
2037 state(OFFLINE),
2038 log_file_path(in_log_file_path),
2039 has_error(false),
2040 error_message(),
2041- sync_method(in_sync_method)
2042+ sync_method(in_sync_method),
2043+ do_checksum(in_do_checksum)
2044 {
2045 /* Setup our log file and determine the next write offset... */
2046 log_file= open(log_file_path.c_str(), O_APPEND|O_CREAT|O_SYNC|O_WRONLY, S_IRWXU);
2047@@ -133,6 +147,43 @@
2048 }
2049 }
2050
2051+uint8_t *TransactionLog::packTransactionIntoLogEntry(const message::Transaction &trx,
2052+ uint8_t *buffer,
2053+ uint32_t *checksum_out)
2054+{
2055+ uint8_t *orig_buffer= buffer;
2056+ size_t message_byte_length= trx.ByteSize();
2057+
2058+ /*
2059+ * Write the header information, which is the message type and
2060+ * the length of the transaction message into the buffer
2061+ */
2062+ buffer= protobuf::io::CodedOutputStream::WriteLittleEndian32ToArray(
2063+ static_cast<uint32_t>(ReplicationServices::TRANSACTION), buffer);
2064+ buffer= protobuf::io::CodedOutputStream::WriteLittleEndian32ToArray(
2065+ static_cast<uint32_t>(message_byte_length), buffer);
2066+
2067+ /*
2068+ * Now write the serialized transaction message, followed
2069+ * by the optional checksum into the buffer.
2070+ */
2071+ buffer= trx.SerializeWithCachedSizesToArray(buffer);
2072+
2073+ if (do_checksum)
2074+ {
2075+ *checksum_out= drizzled::algorithm::crc32(
2076+ reinterpret_cast<char *>(buffer) - message_byte_length, message_byte_length);
2077+ }
2078+ else
2079+ *checksum_out= 0;
2080+
2081+ /* We always write in network byte order */
2082+ buffer= protobuf::io::CodedOutputStream::WriteLittleEndian32ToArray(*checksum_out, buffer);
2083+ /* Reset the pointer back to its original location... */
2084+ buffer= orig_buffer;
2085+ return orig_buffer;
2086+}
2087+
2088 off_t TransactionLog::writeEntry(const uint8_t *data, size_t data_length)
2089 {
2090 ssize_t written= 0;
2091@@ -142,12 +193,6 @@
2092 */
2093 off_t cur_offset= log_offset.fetch_and_add(static_cast<off_t>(data_length));
2094
2095- /*
2096- * We adjust cur_offset back to the original log_offset before
2097- * the increment above...
2098- */
2099- cur_offset-= static_cast<off_t>(data_length);
2100-
2101 /*
2102 * Quick safety...if an error occurs above in another writer, the log
2103 * file will be in a crashed state.
2104@@ -243,6 +288,7 @@
2105 result= ftruncate(log_file, log_offset);
2106 }
2107 while (result == -1 && errno == EINTR);
2108+ drizzled::TransactionServices::singleton().resetTransactionId();
2109 }
2110
2111 bool TransactionLog::findLogFilenameContainingTransactionId(const ReplicationServices::GlobalTransactionId&,
2112@@ -272,3 +318,8 @@
2113 {
2114 return error_message;
2115 }
2116+
2117+size_t TransactionLog::getLogEntrySize(const message::Transaction &trx)
2118+{
2119+ return trx.ByteSize() + HEADER_TRAILER_BYTES;
2120+}
2121
2122=== modified file 'plugin/transaction_log/transaction_log.h'
2123--- plugin/transaction_log/transaction_log.h 2010-03-05 18:08:49 +0000
2124+++ plugin/transaction_log/transaction_log.h 2010-04-06 19:16:34 +0000
2125@@ -49,10 +49,6 @@
2126 class TransactionLog
2127 {
2128 public:
2129- static const uint32_t HEADER_TRAILER_BYTES= sizeof(uint32_t) + /* 4-byte msg type header */
2130- sizeof(uint32_t) + /* 4-byte length header */
2131- sizeof(uint32_t); /* 4 byte checksum trailer */
2132-
2133 typedef std::vector<TransactionLogEntry> Entries;
2134 typedef std::vector<TransactionLogTransactionEntry> TransactionEntries;
2135 /**
2136@@ -70,7 +66,8 @@
2137 static const uint32_t SYNC_METHOD_EVERY_SECOND= 2; ///< Sync no more than once a second
2138 public:
2139 TransactionLog(const std::string in_log_file_path,
2140- uint32_t in_sync_method);
2141+ uint32_t in_sync_method,
2142+ bool in_do_checksum);
2143
2144 /** Destructor */
2145 ~TransactionLog();
2146@@ -102,6 +99,31 @@
2147 }
2148
2149 /**
2150+ * Static helper method which returns the transaction
2151+ * log entry size in bytes of a given transaction
2152+ * message.
2153+ *
2154+ * @param[in] Transaction message
2155+ */
2156+ static size_t getLogEntrySize(const drizzled::message::Transaction &trx);
2157+
2158+ /**
2159+ * Method which packs into a raw byte buffer
2160+ * a transaction log entry. Supplied buffer should
2161+ * be of adequate size.
2162+ *
2163+ * Returns a pointer to the start of the original
2164+ * buffer.
2165+ *
2166+ * @param[in] Transaction message to pack
2167+ * @param[in] Raw byte buffer
2168+ * @param[out] Pointer to storage for checksum of message
2169+ */
2170+ uint8_t *packTransactionIntoLogEntry(const drizzled::message::Transaction &trx,
2171+ uint8_t *buffer,
2172+ uint32_t *checksum_out);
2173+
2174+ /**
2175 * Writes a chunk of data to the log file of a specified
2176 * length and returns the offset at which the chunk of
2177 * data was written.
2178@@ -153,6 +175,10 @@
2179 */
2180 const std::string &getErrorMessage() const;
2181 private:
2182+ static const uint32_t HEADER_TRAILER_BYTES= sizeof(uint32_t) + /* 4-byte msg type header */
2183+ sizeof(uint32_t) + /* 4-byte length header */
2184+ sizeof(uint32_t); /* 4 byte checksum trailer */
2185+
2186 /* Don't allows these */
2187 TransactionLog();
2188 TransactionLog(const TransactionLog &other);
2189@@ -181,6 +207,7 @@
2190 std::string error_message; ///< Current error message
2191 uint32_t sync_method; ///< Determines behaviour of syncing log file
2192 time_t last_sync_time; ///< Last time the log file was synced (only set in SYNC_METHOD_EVERY_SECOND)
2193+ bool do_checksum; ///< Do a CRC32 checksum when writing Transaction message to log?
2194 };
2195
2196 #endif /* PLUGIN_TRANSACTION_LOG_TRANSACTION_LOG_H */
2197
2198=== modified file 'plugin/transaction_log/transaction_log_applier.cc'
2199--- plugin/transaction_log/transaction_log_applier.cc 2010-03-29 22:01:35 +0000
2200+++ plugin/transaction_log/transaction_log_applier.cc 2010-04-06 19:16:34 +0000
2201@@ -6,7 +6,7 @@
2202 *
2203 * Authors:
2204 *
2205- * Jay Pipes <jaypipes@gmail.com.com>
2206+ * Jay Pipes <jaypipes@gmail.com>
2207 *
2208 * This program is free software; you can redistribute it and/or modify
2209 * it under the terms of the GNU General Public License as published by
2210@@ -43,24 +43,19 @@
2211 */
2212
2213 #include "config.h"
2214+#include "write_buffer.h"
2215 #include "transaction_log.h"
2216 #include "transaction_log_applier.h"
2217 #include "transaction_log_index.h"
2218
2219-#include <sys/stat.h>
2220-#include <fcntl.h>
2221-#include <unistd.h>
2222-#include <errno.h>
2223+#include <vector>
2224
2225-#include <drizzled/errmsg_print.h>
2226-#include <drizzled/gettext.h>
2227-#include <drizzled/algorithm/crc32.h>
2228 #include <drizzled/message/transaction.pb.h>
2229-#include <google/protobuf/io/coded_stream.h>
2230+#include <drizzled/util/functors.h>
2231+#include <drizzled/session.h>
2232
2233 using namespace std;
2234 using namespace drizzled;
2235-using namespace google;
2236
2237 TransactionLogApplier *transaction_log_applier= NULL; /* The singleton transaction log applier */
2238
2239@@ -68,79 +63,59 @@
2240
2241 TransactionLogApplier::TransactionLogApplier(const string name_arg,
2242 TransactionLog &in_transaction_log,
2243- bool in_do_checksum) :
2244+ uint32_t in_num_write_buffers) :
2245 plugin::TransactionApplier(name_arg),
2246 transaction_log(in_transaction_log),
2247- do_checksum(in_do_checksum)
2248+ num_write_buffers(in_num_write_buffers),
2249+ write_buffers()
2250 {
2251+ /*
2252+ * Create each of the buffers we need for undo log entries
2253+ */
2254+ write_buffers.reserve(num_write_buffers);
2255+ for (size_t x= 0; x < num_write_buffers; ++x)
2256+ {
2257+ write_buffers.push_back(new WriteBuffer());
2258+ }
2259 }
2260
2261 TransactionLogApplier::~TransactionLogApplier()
2262 {
2263-}
2264-
2265-void TransactionLogApplier::apply(const message::Transaction &to_apply)
2266-{
2267- uint8_t *buffer; /* Buffer we will write serialized header,
2268- message and trailing checksum to */
2269- uint8_t *orig_buffer;
2270-
2271- size_t message_byte_length= to_apply.ByteSize();
2272- size_t total_envelope_length= TransactionLog::HEADER_TRAILER_BYTES + message_byte_length;
2273-
2274- /*
2275- * Attempt allocation of raw memory buffer for the header,
2276- * message and trailing checksum bytes.
2277- */
2278- buffer= static_cast<uint8_t *>(malloc(total_envelope_length));
2279- if (buffer == NULL)
2280- {
2281- errmsg_printf(ERRMSG_LVL_ERROR,
2282- _("Failed to allocate enough memory to buffer header, "
2283- "transaction message, and trailing checksum bytes. Tried to allocate %" PRId64
2284- " bytes. Error: %s\n"),
2285- static_cast<int64_t>(total_envelope_length),
2286- strerror(errno));
2287- return;
2288- }
2289- else
2290- orig_buffer= buffer; /* We will free() orig_buffer, as buffer is moved during write */
2291-
2292- /*
2293- * Write the header information, which is the message type and
2294- * the length of the transaction message into the buffer
2295- */
2296- buffer= protobuf::io::CodedOutputStream::WriteLittleEndian32ToArray(
2297- static_cast<uint32_t>(ReplicationServices::TRANSACTION), buffer);
2298- buffer= protobuf::io::CodedOutputStream::WriteLittleEndian32ToArray(
2299- static_cast<uint32_t>(message_byte_length), buffer);
2300-
2301- /*
2302- * Now write the serialized transaction message, followed
2303- * by the optional checksum into the buffer.
2304- */
2305- buffer= to_apply.SerializeWithCachedSizesToArray(buffer);
2306-
2307- uint32_t checksum= 0;
2308- if (do_checksum)
2309- {
2310- checksum= drizzled::algorithm::crc32(
2311- buffer - message_byte_length, message_byte_length);
2312- }
2313-
2314- /* We always write in network byte order */
2315- buffer= protobuf::io::CodedOutputStream::WriteLittleEndian32ToArray(checksum, buffer);
2316-
2317- /* Ask the transaction log to write the entry and return where it wrote it */
2318- off_t written_to= transaction_log.writeEntry(orig_buffer, total_envelope_length);
2319-
2320- free(orig_buffer);
2321+ for_each(write_buffers.begin(),
2322+ write_buffers.end(),
2323+ DeletePtr());
2324+ write_buffers.clear();
2325+}
2326+
2327+WriteBuffer *TransactionLogApplier::getWriteBuffer(const Session &session)
2328+{
2329+ return write_buffers[session.getSessionId() % num_write_buffers];
2330+}
2331+
2332+plugin::ReplicationReturnCode
2333+TransactionLogApplier::apply(Session &in_session,
2334+ const message::Transaction &to_apply)
2335+{
2336+ size_t entry_size= TransactionLog::getLogEntrySize(to_apply);
2337+ WriteBuffer *write_buffer= getWriteBuffer(in_session);
2338+
2339+ uint32_t checksum;
2340+
2341+ write_buffer->lock();
2342+ write_buffer->resize(entry_size);
2343+ uint8_t *bytes= write_buffer->getRawBytes();
2344+ bytes= transaction_log.packTransactionIntoLogEntry(to_apply,
2345+ bytes,
2346+ &checksum);
2347+
2348+ off_t written_to= transaction_log.writeEntry(bytes, entry_size);
2349+ write_buffer->unlock();
2350
2351 /* Add an entry to the index describing what was just applied */
2352 transaction_log_index->addEntry(TransactionLogEntry(ReplicationServices::TRANSACTION,
2353 written_to,
2354- total_envelope_length),
2355+ entry_size),
2356 to_apply,
2357 checksum);
2358-
2359+ return plugin::SUCCESS;
2360 }
2361
2362=== modified file 'plugin/transaction_log/transaction_log_applier.h'
2363--- plugin/transaction_log/transaction_log_applier.h 2010-03-05 18:08:49 +0000
2364+++ plugin/transaction_log/transaction_log_applier.h 2010-04-06 19:16:34 +0000
2365@@ -43,14 +43,20 @@
2366 #include <vector>
2367 #include <string>
2368
2369+namespace drizzled
2370+{
2371+ class Session;
2372+}
2373+
2374 class TransactionLog;
2375+class WriteBuffer;
2376
2377 class TransactionLogApplier: public drizzled::plugin::TransactionApplier
2378 {
2379 public:
2380 TransactionLogApplier(const std::string name_arg,
2381 TransactionLog &in_transaction_log,
2382- bool in_do_checksum);
2383+ uint32_t in_num_write_buffers);
2384
2385 /** Destructor */
2386 ~TransactionLogApplier();
2387@@ -68,16 +74,27 @@
2388 * the supplied message to their own controlled memory storage
2389 * area.
2390 *
2391+ * @param Session descriptor
2392 * @param Transaction message to be replicated
2393 */
2394- void apply(const drizzled::message::Transaction &to_apply);
2395+ drizzled::plugin::ReplicationReturnCode
2396+ apply(drizzled::Session &in_session,
2397+ const drizzled::message::Transaction &to_apply);
2398 private:
2399 /* Don't allows these */
2400 TransactionLogApplier();
2401 TransactionLogApplier(const TransactionLogApplier &other);
2402 TransactionLogApplier &operator=(const TransactionLogApplier &other);
2403 TransactionLog &transaction_log;
2404- bool do_checksum; ///< Do a CRC32 checksum when writing Transaction message to log?
2405+ uint32_t num_write_buffers; ///< Number of write buffers used
2406+ std::vector<WriteBuffer *> write_buffers; ///< array of write buffers
2407+
2408+ /**
2409+ * Returns the write buffer for the supplied session
2410+ *
2411+ * @param Session descriptor
2412+ */
2413+ WriteBuffer *getWriteBuffer(const drizzled::Session &session);
2414 };
2415
2416 #endif /* PLUGIN_TRANSACTION_LOG_TRANSACTION_LOG_APPLIER_H */
2417
2418=== added file 'plugin/transaction_log/write_buffer.cc'
2419--- plugin/transaction_log/write_buffer.cc 1970-01-01 00:00:00 +0000
2420+++ plugin/transaction_log/write_buffer.cc 2010-04-06 19:16:34 +0000
2421@@ -0,0 +1,71 @@
2422+/* - mode: c; c-basic-offset: 2; indent-tabs-mode: nil; -*-
2423+ * vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
2424+ *
2425+ * Copyright (c) 2010 Jay Pipes <jaypipes@gmail.com>
2426+ *
2427+ * Authors:
2428+ *
2429+ * Jay Pipes <jaypipes@gmail.com>
2430+ *
2431+ * This program is free software; you can redistribute it and/or modify
2432+ * it under the terms of the GNU General Public License as published by
2433+ * the Free Software Foundation; either version 2 of the License, or
2434+ * (at your option) any later version.
2435+ *
2436+ * This program is distributed in the hope that it will be useful,
2437+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
2438+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
2439+ * GNU General Public License for more details.
2440+ *
2441+ * You should have received a copy of the GNU General Public License
2442+ * along with this program; if not, write to the Free Software
2443+ * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
2444+ */
2445+
2446+/**
2447+ * @file
2448+ *
2449+ * Defines the implementation of a simple locked write buffer
2450+ *
2451+ * @details
2452+ *
2453+ * The write buffer keeps a block of allocated raw bytes available for
2454+ * callers.
2455+ */
2456+
2457+#include "config.h"
2458+#include "write_buffer.h"
2459+
2460+#include <drizzled/errmsg_print.h>
2461+#include <drizzled/gettext.h>
2462+
2463+#include <vector>
2464+
2465+using namespace std;
2466+using namespace drizzled;
2467+
2468+WriteBuffer::WriteBuffer() :
2469+ buffer()
2470+{
2471+ buffer.reserve(DEFAULT_WRITE_BUFFER_SIZE);
2472+ pthread_mutex_init(&latch, NULL);
2473+}
2474+
2475+WriteBuffer::~WriteBuffer()
2476+{
2477+ buffer.clear();
2478+ pthread_mutex_destroy(&latch);
2479+}
2480+
2481+void WriteBuffer::resize(size_t new_size)
2482+{
2483+ /*
2484+ * Attempt allocation of raw memory buffer for the
2485+ * requested size. Does nothing if already allocated size
2486+ * if greater...
2487+ */
2488+ if (buffer.capacity() >= new_size)
2489+ return;
2490+
2491+ buffer.reserve(new_size);
2492+}
2493
2494=== added file 'plugin/transaction_log/write_buffer.h'
2495--- plugin/transaction_log/write_buffer.h 1970-01-01 00:00:00 +0000
2496+++ plugin/transaction_log/write_buffer.h 2010-04-06 19:16:34 +0000
2497@@ -0,0 +1,90 @@
2498+/* - mode: c; c-basic-offset: 2; indent-tabs-mode: nil; -*-
2499+ * vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
2500+ *
2501+ * Copyright (c) 2010 Jay Pipes <jaypipes@gmail.com>
2502+ *
2503+ * Authors:
2504+ *
2505+ * Jay Pipes <jaypipes@gmail.com>
2506+ *
2507+ * This program is free software; you can redistribute it and/or modify
2508+ * it under the terms of the GNU General Public License as published by
2509+ * the Free Software Foundation; either version 2 of the License, or
2510+ * (at your option) any later version.
2511+ *
2512+ * This program is distributed in the hope that it will be useful,
2513+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
2514+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
2515+ * GNU General Public License for more details.
2516+ *
2517+ * You should have received a copy of the GNU General Public License
2518+ * along with this program; if not, write to the Free Software
2519+ * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
2520+ */
2521+
2522+/**
2523+ * @file
2524+ *
2525+ * Defines a simple structure for maintaining a write buffer.
2526+ */
2527+
2528+#ifndef PLUGIN_TRANSACTION_LOG_WRITE_BUFFER_H
2529+#define PLUGIN_TRANSACTION_LOG_WRITE_BUFFER_H
2530+
2531+#include <stdint.h>
2532+#include <vector>
2533+#include <pthread.h>
2534+
2535+class WriteBuffer
2536+{
2537+public:
2538+ static const size_t DEFAULT_WRITE_BUFFER_SIZE= 1024; /* Many GPB messages are < 1 KB... */
2539+ /**
2540+ * Constructor.
2541+ */
2542+ WriteBuffer();
2543+ ~WriteBuffer();
2544+ /**
2545+ * Locks the log write buffer
2546+ */
2547+ void lock()
2548+ {
2549+ pthread_mutex_lock(&latch);
2550+ }
2551+ /**
2552+ * Unlocks the log's write buffer
2553+ */
2554+ void unlock()
2555+ {
2556+ pthread_mutex_unlock(&latch);
2557+ }
2558+ /**
2559+ * Resizes the internal raw byte buffer
2560+ *
2561+ * @param[in] New size to allocate
2562+ */
2563+ void resize(size_t new_size);
2564+ /**
2565+ * Returns the pointer to the raw bytes.
2566+ */
2567+ uint8_t *getRawBytes()
2568+ {
2569+ return &buffer[0];
2570+ }
2571+ /**
2572+ * Returns the size of the write buffer
2573+ */
2574+ size_t getCapacity()
2575+ {
2576+ return buffer.size();
2577+ }
2578+private:
2579+ /* Prohibit these */
2580+ WriteBuffer(const WriteBuffer&);
2581+ WriteBuffer &operator=(const WriteBuffer&);
2582+
2583+ std::vector<uint8_t> buffer; ///< Raw memory buffer managed by the log
2584+ pthread_mutex_t latch; ///< Lock around the synchronized parts of the log (the write buffer)
2585+};
2586+
2587+#endif /* PLUGIN_TRANSACTION_LOG_WRITE_BUFFER_H */