Merge lp:~jaypipes/drizzle/replication-ddl into lp:~drizzle-trunk/drizzle/development

Proposed by Jay Pipes
Status: Merged
Merged at revision: not available
Proposed branch: lp:~jaypipes/drizzle/replication-ddl
Merge into: lp:~drizzle-trunk/drizzle/development
Prerequisite: lp:~jaypipes/drizzle/replication-write-buffers
Diff against target: 2587 lines (+894/-424)
49 files modified
drizzled/atomic/gcc_traits.h (+11/-10)
drizzled/atomic/pthread_traits.h (+20/-8)
drizzled/atomic/sun_studio.h (+152/-29)
drizzled/atomics.h (+16/-11)
drizzled/cursor.cc (+1/-4)
drizzled/include.am (+1/-0)
drizzled/plugin/replication.h (+46/-0)
drizzled/plugin/transaction_applier.h (+7/-2)
drizzled/plugin/transaction_replicator.h (+8/-4)
drizzled/replication_services.cc (+20/-13)
drizzled/replication_services.h (+4/-1)
drizzled/session.cc (+5/-5)
drizzled/set_var.cc (+1/-1)
drizzled/sql_delete.cc (+1/-1)
drizzled/sql_insert.cc (+2/-2)
drizzled/sql_parse.cc (+1/-1)
drizzled/sql_table.cc (+3/-3)
drizzled/sql_update.cc (+1/-1)
drizzled/statement/alter_table.cc (+3/-3)
drizzled/statement/release_savepoint.cc (+1/-1)
drizzled/statement/rollback_to_savepoint.cc (+2/-2)
drizzled/statement/savepoint.cc (+2/-2)
drizzled/transaction_services.cc (+47/-145)
drizzled/transaction_services.h (+40/-10)
plugin/default_replicator/default_replicator.cc (+5/-2)
plugin/default_replicator/default_replicator.h (+6/-1)
plugin/filtered_replicator/filtered_replicator.cc (+6/-3)
plugin/filtered_replicator/filtered_replicator.h (+6/-2)
plugin/innobase/handler/ha_innodb.cc (+0/-30)
plugin/transaction_log/module.cc (+26/-4)
plugin/transaction_log/plugin.ini (+2/-2)
plugin/transaction_log/tests/r/ddl_transaction_id.result (+36/-0)
plugin/transaction_log/tests/r/filtered_replicator.result (+4/-0)
plugin/transaction_log/tests/r/information_schema.result (+8/-5)
plugin/transaction_log/tests/r/insert.result (+4/-0)
plugin/transaction_log/tests/r/truncate_log.result (+2/-1)
plugin/transaction_log/tests/r/udf_print_transaction_message.result (+1/-0)
plugin/transaction_log/tests/r/variables.result (+2/-1)
plugin/transaction_log/tests/t/ddl_transaction_id-master.opt (+1/-0)
plugin/transaction_log/tests/t/ddl_transaction_id.inc (+29/-0)
plugin/transaction_log/tests/t/ddl_transaction_id.test (+14/-0)
plugin/transaction_log/tests/t/information_schema.test (+2/-2)
plugin/transaction_log/tests/t/insert.inc (+2/-0)
plugin/transaction_log/transaction_log.cc (+83/-32)
plugin/transaction_log/transaction_log.h (+32/-5)
plugin/transaction_log/transaction_log_applier.cc (+47/-72)
plugin/transaction_log/transaction_log_applier.h (+20/-3)
plugin/transaction_log/write_buffer.cc (+71/-0)
plugin/transaction_log/write_buffer.h (+90/-0)
To merge this branch: bzr merge lp:~jaypipes/drizzle/replication-ddl
Reviewer Review Type Date Requested Status
Brian Aker Needs Fixing
Jay Pipes (community) Needs Resubmitting
Monty Taylor Needs Fixing
Review via email: mp+22558@code.launchpad.net

Description of the change

    * Fixes drizzled's atomics:

    - fetch_and_add() was actually add_and_fetch() - fixed to have both methods correct
    - compare_and_swap() was incorrect for all traits classes. Fixed to return a bool
    true only when the supplied value is actually swapped
    - fixes increment() and decrement() methods and operator+=() in outer atomics class
    template to call proper add_and_fetch() methods on traits classes
    - Now that above are fixed, removed the hacks in Query_id and TransactionLog to
    have query ID and the new transactoin ID start properly at 1.

    * Transaction messages sent over replication stream now use
    a real transaction ID, managed by drizzled::TransactionServices. Previously,
    the Query_id was being used, resulting in SELECT statements incrementing the
    transaction ID.

    * Added a test case to ensure that DDL ops are given a transaction ID and SELECT
    ops do not increment the transaction ID.

    The transaction ID will be paired with a channel ID to become the global
    transaction identifier. ReplicationServices will manage the pairing of
    channel and transaction ID and understand how far a particular subscriber
    node has applied.

To post a comment you must log in.
Revision history for this message
Monty Taylor (mordred) wrote :

It's missing an update to the sun studio atomic trait... but other than that looks good.

review: Needs Fixing
Revision history for this message
Jay Pipes (jaypipes) wrote :

Pushed Sun atomics fixes. Please review r1419. If we could run it through a hudson builder for Sun-only, that would be cool...

Revision history for this message
Jay Pipes (jaypipes) wrote :

OK, all green in build. Fixed Sun Studio atomics.

review: Needs Resubmitting
lp:~jaypipes/drizzle/replication-ddl updated
1423. By Jay Pipes <jpipes@serialcoder>

Fix cpplint header guard.

Revision history for this message
Brian Aker (brianaker) wrote :

This needs to be remerged with Trunk.

Thanks,
   -Brian

review: Needs Fixing
Revision history for this message
Jay Pipes (jaypipes) wrote :

This has already been merged with trunk.

Revision history for this message
Brian Aker (brianaker) wrote :

Sorry, I must have clicked on the wrong link. Though I am not seeing
this in the merge history.

On Apr 1, 2010, at 12:55 PM, Jay Pipes wrote:

> This has already been merged with trunk.
> --
> https://code.launchpad.net/~jaypipes/drizzle/replication-ddl/+merge/22558
> You are reviewing the proposed merge of lp:~jaypipes/drizzle/
> replication-ddl into lp:drizzle.

Revision history for this message
Jay Pipes (jaypipes) wrote :

No worries. I'm pretty sure I'm all merged up, though, as I see no revisions to pull if I bzr merge trunk locally...

Revision history for this message
Jay Pipes (jaypipes) wrote :

fyi, r1422 on this branch contains the most recent merge with trunk

lp:~jaypipes/drizzle/replication-ddl updated
1424. By Jay Pipes <jpipes@serialcoder>

merge trunk

1425. By Jay Pipes <jpipes@serialcoder>

merge replication-api

1426. By Jay Pipes <jpipes@serialcoder>

Merge trunk

1427. By Jay Pipes <jpipes@serialcoder>

Try another const_cast<> technique on pthread_traits.

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
=== modified file 'drizzled/atomic/gcc_traits.h'
--- drizzled/atomic/gcc_traits.h 2009-07-17 20:59:45 +0000
+++ drizzled/atomic/gcc_traits.h 2010-04-06 19:16:34 +0000
@@ -32,36 +32,37 @@
3232
33 gcc_traits() {}33 gcc_traits() {}
3434
35 /* YES. I know these are semantically backwards...35 inline value_type add_and_fetch(volatile value_type *value, D addend )
36 * so... TODO: Ensure we're doing the "right" thing here36 {
37 */37 return __sync_add_and_fetch(value, addend);
38 }
39
38 inline value_type fetch_and_add(volatile value_type *value, D addend )40 inline value_type fetch_and_add(volatile value_type *value, D addend )
39 {41 {
40 return __sync_add_and_fetch(value, addend);42 return __sync_fetch_and_add(value, addend);
41 }43 }
4244
43 inline value_type fetch_and_increment(volatile value_type *value)45 inline value_type fetch_and_increment(volatile value_type *value)
44 {46 {
45 return __sync_add_and_fetch(value, 1);47 return __sync_fetch_and_add(value, 1);
46 }48 }
4749
48 inline value_type fetch_and_decrement(volatile value_type *value)50 inline value_type fetch_and_decrement(volatile value_type *value)
49 {51 {
50 return __sync_sub_and_fetch(value, 1);52 return __sync_fetch_and_sub(value, 1);
51 }53 }
5254
53 inline value_type fetch_and_store(volatile value_type *value,55 inline value_type fetch_and_store(volatile value_type *value,
54 value_type new_value)56 value_type new_value)
55 {57 {
56 /* TODO: Is this the right one? */
57 return __sync_lock_test_and_set(value, new_value);58 return __sync_lock_test_and_set(value, new_value);
58 }59 }
5960
60 inline value_type compare_and_swap(volatile value_type *value,61 inline bool compare_and_swap(volatile value_type *value,
61 value_type new_value,62 value_type new_value,
62 value_type comparand )63 value_type comparand )
63 {64 {
64 return __sync_val_compare_and_swap(value, comparand, new_value);65 return __sync_bool_compare_and_swap(value, comparand, new_value);
65 }66 }
6667
67 inline value_type fetch(const volatile value_type *value) const volatile68 inline value_type fetch(const volatile value_type *value) const volatile
@@ -76,7 +77,7 @@
76 * Look at how to rewrite the below to something that ICC feels is77 * Look at how to rewrite the below to something that ICC feels is
77 * OK and yet respects memory barriers.78 * OK and yet respects memory barriers.
78 */79 */
79 return __sync_add_and_fetch(const_cast<value_type *>(value), 0);80 return __sync_fetch_and_add(const_cast<value_type *>(value), 0);
80 }81 }
8182
82 inline value_type store_with_release(volatile value_type *value,83 inline value_type store_with_release(volatile value_type *value,
8384
=== modified file 'drizzled/atomic/pthread_traits.h'
--- drizzled/atomic/pthread_traits.h 2009-12-14 19:51:51 +0000
+++ drizzled/atomic/pthread_traits.h 2010-04-06 19:16:34 +0000
@@ -67,11 +67,20 @@
6767
68 pthread_traits() {}68 pthread_traits() {}
6969
70 inline value_type add_and_fetch(volatile value_type *value, D addend )
71 {
72 my_lock.lock();
73 *value += addend;
74 value_type ret= *value;
75 my_lock.unlock();
76 return ret;
77 }
78
70 inline value_type fetch_and_add(volatile value_type *value, D addend )79 inline value_type fetch_and_add(volatile value_type *value, D addend )
71 {80 {
72 my_lock.lock();81 my_lock.lock();
82 value_type ret= *value;
73 *value += addend;83 *value += addend;
74 value_type ret= *value;
75 my_lock.unlock();84 my_lock.unlock();
76 return ret;85 return ret;
77 }86 }
@@ -79,8 +88,8 @@
79 inline value_type fetch_and_increment(volatile value_type *value)88 inline value_type fetch_and_increment(volatile value_type *value)
80 {89 {
81 my_lock.lock();90 my_lock.lock();
91 value_type ret= *value;
82 *value++;92 *value++;
83 value_type ret= *value;
84 my_lock.unlock();93 my_lock.unlock();
85 return ret;94 return ret;
86 }95 }
@@ -88,8 +97,8 @@
88 inline value_type fetch_and_decrement(volatile value_type *value)97 inline value_type fetch_and_decrement(volatile value_type *value)
89 {98 {
90 my_lock.lock();99 my_lock.lock();
100 value_type ret= *value;
91 *value--;101 *value--;
92 value_type ret= *value;
93 my_lock.unlock();102 my_lock.unlock();
94 return ret;103 return ret;
95 }104 }
@@ -98,27 +107,30 @@
98 value_type new_value )107 value_type new_value )
99 {108 {
100 my_lock.lock();109 my_lock.lock();
110 value_type ret= *value;
101 *value= new_value;111 *value= new_value;
102 value_type ret= *value;
103 my_lock.unlock();112 my_lock.unlock();
104 return ret;113 return ret;
105 }114 }
106115
107 inline value_type compare_and_swap(volatile value_type *value,116 inline bool compare_and_swap(volatile value_type *value,
108 value_type new_value,117 value_type new_value,
109 value_type comparand )118 value_type comparand )
110 {119 {
111 my_lock.lock();120 my_lock.lock();
112 if (*value == comparand)121 bool ret= (*value == comparand);
122 if (ret)
113 *value= new_value;123 *value= new_value;
114 value_type ret= *value;
115 my_lock.unlock();124 my_lock.unlock();
116 return ret;125 return ret;
117 }126 }
118127
119 inline value_type fetch(const volatile value_type *value) const volatile128 inline value_type fetch(const volatile value_type *value) const volatile
120 {129 {
121 return *value;130 const_cast<pthread_traits *>(this)->my_lock.lock();
131 value_type ret= *value;
132 const_cast<pthread_traits *>(this)->my_lock.unlock();
133 return ret;
122 }134 }
123135
124 inline value_type store_with_release(volatile value_type *value,136 inline value_type store_with_release(volatile value_type *value,
125137
=== modified file 'drizzled/atomic/sun_studio.h'
--- drizzled/atomic/sun_studio.h 2009-08-30 00:26:17 +0000
+++ drizzled/atomic/sun_studio.h 2010-04-06 19:16:34 +0000
@@ -24,91 +24,177 @@
24#include <atomic.h>24#include <atomic.h>
25#undef _KERNEL25#undef _KERNEL
2626
27inline bool __sync_fetch_and_add(volatile bool* ptr, bool val)
28{
29 bool ret= *ptr;
30 (val == true) ? atomic_inc_8((volatile uint8_t *)ptr) : atomic_add_8((volatile uint8_t *)ptr, (int8_t)val);
31 return ret;
32}
33
34inline int8_t __sync_fetch_and_add(volatile int8_t* ptr, int8_t val)
35{
36 int8_t ret= *ptr;
37 (val == 1) ? atomic_inc_8((volatile uint8_t*)ptr) : atomic_add_8((volatile uint8_t*)ptr, val);
38 return ret;
39}
40
41inline int16_t __sync_fetch_and_add(volatile int16_t* ptr, int16_t val)
42{
43 int16_t ret= *ptr;
44 (val == 1) ? atomic_inc_16((volatile uint16_t*)ptr) : atomic_add_16((volatile uint16_t*)ptr, val);
45 return ret;
46}
47
48inline int32_t __sync_fetch_and_add(volatile int32_t* ptr, int32_t val)
49{
50 int32_t ret= *ptr;
51 (val == 1) ? atomic_inc_32((volatile uint32_t*)ptr) : atomic_add_32((volatile uint32_t*)ptr, val);
52 return ret;
53}
54
55inline uint8_t __sync_fetch_and_add(volatile uint8_t* ptr, uint8_t val)
56{
57 uint8_t ret= *ptr;
58 (val == 1) ? atomic_inc_8(ptr) : atomic_add_8(ptr, (int8_t)val);
59 return ret;
60}
61
62inline uint16_t __sync_fetch_and_add(volatile uint16_t* ptr, uint16_t val)
63{
64 uint16_t ret= *ptr;
65 (val == 1) ? atomic_inc_16(ptr) : atomic_add_16(ptr, (int16_t)val);
66 return ret;
67}
68
69inline uint32_t __sync_fetch_and_add(volatile uint32_t* ptr, uint32_t val)
70{
71 uint32_t ret= *ptr;
72 (val == 1) ? atomic_inc_32(ptr) : atomic_add_32(ptr, (int32_t)val);
73 return ret;
74}
75
76# if defined(_KERNEL) || defined(_INT64_TYPE)
77inline uint64_t __sync_fetch_and_add(volatile uint64_t* ptr, uint64_t val)
78{
79 uint64_t ret= *ptr;
80 (val == 1) ? atomic_inc_64(ptr) : atomic_add_64(ptr, (int64_t)val);
81 return ret;
82}
83
84inline int64_t __sync_fetch_and_add(volatile int64_t* ptr, int64_t val)
85{
86 int64_t ret= *ptr;
87 (val == 1) ? atomic_inc_64((volatile uint64_t*)ptr) : atomic_add_64((volatile uint64_t*)ptr, val);
88 return ret;
89}
90# endif /* defined(_KERNEL) || defined(_INT64_TYPE) */
91
92inline uint8_t __sync_fetch_and_sub(volatile uint8_t* ptr, uint8_t val)
93{
94 uint8_t ret= *ptr;
95 (val == 1) ? atomic_dec_8(ptr) : atomic_add_8(ptr, 0-(int8_t)val);
96 return ret;
97}
98
99inline uint16_t __sync_fetch_and_sub(volatile uint16_t* ptr, uint16_t val)
100{
101 uint16_t ret= *ptr;
102 (val == 1) ? atomic_dec_16(ptr) : atomic_add_16(ptr, 0-(int16_t)val);
103 return ret;
104}
105
106inline uint32_t __sync_fetch_and_sub(volatile uint32_t* ptr, uint32_t val)
107{
108 uint32_t ret= *ptr;
109 (val == 1) ? atomic_dec_32(ptr) : atomic_add_32(ptr, 0-(int32_t)val);
110 return ret;
111}
112
113# if defined(_KERNEL) || defined(_INT64_TYPE)
114inline uint64_t __sync_fetch_and_sub(volatile uint64_t* ptr, uint64_t val)
115{
116 uint64_t ret= *ptr;
117 (val == 1) ? atomic_dec_64(ptr) : atomic_add_64(ptr, 0-(int64_t)val);
118 return ret;
119}
120inline int64_t __sync_fetch_and_sub(volatile int64_t* ptr, uint64_t val)
121{
122 int64_t ret= *ptr;
123 (val == 1) ? atomic_dec_64((volatile uint64_t *) ptr) : atomic_add_64((volatile uint64_t *) ptr, 0-(int64_t)val);
124 return ret;
125}
126# endif /* defined(_KERNEL) || defined(_INT64_TYPE) */
127
27inline bool __sync_add_and_fetch(volatile bool* ptr, bool val)128inline bool __sync_add_and_fetch(volatile bool* ptr, bool val)
28{129{
29 (val == true) ? atomic_inc_8((volatile uint8_t *)ptr) : atomic_add_8((volatile uint8_t *)ptr, (int8_t)val);130 return (val == true) ? atomic_inc_8_nv((volatile uint8_t *)ptr) : atomic_add_8_nv((volatile uint8_t *)ptr, (int8_t)val);
30 return *ptr;
31}131}
32 132
33inline int8_t __sync_add_and_fetch(volatile int8_t* ptr, int8_t val)133inline int8_t __sync_add_and_fetch(volatile int8_t* ptr, int8_t val)
34{134{
35 (val == 1) ? atomic_inc_8((volatile uint8_t*)ptr) : atomic_add_8((volatile uint8_t*)ptr, val);135 return (val == 1) ? atomic_inc_8_nv((volatile uint8_t*)ptr) : atomic_add_8_nv((volatile uint8_t*)ptr, val);
36 return *ptr;
37}136}
38137
39inline int16_t __sync_add_and_fetch(volatile int16_t* ptr, int16_t val)138inline int16_t __sync_add_and_fetch(volatile int16_t* ptr, int16_t val)
40{139{
41 (val == 1) ? atomic_inc_16((volatile uint16_t*)ptr) : atomic_add_16((volatile uint16_t*)ptr, val);140 return (val == 1) ? atomic_inc_16_nv((volatile uint16_t*)ptr) : atomic_add_16_nv((volatile uint16_t*)ptr, val);
42 return *ptr;
43}141}
44142
45inline int32_t __sync_add_and_fetch(volatile int32_t* ptr, int32_t val)143inline int32_t __sync_add_and_fetch(volatile int32_t* ptr, int32_t val)
46{144{
47 (val == 1) ? atomic_inc_32((volatile uint32_t*)ptr) : atomic_add_32((volatile uint32_t*)ptr, val);145 return (val == 1) ? atomic_inc_32_nv((volatile uint32_t*)ptr) : atomic_add_32_nv((volatile uint32_t*)ptr, val);
48 return *ptr;
49}146}
50147
51inline uint8_t __sync_add_and_fetch(volatile uint8_t* ptr, uint8_t val)148inline uint8_t __sync_add_and_fetch(volatile uint8_t* ptr, uint8_t val)
52{149{
53 (val == 1) ? atomic_inc_8(ptr) : atomic_add_8(ptr, (int8_t)val);150 return (val == 1) ? atomic_inc_8_nv(ptr) : atomic_add_8_nv(ptr, (int8_t)val);
54 return *ptr;
55}151}
56152
57inline uint16_t __sync_add_and_fetch(volatile uint16_t* ptr, uint16_t val)153inline uint16_t __sync_add_and_fetch(volatile uint16_t* ptr, uint16_t val)
58{154{
59 (val == 1) ? atomic_inc_16(ptr) : atomic_add_16(ptr, (int16_t)val);155 return (val == 1) ? atomic_inc_16_nv(ptr) : atomic_add_16_nv(ptr, (int16_t)val);
60 return *ptr;
61}156}
62157
63inline uint32_t __sync_add_and_fetch(volatile uint32_t* ptr, uint32_t val)158inline uint32_t __sync_add_and_fetch(volatile uint32_t* ptr, uint32_t val)
64{159{
65 (val == 1) ? atomic_inc_32(ptr) : atomic_add_32(ptr, (int32_t)val);160 return (val == 1) ? atomic_inc_32_nv(ptr) : atomic_add_32_nv(ptr, (int32_t)val);
66 return *ptr;
67}161}
68162
69# if defined(_KERNEL) || defined(_INT64_TYPE)163# if defined(_KERNEL) || defined(_INT64_TYPE)
70inline uint64_t __sync_add_and_fetch(volatile uint64_t* ptr, uint64_t val)164inline uint64_t __sync_add_and_fetch(volatile uint64_t* ptr, uint64_t val)
71{165{
72 (val == 1) ? atomic_inc_64(ptr) : atomic_add_64(ptr, (int64_t)val);166 return (val == 1) ? atomic_inc_64_nv(ptr) : atomic_add_64_nv(ptr, (int64_t)val);
73 return *ptr;
74}167}
75168
76inline int64_t __sync_add_and_fetch(volatile int64_t* ptr, int64_t val)169inline int64_t __sync_add_and_fetch(volatile int64_t* ptr, int64_t val)
77{170{
78 (val == 1) ? atomic_inc_64((volatile uint64_t*)ptr) : atomic_add_64((volatile uint64_t*)ptr, val);171 return (val == 1) ? atomic_inc_64_nv((volatile uint64_t*)ptr) : atomic_add_64_nv((volatile uint64_t*)ptr, val);
79 return *ptr;
80}172}
81# endif /* defined(_KERNEL) || defined(_INT64_TYPE) */173# endif /* defined(_KERNEL) || defined(_INT64_TYPE) */
82174
83
84inline uint8_t __sync_sub_and_fetch(volatile uint8_t* ptr, uint8_t val)175inline uint8_t __sync_sub_and_fetch(volatile uint8_t* ptr, uint8_t val)
85{176{
86 (val == 1) ? atomic_dec_8(ptr) : atomic_add_8(ptr, 0-(int8_t)val);177 return (val == 1) ? atomic_dec_8_nv(ptr) : atomic_add_8_nv(ptr, 0-(int8_t)val);
87 return *ptr;
88}178}
89179
90inline uint16_t __sync_sub_and_fetch(volatile uint16_t* ptr, uint16_t val)180inline uint16_t __sync_sub_and_fetch(volatile uint16_t* ptr, uint16_t val)
91{181{
92 (val == 1) ? atomic_dec_16(ptr) : atomic_add_16(ptr, 0-(int16_t)val);182 return (val == 1) ? atomic_dec_16_nv(ptr) : atomic_add_16_nv(ptr, 0-(int16_t)val);
93 return *ptr;
94}183}
95184
96inline uint32_t __sync_sub_and_fetch(volatile uint32_t* ptr, uint32_t val)185inline uint32_t __sync_sub_and_fetch(volatile uint32_t* ptr, uint32_t val)
97{186{
98 (val == 1) ? atomic_dec_32(ptr) : atomic_add_32(ptr, 0-(int32_t)val);187 return (val == 1) ? atomic_dec_32_nv(ptr) : atomic_add_32_nv(ptr, 0-(int32_t)val);
99 return *ptr;
100}188}
101189
102# if defined(_KERNEL) || defined(_INT64_TYPE)190# if defined(_KERNEL) || defined(_INT64_TYPE)
103inline uint64_t __sync_sub_and_fetch(volatile uint64_t* ptr, uint64_t val)191inline uint64_t __sync_sub_and_fetch(volatile uint64_t* ptr, uint64_t val)
104{192{
105 (val == 1) ? atomic_dec_64(ptr) : atomic_add_64(ptr, 0-(int64_t)val);193 return (val == 1) ? atomic_dec_64_nv(ptr) : atomic_add_64_nv(ptr, 0-(int64_t)val);
106 return *ptr;
107}194}
108inline int64_t __sync_sub_and_fetch(volatile int64_t* ptr, uint64_t val)195inline int64_t __sync_sub_and_fetch(volatile int64_t* ptr, uint64_t val)
109{196{
110 (val == 1) ? atomic_dec_64((volatile uint64_t *) ptr) : atomic_add_64((volatile uint64_t *) ptr, 0-(int64_t)val);197 return (val == 1) ? atomic_dec_64_nv((volatile uint64_t *) ptr) : atomic_add_64_nv((volatile uint64_t *) ptr, 0-(int64_t)val);
111 return *ptr;
112}198}
113# endif /* defined(_KERNEL) || defined(_INT64_TYPE) */199# endif /* defined(_KERNEL) || defined(_INT64_TYPE) */
114200
@@ -175,4 +261,41 @@
175}261}
176#endif /* defined(_KERNEL) || defined(_INT64_TYPE) */262#endif /* defined(_KERNEL) || defined(_INT64_TYPE) */
177263
264inline int8_t __sync_bool_compare_and_swap(volatile int8_t* ptr,
265 int8_t old_val, int8_t val)
266{
267 int8_t orig= *ptr;
268 return orig == atomic_cas_8((volatile uint8_t *)ptr, old_val, val);
269}
270
271inline uint8_t __sync_bool_compare_and_swap(volatile uint8_t* ptr,
272 uint8_t old_val, uint8_t val)
273{
274 uint8_t orig= *ptr;
275 return orig == atomic_cas_8(ptr, old_val, val);
276}
277
278inline uint16_t __sync_bool_compare_and_swap(volatile uint16_t* ptr,
279 uint16_t old_val, uint16_t val)
280{
281 uint16_t orig= *ptr;
282 return orig == atomic_cas_16(ptr, old_val, val);
283}
284
285inline uint32_t __sync_bool_compare_and_swap(volatile uint32_t* ptr,
286 uint32_t old_val, uint32_t val)
287{
288 uint32_t orig= *ptr;
289 return orig == atomic_cas_32(ptr, old_val, val);
290}
291
292# if defined(_KERNEL) || defined(_INT64_TYPE)
293inline uint64_t __sync_bool_compare_and_swap(volatile uint64_t* ptr,
294 uint64_t old_val, uint64_t val)
295{
296 uint64_t orig= *ptr;
297 return orig == atomic_cas_64(ptr, old_val, val);
298}
299#endif /* defined(_KERNEL) || defined(_INT64_TYPE) */
300
178#endif /* DRIZZLED_ATOMIC_SUN_STUDIO_H */301#endif /* DRIZZLED_ATOMIC_SUN_STUDIO_H */
179302
=== modified file 'drizzled/atomics.h'
--- drizzled/atomics.h 2010-01-04 22:52:52 +0000
+++ drizzled/atomics.h 2010-04-06 19:16:34 +0000
@@ -58,6 +58,11 @@
5858
59 atomic_impl() : atomic_base<I>(), traits() {}59 atomic_impl() : atomic_base<I>(), traits() {}
6060
61 value_type add_and_fetch( D addend )
62 {
63 return traits.add_and_fetch(&this->my_value, addend);
64 }
65
61 value_type fetch_and_add( D addend )66 value_type fetch_and_add( D addend )
62 {67 {
63 return traits.fetch_and_add(&this->my_value, addend);68 return traits.fetch_and_add(&this->my_value, addend);
@@ -78,7 +83,7 @@
78 return traits.fetch_and_store(&this->my_value, value);83 return traits.fetch_and_store(&this->my_value, value);
79 }84 }
8085
81 value_type compare_and_swap( value_type value, value_type comparand )86 bool compare_and_swap( value_type value, value_type comparand )
82 {87 {
83 return traits.compare_and_swap(&this->my_value, value, comparand);88 return traits.compare_and_swap(&this->my_value, value, comparand);
84 }89 }
@@ -102,7 +107,7 @@
102public:107public:
103 atomic_impl<I,D,T>& operator+=( D addend )108 atomic_impl<I,D,T>& operator+=( D addend )
104 {109 {
105 fetch_and_add(addend)+addend;110 increment(addend);
106 return *this;111 return *this;
107 }112 }
108113
@@ -113,15 +118,15 @@
113 return operator+=(D(0)-addend);118 return operator+=(D(0)-addend);
114 }119 }
115120
116 value_type increment() {121 value_type increment()
117 return fetch_and_add(1)+1;122 {
118 }123 return add_and_fetch(1);
119124 }
120 value_type decrement() {125
121 return fetch_and_add(D(-1))-1;126 value_type decrement()
122 }127 {
123128 return add_and_fetch(D(-1));
124129 }
125};130};
126131
127} /* namespace internal */132} /* namespace internal */
128133
=== modified file 'drizzled/cursor.cc'
--- drizzled/cursor.cc 2010-04-01 06:30:46 +0000
+++ drizzled/cursor.cc 2010-04-06 19:16:34 +0000
@@ -36,7 +36,6 @@
36#include "drizzled/session.h"36#include "drizzled/session.h"
37#include "drizzled/sql_base.h"37#include "drizzled/sql_base.h"
38#include "drizzled/transaction_services.h"38#include "drizzled/transaction_services.h"
39#include "drizzled/replication_services.h"
40#include "drizzled/lock.h"39#include "drizzled/lock.h"
41#include "drizzled/item/int.h"40#include "drizzled/item/int.h"
42#include "drizzled/item/empty_string.h"41#include "drizzled/item/empty_string.h"
@@ -44,7 +43,6 @@
44#include "drizzled/message/table.pb.h"43#include "drizzled/message/table.pb.h"
45#include "drizzled/plugin/client.h"44#include "drizzled/plugin/client.h"
46#include "drizzled/internal/my_sys.h"45#include "drizzled/internal/my_sys.h"
47#include "drizzled/transaction_services.h"
4846
49using namespace std;47using namespace std;
5048
@@ -1297,10 +1295,9 @@
1297 const unsigned char *after_record)1295 const unsigned char *after_record)
1298{1296{
1299 TransactionServices &transaction_services= TransactionServices::singleton();1297 TransactionServices &transaction_services= TransactionServices::singleton();
1300 ReplicationServices &replication_services= ReplicationServices::singleton();
1301 Session *const session= table->in_use;1298 Session *const session= table->in_use;
13021299
1303 if (table->s->tmp_table || not replication_services.isActive())1300 if (table->s->tmp_table || not transaction_services.shouldConstructMessages())
1304 return false;1301 return false;
13051302
1306 bool result= false;1303 bool result= false;
13071304
=== modified file 'drizzled/include.am'
--- drizzled/include.am 2010-04-02 07:45:12 +0000
+++ drizzled/include.am 2010-04-06 19:16:34 +0000
@@ -307,6 +307,7 @@
307 drizzled/plugin/query_cache.h \307 drizzled/plugin/query_cache.h \
308 drizzled/plugin/query_rewrite.h \308 drizzled/plugin/query_rewrite.h \
309 drizzled/plugin/registry.h \309 drizzled/plugin/registry.h \
310 drizzled/plugin/replication.h \
310 drizzled/plugin/scheduler.h \311 drizzled/plugin/scheduler.h \
311 drizzled/plugin/storage_engine.h \312 drizzled/plugin/storage_engine.h \
312 drizzled/plugin/table_function.h \313 drizzled/plugin/table_function.h \
313314
=== added file 'drizzled/plugin/replication.h'
--- drizzled/plugin/replication.h 1970-01-01 00:00:00 +0000
+++ drizzled/plugin/replication.h 2010-04-06 19:16:34 +0000
@@ -0,0 +1,46 @@
1/* -*- mode: c++; c-basic-offset: 2; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
3 *
4 * Copyright (c) 2010 Jay Pipes
5 *
6 * Authors:
7 *
8 * Jay Pipes <jaypipes@gmail.com>
9 *
10 * This program is free software; you can redistribute it and/or modify
11 * it under the terms of the GNU General Public License as published by
12 * the Free Software Foundation; version 2 of the License.
13 *
14 * This program is distributed in the hope that it will be useful,
15 * but WITHOUT ANY WARRANTY; without even the implied warranty of
16 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 * GNU General Public License for more details.
18 *
19 * You should have received a copy of the GNU General Public License
20 * along with this program; if not, write to the Free Software
21 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
22 */
23
24#ifndef DRIZZLED_PLUGIN_REPLICATION_H
25#define DRIZZLED_PLUGIN_REPLICATION_H
26
27/**
28 * @file Common structs and enums for the replication API
29 */
30
31namespace drizzled
32{
33
34namespace plugin
35{
36
37enum ReplicationReturnCode
38{
39 SUCCESS= 0, /* no error */
40 UNKNOWN_ERROR= 1
41};
42
43} /* namespace plugin */
44} /* namespace drizzled */
45
46#endif /* DRIZZLED_PLUGIN_REPLICATION_H */
047
=== modified file 'drizzled/plugin/transaction_applier.h'
--- drizzled/plugin/transaction_applier.h 2009-10-28 16:38:30 +0000
+++ drizzled/plugin/transaction_applier.h 2010-04-06 19:16:34 +0000
@@ -2,10 +2,11 @@
2 * vim:expandtab:shiftwidth=2:tabstop=2:smarttab:2 * vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
3 *3 *
4 * Copyright (C) 2008-2009 Sun Microsystems4 * Copyright (C) 2008-2009 Sun Microsystems
5 * Copyright (c) 2010 Jay Pipes
5 *6 *
6 * Authors:7 * Authors:
7 *8 *
8 * Jay Pipes <joinfu@sun.com>9 * Jay Pipes <jaypipes@gmail.com>
9 *10 *
10 * This program is free software; you can redistribute it and/or modify11 * This program is free software; you can redistribute it and/or modify
11 * it under the terms of the GNU General Public License as published by12 * it under the terms of the GNU General Public License as published by
@@ -33,11 +34,14 @@
33 */34 */
3435
35#include "drizzled/plugin/plugin.h"36#include "drizzled/plugin/plugin.h"
37#include "drizzled/plugin/replication.h"
36#include "drizzled/atomics.h"38#include "drizzled/atomics.h"
3739
38namespace drizzled40namespace drizzled
39{41{
4042
43class Session;
44
41namespace message { class Transaction; }45namespace message { class Transaction; }
4246
43namespace plugin47namespace plugin
@@ -74,7 +78,8 @@
74 *78 *
75 * @param Transaction message to be replicated79 * @param Transaction message to be replicated
76 */80 */
77 virtual void apply(const message::Transaction &to_apply)= 0;81 virtual ReplicationReturnCode apply(Session &in_session,
82 const message::Transaction &to_apply)= 0;
7883
79 static bool addPlugin(TransactionApplier *applier);84 static bool addPlugin(TransactionApplier *applier);
80 static void removePlugin(TransactionApplier *applier);85 static void removePlugin(TransactionApplier *applier);
8186
=== modified file 'drizzled/plugin/transaction_replicator.h'
--- drizzled/plugin/transaction_replicator.h 2009-12-04 23:47:14 +0000
+++ drizzled/plugin/transaction_replicator.h 2010-04-06 19:16:34 +0000
@@ -2,10 +2,11 @@
2 * vim:expandtab:shiftwidth=2:tabstop=2:smarttab:2 * vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
3 *3 *
4 * Copyright (C) 2008-2009 Sun Microsystems4 * Copyright (C) 2008-2009 Sun Microsystems
5 * Copyright (c) 2010 Jay Pipes
5 *6 *
6 * Authors:7 * Authors:
7 *8 *
8 * Jay Pipes <joinfu@sun.com>9 * Jay Pipes <jaypipes@gmail.com>
9 *10 *
10 * This program is free software; you can redistribute it and/or modify11 * This program is free software; you can redistribute it and/or modify
11 * it under the terms of the GNU General Public License as published by12 * it under the terms of the GNU General Public License as published by
@@ -25,6 +26,7 @@
25#define DRIZZLED_PLUGIN_TRANSACTION_REPLICATOR_H26#define DRIZZLED_PLUGIN_TRANSACTION_REPLICATOR_H
2627
27#include "drizzled/atomics.h"28#include "drizzled/atomics.h"
29#include "drizzled/plugin/replication.h"
28#include "drizzled/plugin/plugin.h"30#include "drizzled/plugin/plugin.h"
2931
30/**32/**
@@ -36,7 +38,6 @@
36 * An applier is responsible for applying events, not a replicator...38 * An applier is responsible for applying events, not a replicator...
37 */39 */
3840
39
40namespace drizzled41namespace drizzled
41{42{
42namespace message43namespace message
@@ -45,6 +46,8 @@
45 class Statement;46 class Statement;
46}47}
4748
49class Session;
50
48namespace plugin51namespace plugin
49{52{
5053
@@ -83,8 +86,9 @@
83 * @param Pointer to the applier of the command message86 * @param Pointer to the applier of the command message
84 * @param Transaction message to be replicated87 * @param Transaction message to be replicated
85 */88 */
86 virtual void replicate(TransactionApplier *in_applier, 89 virtual ReplicationReturnCode replicate(TransactionApplier *in_applier,
87 message::Transaction &to_replicate)= 0;90 Session &session,
91 message::Transaction &to_replicate)= 0;
88 static bool addPlugin(TransactionReplicator *replicator);92 static bool addPlugin(TransactionReplicator *replicator);
89 static void removePlugin(TransactionReplicator *replicator);93 static void removePlugin(TransactionReplicator *replicator);
9094
9195
=== modified file 'drizzled/replication_services.cc'
--- drizzled/replication_services.cc 2010-03-17 03:11:34 +0000
+++ drizzled/replication_services.cc 2010-04-06 19:16:34 +0000
@@ -39,8 +39,6 @@
39#include "drizzled/plugin/transaction_replicator.h"39#include "drizzled/plugin/transaction_replicator.h"
40#include "drizzled/plugin/transaction_applier.h"40#include "drizzled/plugin/transaction_applier.h"
41#include "drizzled/message/transaction.pb.h"41#include "drizzled/message/transaction.pb.h"
42#include "drizzled/message/table.pb.h"
43#include "drizzled/message/statement_transform.h"
44#include "drizzled/gettext.h"42#include "drizzled/gettext.h"
45#include "drizzled/session.h"43#include "drizzled/session.h"
46#include "drizzled/error.h"44#include "drizzled/error.h"
@@ -145,7 +143,8 @@
145 return is_active;143 return is_active;
146}144}
147145
148void ReplicationServices::pushTransactionMessage(message::Transaction &to_push)146plugin::ReplicationReturnCode ReplicationServices::pushTransactionMessage(Session &in_session,
147 message::Transaction &to_push)
149{148{
150 vector<plugin::TransactionReplicator *>::iterator repl_iter= replicators.begin();149 vector<plugin::TransactionReplicator *>::iterator repl_iter= replicators.begin();
151 vector<plugin::TransactionApplier *>::iterator appl_start_iter, appl_iter;150 vector<plugin::TransactionApplier *>::iterator appl_start_iter, appl_iter;
@@ -154,6 +153,8 @@
154 plugin::TransactionReplicator *cur_repl;153 plugin::TransactionReplicator *cur_repl;
155 plugin::TransactionApplier *cur_appl;154 plugin::TransactionApplier *cur_appl;
156155
156 plugin::ReplicationReturnCode result= plugin::SUCCESS;
157
157 while (repl_iter != replicators.end())158 while (repl_iter != replicators.end())
158 {159 {
159 cur_repl= *repl_iter;160 cur_repl= *repl_iter;
@@ -174,19 +175,25 @@
174 continue;175 continue;
175 }176 }
176177
177 cur_repl->replicate(cur_appl, to_push);178 result= cur_repl->replicate(cur_appl, in_session, to_push);
178 179
179 /* 180 if (result == plugin::SUCCESS)
180 * We update the timestamp for the last applied Transaction so that181 {
181 * publisher plugins can ask the replication services when the182 /*
182 * last known applied Transaction was using the getLastAppliedTimestamp()183 * We update the timestamp for the last applied Transaction so that
183 * method.184 * publisher plugins can ask the replication services when the
184 */185 * last known applied Transaction was using the getLastAppliedTimestamp()
185 last_applied_timestamp.fetch_and_store(to_push.transaction_context().end_timestamp());186 * method.
186 ++appl_iter;187 */
188 last_applied_timestamp.fetch_and_store(to_push.transaction_context().end_timestamp());
189 ++appl_iter;
190 }
191 else
192 return result;
187 }193 }
188 ++repl_iter;194 ++repl_iter;
189 }195 }
196 return result;
190}197}
191198
192} /* namespace drizzled */199} /* namespace drizzled */
193200
=== modified file 'drizzled/replication_services.h'
--- drizzled/replication_services.h 2010-03-16 21:30:44 +0000
+++ drizzled/replication_services.h 2010-04-06 19:16:34 +0000
@@ -26,6 +26,7 @@
26#define DRIZZLED_REPLICATION_SERVICES_H26#define DRIZZLED_REPLICATION_SERVICES_H
2727
28#include "drizzled/atomics.h"28#include "drizzled/atomics.h"
29#include "drizzled/plugin/replication.h"
2930
30#include <vector>31#include <vector>
3132
@@ -96,9 +97,11 @@
96 * Helper method which pushes a constructed message out to the registered97 * Helper method which pushes a constructed message out to the registered
97 * replicator and applier plugins.98 * replicator and applier plugins.
98 *99 *
100 * @param Session descriptor
99 * @param Message to push out101 * @param Message to push out
100 */102 */
101 void pushTransactionMessage(message::Transaction &to_push);103 plugin::ReplicationReturnCode pushTransactionMessage(Session &in_session,
104 message::Transaction &to_push);
102 /**105 /**
103 * Constructor106 * Constructor
104 */107 */
105108
=== modified file 'drizzled/session.cc'
--- drizzled/session.cc 2010-03-31 22:22:16 +0000
+++ drizzled/session.cc 2010-04-06 19:16:34 +0000
@@ -369,7 +369,7 @@
369#endif369#endif
370 {370 {
371 TransactionServices &transaction_services= TransactionServices::singleton();371 TransactionServices &transaction_services= TransactionServices::singleton();
372 transaction_services.ha_rollback_trans(this, true);372 transaction_services.rollbackTransaction(this, true);
373 xid_cache_delete(&transaction.xid_state);373 xid_cache_delete(&transaction.xid_state);
374 }374 }
375 hash_free(&user_vars);375 hash_free(&user_vars);
@@ -772,7 +772,7 @@
772 * (Which of course should never happen...)772 * (Which of course should never happen...)
773 */773 */
774 server_status&= ~SERVER_STATUS_IN_TRANS;774 server_status&= ~SERVER_STATUS_IN_TRANS;
775 if (transaction_services.ha_commit_trans(this, true))775 if (transaction_services.commitTransaction(this, true))
776 result= false;776 result= false;
777 options&= ~(OPTION_BEGIN);777 options&= ~(OPTION_BEGIN);
778 break;778 break;
@@ -789,7 +789,7 @@
789 case ROLLBACK_AND_CHAIN:789 case ROLLBACK_AND_CHAIN:
790 {790 {
791 server_status&= ~SERVER_STATUS_IN_TRANS;791 server_status&= ~SERVER_STATUS_IN_TRANS;
792 if (transaction_services.ha_rollback_trans(this, true))792 if (transaction_services.rollbackTransaction(this, true))
793 result= false;793 result= false;
794 options&= ~(OPTION_BEGIN);794 options&= ~(OPTION_BEGIN);
795 if (result == true && (completion == ROLLBACK_AND_CHAIN))795 if (result == true && (completion == ROLLBACK_AND_CHAIN))
@@ -822,7 +822,7 @@
822 if (options & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))822 if (options & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))
823 {823 {
824 server_status&= ~SERVER_STATUS_IN_TRANS;824 server_status&= ~SERVER_STATUS_IN_TRANS;
825 if (transaction_services.ha_commit_trans(this, true))825 if (transaction_services.commitTransaction(this, true))
826 result= false;826 result= false;
827 }827 }
828 options&= ~(OPTION_BEGIN);828 options&= ~(OPTION_BEGIN);
@@ -1947,7 +1947,7 @@
1947 {1947 {
1948 TransactionServices &transaction_services= TransactionServices::singleton();1948 TransactionServices &transaction_services= TransactionServices::singleton();
1949 main_da.can_overwrite_status= true;1949 main_da.can_overwrite_status= true;
1950 transaction_services.ha_autocommit_or_rollback(this, is_error());1950 transaction_services.autocommitOrRollback(this, is_error());
1951 main_da.can_overwrite_status= false;1951 main_da.can_overwrite_status= false;
1952 transaction.stmt.reset();1952 transaction.stmt.reset();
1953 }1953 }
19541954
=== modified file 'drizzled/set_var.cc'
--- drizzled/set_var.cc 2010-03-30 09:29:02 +0000
+++ drizzled/set_var.cc 2010-04-06 19:16:34 +0000
@@ -1381,7 +1381,7 @@
1381 session->options&= ~(uint64_t) (OPTION_BEGIN);1381 session->options&= ~(uint64_t) (OPTION_BEGIN);
1382 session->server_status|= SERVER_STATUS_AUTOCOMMIT;1382 session->server_status|= SERVER_STATUS_AUTOCOMMIT;
1383 TransactionServices &transaction_services= TransactionServices::singleton();1383 TransactionServices &transaction_services= TransactionServices::singleton();
1384 if (transaction_services.ha_commit_trans(session, true))1384 if (transaction_services.commitTransaction(session, true))
1385 return 1;1385 return 1;
1386 }1386 }
1387 else1387 else
13881388
=== modified file 'drizzled/sql_delete.cc'
--- drizzled/sql_delete.cc 2010-02-06 03:22:59 +0000
+++ drizzled/sql_delete.cc 2010-04-06 19:16:34 +0000
@@ -396,7 +396,7 @@
396 Safety, in case the engine ignored ha_enable_transaction(false)396 Safety, in case the engine ignored ha_enable_transaction(false)
397 above. Also clears session->transaction.*.397 above. Also clears session->transaction.*.
398 */398 */
399 error= transaction_services.ha_autocommit_or_rollback(&session, error);399 error= transaction_services.autocommitOrRollback(&session, error);
400 session.options= save_options;400 session.options= save_options;
401401
402 return error;402 return error;
403403
=== modified file 'drizzled/sql_insert.cc'
--- drizzled/sql_insert.cc 2010-03-31 21:15:40 +0000
+++ drizzled/sql_insert.cc 2010-04-06 19:16:34 +0000
@@ -1325,7 +1325,7 @@
1325 {1325 {
1326 /*1326 /*
1327 We must invalidate the table in the query cache before binlog writing1327 We must invalidate the table in the query cache before binlog writing
1328 and ha_autocommit_or_rollback.1328 and autocommitOrRollback.
1329 */1329 */
1330 if (session->transaction.stmt.hasModifiedNonTransData())1330 if (session->transaction.stmt.hasModifiedNonTransData())
1331 session->transaction.all.markModifiedNonTransData();1331 session->transaction.all.markModifiedNonTransData();
@@ -1711,7 +1711,7 @@
1711 if (!table->s->tmp_table)1711 if (!table->s->tmp_table)
1712 {1712 {
1713 TransactionServices &transaction_services= TransactionServices::singleton();1713 TransactionServices &transaction_services= TransactionServices::singleton();
1714 transaction_services.ha_autocommit_or_rollback(session, 0);1714 transaction_services.autocommitOrRollback(session, 0);
1715 (void) session->endActiveTransaction();1715 (void) session->endActiveTransaction();
1716 }1716 }
17171717
17181718
=== modified file 'drizzled/sql_parse.cc'
--- drizzled/sql_parse.cc 2010-03-31 21:15:40 +0000
+++ drizzled/sql_parse.cc 2010-04-06 19:16:34 +0000
@@ -254,7 +254,7 @@
254 /* If commit fails, we should be able to reset the OK status. */254 /* If commit fails, we should be able to reset the OK status. */
255 session->main_da.can_overwrite_status= true;255 session->main_da.can_overwrite_status= true;
256 TransactionServices &transaction_services= TransactionServices::singleton();256 TransactionServices &transaction_services= TransactionServices::singleton();
257 transaction_services.ha_autocommit_or_rollback(session, session->is_error());257 transaction_services.autocommitOrRollback(session, session->is_error());
258 session->main_da.can_overwrite_status= false;258 session->main_da.can_overwrite_status= false;
259259
260 session->transaction.stmt.reset();260 session->transaction.stmt.reset();
261261
=== modified file 'drizzled/sql_table.cc'
--- drizzled/sql_table.cc 2010-03-31 21:15:40 +0000
+++ drizzled/sql_table.cc 2010-04-06 19:16:34 +0000
@@ -1829,7 +1829,7 @@
1829 length= snprintf(buff, sizeof(buff), ER(ER_OPEN_AS_READONLY),1829 length= snprintf(buff, sizeof(buff), ER(ER_OPEN_AS_READONLY),
1830 table_name);1830 table_name);
1831 session->client->store(buff, length);1831 session->client->store(buff, length);
1832 transaction_services.ha_autocommit_or_rollback(session, false);1832 transaction_services.autocommitOrRollback(session, false);
1833 session->endTransaction(COMMIT);1833 session->endTransaction(COMMIT);
1834 session->close_thread_tables();1834 session->close_thread_tables();
1835 lex->reset_query_tables_list(false);1835 lex->reset_query_tables_list(false);
@@ -1952,7 +1952,7 @@
1952 }1952 }
1953 }1953 }
1954 }1954 }
1955 transaction_services.ha_autocommit_or_rollback(session, false);1955 transaction_services.autocommitOrRollback(session, false);
1956 session->endTransaction(COMMIT);1956 session->endTransaction(COMMIT);
1957 session->close_thread_tables();1957 session->close_thread_tables();
1958 table->table=0; // For query cache1958 table->table=0; // For query cache
@@ -1964,7 +1964,7 @@
1964 return(false);1964 return(false);
19651965
1966err:1966err:
1967 transaction_services.ha_autocommit_or_rollback(session, true);1967 transaction_services.autocommitOrRollback(session, true);
1968 session->endTransaction(ROLLBACK);1968 session->endTransaction(ROLLBACK);
1969 session->close_thread_tables(); // Shouldn't be needed1969 session->close_thread_tables(); // Shouldn't be needed
1970 if (table)1970 if (table)
19711971
=== modified file 'drizzled/sql_update.cc'
--- drizzled/sql_update.cc 2010-03-26 21:33:42 +0000
+++ drizzled/sql_update.cc 2010-04-06 19:16:34 +0000
@@ -523,7 +523,7 @@
523 last one without error. error > 0 means an error (e.g. unique key523 last one without error. error > 0 means an error (e.g. unique key
524 violation and no IGNORE or REPLACE). error == 0 is also an error (if524 violation and no IGNORE or REPLACE). error == 0 is also an error (if
525 preparing the record or invoking before triggers fails). See525 preparing the record or invoking before triggers fails). See
526 ha_autocommit_or_rollback(error>=0) and return(error>=0) below.526 autocommitOrRollback(error>=0) and return(error>=0) below.
527 Sometimes we want to binlog even if we updated no rows, in case user used527 Sometimes we want to binlog even if we updated no rows, in case user used
528 it to be sure master and slave are in same state.528 it to be sure master and slave are in same state.
529 */529 */
530530
=== modified file 'drizzled/statement/alter_table.cc'
--- drizzled/statement/alter_table.cc 2010-03-31 22:22:16 +0000
+++ drizzled/statement/alter_table.cc 2010-04-06 19:16:34 +0000
@@ -619,7 +619,7 @@
619 goto err;619 goto err;
620620
621 /* The ALTER Table is always in its own transaction */621 /* The ALTER Table is always in its own transaction */
622 error= transaction_services.ha_autocommit_or_rollback(session, false);622 error= transaction_services.autocommitOrRollback(session, false);
623 if (! session->endActiveTransaction())623 if (! session->endActiveTransaction())
624 error=1;624 error=1;
625 if (error)625 if (error)
@@ -627,7 +627,7 @@
627 write_bin_log(session, session->query.c_str());627 write_bin_log(session, session->query.c_str());
628628
629err:629err:
630 (void) transaction_services.ha_autocommit_or_rollback(session, error);630 (void) transaction_services.autocommitOrRollback(session, error);
631 session->tablespace_op=false;631 session->tablespace_op=false;
632632
633 if (error == 0)633 if (error == 0)
@@ -1480,7 +1480,7 @@
1480 Ensure that the new table is saved properly to disk so that we1480 Ensure that the new table is saved properly to disk so that we
1481 can do a rename1481 can do a rename
1482 */1482 */
1483 if (transaction_services.ha_autocommit_or_rollback(session, false))1483 if (transaction_services.autocommitOrRollback(session, false))
1484 error=1;1484 error=1;
1485 if (! session->endActiveTransaction())1485 if (! session->endActiveTransaction())
1486 error=1;1486 error=1;
14871487
=== modified file 'drizzled/statement/release_savepoint.cc'
--- drizzled/statement/release_savepoint.cc 2010-01-28 20:48:31 +0000
+++ drizzled/statement/release_savepoint.cc 2010-04-06 19:16:34 +0000
@@ -59,7 +59,7 @@
59 if (iter != savepoints.end())59 if (iter != savepoints.end())
60 {60 {
61 NamedSavepoint &sv= *iter;61 NamedSavepoint &sv= *iter;
62 (void) transaction_services.ha_release_savepoint(session, sv);62 (void) transaction_services.releaseSavepoint(session, sv);
63 savepoints.erase(iter);63 savepoints.erase(iter);
64 session->my_ok();64 session->my_ok();
65 }65 }
6666
=== modified file 'drizzled/statement/rollback_to_savepoint.cc'
--- drizzled/statement/rollback_to_savepoint.cc 2010-02-06 03:22:59 +0000
+++ drizzled/statement/rollback_to_savepoint.cc 2010-04-06 19:16:34 +0000
@@ -73,7 +73,7 @@
73 first_savepoint_name.size()) == 0)73 first_savepoint_name.size()) == 0)
74 {74 {
75 /* Found the named savepoint we want to rollback to */75 /* Found the named savepoint we want to rollback to */
76 (void) transaction_services.ha_rollback_to_savepoint(session, first_savepoint);76 (void) transaction_services.rollbackToSavepoint(session, first_savepoint);
7777
78 if (session->transaction.all.hasModifiedNonTransData())78 if (session->transaction.all.hasModifiedNonTransData())
79 {79 {
@@ -111,7 +111,7 @@
111 /* Found the named savepoint we want to rollback to */111 /* Found the named savepoint we want to rollback to */
112 found= true;112 found= true;
113113
114 (void) transaction_services.ha_rollback_to_savepoint(session, sv);114 (void) transaction_services.rollbackToSavepoint(session, sv);
115 }115 }
116 if (found)116 if (found)
117 {117 {
118118
=== modified file 'drizzled/statement/savepoint.cc'
--- drizzled/statement/savepoint.cc 2010-01-28 20:48:31 +0000
+++ drizzled/statement/savepoint.cc 2010-04-06 19:16:34 +0000
@@ -65,13 +65,13 @@
65 if (iter != savepoints.end())65 if (iter != savepoints.end())
66 {66 {
67 NamedSavepoint &sv= *iter;67 NamedSavepoint &sv= *iter;
68 (void) transaction_services.ha_release_savepoint(session, sv);68 (void) transaction_services.releaseSavepoint(session, sv);
69 savepoints.erase(iter);69 savepoints.erase(iter);
70 }70 }
71 71
72 NamedSavepoint newsv(session->lex->ident.str, session->lex->ident.length);72 NamedSavepoint newsv(session->lex->ident.str, session->lex->ident.length);
7373
74 if (transaction_services.ha_savepoint(session, newsv))74 if (transaction_services.setSavepoint(session, newsv))
75 {75 {
76 return true;76 return true;
77 }77 }
7878
=== modified file 'drizzled/transaction_services.cc'
--- drizzled/transaction_services.cc 2010-03-17 03:11:34 +0000
+++ drizzled/transaction_services.cc 2010-04-06 19:16:34 +0000
@@ -427,65 +427,6 @@
427}427}
428428
429/**429/**
430 Check if we can skip the two-phase commit.
431
432 A helper function to evaluate if two-phase commit is mandatory.
433 As a side effect, propagates the read-only/read-write flags
434 of the statement transaction to its enclosing normal transaction.
435
436 @retval true we must run a two-phase commit. Returned
437 if we have at least two engines with read-write changes.
438 @retval false Don't need two-phase commit. Even if we have two
439 transactional engines, we can run two independent
440 commits if changes in one of the engines are read-only.
441*/
442static
443bool
444ha_check_and_coalesce_trx_read_only(Session *session,
445 TransactionContext::ResourceContexts &resource_contexts,
446 bool normal_transaction)
447{
448 /* The number of storage engines that have actual changes. */
449 unsigned num_resources_modified_data= 0;
450 ResourceContext *resource_context;
451
452 for (TransactionContext::ResourceContexts::iterator it= resource_contexts.begin();
453 it != resource_contexts.end();
454 ++it)
455 {
456 resource_context= *it;
457 if (resource_context->hasModifiedData())
458 ++num_resources_modified_data;
459
460 if (! normal_transaction)
461 {
462 ResourceContext *resource_context_normal= session->getResourceContext(resource_context->getMonitored(), true);
463 assert(resource_context != resource_context_normal);
464 /*
465 Merge read-only/read-write information about statement
466 transaction to its enclosing normal transaction. Do this
467 only if in a real transaction -- that is, if we know
468 that resource_context_all is registered in session->transaction.all.
469 Since otherwise we only clutter the normal transaction flags.
470 */
471 if (resource_context_normal->isStarted()) /* false if autocommit. */
472 resource_context_normal->coalesceWith(resource_context);
473 }
474 else if (num_resources_modified_data > 1)
475 {
476 /*
477 It is a normal transaction, so we don't need to merge read/write
478 information up, and the need for two-phase commit has been
479 already established. Break the loop prematurely.
480 */
481 break;
482 }
483 }
484 return num_resources_modified_data > 1;
485}
486
487
488/**
489 @retval430 @retval
490 0 ok431 0 ok
491 @retval432 @retval
@@ -499,7 +440,7 @@
499 stored functions or triggers. So we simply do nothing now.440 stored functions or triggers. So we simply do nothing now.
500 TODO: This should be fixed in later ( >= 5.1) releases.441 TODO: This should be fixed in later ( >= 5.1) releases.
501*/442*/
502int TransactionServices::ha_commit_trans(Session *session, bool normal_transaction)443int TransactionServices::commitTransaction(Session *session, bool normal_transaction)
503{444{
504 int error= 0, cookie= 0;445 int error= 0, cookie= 0;
505 /*446 /*
@@ -522,17 +463,18 @@
522463
523 if (resource_contexts.empty() == false)464 if (resource_contexts.empty() == false)
524 {465 {
525 bool must_2pc;
526
527 if (is_real_trans && wait_if_global_read_lock(session, 0, 0))466 if (is_real_trans && wait_if_global_read_lock(session, 0, 0))
528 {467 {
529 ha_rollback_trans(session, normal_transaction);468 rollbackTransaction(session, normal_transaction);
530 return 1;469 return 1;
531 }470 }
532471
533 must_2pc= ha_check_and_coalesce_trx_read_only(session, resource_contexts, normal_transaction);472 /*
534473 * If replication is on, we do a PREPARE on the resource managers, push the
535 if (! trans->no_2pc && must_2pc)474 * Transaction message across the replication stream, and then COMMIT if the
475 * replication stream returned successfully.
476 */
477 if (shouldConstructMessages())
536 {478 {
537 for (TransactionContext::ResourceContexts::iterator it= resource_contexts.begin();479 for (TransactionContext::ResourceContexts::iterator it= resource_contexts.begin();
538 it != resource_contexts.end() && ! error;480 it != resource_contexts.end() && ! error;
@@ -563,14 +505,22 @@
563 }505 }
564 }506 }
565 }507 }
508 if (error == 0 && is_real_trans)
509 {
510 /*
511 * Push the constructed Transaction messages across to
512 * replicators and appliers.
513 */
514 error= commitTransactionMessage(session);
515 }
566 if (error)516 if (error)
567 {517 {
568 ha_rollback_trans(session, normal_transaction);518 rollbackTransaction(session, normal_transaction);
569 error= 1;519 error= 1;
570 goto end;520 goto end;
571 }521 }
572 }522 }
573 error= ha_commit_one_phase(session, normal_transaction) ? (cookie ? 2 : 1) : 0;523 error= commitPhaseOne(session, normal_transaction) ? (cookie ? 2 : 1) : 0;
574end:524end:
575 if (is_real_trans)525 if (is_real_trans)
576 start_waiting_global_read_lock(session);526 start_waiting_global_read_lock(session);
@@ -582,7 +532,7 @@
582 @note532 @note
583 This function does not care about global read lock. A caller should.533 This function does not care about global read lock. A caller should.
584*/534*/
585int TransactionServices::ha_commit_one_phase(Session *session, bool normal_transaction)535int TransactionServices::commitPhaseOne(Session *session, bool normal_transaction)
586{536{
587 int error=0;537 int error=0;
588 TransactionContext *trans= normal_transaction ? &session->transaction.all : &session->transaction.stmt;538 TransactionContext *trans= normal_transaction ? &session->transaction.all : &session->transaction.stmt;
@@ -638,21 +588,10 @@
638 }588 }
639 }589 }
640 trans->reset();590 trans->reset();
641 if (error == 0)
642 {
643 if (is_real_trans)
644 {
645 /*
646 * We commit the normal transaction by finalizing the transaction message
647 * and propogating the message to all registered replicators.
648 */
649 commitTransactionMessage(session);
650 }
651 }
652 return error;591 return error;
653}592}
654593
655int TransactionServices::ha_rollback_trans(Session *session, bool normal_transaction)594int TransactionServices::rollbackTransaction(Session *session, bool normal_transaction)
656{595{
657 int error= 0;596 int error= 0;
658 TransactionContext *trans= normal_transaction ? &session->transaction.all : &session->transaction.stmt;597 TransactionContext *trans= normal_transaction ? &session->transaction.all : &session->transaction.stmt;
@@ -751,20 +690,20 @@
751 the user has used LOCK TABLES then that mechanism does not know to do the690 the user has used LOCK TABLES then that mechanism does not know to do the
752 commit.691 commit.
753*/692*/
754int TransactionServices::ha_autocommit_or_rollback(Session *session, int error)693int TransactionServices::autocommitOrRollback(Session *session, int error)
755{694{
756 if (session->transaction.stmt.getResourceContexts().empty() == false)695 if (session->transaction.stmt.getResourceContexts().empty() == false)
757 {696 {
758 if (! error)697 if (! error)
759 {698 {
760 if (ha_commit_trans(session, false))699 if (commitTransaction(session, false))
761 error= 1;700 error= 1;
762 }701 }
763 else702 else
764 {703 {
765 (void) ha_rollback_trans(session, false);704 (void) rollbackTransaction(session, false);
766 if (session->transaction_rollback_request)705 if (session->transaction_rollback_request)
767 (void) ha_rollback_trans(session, true);706 (void) rollbackTransaction(session, true);
768 }707 }
769708
770 session->variables.tx_isolation= session->session_tx_isolation;709 session->variables.tx_isolation= session->session_tx_isolation;
@@ -772,51 +711,6 @@
772 return error;711 return error;
773}712}
774713
775/**
776 return the list of XID's to a client, the same way SHOW commands do.
777
778 @note
779 I didn't find in XA specs that an RM cannot return the same XID twice,
780 so mysql_xa_recover does not filter XID's to ensure uniqueness.
781 It can be easily fixed later, if necessary.
782*/
783bool TransactionServices::mysql_xa_recover(Session *session)
784{
785 List<Item> field_list;
786 int i= 0;
787 XID_STATE *xs;
788
789 field_list.push_back(new Item_int("formatID", 0, MY_INT32_NUM_DECIMAL_DIGITS));
790 field_list.push_back(new Item_int("gtrid_length", 0, MY_INT32_NUM_DECIMAL_DIGITS));
791 field_list.push_back(new Item_int("bqual_length", 0, MY_INT32_NUM_DECIMAL_DIGITS));
792 field_list.push_back(new Item_empty_string("data", DRIZZLE_XIDDATASIZE));
793
794 if (session->client->sendFields(&field_list))
795 return 1;
796
797 pthread_mutex_lock(&LOCK_xid_cache);
798 while ((xs= (XID_STATE*)hash_element(&xid_cache, i++)))
799 {
800 if (xs->xa_state==XA_PREPARED)
801 {
802 session->client->store((int64_t)xs->xid.formatID);
803 session->client->store((int64_t)xs->xid.gtrid_length);
804 session->client->store((int64_t)xs->xid.bqual_length);
805 session->client->store(xs->xid.data,
806 xs->xid.gtrid_length+xs->xid.bqual_length);
807 if (session->client->flush())
808 {
809 pthread_mutex_unlock(&LOCK_xid_cache);
810 return 1;
811 }
812 }
813 }
814
815 pthread_mutex_unlock(&LOCK_xid_cache);
816 session->my_eof();
817 return 0;
818}
819
820struct ResourceContextCompare : public std::binary_function<ResourceContext *, ResourceContext *, bool>714struct ResourceContextCompare : public std::binary_function<ResourceContext *, ResourceContext *, bool>
821{715{
822 result_type operator()(const ResourceContext *lhs, const ResourceContext *rhs) const716 result_type operator()(const ResourceContext *lhs, const ResourceContext *rhs) const
@@ -827,7 +721,7 @@
827 }721 }
828};722};
829723
830int TransactionServices::ha_rollback_to_savepoint(Session *session, NamedSavepoint &sv)724int TransactionServices::rollbackToSavepoint(Session *session, NamedSavepoint &sv)
831{725{
832 int error= 0;726 int error= 0;
833 TransactionContext *trans= &session->transaction.all;727 TransactionContext *trans= &session->transaction.all;
@@ -923,7 +817,7 @@
923 section "4.33.4 SQL-statements and transaction states",817 section "4.33.4 SQL-statements and transaction states",
924 NamedSavepoint is *not* transaction-initiating SQL-statement818 NamedSavepoint is *not* transaction-initiating SQL-statement
925*/819*/
926int TransactionServices::ha_savepoint(Session *session, NamedSavepoint &sv)820int TransactionServices::setSavepoint(Session *session, NamedSavepoint &sv)
927{821{
928 int error= 0;822 int error= 0;
929 TransactionContext *trans= &session->transaction.all;823 TransactionContext *trans= &session->transaction.all;
@@ -961,7 +855,7 @@
961 return error;855 return error;
962}856}
963857
964int TransactionServices::ha_release_savepoint(Session *session, NamedSavepoint &sv)858int TransactionServices::releaseSavepoint(Session *session, NamedSavepoint &sv)
965{859{
966 int error= 0;860 int error= 0;
967861
@@ -988,6 +882,12 @@
988 return error;882 return error;
989}883}
990884
885bool TransactionServices::shouldConstructMessages()
886{
887 ReplicationServices &replication_services= ReplicationServices::singleton();
888 return replication_services.isActive();
889}
890
991message::Transaction *TransactionServices::getActiveTransactionMessage(Session *in_session)891message::Transaction *TransactionServices::getActiveTransactionMessage(Session *in_session)
992{892{
993 message::Transaction *transaction= in_session->getTransactionMessage();893 message::Transaction *transaction= in_session->getTransactionMessage();
@@ -1013,7 +913,7 @@
1013{913{
1014 message::TransactionContext *trx= in_transaction.mutable_transaction_context();914 message::TransactionContext *trx= in_transaction.mutable_transaction_context();
1015 trx->set_server_id(in_session->getServerId());915 trx->set_server_id(in_session->getServerId());
1016 trx->set_transaction_id(in_session->getQueryId());916 trx->set_transaction_id(getNextTransactionId());
1017 trx->set_start_timestamp(in_session->getCurrentTimestamp());917 trx->set_start_timestamp(in_session->getCurrentTimestamp());
1018}918}
1019919
@@ -1032,11 +932,11 @@
1032 in_session->setTransactionMessage(NULL);932 in_session->setTransactionMessage(NULL);
1033}933}
1034934
1035void TransactionServices::commitTransactionMessage(Session *in_session)935int TransactionServices::commitTransactionMessage(Session *in_session)
1036{936{
1037 ReplicationServices &replication_services= ReplicationServices::singleton();937 ReplicationServices &replication_services= ReplicationServices::singleton();
1038 if (! replication_services.isActive())938 if (! replication_services.isActive())
1039 return;939 return 0;
1040940
1041 /* If there is an active statement message, finalize it */941 /* If there is an active statement message, finalize it */
1042 message::Statement *statement= in_session->getStatementMessage();942 message::Statement *statement= in_session->getStatementMessage();
@@ -1046,15 +946,17 @@
1046 finalizeStatementMessage(*statement, in_session);946 finalizeStatementMessage(*statement, in_session);
1047 }947 }
1048 else948 else
1049 return; /* No data modification occurred inside the transaction */949 return 0; /* No data modification occurred inside the transaction */
1050 950
1051 message::Transaction* transaction= getActiveTransactionMessage(in_session);951 message::Transaction* transaction= getActiveTransactionMessage(in_session);
1052952
1053 finalizeTransactionMessage(*transaction, in_session);953 finalizeTransactionMessage(*transaction, in_session);
1054 954
1055 replication_services.pushTransactionMessage(*transaction);955 plugin::ReplicationReturnCode result= replication_services.pushTransactionMessage(*in_session, *transaction);
1056956
1057 cleanupTransactionMessage(transaction, in_session);957 cleanupTransactionMessage(transaction, in_session);
958
959 return static_cast<int>(result);
1058}960}
1059961
1060void TransactionServices::initStatementMessage(message::Statement &statement,962void TransactionServices::initStatementMessage(message::Statement &statement,
@@ -1114,7 +1016,7 @@
11141016
1115 finalizeTransactionMessage(*transaction, in_session);1017 finalizeTransactionMessage(*transaction, in_session);
1116 1018
1117 replication_services.pushTransactionMessage(*transaction);1019 (void) replication_services.pushTransactionMessage(*in_session, *transaction);
1118 }1020 }
1119 cleanupTransactionMessage(transaction, in_session);1021 cleanupTransactionMessage(transaction, in_session);
1120}1022}
@@ -1552,7 +1454,7 @@
15521454
1553 finalizeTransactionMessage(*transaction, in_session);1455 finalizeTransactionMessage(*transaction, in_session);
1554 1456
1555 replication_services.pushTransactionMessage(*transaction);1457 (void) replication_services.pushTransactionMessage(*in_session, *transaction);
15561458
1557 cleanupTransactionMessage(transaction, in_session);1459 cleanupTransactionMessage(transaction, in_session);
15581460
@@ -1582,7 +1484,7 @@
15821484
1583 finalizeTransactionMessage(*transaction, in_session);1485 finalizeTransactionMessage(*transaction, in_session);
1584 1486
1585 replication_services.pushTransactionMessage(*transaction);1487 (void) replication_services.pushTransactionMessage(*in_session, *transaction);
15861488
1587 cleanupTransactionMessage(transaction, in_session);1489 cleanupTransactionMessage(transaction, in_session);
15881490
@@ -1611,7 +1513,7 @@
16111513
1612 finalizeTransactionMessage(*transaction, in_session);1514 finalizeTransactionMessage(*transaction, in_session);
1613 1515
1614 replication_services.pushTransactionMessage(*transaction);1516 (void) replication_services.pushTransactionMessage(*in_session, *transaction);
16151517
1616 cleanupTransactionMessage(transaction, in_session);1518 cleanupTransactionMessage(transaction, in_session);
1617}1519}
@@ -1647,7 +1549,7 @@
16471549
1648 finalizeTransactionMessage(*transaction, in_session);1550 finalizeTransactionMessage(*transaction, in_session);
1649 1551
1650 replication_services.pushTransactionMessage(*transaction);1552 (void) replication_services.pushTransactionMessage(*in_session, *transaction);
16511553
1652 cleanupTransactionMessage(transaction, in_session);1554 cleanupTransactionMessage(transaction, in_session);
1653}1555}
@@ -1682,7 +1584,7 @@
16821584
1683 finalizeTransactionMessage(*transaction, in_session);1585 finalizeTransactionMessage(*transaction, in_session);
1684 1586
1685 replication_services.pushTransactionMessage(*transaction);1587 (void) replication_services.pushTransactionMessage(*in_session, *transaction);
16861588
1687 cleanupTransactionMessage(transaction, in_session);1589 cleanupTransactionMessage(transaction, in_session);
1688}1590}
@@ -1702,7 +1604,7 @@
17021604
1703 finalizeTransactionMessage(*transaction, in_session);1605 finalizeTransactionMessage(*transaction, in_session);
1704 1606
1705 replication_services.pushTransactionMessage(*transaction);1607 (void) replication_services.pushTransactionMessage(*in_session, *transaction);
17061608
1707 cleanupTransactionMessage(transaction, in_session);1609 cleanupTransactionMessage(transaction, in_session);
1708}1610}
17091611
=== modified file 'drizzled/transaction_services.h'
--- drizzled/transaction_services.h 2010-03-16 21:30:44 +0000
+++ drizzled/transaction_services.h 2010-04-06 19:16:34 +0000
@@ -25,6 +25,7 @@
25#ifndef DRIZZLED_TRANSACTION_SERVICES_H25#ifndef DRIZZLED_TRANSACTION_SERVICES_H
26#define DRIZZLED_TRANSACTION_SERVICES_H26#define DRIZZLED_TRANSACTION_SERVICES_H
2727
28#include "drizzled/atomics.h"
28#include "drizzled/message/transaction.pb.h"29#include "drizzled/message/transaction.pb.h"
2930
30namespace drizzled31namespace drizzled
@@ -49,10 +50,17 @@
49{50{
50public:51public:
51 static const size_t DEFAULT_RECORD_SIZE= 100;52 static const size_t DEFAULT_RECORD_SIZE= 100;
53 typedef uint64_t TransactionId;
52 /**54 /**
53 * Constructor55 * Constructor
54 */56 */
55 TransactionServices() {}57 TransactionServices()
58 {
59 /**
60 * @todo set transaction ID to the last one from an applier...
61 */
62 current_transaction_id= 0;
63 }
5664
57 /**65 /**
58 * Singleton method66 * Singleton method
@@ -63,6 +71,12 @@
63 static TransactionServices transaction_services;71 static TransactionServices transaction_services;
64 return transaction_services;72 return transaction_services;
65 }73 }
74
75 /**
76 * Returns true if the transaction manager should construct
77 * Transaction and Statement messages, false otherwise.
78 */
79 bool shouldConstructMessages();
66 /**80 /**
67 * Method which returns the active Transaction message81 * Method which returns the active Transaction message
68 * for the supplied Session. If one is not found, a new Transaction82 * for the supplied Session. If one is not found, a new Transaction
@@ -189,7 +203,7 @@
189 *203 *
190 * @param Pointer to the Session committing the transaction204 * @param Pointer to the Session committing the transaction
191 */205 */
192 void commitTransactionMessage(Session *in_session);206 int commitTransactionMessage(Session *in_session);
193 /** 207 /**
194 * Marks the current active transaction message as being rolled back and208 * Marks the current active transaction message as being rolled back and
195 * pushes the transaction message out to replicators.209 * pushes the transaction message out to replicators.
@@ -293,18 +307,17 @@
293 */307 */
294 void rawStatement(Session *in_session, const std::string &query);308 void rawStatement(Session *in_session, const std::string &query);
295 /* transactions: interface to plugin::StorageEngine functions */309 /* transactions: interface to plugin::StorageEngine functions */
296 int ha_commit_one_phase(Session *session, bool all);310 int commitPhaseOne(Session *session, bool all);
297 int ha_rollback_trans(Session *session, bool all);311 int rollbackTransaction(Session *session, bool all);
298312
299 /* transactions: these functions never call plugin::StorageEngine functions directly */313 /* transactions: these functions never call plugin::StorageEngine functions directly */
300 int ha_commit_trans(Session *session, bool all);314 int commitTransaction(Session *session, bool all);
301 int ha_autocommit_or_rollback(Session *session, int error);315 int autocommitOrRollback(Session *session, int error);
302316
303 /* savepoints */317 /* savepoints */
304 int ha_rollback_to_savepoint(Session *session, NamedSavepoint &sv);318 int rollbackToSavepoint(Session *session, NamedSavepoint &sv);
305 int ha_savepoint(Session *session, NamedSavepoint &sv);319 int setSavepoint(Session *session, NamedSavepoint &sv);
306 int ha_release_savepoint(Session *session, NamedSavepoint &sv);320 int releaseSavepoint(Session *session, NamedSavepoint &sv);
307 bool mysql_xa_recover(Session *session);
308321
309 /**322 /**
310 * Marks a storage engine as participating in a statement323 * Marks a storage engine as participating in a statement
@@ -383,6 +396,23 @@
383 plugin::MonitoredInTransaction *monitored,396 plugin::MonitoredInTransaction *monitored,
384 plugin::TransactionalStorageEngine *engine,397 plugin::TransactionalStorageEngine *engine,
385 plugin::XaResourceManager *resource_manager);398 plugin::XaResourceManager *resource_manager);
399 TransactionId getNextTransactionId()
400 {
401 return current_transaction_id.increment();
402 }
403 TransactionId getCurrentTransactionId()
404 {
405 return current_transaction_id;
406 }
407 /**
408 * DEBUG ONLY. See plugin::TransactionLog::truncate()
409 */
410 void resetTransactionId()
411 {
412 current_transaction_id= 0;
413 }
414private:
415 atomic<TransactionId> current_transaction_id;
386};416};
387417
388} /* namespace drizzled */418} /* namespace drizzled */
389419
=== modified file 'plugin/default_replicator/default_replicator.cc'
--- plugin/default_replicator/default_replicator.cc 2010-03-06 02:08:13 +0000
+++ plugin/default_replicator/default_replicator.cc 2010-04-06 19:16:34 +0000
@@ -62,13 +62,16 @@
62 sysvar_default_replicator_enable= false;62 sysvar_default_replicator_enable= false;
63}63}
6464
65void DefaultReplicator::replicate(plugin::TransactionApplier *in_applier, message::Transaction &to_replicate)65plugin::ReplicationReturnCode
66DefaultReplicator::replicate(plugin::TransactionApplier *in_applier,
67 Session &in_session,
68 message::Transaction &to_replicate)
66{69{
67 /* 70 /*
68 * We do absolutely nothing but call the applier's apply() method, passing71 * We do absolutely nothing but call the applier's apply() method, passing
69 * along the supplied Transaction. Yep, told you it was simple...72 * along the supplied Transaction. Yep, told you it was simple...
70 */73 */
71 in_applier->apply(to_replicate);74 return in_applier->apply(in_session, to_replicate);
72}75}
7376
74static DefaultReplicator *default_replicator= NULL; /* The singleton replicator */77static DefaultReplicator *default_replicator= NULL; /* The singleton replicator */
7578
=== modified file 'plugin/default_replicator/default_replicator.h'
--- plugin/default_replicator/default_replicator.h 2009-10-19 19:19:38 +0000
+++ plugin/default_replicator/default_replicator.h 2010-04-06 19:16:34 +0000
@@ -66,9 +66,14 @@
66 * the supplied message to their own controlled memory storage66 * the supplied message to their own controlled memory storage
67 * area.67 * area.
68 *68 *
69 * @param Applier to replicate to
70 * @param Session descriptor
69 * @param Transaction message to be replicated71 * @param Transaction message to be replicated
70 */72 */
71 void replicate(drizzled::plugin::TransactionApplier *in_applier, drizzled::message::Transaction &to_replicate);73 drizzled::plugin::ReplicationReturnCode
74 replicate(drizzled::plugin::TransactionApplier *in_applier,
75 drizzled::Session &in_session,
76 drizzled::message::Transaction &to_replicate);
72 77
73};78};
7479
7580
=== modified file 'plugin/filtered_replicator/filtered_replicator.cc'
--- plugin/filtered_replicator/filtered_replicator.cc 2010-03-06 02:08:13 +0000
+++ plugin/filtered_replicator/filtered_replicator.cc 2010-04-06 19:16:34 +0000
@@ -214,8 +214,10 @@
214 sysvar_filtered_replicator_enabled= false;214 sysvar_filtered_replicator_enabled= false;
215}215}
216216
217void FilteredReplicator::replicate(plugin::TransactionApplier *in_applier, 217plugin::ReplicationReturnCode
218 message::Transaction &to_replicate)218FilteredReplicator::replicate(plugin::TransactionApplier *in_applier,
219 Session &in_session,
220 message::Transaction &to_replicate)
219{221{
220 string schema_name;222 string schema_name;
221 string table_name;223 string table_name;
@@ -287,8 +289,9 @@
287 */289 */
288 message::TransactionContext *tc= filtered_transaction.mutable_transaction_context();290 message::TransactionContext *tc= filtered_transaction.mutable_transaction_context();
289 *tc= to_replicate.transaction_context(); /* copy construct */291 *tc= to_replicate.transaction_context(); /* copy construct */
290 in_applier->apply(filtered_transaction);292 return in_applier->apply(in_session, filtered_transaction);
291 }293 }
294 return plugin::SUCCESS;
292}295}
293296
294void FilteredReplicator::populateFilter(std::string input,297void FilteredReplicator::populateFilter(std::string input,
295298
=== modified file 'plugin/filtered_replicator/filtered_replicator.h'
--- plugin/filtered_replicator/filtered_replicator.h 2009-10-19 19:19:38 +0000
+++ plugin/filtered_replicator/filtered_replicator.h 2010-04-06 19:16:34 +0000
@@ -84,10 +84,14 @@
84 * the supplied message to their own controlled memory storage84 * the supplied message to their own controlled memory storage
85 * area.85 * area.
86 *86 *
87 * @param Applier to replicate to
88 * @param Session descriptor
87 * @param Transaction message to be replicated89 * @param Transaction message to be replicated
88 */90 */
89 void replicate(drizzled::plugin::TransactionApplier *in_applier, 91 drizzled::plugin::ReplicationReturnCode
90 drizzled::message::Transaction &to_replicate);92 replicate(drizzled::plugin::TransactionApplier *in_applier,
93 drizzled::Session &in_session,
94 drizzled::message::Transaction &to_replicate);
91 95
92 /**96 /**
93 * Populate the vector of schemas to filter from the97 * Populate the vector of schemas to filter from the
9498
=== modified file 'plugin/innobase/handler/ha_innodb.cc'
--- plugin/innobase/handler/ha_innodb.cc 2010-04-01 15:54:57 +0000
+++ plugin/innobase/handler/ha_innodb.cc 2010-04-06 19:16:34 +0000
@@ -2259,11 +2259,6 @@
2259 pthread_mutex_unlock(&commit_cond_m);2259 pthread_mutex_unlock(&commit_cond_m);
2260 }2260 }
22612261
2262 if (trx->conc_state == TRX_PREPARED) {
2263
2264 pthread_mutex_unlock(&prepare_commit_mutex);
2265 }
2266
2267 /* Now do a write + flush of logs. */2262 /* Now do a write + flush of logs. */
2268 trx_commit_complete_for_mysql(trx);2263 trx_commit_complete_for_mysql(trx);
22692264
@@ -8150,31 +8145,6 @@
81508145
8151 srv_active_wake_master_thread();8146 srv_active_wake_master_thread();
81528147
8153 if (all || !session_test_options(session, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))
8154 {
8155
8156 /* For ibbackup to work the order of transactions in binlog
8157 and InnoDB must be the same. Consider the situation
8158
8159 thread1> prepare; write to binlog; ...
8160 <context switch>
8161 thread2> prepare; write to binlog; commit
8162 thread1> ... commit
8163
8164 To ensure this will not happen we're taking the mutex on
8165 prepare, and releasing it on commit.
8166
8167 Note: only do it for normal commits, done via ha_commit_trans.
8168 If 2pc protocol is executed by external transaction
8169 coordinator, it will be just a regular MySQL client
8170 executing XA PREPARE and XA COMMIT commands.
8171 In this case we cannot know how many minutes or hours
8172 will be between XA PREPARE and XA COMMIT, and we don't want
8173 to block for undefined period of time.
8174 */
8175 pthread_mutex_lock(&prepare_commit_mutex);
8176 trx->conc_state = TRX_PREPARED;
8177 }
8178 return(error);8148 return(error);
8179}8149}
81808150
81818151
=== modified file 'plugin/transaction_log/module.cc'
--- plugin/transaction_log/module.cc 2010-03-15 22:27:00 +0000
+++ plugin/transaction_log/module.cc 2010-04-06 19:16:34 +0000
@@ -61,6 +61,10 @@
61 * in truncating the log file. 61 * in truncating the log file.
62 */62 */
63static bool sysvar_transaction_log_truncate_debug= false;63static bool sysvar_transaction_log_truncate_debug= false;
64/**
65 * The name of the main transaction log file on disk. With no prefix,
66 * this goes into Drizzle's $datadir.
67 */
64static const char DEFAULT_LOG_FILE_PATH[]= "transaction.log"; /* In datadir... */68static const char DEFAULT_LOG_FILE_PATH[]= "transaction.log"; /* In datadir... */
65/** 69/**
66 * Transaction Log plugin system variable - Should we write a CRC32 checksum for 70 * Transaction Log plugin system variable - Should we write a CRC32 checksum for
@@ -76,6 +80,11 @@
76 * TransactionLog::SYNC_METHOD_EVERY_SECOND == 2 ... sync at most once a second80 * TransactionLog::SYNC_METHOD_EVERY_SECOND == 2 ... sync at most once a second
77 */81 */
78static uint32_t sysvar_transaction_log_sync_method= 0;82static uint32_t sysvar_transaction_log_sync_method= 0;
83/**
84 * Transaction Log plugin system variable - Number of slots to create
85 * for managing write buffers
86 */
87static uint32_t sysvar_transaction_log_num_write_buffers= 8;
7988
80/** DATA_DICTIONARY views */89/** DATA_DICTIONARY views */
81static TransactionLogTool *transaction_log_tool;90static TransactionLogTool *transaction_log_tool;
@@ -99,7 +108,8 @@
99 if (sysvar_transaction_log_enabled)108 if (sysvar_transaction_log_enabled)
100 {109 {
101 transaction_log= new (nothrow) TransactionLog(string(sysvar_transaction_log_file),110 transaction_log= new (nothrow) TransactionLog(string(sysvar_transaction_log_file),
102 sysvar_transaction_log_sync_method);111 sysvar_transaction_log_sync_method,
112 sysvar_transaction_log_checksum_enabled);
103113
104 if (transaction_log == NULL)114 if (transaction_log == NULL)
105 {115 {
@@ -120,7 +130,7 @@
120 /* Create the applier plugin and register it */130 /* Create the applier plugin and register it */
121 transaction_log_applier= new (nothrow) TransactionLogApplier("transaction_log_applier",131 transaction_log_applier= new (nothrow) TransactionLogApplier("transaction_log_applier",
122 *transaction_log, 132 *transaction_log,
123 sysvar_transaction_log_checksum_enabled);133 sysvar_transaction_log_num_write_buffers);
124 if (transaction_log_applier == NULL)134 if (transaction_log_applier == NULL)
125 {135 {
126 errmsg_printf(ERRMSG_LVL_ERROR, _("Failed to allocate the TransactionLogApplier instance. Got error: %s\n"), 136 errmsg_printf(ERRMSG_LVL_ERROR, _("Failed to allocate the TransactionLogApplier instance. Got error: %s\n"),
@@ -212,7 +222,7 @@
212 set_truncate_debug, /* update func */222 set_truncate_debug, /* update func */
213 false /* default */);223 false /* default */);
214224
215static DRIZZLE_SYSVAR_STR(log_file,225static DRIZZLE_SYSVAR_STR(file,
216 sysvar_transaction_log_file,226 sysvar_transaction_log_file,
217 PLUGIN_VAR_READONLY,227 PLUGIN_VAR_READONLY,
218 N_("Path to the file to use for transaction log"),228 N_("Path to the file to use for transaction log"),
@@ -241,12 +251,24 @@
241 2,251 2,
242 0);252 0);
243253
254static DRIZZLE_SYSVAR_UINT(num_write_buffers,
255 sysvar_transaction_log_num_write_buffers,
256 PLUGIN_VAR_OPCMDARG,
257 N_("Number of slots for in-memory write buffers (default=8)."),
258 NULL, /* check func */
259 NULL, /* update func */
260 8, /* default */
261 4,
262 8192,
263 0);
264
244static drizzle_sys_var* sys_variables[]= {265static drizzle_sys_var* sys_variables[]= {
245 DRIZZLE_SYSVAR(enable),266 DRIZZLE_SYSVAR(enable),
246 DRIZZLE_SYSVAR(truncate_debug),267 DRIZZLE_SYSVAR(truncate_debug),
247 DRIZZLE_SYSVAR(log_file),268 DRIZZLE_SYSVAR(file),
248 DRIZZLE_SYSVAR(enable_checksum),269 DRIZZLE_SYSVAR(enable_checksum),
249 DRIZZLE_SYSVAR(sync_method),270 DRIZZLE_SYSVAR(sync_method),
271 DRIZZLE_SYSVAR(num_write_buffers),
250 NULL272 NULL
251};273};
252274
253275
=== modified file 'plugin/transaction_log/plugin.ini'
--- plugin/transaction_log/plugin.ini 2010-03-05 18:08:49 +0000
+++ plugin/transaction_log/plugin.ini 2010-04-06 19:16:34 +0000
@@ -6,8 +6,8 @@
6title=Transaction Log6title=Transaction Log
7description=Log of Transaction Messages7description=Log of Transaction Messages
8load_by_default=yes8load_by_default=yes
9sources= background_worker.cc hexdump_transaction_message.cc module.cc print_transaction_message.cc transaction_log.cc transaction_log_applier.cc transaction_log_entry.cc transaction_log_index.cc transaction_log_reader.cc data_dictionary_schema.cc9sources= background_worker.cc hexdump_transaction_message.cc module.cc print_transaction_message.cc transaction_log.cc transaction_log_applier.cc transaction_log_entry.cc transaction_log_index.cc transaction_log_reader.cc data_dictionary_schema.cc write_buffer.cc
10headers= background_worker.h hexdump_transaction_message.h print_transaction_message.h transaction_log.h transaction_log_applier.h transaction_log_entry.h transaction_log_index.h transaction_log_reader.h data_dictionary_schema.h10headers= background_worker.h hexdump_transaction_message.h print_transaction_message.h transaction_log.h transaction_log_applier.h transaction_log_entry.h transaction_log_index.h transaction_log_reader.h data_dictionary_schema.h write_buffer.h
11libs=${top_builddir}/drizzled/algorithm/libhash.la 11libs=${top_builddir}/drizzled/algorithm/libhash.la
12libadd=$(LIBZ)12libadd=$(LIBZ)
13cxxflags=${PROTOSKIP_WARNINGS}13cxxflags=${PROTOSKIP_WARNINGS}
1414
=== added file 'plugin/transaction_log/tests/r/ddl_transaction_id.result'
--- plugin/transaction_log/tests/r/ddl_transaction_id.result 1970-01-01 00:00:00 +0000
+++ plugin/transaction_log/tests/r/ddl_transaction_id.result 2010-04-06 19:16:34 +0000
@@ -0,0 +1,36 @@
1SET GLOBAL transaction_log_truncate_debug= true;
2DROP TABLE IF EXISTS t1;
3CREATE TABLE t1 (
4id INT NOT NULL PRIMARY KEY
5, padding VARCHAR(200) NOT NULL
6);
7INSERT INTO t1 VALUES (1, "I love testing.");
8INSERT INTO t1 VALUES (2, "I hate testing.");
9SELECT * FROM t1;
10id padding
111 I love testing.
122 I hate testing.
13ALTER TABLE t1 CHANGE COLUMN padding less_padding VARCHAR(180) NOT NULL;
14SELECT * FROM t1 WHERE id = 3;
15id less_padding
16DROP TABLE t1;
17SELECT * FROM DATA_DICTIONARY.TRANSACTION_LOG;
18FILE_NAME FILE_LENGTH NUM_LOG_ENTRIES NUM_TRANSACTIONS MIN_TRANSACTION_ID MAX_TRANSACTION_ID MIN_END_TIMESTAMP MAX_END_TIMESTAMP INDEX_SIZE_IN_BYTES
19transaction.log X 6 6 1 6 X X X
20SELECT * FROM DATA_DICTIONARY.TRANSACTION_LOG_ENTRIES;
21ENTRY_OFFSET ENTRY_TYPE ENTRY_LENGTH
22X TRANSACTION X
23X TRANSACTION X
24X TRANSACTION X
25X TRANSACTION X
26X TRANSACTION X
27X TRANSACTION X
28SELECT * FROM DATA_DICTIONARY.TRANSACTION_LOG_TRANSACTIONS;
29ENTRY_OFFSET TRANSACTION_ID SERVER_ID START_TIMESTAMP END_TIMESTAMP NUM_STATEMENTS CHECKSUM
30X 1 1 X X 1 0
31X 2 1 X X 1 0
32X 3 1 X X 1 0
33X 4 1 X X 1 0
34X 5 1 X X 1 0
35X 6 1 X X 1 0
36SET GLOBAL transaction_log_truncate_debug= true;
037
=== modified file 'plugin/transaction_log/tests/r/filtered_replicator.result'
--- plugin/transaction_log/tests/r/filtered_replicator.result 2010-02-27 23:47:33 +0000
+++ plugin/transaction_log/tests/r/filtered_replicator.result 2010-04-06 19:16:34 +0000
@@ -6,6 +6,7 @@
6);6);
7INSERT INTO t1 VALUES (1, "I love testing.");7INSERT INTO t1 VALUES (1, "I love testing.");
8INSERT INTO t1 VALUES (2, "I hate testing.");8INSERT INTO t1 VALUES (2, "I hate testing.");
9DROP TABLE t1;
9DROP TABLE IF EXISTS t1;10DROP TABLE IF EXISTS t1;
10CREATE TABLE t1 (11CREATE TABLE t1 (
11id INT NOT NULL PRIMARY KEY12id INT NOT NULL PRIMARY KEY
@@ -134,6 +135,9 @@
134INSERT INTO `test`.`t1` (`id`,`padding`) VALUES (2,'I hate testing.');135INSERT INTO `test`.`t1` (`id`,`padding`) VALUES (2,'I hate testing.');
135COMMIT;136COMMIT;
136START TRANSACTION;137START TRANSACTION;
138DROP TABLE `test`.`t1`;
139COMMIT;
140START TRANSACTION;
137DROP TABLE IF EXISTS `test`.`t1`;141DROP TABLE IF EXISTS `test`.`t1`;
138COMMIT;142COMMIT;
139START TRANSACTION;143START TRANSACTION;
140144
=== modified file 'plugin/transaction_log/tests/r/information_schema.result'
--- plugin/transaction_log/tests/r/information_schema.result 2010-03-05 04:44:58 +0000
+++ plugin/transaction_log/tests/r/information_schema.result 2010-04-06 19:16:34 +0000
@@ -6,19 +6,22 @@
6);6);
7INSERT INTO t1 VALUES (1, "I love testing.");7INSERT INTO t1 VALUES (1, "I love testing.");
8INSERT INTO t1 VALUES (2, "I hate testing.");8INSERT INTO t1 VALUES (2, "I hate testing.");
9DROP TABLE t1;
9SELECT * FROM DATA_DICTIONARY.TRANSACTION_LOG;10SELECT * FROM DATA_DICTIONARY.TRANSACTION_LOG;
10FILE_NAME FILE_LENGTH NUM_LOG_ENTRIES NUM_TRANSACTIONS MIN_TRANSACTION_ID MAX_TRANSACTION_ID MIN_END_TIMESTAMP MAX_END_TIMESTAMP INDEX_SIZE_IN_BYTES11FILE_NAME FILE_LENGTH NUM_LOG_ENTRIES NUM_TRANSACTIONS MIN_TRANSACTION_ID MAX_TRANSACTION_ID MIN_END_TIMESTAMP MAX_END_TIMESTAMP INDEX_SIZE_IN_BYTES
11transaction.log X 4 4 X X X X X12transaction.log X 5 5 1 5 X X X
12SELECT * FROM DATA_DICTIONARY.TRANSACTION_LOG_ENTRIES;13SELECT * FROM DATA_DICTIONARY.TRANSACTION_LOG_ENTRIES;
13ENTRY_OFFSET ENTRY_TYPE ENTRY_LENGTH14ENTRY_OFFSET ENTRY_TYPE ENTRY_LENGTH
14X TRANSACTION X15X TRANSACTION X
15X TRANSACTION X16X TRANSACTION X
16X TRANSACTION X17X TRANSACTION X
17X TRANSACTION X18X TRANSACTION X
19X TRANSACTION X
18SELECT * FROM DATA_DICTIONARY.TRANSACTION_LOG_TRANSACTIONS;20SELECT * FROM DATA_DICTIONARY.TRANSACTION_LOG_TRANSACTIONS;
19ENTRY_OFFSET TRANSACTION_ID SERVER_ID START_TIMESTAMP END_TIMESTAMP NUM_STATEMENTS CHECKSUM21ENTRY_OFFSET TRANSACTION_ID SERVER_ID START_TIMESTAMP END_TIMESTAMP NUM_STATEMENTS CHECKSUM
20X X 1 X X 1 022X 1 1 X X 1 0
21X X 1 X X 1 023X 2 1 X X 1 0
22X X 1 X X 1 024X 3 1 X X 1 0
23X X 1 X X 1 025X 4 1 X X 1 0
26X 5 1 X X 1 0
24SET GLOBAL transaction_log_truncate_debug= true;27SET GLOBAL transaction_log_truncate_debug= true;
2528
=== modified file 'plugin/transaction_log/tests/r/insert.result'
--- plugin/transaction_log/tests/r/insert.result 2010-02-27 23:47:33 +0000
+++ plugin/transaction_log/tests/r/insert.result 2010-04-06 19:16:34 +0000
@@ -5,6 +5,7 @@
5);5);
6INSERT INTO t1 VALUES (1, "I love testing.");6INSERT INTO t1 VALUES (1, "I love testing.");
7INSERT INTO t1 VALUES (2, "I hate testing.");7INSERT INTO t1 VALUES (2, "I hate testing.");
8DROP TABLE t1;
8START TRANSACTION;9START TRANSACTION;
9DROP TABLE IF EXISTS `test`.`t1`;10DROP TABLE IF EXISTS `test`.`t1`;
10COMMIT;11COMMIT;
@@ -17,4 +18,7 @@
17START TRANSACTION;18START TRANSACTION;
18INSERT INTO `test`.`t1` (`id`,`padding`) VALUES (2,'I hate testing.');19INSERT INTO `test`.`t1` (`id`,`padding`) VALUES (2,'I hate testing.');
19COMMIT;20COMMIT;
21START TRANSACTION;
22DROP TABLE `test`.`t1`;
23COMMIT;
20SET GLOBAL transaction_log_truncate_debug= true;24SET GLOBAL transaction_log_truncate_debug= true;
2125
=== modified file 'plugin/transaction_log/tests/r/truncate_log.result'
--- plugin/transaction_log/tests/r/truncate_log.result 2010-03-05 04:44:58 +0000
+++ plugin/transaction_log/tests/r/truncate_log.result 2010-04-06 19:16:34 +0000
@@ -6,6 +6,7 @@
6);6);
7INSERT INTO t1 VALUES (1, "I love testing.");7INSERT INTO t1 VALUES (1, "I love testing.");
8INSERT INTO t1 VALUES (2, "I hate testing.");8INSERT INTO t1 VALUES (2, "I hate testing.");
924var/master-data/transaction.log9DROP TABLE t1;
1029var/master-data/transaction.log
10SET GLOBAL transaction_log_truncate_debug= true;11SET GLOBAL transaction_log_truncate_debug= true;
110var/master-data/transaction.log120var/master-data/transaction.log
1213
=== modified file 'plugin/transaction_log/tests/r/udf_print_transaction_message.result'
--- plugin/transaction_log/tests/r/udf_print_transaction_message.result 2010-02-11 04:18:03 +0000
+++ plugin/transaction_log/tests/r/udf_print_transaction_message.result 2010-04-06 19:16:34 +0000
@@ -5,6 +5,7 @@
5);5);
6INSERT INTO t1 VALUES (1, "I love testing.");6INSERT INTO t1 VALUES (1, "I love testing.");
7INSERT INTO t1 VALUES (2, "I hate testing.");7INSERT INTO t1 VALUES (2, "I hate testing.");
8DROP TABLE t1;
8SELECT LENGTH(PRINT_TRANSACTION_MESSAGE('transaction.log', ENTRY_OFFSET)) > 0 as checked9SELECT LENGTH(PRINT_TRANSACTION_MESSAGE('transaction.log', ENTRY_OFFSET)) > 0 as checked
9FROM DATA_DICTIONARY.TRANSACTION_LOG_ENTRIES10FROM DATA_DICTIONARY.TRANSACTION_LOG_ENTRIES
10LIMIT 1;11LIMIT 1;
1112
=== renamed file 'plugin/transaction_log/tests/r/sync_method_every_write.result' => 'plugin/transaction_log/tests/r/variables.result'
--- plugin/transaction_log/tests/r/sync_method_every_write.result 2010-02-05 08:11:15 +0000
+++ plugin/transaction_log/tests/r/variables.result 2010-04-06 19:16:34 +0000
@@ -3,6 +3,7 @@
3VARIABLE_NAME VARIABLE_VALUE3VARIABLE_NAME VARIABLE_VALUE
4transaction_log_enable ON4transaction_log_enable ON
5transaction_log_enable_checksum OFF5transaction_log_enable_checksum OFF
6transaction_log_log_file transaction.log6transaction_log_file transaction.log
7transaction_log_num_write_buffers 8
7transaction_log_sync_method 18transaction_log_sync_method 1
8transaction_log_truncate_debug OFF9transaction_log_truncate_debug OFF
910
=== added file 'plugin/transaction_log/tests/t/ddl_transaction_id-master.opt'
--- plugin/transaction_log/tests/t/ddl_transaction_id-master.opt 1970-01-01 00:00:00 +0000
+++ plugin/transaction_log/tests/t/ddl_transaction_id-master.opt 2010-04-06 19:16:34 +0000
@@ -0,0 +1,1 @@
1--default-replicator-enable --transaction-log-enable --scheduler=multi_thread
02
=== added file 'plugin/transaction_log/tests/t/ddl_transaction_id.inc'
--- plugin/transaction_log/tests/t/ddl_transaction_id.inc 1970-01-01 00:00:00 +0000
+++ plugin/transaction_log/tests/t/ddl_transaction_id.inc 2010-04-06 19:16:34 +0000
@@ -0,0 +1,29 @@
1#
2# Tests ordering of transaction ID, that SELECT queries
3# do not increment the transaction ID, and that the proper
4# generation of transaction IDs is done for DDL operations.
5#
6
7--disable_warnings
8DROP TABLE IF EXISTS t1;
9--enable_warnings
10
11CREATE TABLE t1 (
12 id INT NOT NULL PRIMARY KEY
13, padding VARCHAR(200) NOT NULL
14);
15
16INSERT INTO t1 VALUES (1, "I love testing.");
17INSERT INTO t1 VALUES (2, "I hate testing.");
18
19SELECT * FROM t1;
20
21ALTER TABLE t1 CHANGE COLUMN padding less_padding VARCHAR(180) NOT NULL;
22
23#
24# Should be no result here...
25#
26
27SELECT * FROM t1 WHERE id = 3;
28
29DROP TABLE t1;
030
=== added file 'plugin/transaction_log/tests/t/ddl_transaction_id.test'
--- plugin/transaction_log/tests/t/ddl_transaction_id.test 1970-01-01 00:00:00 +0000
+++ plugin/transaction_log/tests/t/ddl_transaction_id.test 2010-04-06 19:16:34 +0000
@@ -0,0 +1,14 @@
1# Truncate the log file to reset for the next test
2--source ../plugin/transaction_log/tests/t/truncate_log.inc
3
4--source ../plugin/transaction_log/tests/t/ddl_transaction_id.inc
5
6--replace_column 2 X 7 X 8 X 9 X
7SELECT * FROM DATA_DICTIONARY.TRANSACTION_LOG;
8--replace_column 1 X 3 X
9SELECT * FROM DATA_DICTIONARY.TRANSACTION_LOG_ENTRIES;
10--replace_column 1 X 4 X 5 X
11SELECT * FROM DATA_DICTIONARY.TRANSACTION_LOG_TRANSACTIONS;
12
13# Truncate the log file to reset for the next test
14--source ../plugin/transaction_log/tests/t/truncate_log.inc
015
=== modified file 'plugin/transaction_log/tests/t/information_schema.test'
--- plugin/transaction_log/tests/t/information_schema.test 2010-03-05 04:44:58 +0000
+++ plugin/transaction_log/tests/t/information_schema.test 2010-04-06 19:16:34 +0000
@@ -13,11 +13,11 @@
13# that the information contained in them matches13# that the information contained in them matches
14# the transaction log.14# the transaction log.
1515
16--replace_column 2 X 5 X 6 X 7 X 8 X 9 X16--replace_column 2 X 7 X 8 X 9 X
17SELECT * FROM DATA_DICTIONARY.TRANSACTION_LOG;17SELECT * FROM DATA_DICTIONARY.TRANSACTION_LOG;
18--replace_column 1 X 3 X18--replace_column 1 X 3 X
19SELECT * FROM DATA_DICTIONARY.TRANSACTION_LOG_ENTRIES;19SELECT * FROM DATA_DICTIONARY.TRANSACTION_LOG_ENTRIES;
20--replace_column 1 X 2 X 4 X 5 X20--replace_column 1 X 4 X 5 X
21SELECT * FROM DATA_DICTIONARY.TRANSACTION_LOG_TRANSACTIONS;21SELECT * FROM DATA_DICTIONARY.TRANSACTION_LOG_TRANSACTIONS;
2222
23# Truncate the log file to reset for the next test23# Truncate the log file to reset for the next test
2424
=== modified file 'plugin/transaction_log/tests/t/insert.inc'
--- plugin/transaction_log/tests/t/insert.inc 2009-12-11 19:08:29 +0000
+++ plugin/transaction_log/tests/t/insert.inc 2010-04-06 19:16:34 +0000
@@ -18,3 +18,5 @@
1818
19INSERT INTO t1 VALUES (1, "I love testing.");19INSERT INTO t1 VALUES (1, "I love testing.");
20INSERT INTO t1 VALUES (2, "I hate testing.");20INSERT INTO t1 VALUES (2, "I hate testing.");
21
22DROP TABLE t1;
2123
=== renamed file 'plugin/transaction_log/tests/t/sync_method_every_write-master.opt' => 'plugin/transaction_log/tests/t/variables-master.opt'
=== renamed file 'plugin/transaction_log/tests/t/sync_method_every_write.test' => 'plugin/transaction_log/tests/t/variables.test'
=== modified file 'plugin/transaction_log/transaction_log.cc'
--- plugin/transaction_log/transaction_log.cc 2010-03-10 18:22:56 +0000
+++ plugin/transaction_log/transaction_log.cc 2010-04-06 19:16:34 +0000
@@ -6,7 +6,7 @@
6 *6 *
7 * Authors:7 * Authors:
8 *8 *
9 * Jay Pipes <jaypipes@gmail.com.com>9 * Jay Pipes <jaypipes@gmail.com.com>
10 *10 *
11 * This program is free software; you can redistribute it and/or modify11 * This program is free software; you can redistribute it and/or modify
12 * it under the terms of the GNU General Public License as published by12 * it under the terms of the GNU General Public License as published by
@@ -30,31 +30,37 @@
30 *30 *
31 * @details31 * @details
32 *32 *
33 * Currently, the log file uses this implementation:33 * Currently, the transaction log file uses a simple, single-file, append-only
34 * format.
34 *35 *
35 * We have an atomic off_t called log_offset which keeps track of the 36 * We have an atomic off_t called log_offset which keeps track of the
36 * offset into the log file for writing the next Transaction.37 * offset into the log file for writing the next log entry. The log
37 *38 * entries are written, one after the other, in the following way:
38 * We write Transaction message encapsulated in an 8-byte length/type header and a39 *
39 * 4-byte checksum trailer.40 * <pre>
40 *41 * --------------------------------------
41 * When writing a Transaction to the log, we calculate the length of the 42 * |<- 4 bytes ->|<- # Bytes of Entry ->|
42 * Transaction to be written. We then increment log_offset by the length43 * --------------------------------------
43 * of the Transaction plus 2 * sizeof(uint32_t) plus sizeof(uint32_t) and store 44 * | Entry Type | Serialized Entry |
44 * this new offset in a local off_t called cur_offset (see TransactionLog::apply(). 45 * --------------------------------------
45 * This compare and set is done in an atomic instruction.46 * </pre>
46 *47 *
47 * We then adjust the local off_t (cur_offset) back to the original48 * The Entry Type is an integer defined as an enumeration in the
48 * offset by subtracting the length and sizeof(uint32_t) and sizeof(uint32_t).49 * /drizzled/message/transaction.proto file called TransactionLogEntry::Type.
49 *50 *
50 * We then first write a 64-bit length and then the serialized transaction/transaction51 * Each transaction log entry type is written to the log differently. Here,
51 * and optional checksum to our log file at our local cur_offset.52 * we cover the format of each log entry type.
52 *53 *
53 * --------------------------------------------------------------------------------54 * Committed and Prepared Transaction Log Entries
54 * |<- 4 bytes ->|<- 4 bytes ->|<- # Bytes of Transaction Message ->|<- 4 bytes ->|55 * -----------------------------------------------
55 * --------------------------------------------------------------------------------56 *
56 * | Msg Type | Length | Serialized Transaction Message | Checksum |57 * <pre>
57 * --------------------------------------------------------------------------------58 * ------------------------------------------------------------------
59 * |<- 4 bytes ->|<- # Bytes of Transaction Message ->|<- 4 bytes ->|
60 * ------------------------------------------------------------------
61 * | Length | Serialized Transaction Message | Checksum |
62 * ------------------------------------------------------------------
63 * </pre>
58 *64 *
59 * @todo65 * @todo
60 *66 *
@@ -77,19 +83,27 @@
77#include <drizzled/internal/my_sys.h> /* for internal::my_sync */83#include <drizzled/internal/my_sys.h> /* for internal::my_sync */
78#include <drizzled/errmsg_print.h>84#include <drizzled/errmsg_print.h>
79#include <drizzled/gettext.h>85#include <drizzled/gettext.h>
86#include <drizzled/message/transaction.pb.h>
87#include <drizzled/transaction_services.h>
88#include <drizzled/algorithm/crc32.h>
89
90#include <google/protobuf/io/coded_stream.h>
8091
81using namespace std;92using namespace std;
82using namespace drizzled;93using namespace drizzled;
94using namespace google;
8395
84TransactionLog *transaction_log= NULL; /* The singleton transaction log */96TransactionLog *transaction_log= NULL; /* The singleton transaction log */
8597
86TransactionLog::TransactionLog(const string in_log_file_path,98TransactionLog::TransactionLog(const string in_log_file_path,
87 uint32_t in_sync_method) : 99 uint32_t in_sync_method,
100 bool in_do_checksum) :
88 state(OFFLINE),101 state(OFFLINE),
89 log_file_path(in_log_file_path),102 log_file_path(in_log_file_path),
90 has_error(false),103 has_error(false),
91 error_message(),104 error_message(),
92 sync_method(in_sync_method)105 sync_method(in_sync_method),
106 do_checksum(in_do_checksum)
93{107{
94 /* Setup our log file and determine the next write offset... */108 /* Setup our log file and determine the next write offset... */
95 log_file= open(log_file_path.c_str(), O_APPEND|O_CREAT|O_SYNC|O_WRONLY, S_IRWXU);109 log_file= open(log_file_path.c_str(), O_APPEND|O_CREAT|O_SYNC|O_WRONLY, S_IRWXU);
@@ -133,6 +147,43 @@
133 }147 }
134}148}
135149
150uint8_t *TransactionLog::packTransactionIntoLogEntry(const message::Transaction &trx,
151 uint8_t *buffer,
152 uint32_t *checksum_out)
153{
154 uint8_t *orig_buffer= buffer;
155 size_t message_byte_length= trx.ByteSize();
156
157 /*
158 * Write the header information, which is the message type and
159 * the length of the transaction message into the buffer
160 */
161 buffer= protobuf::io::CodedOutputStream::WriteLittleEndian32ToArray(
162 static_cast<uint32_t>(ReplicationServices::TRANSACTION), buffer);
163 buffer= protobuf::io::CodedOutputStream::WriteLittleEndian32ToArray(
164 static_cast<uint32_t>(message_byte_length), buffer);
165
166 /*
167 * Now write the serialized transaction message, followed
168 * by the optional checksum into the buffer.
169 */
170 buffer= trx.SerializeWithCachedSizesToArray(buffer);
171
172 if (do_checksum)
173 {
174 *checksum_out= drizzled::algorithm::crc32(
175 reinterpret_cast<char *>(buffer) - message_byte_length, message_byte_length);
176 }
177 else
178 *checksum_out= 0;
179
180 /* We always write in network byte order */
181 buffer= protobuf::io::CodedOutputStream::WriteLittleEndian32ToArray(*checksum_out, buffer);
182 /* Reset the pointer back to its original location... */
183 buffer= orig_buffer;
184 return orig_buffer;
185}
186
136off_t TransactionLog::writeEntry(const uint8_t *data, size_t data_length)187off_t TransactionLog::writeEntry(const uint8_t *data, size_t data_length)
137{188{
138 ssize_t written= 0;189 ssize_t written= 0;
@@ -142,12 +193,6 @@
142 */193 */
143 off_t cur_offset= log_offset.fetch_and_add(static_cast<off_t>(data_length));194 off_t cur_offset= log_offset.fetch_and_add(static_cast<off_t>(data_length));
144195
145 /*
146 * We adjust cur_offset back to the original log_offset before
147 * the increment above...
148 */
149 cur_offset-= static_cast<off_t>(data_length);
150
151 /* 196 /*
152 * Quick safety...if an error occurs above in another writer, the log 197 * Quick safety...if an error occurs above in another writer, the log
153 * file will be in a crashed state.198 * file will be in a crashed state.
@@ -243,6 +288,7 @@
243 result= ftruncate(log_file, log_offset);288 result= ftruncate(log_file, log_offset);
244 }289 }
245 while (result == -1 && errno == EINTR);290 while (result == -1 && errno == EINTR);
291 drizzled::TransactionServices::singleton().resetTransactionId();
246}292}
247293
248bool TransactionLog::findLogFilenameContainingTransactionId(const ReplicationServices::GlobalTransactionId&,294bool TransactionLog::findLogFilenameContainingTransactionId(const ReplicationServices::GlobalTransactionId&,
@@ -272,3 +318,8 @@
272{318{
273 return error_message;319 return error_message;
274}320}
321
322size_t TransactionLog::getLogEntrySize(const message::Transaction &trx)
323{
324 return trx.ByteSize() + HEADER_TRAILER_BYTES;
325}
275326
=== modified file 'plugin/transaction_log/transaction_log.h'
--- plugin/transaction_log/transaction_log.h 2010-03-05 18:08:49 +0000
+++ plugin/transaction_log/transaction_log.h 2010-04-06 19:16:34 +0000
@@ -49,10 +49,6 @@
49class TransactionLog49class TransactionLog
50{50{
51public:51public:
52 static const uint32_t HEADER_TRAILER_BYTES= sizeof(uint32_t) + /* 4-byte msg type header */
53 sizeof(uint32_t) + /* 4-byte length header */
54 sizeof(uint32_t); /* 4 byte checksum trailer */
55
56 typedef std::vector<TransactionLogEntry> Entries;52 typedef std::vector<TransactionLogEntry> Entries;
57 typedef std::vector<TransactionLogTransactionEntry> TransactionEntries;53 typedef std::vector<TransactionLogTransactionEntry> TransactionEntries;
58 /**54 /**
@@ -70,7 +66,8 @@
70 static const uint32_t SYNC_METHOD_EVERY_SECOND= 2; ///< Sync no more than once a second66 static const uint32_t SYNC_METHOD_EVERY_SECOND= 2; ///< Sync no more than once a second
71public:67public:
72 TransactionLog(const std::string in_log_file_path,68 TransactionLog(const std::string in_log_file_path,
73 uint32_t in_sync_method);69 uint32_t in_sync_method,
70 bool in_do_checksum);
7471
75 /** Destructor */72 /** Destructor */
76 ~TransactionLog();73 ~TransactionLog();
@@ -102,6 +99,31 @@
102 }99 }
103100
104 /**101 /**
102 * Static helper method which returns the transaction
103 * log entry size in bytes of a given transaction
104 * message.
105 *
106 * @param[in] Transaction message
107 */
108 static size_t getLogEntrySize(const drizzled::message::Transaction &trx);
109
110 /**
111 * Method which packs into a raw byte buffer
112 * a transaction log entry. Supplied buffer should
113 * be of adequate size.
114 *
115 * Returns a pointer to the start of the original
116 * buffer.
117 *
118 * @param[in] Transaction message to pack
119 * @param[in] Raw byte buffer
120 * @param[out] Pointer to storage for checksum of message
121 */
122 uint8_t *packTransactionIntoLogEntry(const drizzled::message::Transaction &trx,
123 uint8_t *buffer,
124 uint32_t *checksum_out);
125
126 /**
105 * Writes a chunk of data to the log file of a specified127 * Writes a chunk of data to the log file of a specified
106 * length and returns the offset at which the chunk of128 * length and returns the offset at which the chunk of
107 * data was written.129 * data was written.
@@ -153,6 +175,10 @@
153 */175 */
154 const std::string &getErrorMessage() const;176 const std::string &getErrorMessage() const;
155private:177private:
178 static const uint32_t HEADER_TRAILER_BYTES= sizeof(uint32_t) + /* 4-byte msg type header */
179 sizeof(uint32_t) + /* 4-byte length header */
180 sizeof(uint32_t); /* 4 byte checksum trailer */
181
156 /* Don't allows these */182 /* Don't allows these */
157 TransactionLog();183 TransactionLog();
158 TransactionLog(const TransactionLog &other);184 TransactionLog(const TransactionLog &other);
@@ -181,6 +207,7 @@
181 std::string error_message; ///< Current error message207 std::string error_message; ///< Current error message
182 uint32_t sync_method; ///< Determines behaviour of syncing log file208 uint32_t sync_method; ///< Determines behaviour of syncing log file
183 time_t last_sync_time; ///< Last time the log file was synced (only set in SYNC_METHOD_EVERY_SECOND)209 time_t last_sync_time; ///< Last time the log file was synced (only set in SYNC_METHOD_EVERY_SECOND)
210 bool do_checksum; ///< Do a CRC32 checksum when writing Transaction message to log?
184};211};
185212
186#endif /* PLUGIN_TRANSACTION_LOG_TRANSACTION_LOG_H */213#endif /* PLUGIN_TRANSACTION_LOG_TRANSACTION_LOG_H */
187214
=== modified file 'plugin/transaction_log/transaction_log_applier.cc'
--- plugin/transaction_log/transaction_log_applier.cc 2010-03-29 22:01:35 +0000
+++ plugin/transaction_log/transaction_log_applier.cc 2010-04-06 19:16:34 +0000
@@ -6,7 +6,7 @@
6 *6 *
7 * Authors:7 * Authors:
8 *8 *
9 * Jay Pipes <jaypipes@gmail.com.com>9 * Jay Pipes <jaypipes@gmail.com>
10 *10 *
11 * This program is free software; you can redistribute it and/or modify11 * This program is free software; you can redistribute it and/or modify
12 * it under the terms of the GNU General Public License as published by12 * it under the terms of the GNU General Public License as published by
@@ -43,24 +43,19 @@
43 */43 */
4444
45#include "config.h"45#include "config.h"
46#include "write_buffer.h"
46#include "transaction_log.h"47#include "transaction_log.h"
47#include "transaction_log_applier.h"48#include "transaction_log_applier.h"
48#include "transaction_log_index.h"49#include "transaction_log_index.h"
4950
50#include <sys/stat.h>51#include <vector>
51#include <fcntl.h>
52#include <unistd.h>
53#include <errno.h>
5452
55#include <drizzled/errmsg_print.h>
56#include <drizzled/gettext.h>
57#include <drizzled/algorithm/crc32.h>
58#include <drizzled/message/transaction.pb.h>53#include <drizzled/message/transaction.pb.h>
59#include <google/protobuf/io/coded_stream.h>54#include <drizzled/util/functors.h>
55#include <drizzled/session.h>
6056
61using namespace std;57using namespace std;
62using namespace drizzled;58using namespace drizzled;
63using namespace google;
6459
65TransactionLogApplier *transaction_log_applier= NULL; /* The singleton transaction log applier */60TransactionLogApplier *transaction_log_applier= NULL; /* The singleton transaction log applier */
6661
@@ -68,79 +63,59 @@
6863
69TransactionLogApplier::TransactionLogApplier(const string name_arg,64TransactionLogApplier::TransactionLogApplier(const string name_arg,
70 TransactionLog &in_transaction_log,65 TransactionLog &in_transaction_log,
71 bool in_do_checksum) :66 uint32_t in_num_write_buffers) :
72 plugin::TransactionApplier(name_arg),67 plugin::TransactionApplier(name_arg),
73 transaction_log(in_transaction_log), 68 transaction_log(in_transaction_log),
74 do_checksum(in_do_checksum)69 num_write_buffers(in_num_write_buffers),
70 write_buffers()
75{71{
72 /*
73 * Create each of the buffers we need for undo log entries
74 */
75 write_buffers.reserve(num_write_buffers);
76 for (size_t x= 0; x < num_write_buffers; ++x)
77 {
78 write_buffers.push_back(new WriteBuffer());
79 }
76}80}
7781
78TransactionLogApplier::~TransactionLogApplier()82TransactionLogApplier::~TransactionLogApplier()
79{83{
80}84 for_each(write_buffers.begin(),
8185 write_buffers.end(),
82void TransactionLogApplier::apply(const message::Transaction &to_apply)86 DeletePtr());
83{87 write_buffers.clear();
84 uint8_t *buffer; /* Buffer we will write serialized header, 88}
85 message and trailing checksum to */89
86 uint8_t *orig_buffer;90WriteBuffer *TransactionLogApplier::getWriteBuffer(const Session &session)
8791{
88 size_t message_byte_length= to_apply.ByteSize();92 return write_buffers[session.getSessionId() % num_write_buffers];
89 size_t total_envelope_length= TransactionLog::HEADER_TRAILER_BYTES + message_byte_length;93}
9094
91 /* 95plugin::ReplicationReturnCode
92 * Attempt allocation of raw memory buffer for the header, 96TransactionLogApplier::apply(Session &in_session,
93 * message and trailing checksum bytes.97 const message::Transaction &to_apply)
94 */98{
95 buffer= static_cast<uint8_t *>(malloc(total_envelope_length));99 size_t entry_size= TransactionLog::getLogEntrySize(to_apply);
96 if (buffer == NULL)100 WriteBuffer *write_buffer= getWriteBuffer(in_session);
97 {101
98 errmsg_printf(ERRMSG_LVL_ERROR, 102 uint32_t checksum;
99 _("Failed to allocate enough memory to buffer header, "103
100 "transaction message, and trailing checksum bytes. Tried to allocate %" PRId64104 write_buffer->lock();
101 " bytes. Error: %s\n"), 105 write_buffer->resize(entry_size);
102 static_cast<int64_t>(total_envelope_length),106 uint8_t *bytes= write_buffer->getRawBytes();
103 strerror(errno));107 bytes= transaction_log.packTransactionIntoLogEntry(to_apply,
104 return;108 bytes,
105 }109 &checksum);
106 else110
107 orig_buffer= buffer; /* We will free() orig_buffer, as buffer is moved during write */111 off_t written_to= transaction_log.writeEntry(bytes, entry_size);
108112 write_buffer->unlock();
109 /*
110 * Write the header information, which is the message type and
111 * the length of the transaction message into the buffer
112 */
113 buffer= protobuf::io::CodedOutputStream::WriteLittleEndian32ToArray(
114 static_cast<uint32_t>(ReplicationServices::TRANSACTION), buffer);
115 buffer= protobuf::io::CodedOutputStream::WriteLittleEndian32ToArray(
116 static_cast<uint32_t>(message_byte_length), buffer);
117
118 /*
119 * Now write the serialized transaction message, followed
120 * by the optional checksum into the buffer.
121 */
122 buffer= to_apply.SerializeWithCachedSizesToArray(buffer);
123
124 uint32_t checksum= 0;
125 if (do_checksum)
126 {
127 checksum= drizzled::algorithm::crc32(
128 buffer - message_byte_length, message_byte_length);
129 }
130
131 /* We always write in network byte order */
132 buffer= protobuf::io::CodedOutputStream::WriteLittleEndian32ToArray(checksum, buffer);
133
134 /* Ask the transaction log to write the entry and return where it wrote it */
135 off_t written_to= transaction_log.writeEntry(orig_buffer, total_envelope_length);
136
137 free(orig_buffer);
138113
139 /* Add an entry to the index describing what was just applied */114 /* Add an entry to the index describing what was just applied */
140 transaction_log_index->addEntry(TransactionLogEntry(ReplicationServices::TRANSACTION,115 transaction_log_index->addEntry(TransactionLogEntry(ReplicationServices::TRANSACTION,
141 written_to,116 written_to,
142 total_envelope_length),117 entry_size),
143 to_apply,118 to_apply,
144 checksum);119 checksum);
145120 return plugin::SUCCESS;
146}121}
147122
=== modified file 'plugin/transaction_log/transaction_log_applier.h'
--- plugin/transaction_log/transaction_log_applier.h 2010-03-05 18:08:49 +0000
+++ plugin/transaction_log/transaction_log_applier.h 2010-04-06 19:16:34 +0000
@@ -43,14 +43,20 @@
43#include <vector>43#include <vector>
44#include <string>44#include <string>
4545
46namespace drizzled
47{
48 class Session;
49}
50
46class TransactionLog;51class TransactionLog;
52class WriteBuffer;
4753
48class TransactionLogApplier: public drizzled::plugin::TransactionApplier 54class TransactionLogApplier: public drizzled::plugin::TransactionApplier
49{55{
50public:56public:
51 TransactionLogApplier(const std::string name_arg,57 TransactionLogApplier(const std::string name_arg,
52 TransactionLog &in_transaction_log,58 TransactionLog &in_transaction_log,
53 bool in_do_checksum);59 uint32_t in_num_write_buffers);
5460
55 /** Destructor */61 /** Destructor */
56 ~TransactionLogApplier();62 ~TransactionLogApplier();
@@ -68,16 +74,27 @@
68 * the supplied message to their own controlled memory storage74 * the supplied message to their own controlled memory storage
69 * area.75 * area.
70 *76 *
77 * @param Session descriptor
71 * @param Transaction message to be replicated78 * @param Transaction message to be replicated
72 */79 */
73 void apply(const drizzled::message::Transaction &to_apply);80 drizzled::plugin::ReplicationReturnCode
81 apply(drizzled::Session &in_session,
82 const drizzled::message::Transaction &to_apply);
74private:83private:
75 /* Don't allows these */84 /* Don't allows these */
76 TransactionLogApplier();85 TransactionLogApplier();
77 TransactionLogApplier(const TransactionLogApplier &other);86 TransactionLogApplier(const TransactionLogApplier &other);
78 TransactionLogApplier &operator=(const TransactionLogApplier &other);87 TransactionLogApplier &operator=(const TransactionLogApplier &other);
79 TransactionLog &transaction_log;88 TransactionLog &transaction_log;
80 bool do_checksum; ///< Do a CRC32 checksum when writing Transaction message to log?89 uint32_t num_write_buffers; ///< Number of write buffers used
90 std::vector<WriteBuffer *> write_buffers; ///< array of write buffers
91
92 /**
93 * Returns the write buffer for the supplied session
94 *
95 * @param Session descriptor
96 */
97 WriteBuffer *getWriteBuffer(const drizzled::Session &session);
81};98};
8299
83#endif /* PLUGIN_TRANSACTION_LOG_TRANSACTION_LOG_APPLIER_H */100#endif /* PLUGIN_TRANSACTION_LOG_TRANSACTION_LOG_APPLIER_H */
84101
=== added file 'plugin/transaction_log/write_buffer.cc'
--- plugin/transaction_log/write_buffer.cc 1970-01-01 00:00:00 +0000
+++ plugin/transaction_log/write_buffer.cc 2010-04-06 19:16:34 +0000
@@ -0,0 +1,71 @@
1/* - mode: c; c-basic-offset: 2; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
3 *
4 * Copyright (c) 2010 Jay Pipes <jaypipes@gmail.com>
5 *
6 * Authors:
7 *
8 * Jay Pipes <jaypipes@gmail.com>
9 *
10 * This program is free software; you can redistribute it and/or modify
11 * it under the terms of the GNU General Public License as published by
12 * the Free Software Foundation; either version 2 of the License, or
13 * (at your option) any later version.
14 *
15 * This program is distributed in the hope that it will be useful,
16 * but WITHOUT ANY WARRANTY; without even the implied warranty of
17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 * GNU General Public License for more details.
19 *
20 * You should have received a copy of the GNU General Public License
21 * along with this program; if not, write to the Free Software
22 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
23 */
24
25/**
26 * @file
27 *
28 * Defines the implementation of a simple locked write buffer
29 *
30 * @details
31 *
32 * The write buffer keeps a block of allocated raw bytes available for
33 * callers.
34 */
35
36#include "config.h"
37#include "write_buffer.h"
38
39#include <drizzled/errmsg_print.h>
40#include <drizzled/gettext.h>
41
42#include <vector>
43
44using namespace std;
45using namespace drizzled;
46
47WriteBuffer::WriteBuffer() :
48 buffer()
49{
50 buffer.reserve(DEFAULT_WRITE_BUFFER_SIZE);
51 pthread_mutex_init(&latch, NULL);
52}
53
54WriteBuffer::~WriteBuffer()
55{
56 buffer.clear();
57 pthread_mutex_destroy(&latch);
58}
59
60void WriteBuffer::resize(size_t new_size)
61{
62 /*
63 * Attempt allocation of raw memory buffer for the
64 * requested size. Does nothing if already allocated size
65 * if greater...
66 */
67 if (buffer.capacity() >= new_size)
68 return;
69
70 buffer.reserve(new_size);
71}
072
=== added file 'plugin/transaction_log/write_buffer.h'
--- plugin/transaction_log/write_buffer.h 1970-01-01 00:00:00 +0000
+++ plugin/transaction_log/write_buffer.h 2010-04-06 19:16:34 +0000
@@ -0,0 +1,90 @@
1/* - mode: c; c-basic-offset: 2; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
3 *
4 * Copyright (c) 2010 Jay Pipes <jaypipes@gmail.com>
5 *
6 * Authors:
7 *
8 * Jay Pipes <jaypipes@gmail.com>
9 *
10 * This program is free software; you can redistribute it and/or modify
11 * it under the terms of the GNU General Public License as published by
12 * the Free Software Foundation; either version 2 of the License, or
13 * (at your option) any later version.
14 *
15 * This program is distributed in the hope that it will be useful,
16 * but WITHOUT ANY WARRANTY; without even the implied warranty of
17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 * GNU General Public License for more details.
19 *
20 * You should have received a copy of the GNU General Public License
21 * along with this program; if not, write to the Free Software
22 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
23 */
24
25/**
26 * @file
27 *
28 * Defines a simple structure for maintaining a write buffer.
29 */
30
31#ifndef PLUGIN_TRANSACTION_LOG_WRITE_BUFFER_H
32#define PLUGIN_TRANSACTION_LOG_WRITE_BUFFER_H
33
34#include <stdint.h>
35#include <vector>
36#include <pthread.h>
37
38class WriteBuffer
39{
40public:
41 static const size_t DEFAULT_WRITE_BUFFER_SIZE= 1024; /* Many GPB messages are < 1 KB... */
42 /**
43 * Constructor.
44 */
45 WriteBuffer();
46 ~WriteBuffer();
47 /**
48 * Locks the log write buffer
49 */
50 void lock()
51 {
52 pthread_mutex_lock(&latch);
53 }
54 /**
55 * Unlocks the log's write buffer
56 */
57 void unlock()
58 {
59 pthread_mutex_unlock(&latch);
60 }
61 /**
62 * Resizes the internal raw byte buffer
63 *
64 * @param[in] New size to allocate
65 */
66 void resize(size_t new_size);
67 /**
68 * Returns the pointer to the raw bytes.
69 */
70 uint8_t *getRawBytes()
71 {
72 return &buffer[0];
73 }
74 /**
75 * Returns the size of the write buffer
76 */
77 size_t getCapacity()
78 {
79 return buffer.size();
80 }
81private:
82 /* Prohibit these */
83 WriteBuffer(const WriteBuffer&);
84 WriteBuffer &operator=(const WriteBuffer&);
85
86 std::vector<uint8_t> buffer; ///< Raw memory buffer managed by the log
87 pthread_mutex_t latch; ///< Lock around the synchronized parts of the log (the write buffer)
88};
89
90#endif /* PLUGIN_TRANSACTION_LOG_WRITE_BUFFER_H */