Merge lp:~jaypipes/drizzle/publisher into lp:~drizzle-trunk/drizzle/development

Proposed by Jay Pipes
Status: Rejected
Rejected by: Monty Taylor
Proposed branch: lp:~jaypipes/drizzle/publisher
Merge into: lp:~drizzle-trunk/drizzle/development
Diff against target: 2657 lines
To merge this branch: bzr merge lp:~jaypipes/drizzle/publisher
Reviewer Review Type Date Requested Status
Brian Aker Disapprove
Eric Day (community) Approve
Drizzle Developers Pending
Review via email: mp+23785@code.launchpad.net

Description of the change

First patchset adding basic publisher support to the
replication services component.

A Publisher tracks what transactions have been applied to a channel on
a node and provides an interface for querying for this
information. Later, the publisher will act as a specialized
server which connects to a Subscriber plugin.

Adds a new replication_publishers data_dictionary view,
mostly used in testing to see if publisher registration
was correct.

Adds a new replication_publisher_channels data_dictionary view, again
mostly used in testing registration.

To post a comment you must log in.
Revision history for this message
Jay Pipes (jaypipes) wrote :

Taking into Work In Progress. Eric and I discussed moving the replication.proto into the transaction log module and not requiring publishers and subscribers to keep a GPB manifest if they didn't want to. This makes sense. The GPB manifest is implementation, not interface.

Revision history for this message
Jay Pipes (jaypipes) wrote :

Moves the replication.proto file into /plugin/transaction_log/ since it is implementation-dependent stuff.

lp:~jaypipes/drizzle/publisher updated
1440. By Jay Pipes <jpipes@serialcoder>

Publisher manifest file now written to disk properly (was using incorrect ByteSize() instead of std::string::size(). Also, manifest is read on startup

1441. By Jay Pipes <jpipes@serialcoder>

Adds a custom test suite and framework for testing the replication pieces. The first test case tests basic functionality of creating and writing a serialized publisher manifest file.

1442. By Jay Pipes <jpipes@serialcoder>

Connect replication test suite into main transaction_log test suite

1443. By Jay Pipes <jpipes@serialcoder>

Adds test cases to replication test suite for checking
startup when a corrupted manifest file is present, as
well as a good publisher manifest is present. Changes
the TransactionLogPublisher constructor to throw a proper
error and disable the publisher plugin properly.

Revision history for this message
Eric Day (eday) wrote :

Great work! Plugin point for Publisher looks sane, as for locking in the transaction log publisher plugin (your comment on using a scoreboard method), it depends how expensive the calls are in the lock. We might just want to break it down to one lock/channel.

Revision history for this message
Eric Day (eday) :
review: Approve
lp:~jaypipes/drizzle/publisher updated
1444. By Jay Pipes <jpipes@serialcoder>

Fix valgrind issue due to not initializing the publisher's mutex before calling readManifestFile()

Revision history for this message
Jay Pipes (jaypipes) wrote :

FYI, saw the valgrind error and have pushed a fix.

lp:~jaypipes/drizzle/publisher updated
1445. By Jay Pipes <jpipes@serialcoder>

Merge trunk

Revision history for this message
Brian Aker (brianaker) :
review: Needs Information
Revision history for this message
Jay Pipes (jaypipes) wrote :

Heya, what information do you need on this one? Related to the
testing in Python or something else? Happy to answer, just let me
know...

On Thu, May 27, 2010 at 1:07 AM, Brian Aker <email address hidden> wrote:
> Review: Needs Information
>
> --
> https://code.launchpad.net/~jaypipes/drizzle/publisher/+merge/23785
> You are the owner of lp:~jaypipes/drizzle/publisher.
>

Revision history for this message
Brian Aker (brianaker) wrote :

Hi!

I marked it because of the python testing. PatrickC offered to look at extend test-run.pl so solve this issue so I am currently waiting to hear from him about it.

Cheers,
 -Brian

On May 27, 2010, at 8:42 AM, Jay Pipes wrote:

> Heya, what information do you need on this one? Related to the
> testing in Python or something else? Happy to answer, just let me
> know...
>
> On Thu, May 27, 2010 at 1:07 AM, Brian Aker <email address hidden> wrote:
>> Review: Needs Information
>>
>> --
>> https://code.launchpad.net/~jaypipes/drizzle/publisher/+merge/23785
>> You are the owner of lp:~jaypipes/drizzle/publisher.
>>
> --
> https://code.launchpad.net/~jaypipes/drizzle/publisher/+merge/23785
> You are reviewing the proposed merge of lp:~jaypipes/drizzle/publisher into lp:drizzle.

Revision history for this message
Patrick Crews (patrick-crews) wrote :

Expect to have some word on this next week. I've been a bit occupied with
some other tasks this week. I'll likely be in touch with you then, Jay. My
apologies for the hold-up on the review.

On Fri, May 28, 2010 at 6:34 PM, Brian Aker <email address hidden> wrote:

> Hi!
>
> I marked it because of the python testing. PatrickC offered to look at
> extend test-run.pl so solve this issue so I am currently waiting to hear
> from him about it.
>
> Cheers,
> -Brian
>
> On May 27, 2010, at 8:42 AM, Jay Pipes wrote:
>
> > Heya, what information do you need on this one? Related to the
> > testing in Python or something else? Happy to answer, just let me
> > know...
> >
> > On Thu, May 27, 2010 at 1:07 AM, Brian Aker <email address hidden> wrote:
> >> Review: Needs Information
> >>
> >> --
> >> https://code.launchpad.net/~jaypipes/drizzle/publisher/+merge/23785
> >> You are the owner of lp:~jaypipes/drizzle/publisher.
> >>
> > --
> > https://code.launchpad.net/~jaypipes/drizzle/publisher/+merge/23785
> > You are reviewing the proposed merge of lp:~jaypipes/drizzle/publisher
> into lp:drizzle.
>
> --
> https://code.launchpad.net/~jaypipes/drizzle/publisher/+merge/23785
> Your team Drizzle-developers is requested to review the proposed merge of
> lp:~jaypipes/drizzle/publisher into lp:drizzle.
>

Revision history for this message
Jay Pipes (jaypipes) wrote :

No worries at all, Patrick.

On Fri, May 28, 2010 at 6:39 PM, Patrick Crews <email address hidden> wrote:
> Expect to have some word on this next week.  I've been a bit occupied with
> some other tasks this week.  I'll likely be in touch with you then, Jay.  My
> apologies for the hold-up on the review.
>
> On Fri, May 28, 2010 at 6:34 PM, Brian Aker <email address hidden> wrote:
>
>> Hi!
>>
>> I marked it because of the python testing. PatrickC offered to look at
>> extend test-run.pl so solve this issue so I am currently waiting to hear
>> from him about it.
>>
>> Cheers,
>>        -Brian
>>
>> On May 27, 2010, at 8:42 AM, Jay Pipes wrote:
>>
>> > Heya, what information do you need on this one?  Related to the
>> > testing in Python or something else?  Happy to answer, just let me
>> > know...
>> >
>> > On Thu, May 27, 2010 at 1:07 AM, Brian Aker <email address hidden> wrote:
>> >> Review: Needs Information
>> >>
>> >> --
>> >> https://code.launchpad.net/~jaypipes/drizzle/publisher/+merge/23785
>> >> You are the owner of lp:~jaypipes/drizzle/publisher.
>> >>
>> > --
>> > https://code.launchpad.net/~jaypipes/drizzle/publisher/+merge/23785
>> > You are reviewing the proposed merge of lp:~jaypipes/drizzle/publisher
>> into lp:drizzle.
>>
>> --
>> https://code.launchpad.net/~jaypipes/drizzle/publisher/+merge/23785
>> Your team Drizzle-developers is requested to review the proposed merge of
>> lp:~jaypipes/drizzle/publisher into lp:drizzle.
>>
>
> --
> https://code.launchpad.net/~jaypipes/drizzle/publisher/+merge/23785
> You are the owner of lp:~jaypipes/drizzle/publisher.
>

Revision history for this message
Stewart Smith (stewart) wrote :

On Fri, 28 May 2010 22:39:30 -0000, Patrick Crews <email address hidden> wrote:
> Expect to have some word on this next week. I've been a bit occupied with
> some other tasks this week. I'll likely be in touch with you then, Jay. My
> apologies for the hold-up on the review.

We should really port mtr2 back in before doing lots of modifications.

--
Stewart Smith

Revision history for this message
Patrick Crews (patrick-crews) wrote :

After looking at the tests here, I have to say that I'm in favor of leaving the Python testing as-is unless there is some really urgent need to not have it.

My reasoning:
* protoc doesn't produce perl code. We use the python-generated code to interact with the manifest and get useful, test-related data.
* The current python code is awfully clean and assertion based, which can fill in gaps that test-run doesn't cover effectively. It's also well-developed / not-half-baked and we can expand the Python test without a lot of fuss.

If the use of python is a deal-breaker, we probably could do something to make it work, but I fear that the effort involved would not be worth the pay-off (at least at present).

I will be looking at - https://bugs.launchpad.net/drizzle/+bug/570948 - bad command-line options hang test cases so that we can improve test-run, but it's just my feeling that we can move things forward by keeping the current test(s)

Revision history for this message
Stewart Smith (stewart) wrote :

On Thu, 03 Jun 2010 16:03:26 -0000, Patrick Crews <email address hidden> wrote:
> If the use of python is a deal-breaker, we probably could do something to make it work, but I fear that the effort involved would not be worth the pay-off (at least at present).

We rely on python for parts of the build system anyway, so it's not a
show stopper (if it runs on the pre-historic versions that solaris ship with)

--
Stewart Smith

Revision history for this message
Jay Pipes (jaypipes) wrote :

Not sure why we even are discussing this. Brian's made a decision and that's apparently all that matters.

Revision history for this message
Brian Aker (brianaker) wrote :

Hi!

Until it fits under the test framework we have it won't be mergable. As pointed out in previous comments "and we can expand the Python test without a lot of fuss" is exactly the issue. Once we have two systems we end up with a tower babble. So until we can test with the current framework this patch is non-starter.

Cheers,
  -Brian

review: Disapprove

Unmerged revisions

1445. By Jay Pipes <jpipes@serialcoder>

Merge trunk

1444. By Jay Pipes <jpipes@serialcoder>

Fix valgrind issue due to not initializing the publisher's mutex before calling readManifestFile()

1443. By Jay Pipes <jpipes@serialcoder>

Adds test cases to replication test suite for checking
startup when a corrupted manifest file is present, as
well as a good publisher manifest is present. Changes
the TransactionLogPublisher constructor to throw a proper
error and disable the publisher plugin properly.

1442. By Jay Pipes <jpipes@serialcoder>

Connect replication test suite into main transaction_log test suite

1441. By Jay Pipes <jpipes@serialcoder>

Adds a custom test suite and framework for testing the replication pieces. The first test case tests basic functionality of creating and writing a serialized publisher manifest file.

1440. By Jay Pipes <jpipes@serialcoder>

Publisher manifest file now written to disk properly (was using incorrect ByteSize() instead of std::string::size(). Also, manifest is read on startup

1439. By Jay Pipes <jpipes@serialcoder>

Merge Monty build fixes

1438. By Jay Pipes <jpipes@serialcoder>

remove references to /drizzled/message/replication.pb.h

1437. By Jay Pipes <jpipes@serialcoder>

Moves replication.proto out of /drizzled/message and into /plugin/transaction_log/ since it is implementation-specific.

1436. By Jay Pipes <jpipes@serialcoder>

Remove unused ZeroCopyStream buffer

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file '.bzrignore'
2--- .bzrignore 2010-02-24 00:19:06 +0000
3+++ .bzrignore 2010-05-05 02:25:42 +0000
4@@ -304,3 +304,4 @@
5 *.reject
6 config/pandora_vc_revinfo
7 drizzled/message/schema_writer
8+replication_pb2.py
9
10=== modified file 'drizzled/include.am'
11--- drizzled/include.am 2010-04-27 21:03:13 +0000
12+++ drizzled/include.am 2010-05-05 02:25:42 +0000
13@@ -306,6 +306,7 @@
14 drizzled/plugin/monitored_in_transaction.h \
15 drizzled/plugin/null_client.h \
16 drizzled/plugin/plugin.h \
17+ drizzled/plugin/publisher.h \
18 drizzled/plugin/query_cache.h \
19 drizzled/plugin/query_rewrite.h \
20 drizzled/plugin/replication.h \
21@@ -677,6 +678,7 @@
22 drizzled/plugin/logging.cc \
23 drizzled/plugin/monitored_in_transaction.cc \
24 drizzled/plugin/plugin.cc \
25+ drizzled/plugin/publisher.cc \
26 drizzled/plugin/query_cache.cc \
27 drizzled/plugin/query_rewrite.cc \
28 drizzled/plugin/registry.cc \
29
30=== modified file 'drizzled/message/transaction.proto'
31--- drizzled/message/transaction.proto 2010-02-26 00:05:00 +0000
32+++ drizzled/message/transaction.proto 2010-05-05 02:25:42 +0000
33@@ -304,9 +304,10 @@
34 message TransactionContext
35 {
36 required uint32 server_id = 1; /* Unique identifier of a server */
37- required uint64 transaction_id = 2; /* Globally-unique transaction ID */
38+ required uint64 transaction_id = 2; /* Unique transaction ID within the channel */
39 required uint64 start_timestamp = 3; /* Timestamp of when the transaction started */
40 required uint64 end_timestamp = 4; /* Timestamp of when the transaction ended */
41+ optional uint32 channel_id = 5 [default = 1]; /* Scope in which transaction ID is unique */
42 }
43
44 /*
45
46=== added file 'drizzled/plugin/publisher.cc'
47--- drizzled/plugin/publisher.cc 1970-01-01 00:00:00 +0000
48+++ drizzled/plugin/publisher.cc 2010-05-05 02:25:42 +0000
49@@ -0,0 +1,47 @@
50+/* -*- mode: c++; c-basic-offset: 2; indent-tabs-mode: nil; -*-
51+ * vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
52+ *
53+ * Copyright (c) 2010 Jay Pipes
54+ *
55+ * Authors:
56+ *
57+ * Jay Pipes <jaypipes@gmail.com>
58+ *
59+ * This program is free software; you can redistribute it and/or modify
60+ * it under the terms of the GNU General Public License as published by
61+ * the Free Software Foundation; version 2 of the License.
62+ *
63+ * This program is distributed in the hope that it will be useful,
64+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
65+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
66+ * GNU General Public License for more details.
67+ *
68+ * You should have received a copy of the GNU General Public License
69+ * along with this program; if not, write to the Free Software
70+ * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
71+ */
72+
73+#include "config.h"
74+#include "drizzled/plugin/publisher.h"
75+#include "drizzled/replication_services.h"
76+
77+namespace drizzled
78+{
79+namespace plugin
80+{
81+
82+bool plugin::Publisher::addPlugin(plugin::Publisher *publisher)
83+{
84+ ReplicationServices &replication_services= ReplicationServices::singleton();
85+ replication_services.attachPublisher(publisher);
86+ return false;
87+}
88+
89+void plugin::Publisher::removePlugin(plugin::Publisher *publisher)
90+{
91+ ReplicationServices &replication_services= ReplicationServices::singleton();
92+ replication_services.detachPublisher(publisher);
93+}
94+
95+} // namespace plugin
96+} // namespace drizzled
97
98=== added file 'drizzled/plugin/publisher.h'
99--- drizzled/plugin/publisher.h 1970-01-01 00:00:00 +0000
100+++ drizzled/plugin/publisher.h 2010-05-05 02:25:42 +0000
101@@ -0,0 +1,99 @@
102+/* -*- mode: c++; c-basic-offset: 2; indent-tabs-mode: nil; -*-
103+ * vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
104+ *
105+ * Copyright (c) 2010 Jay Pipes
106+ *
107+ * Authors:
108+ *
109+ * Jay Pipes <jaypipes@gmail.com>
110+ *
111+ * This program is free software; you can redistribute it and/or modify
112+ * it under the terms of the GNU General Public License as published by
113+ * the Free Software Foundation; version 2 of the License.
114+ *
115+ * This program is distributed in the hope that it will be useful,
116+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
117+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
118+ * GNU General Public License for more details.
119+ *
120+ * You should have received a copy of the GNU General Public License
121+ * along with this program; if not, write to the Free Software
122+ * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
123+ */
124+
125+#ifndef DRIZZLED_PLUGIN_PUBLISHER_H
126+#define DRIZZLED_PLUGIN_PUBLISHER_H
127+
128+/**
129+ * @file Defines the API for a Publisher
130+ *
131+ * A Publisher keeps track of transactions that have been applied
132+ * to a data store and publishes information about the transactions to
133+ * a Subscriber.
134+ */
135+
136+#include "drizzled/plugin/plugin.h"
137+
138+#include "drizzled/transaction_services.h"
139+#include "drizzled/message/transaction.pb.h"
140+
141+#include <functional>
142+
143+namespace drizzled
144+{
145+
146+namespace plugin
147+{
148+
149+/**
150+ * A class responsible for publishing replication information
151+ * to subscribers.
152+ */
153+class Publisher :public Plugin
154+{
155+public:
156+ explicit Publisher(std::string name_arg) :
157+ Plugin(name_arg, "Publisher")
158+ {}
159+ virtual ~Publisher() {}
160+ /**
161+ * Returns the transaction ID for the last message that was
162+ * successfully applied on the publisher node for a given
163+ * channel.
164+ *
165+ * @param[in] Channel ID to check
166+ */
167+ virtual TransactionServices::TransactionId getLastAppliedTransactionId(uint32_t channel_id)= 0;
168+ /**
169+ * Returns the end timestamp for the last message that was
170+ * successfully applied on the publisher node for a given
171+ * channel.
172+ *
173+ * @param[in] Channel ID to check
174+ */
175+ virtual uint64_t getLastAppliedEndTimestamp(uint32_t channel_id)= 0;
176+ /**
177+ * Returns the number of channels this publisher
178+ * publishes to.
179+ */
180+ virtual size_t getNumChannels()= 0;
181+ /**
182+ * Adds to a supplied vector the set of channels this
183+ * publisher publishes to.
184+ *
185+ * @param[out] Pointer to a Vector of channel IDs to add to
186+ */
187+ virtual void getChannels(std::vector<uint32_t> *out_channels)= 0;
188+
189+ static bool addPlugin(Publisher *publisher);
190+ static void removePlugin(Publisher *publisher);
191+private:
192+ Publisher();
193+ Publisher(const Publisher &);
194+ Publisher& operator=(const Publisher &);
195+};
196+
197+} /* namespace plugin */
198+} /* namespace drizzled */
199+
200+#endif /* DRIZZLED_PLUGIN_PUBLISHER_H */
201
202=== modified file 'drizzled/plugin/replication.h'
203--- drizzled/plugin/replication.h 2010-03-27 18:03:59 +0000
204+++ drizzled/plugin/replication.h 2010-05-05 02:25:42 +0000
205@@ -37,7 +37,8 @@
206 enum ReplicationReturnCode
207 {
208 SUCCESS= 0, /* no error */
209- UNKNOWN_ERROR= 1
210+ PUBLISHER_ERROR= 1, /* publisher problems...*/
211+ UNKNOWN_ERROR= 99
212 };
213
214 } /* namespace plugin */
215
216=== modified file 'drizzled/plugin/transaction_reader.h'
217--- drizzled/plugin/transaction_reader.h 2010-03-05 18:08:49 +0000
218+++ drizzled/plugin/transaction_reader.h 2010-05-05 02:25:42 +0000
219@@ -25,7 +25,7 @@
220 #define DRIZZLED_PLUGIN_TRANSACTION_READER_H
221
222 #include "drizzled/plugin/plugin.h"
223-#include "drizzled/replication_services.h" /* For global transaction ID typedef */
224+#include "drizzled/transaction_services.h" /* For transaction ID typedef */
225
226 /**
227 * @file Defines the API for a TransactionReader
228@@ -66,7 +66,7 @@
229 * @retval
230 * false if not found or read successfully
231 */
232- virtual bool read(const ReplicationServices::GlobalTransactionId &to_read,
233+ virtual bool read(const TransactionServices::TransactionId &to_read,
234 message::Transaction *to_fill)= 0;
235 };
236
237
238=== modified file 'drizzled/replication_services.cc'
239--- drizzled/replication_services.cc 2010-04-05 16:06:01 +0000
240+++ drizzled/replication_services.cc 2010-05-05 02:25:42 +0000
241@@ -31,13 +31,14 @@
242 * - Publisher
243 * - Subscriber
244 *
245- * ReplicationServices is a bridge between replication modules and the kernel,
246- * and its primary function is to */
247+ * ReplicationServices is a bridge between replication modules and the kernel
248+ */
249
250 #include "config.h"
251 #include "drizzled/replication_services.h"
252 #include "drizzled/plugin/transaction_replicator.h"
253 #include "drizzled/plugin/transaction_applier.h"
254+#include "drizzled/plugin/publisher.h"
255 #include "drizzled/message/transaction.pb.h"
256 #include "drizzled/gettext.h"
257 #include "drizzled/session.h"
258@@ -45,6 +46,7 @@
259
260 #include <string>
261 #include <vector>
262+#include <functional>
263 #include <algorithm>
264
265 using namespace std;
266@@ -140,6 +142,16 @@
267 return true;
268 }
269
270+void ReplicationServices::attachPublisher(plugin::Publisher *in_publisher)
271+{
272+ publishers.push_back(in_publisher);
273+}
274+
275+void ReplicationServices::detachPublisher(plugin::Publisher *in_publisher)
276+{
277+ publishers.erase(std::find(publishers.begin(), publishers.end(), in_publisher));
278+}
279+
280 void ReplicationServices::attachReplicator(plugin::TransactionReplicator *in_replicator)
281 {
282 replicators.push_back(in_replicator);
283@@ -178,22 +190,18 @@
284
285 result= cur_repl->replicate(cur_appl, in_session, to_push);
286
287- if (result == plugin::SUCCESS)
288- {
289- /*
290- * We update the timestamp for the last applied Transaction so that
291- * publisher plugins can ask the replication services when the
292- * last known applied Transaction was using the getLastAppliedTimestamp()
293- * method.
294- */
295- last_applied_timestamp.fetch_and_store(to_push.transaction_context().end_timestamp());
296- }
297- else
298+ if (result != plugin::SUCCESS)
299 return result;
300 }
301+
302 return result;
303 }
304
305+ReplicationServices::Publishers &ReplicationServices::getPublishers()
306+{
307+ return publishers;
308+}
309+
310 ReplicationServices::ReplicationStreams &ReplicationServices::getReplicationStreams()
311 {
312 return replication_streams;
313
314=== modified file 'drizzled/replication_services.h'
315--- drizzled/replication_services.h 2010-04-05 16:06:01 +0000
316+++ drizzled/replication_services.h 2010-05-05 02:25:42 +0000
317@@ -43,6 +43,8 @@
318 {
319 class TransactionReplicator;
320 class TransactionApplier;
321+ class Subscriber;
322+ class Publisher;
323 }
324 namespace message
325 {
326@@ -50,14 +52,29 @@
327 }
328
329 /**
330- * This is a class which manages transforming internal
331- * transactional events into GPB messages and sending those
332- * events out through registered replicators and appliers.
333+ * This is a class which manages communication between publishers
334+ * and subscribers and sends transaction messages out to
335+ * replicator and applier plugins.
336+ *
337+ * The relationship betweem the above classes is as follows:
338+ *
339+ * TransactionReplicators are responsible for filtering or transforming
340+ * Transaction messages before passing them off to TransactionAppliers.
341+ *
342+ * TransactionAppliers "apply" the Transaction messages to something --
343+ * this "something" could be anything, for instance a log file or a message queue.
344+ *
345+ * Publishers are responsible for publishing information
346+ * about the Transaction messages which have been applied to
347+ * a particular channel.
348+ *
349+ * Subscribers are responsible for asking a Publisher for a data
350+ * stream of Transaction messages that it will apply to its own
351+ * data store.
352 */
353 class ReplicationServices
354 {
355 public:
356- typedef uint64_t GlobalTransactionId;
357 /**
358 * Types of messages that can go in the transaction
359 * log file. Every time something is written into the
360@@ -69,8 +86,18 @@
361 TRANSACTION= 1, /* A GPB Transaction Message */
362 BLOB= 2 /* A BLOB value */
363 };
364- typedef std::pair<plugin::TransactionReplicator *, plugin::TransactionApplier *> ReplicationPair;
365- typedef std::vector<ReplicationPair> ReplicationStreams;
366+ /**
367+ * A replication stream is the pairing of a replicator
368+ * plugin with an applied plugin.
369+ */
370+ typedef std::pair<plugin::TransactionReplicator *, plugin::TransactionApplier *> ReplicationStream;
371+ typedef std::vector<ReplicationStream> ReplicationStreams;
372+ /**
373+ * A publisher channel is the pairing of a publisher plugin
374+ * with a channel ID.
375+ */
376+ typedef std::vector<uint32_t> PublisherChannels;
377+ typedef std::vector<plugin::Publisher *> Publishers;
378 /**
379 * Method which is called after plugins have been loaded but
380 * before the first client connects. It determines if the registration
381@@ -84,6 +111,7 @@
382 * tracking...
383 */
384 bool evaluateRegisteredPlugins();
385+ typedef std::vector<plugin::Subscriber *> Subscribers;
386 /**
387 * Helper method which pushes a constructed message out to the registered
388 * replicator and applier plugins.
389@@ -116,11 +144,32 @@
390 bool isActive() const;
391
392 /**
393+ * Returns the list of publishers this node knows about
394+ */
395+ Publishers &getPublishers();
396+
397+ /**
398 * Returns the list of replication streams
399 */
400 ReplicationStreams &getReplicationStreams();
401
402 /**
403+ * Attaches a publisher to our internal collection of
404+ * publishers.
405+ *
406+ * @param Pointer to a publisher to attach/register
407+ */
408+ void attachPublisher(plugin::Publisher *in_publisher);
409+
410+ /**
411+ * Detaches/unregisters a publisher with our internal
412+ * collection of publishers.
413+ *
414+ * @param Pointer to the publisher to detach
415+ */
416+ void detachPublisher(plugin::Publisher *in_publisher);
417+
418+ /**
419 * Attaches a replicator to our internal collection of
420 * replicators.
421 *
422@@ -152,36 +201,36 @@
423 * @param Pointer to the applier to detach
424 */
425 void detachApplier(plugin::TransactionApplier *in_applier);
426-
427- /**
428- * Returns the timestamp of the last Transaction which was sent to an
429- * applier.
430- */
431- uint64_t getLastAppliedTimestamp() const;
432 private:
433 typedef std::vector<plugin::TransactionReplicator *> Replicators;
434 typedef std::vector<std::pair<std::string, plugin::TransactionApplier *> > Appliers;
435 /**
436- * Atomic boolean set to true if any *active* replicators
437- * or appliers are actually registered.
438+ * Set to true if there are replication streams
439 */
440 bool is_active;
441- /**
442- * The timestamp of the last time a Transaction message was successfully
443- * applied (sent to an Applier)
444- */
445- atomic<uint64_t> last_applied_timestamp;
446 /** Our collection of registered replicator plugins */
447 Replicators replicators;
448 /** Our collection of registered applier plugins and their requested replicator plugin names */
449 Appliers appliers;
450 /** Our replication streams */
451 ReplicationStreams replication_streams;
452+ /** The set of known publishers */
453+ Publishers publishers;
454+ /** The set of known subscribers */
455+ Subscribers subscribers;
456 /**
457 * Strips underscores and lowercases supplied replicator name
458 * or requested name, and appends the suffix "replicator" if missing...
459 */
460 void normalizeReplicatorName(std::string &name);
461+ /**
462+ * Helper method which is called after any change in the
463+ * registered appliers or replicators to evaluate whether
464+ * any remaining plugins are actually active.
465+ *
466+ * This method properly sets the is_active member variable.
467+ */
468+ void evaluateActivePlugins();
469 };
470
471 } /* namespace drizzled */
472
473=== modified file 'drizzled/transaction_services.h'
474--- drizzled/transaction_services.h 2010-03-31 19:04:12 +0000
475+++ drizzled/transaction_services.h 2010-05-05 02:25:42 +0000
476@@ -40,6 +40,7 @@
477 }
478
479 class Session;
480+class Table;
481 class NamedSavepoint;
482
483 /**
484
485=== modified file 'plugin/replication_dictionary/module.cc'
486--- plugin/replication_dictionary/module.cc 2010-04-05 16:06:01 +0000
487+++ plugin/replication_dictionary/module.cc 2010-05-05 02:25:42 +0000
488@@ -31,11 +31,15 @@
489
490 #include "config.h"
491 #include "streams.h"
492+#include "publishers.h"
493+#include "publisher_channels.h"
494
495 using namespace std;
496 using namespace drizzled;
497
498 static ReplicationStreamsTool *streams_tool;
499+static PublishersTool *publishers_tool;
500+static PublisherChannelsTool *publisher_channels_tool;
501
502 static int init(plugin::Context &context)
503 {
504@@ -44,13 +48,40 @@
505 if (streams_tool != NULL)
506 {
507 context.add(streams_tool);
508- return 0;
509- }
510- else
511- {
512- // error?
513- return 1;
514- }
515+ }
516+ else
517+ {
518+ // error?
519+ return 1;
520+ }
521+
522+ publishers_tool= new (nothrow) PublishersTool;
523+
524+ if (publishers_tool != NULL)
525+ {
526+ context.add(publishers_tool);
527+ }
528+ else
529+ {
530+ delete streams_tool;
531+ // error?
532+ return 1;
533+ }
534+
535+ publisher_channels_tool= new (nothrow) PublisherChannelsTool;
536+
537+ if (publisher_channels_tool != NULL)
538+ {
539+ context.add(publisher_channels_tool);
540+ }
541+ else
542+ {
543+ delete streams_tool;
544+ delete publishers_tool;
545+ // error?
546+ return 1;
547+ }
548+ return 0;
549 }
550
551 DRIZZLE_PLUGIN(init, NULL);
552
553=== modified file 'plugin/replication_dictionary/plugin.ini'
554--- plugin/replication_dictionary/plugin.ini 2010-04-11 15:50:01 +0000
555+++ plugin/replication_dictionary/plugin.ini 2010-05-05 02:25:42 +0000
556@@ -1,5 +1,5 @@
557 [plugin]
558-title=Registry Dictionary
559+title=Replication Dictionary
560 author=Jay Pipes <jaypipes@gmail.com>
561 license=PLUGIN_LICENSE_BSD
562 description=Provides dictionary tables for replication system
563@@ -7,6 +7,10 @@
564 sources=
565 module.cc
566 streams.cc
567+ publishers.cc
568+ publisher_channels.cc
569 headers=
570 streams.h
571+ publishers.h
572+ publisher_channels.h
573 static=yes
574
575=== added file 'plugin/replication_dictionary/publisher_channels.cc'
576--- plugin/replication_dictionary/publisher_channels.cc 1970-01-01 00:00:00 +0000
577+++ plugin/replication_dictionary/publisher_channels.cc 2010-05-05 02:25:42 +0000
578@@ -0,0 +1,105 @@
579+/* - mode: c; c-basic-offset: 2; indent-tabs-mode: nil; -*-
580+ * vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
581+ *
582+ * Copyright (c) 2010 Jay Pipes <jaypipes@gmail.com>
583+ * All rights reserved.
584+ *
585+ * Redistribution and use in source and binary forms, with or without
586+ * modification, are permitted provided that the following conditions are met:
587+ *
588+ * * Redistributions of source code must retain the above copyright notice,
589+ * this list of conditions and the following disclaimer.
590+ * * Redistributions in binary form must reproduce the above copyright notice,
591+ * this list of conditions and the following disclaimer in the documentation
592+ * and/or other materials provided with the distribution.
593+ * * Neither the name of Drizzle nor the names of its contributors
594+ * may be used to endorse or promote products derived from this software
595+ * without specific prior written permission.
596+ *
597+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
598+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
599+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
600+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
601+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
602+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
603+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
604+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
605+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
606+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
607+ * THE POSSIBILITY OF SUCH DAMAGE.
608+ */
609+
610+#include "config.h"
611+#include "publisher_channels.h"
612+
613+#include <drizzled/plugin/publisher.h>
614+
615+using namespace std;
616+using namespace drizzled;
617+
618+PublisherChannelsTool::PublisherChannelsTool() :
619+ plugin::TableFunction("DATA_DICTIONARY", "REPLICATION_PUBLISHER_CHANNELS")
620+{
621+ add_field("PUBLISHER");
622+ add_field("CHANNEL_ID", plugin::TableFunction::NUMBER);
623+ add_field("LAST_APPLIED_TRANSACTION_ID", plugin::TableFunction::NUMBER);
624+ add_field("LAST_APPLIED_END_TIMESTAMP", plugin::TableFunction::NUMBER);
625+}
626+
627+PublisherChannelsTool::Generator::Generator(Field **arg) :
628+ plugin::TableFunction::Generator(arg)
629+{
630+ ReplicationServices &replication_services= ReplicationServices::singleton();
631+ ReplicationServices::Publishers &publishers= replication_services.getPublishers();
632+
633+ /*
634+ * Fill our row vector with information from each
635+ * publisher and the publisher's collection of
636+ * apply information for each channel
637+ */
638+ vector<uint32_t> channels;
639+ for (ReplicationServices::Publishers::iterator iter= publishers.begin();
640+ iter != publishers.end();
641+ ++iter)
642+ {
643+ plugin::Publisher *publisher= *iter;
644+ channels.clear();
645+ channels.reserve(publisher->getNumChannels());
646+ publisher->getChannels(&channels);
647+ for (vector<uint32_t>::iterator channel_iter= channels.begin();
648+ channel_iter != channels.end();
649+ ++channel_iter)
650+ {
651+ rows.push_back(make_pair(publisher, *channel_iter));
652+ }
653+ }
654+
655+ it= rows.begin();
656+ end= rows.end();
657+}
658+
659+bool PublisherChannelsTool::Generator::populate()
660+{
661+ if (it == end)
662+ return false;
663+
664+ plugin::Publisher *publisher= (*it).first;
665+ uint32_t channel= (*it).second;
666+
667+ /* PUBLISHER */
668+ push(publisher->getName());
669+
670+ /* CHANNEL_ID */
671+ push(static_cast<uint64_t>(channel));
672+
673+ /* LAST_APPLIED_TRANSACTION_ID */
674+ push(static_cast<uint64_t>(publisher->getLastAppliedTransactionId(channel)));
675+
676+ /* LAST_APPLIED_END_TIMESTAMP */
677+ push(static_cast<uint64_t>(publisher->getLastAppliedEndTimestamp(channel)));
678+
679+ ++it;
680+
681+ return true;
682+}
683+
684
685=== added file 'plugin/replication_dictionary/publisher_channels.h'
686--- plugin/replication_dictionary/publisher_channels.h 1970-01-01 00:00:00 +0000
687+++ plugin/replication_dictionary/publisher_channels.h 2010-05-05 02:25:42 +0000
688@@ -0,0 +1,67 @@
689+/* - mode: c; c-basic-offset: 2; indent-tabs-mode: nil; -*-
690+ * vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
691+ *
692+ * Copyright (c) 2010 Jay Pipes <jaypipes@gmail.com>
693+ * All rights reserved.
694+ *
695+ * Redistribution and use in source and binary forms, with or without
696+ * modification, are permitted provided that the following conditions are met:
697+ *
698+ * * Redistributions of source code must retain the above copyright notice,
699+ * this list of conditions and the following disclaimer.
700+ * * Redistributions in binary form must reproduce the above copyright notice,
701+ * this list of conditions and the following disclaimer in the documentation
702+ * and/or other materials provided with the distribution.
703+ * * Neither the name of Drizzle nor the names of its contributors
704+ * may be used to endorse or promote products derived from this software
705+ * without specific prior written permission.
706+ *
707+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
708+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
709+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
710+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
711+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
712+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
713+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
714+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
715+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
716+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
717+ * THE POSSIBILITY OF SUCH DAMAGE.
718+ */
719+
720+#ifndef PLUGIN_REPLICATION_DICTIONARY_PUBLISHER_CHANNELS_H
721+#define PLUGIN_REPLICATION_DICTIONARY_PUBLISHER_CHANNELS_H
722+
723+#include <drizzled/plugin/table_function.h>
724+#include <drizzled/replication_services.h>
725+
726+#include <utility>
727+#include <vector>
728+
729+class PublisherChannelsTool : public drizzled::plugin::TableFunction
730+{
731+public:
732+ PublisherChannelsTool();
733+
734+ class Generator : public drizzled::plugin::TableFunction::Generator
735+ {
736+ public:
737+ Generator(drizzled::Field **arg);
738+
739+ bool populate();
740+ private:
741+ typedef std::pair<drizzled::plugin::Publisher *,
742+ uint32_t /* index into apply_info in publisher manifest */> Row;
743+ typedef std::vector<Row> Rows;
744+ Rows rows;
745+ Rows::iterator it;
746+ Rows::iterator end;
747+ };
748+
749+ Generator *generator(drizzled::Field **arg)
750+ {
751+ return new Generator(arg);
752+ }
753+};
754+
755+#endif /* PLUGIN_REPLICATION_DICTIONARY_PUBLISHER_CHANNELS_H */
756
757=== added file 'plugin/replication_dictionary/publishers.cc'
758--- plugin/replication_dictionary/publishers.cc 1970-01-01 00:00:00 +0000
759+++ plugin/replication_dictionary/publishers.cc 2010-05-05 02:25:42 +0000
760@@ -0,0 +1,71 @@
761+/* - mode: c; c-basic-offset: 2; indent-tabs-mode: nil; -*-
762+ * vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
763+ *
764+ * Copyright (c) 2010 Jay Pipes <jaypipes@gmail.com>
765+ * All rights reserved.
766+ *
767+ * Redistribution and use in source and binary forms, with or without
768+ * modification, are permitted provided that the following conditions are met:
769+ *
770+ * * Redistributions of source code must retain the above copyright notice,
771+ * this list of conditions and the following disclaimer.
772+ * * Redistributions in binary form must reproduce the above copyright notice,
773+ * this list of conditions and the following disclaimer in the documentation
774+ * and/or other materials provided with the distribution.
775+ * * Neither the name of Drizzle nor the names of its contributors
776+ * may be used to endorse or promote products derived from this software
777+ * without specific prior written permission.
778+ *
779+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
780+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
781+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
782+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
783+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
784+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
785+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
786+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
787+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
788+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
789+ * THE POSSIBILITY OF SUCH DAMAGE.
790+ */
791+
792+#include "config.h"
793+#include "publishers.h"
794+
795+#include <drizzled/replication_services.h>
796+#include <drizzled/plugin/publisher.h>
797+
798+using namespace std;
799+using namespace drizzled;
800+
801+PublishersTool::PublishersTool() :
802+ plugin::TableFunction("DATA_DICTIONARY", "REPLICATION_PUBLISHERS")
803+{
804+ add_field("PUBLISHER_NAME");
805+}
806+
807+PublishersTool::Generator::Generator(Field **arg) :
808+ plugin::TableFunction::Generator(arg)
809+{
810+ ReplicationServices &replication_services= ReplicationServices::singleton();
811+ ReplicationServices::Publishers &publishers= replication_services.getPublishers();
812+
813+ it= publishers.begin();
814+ end= publishers.end();
815+}
816+
817+bool PublishersTool::Generator::populate()
818+{
819+ if (it == end)
820+ return false;
821+
822+ plugin::Publisher *publisher= *it;
823+
824+ /* PUBLISHER */
825+ push(publisher->getName());
826+
827+ ++it;
828+
829+ return true;
830+}
831+
832
833=== added file 'plugin/replication_dictionary/publishers.h'
834--- plugin/replication_dictionary/publishers.h 1970-01-01 00:00:00 +0000
835+++ plugin/replication_dictionary/publishers.h 2010-05-05 02:25:42 +0000
836@@ -0,0 +1,60 @@
837+/* - mode: c; c-basic-offset: 2; indent-tabs-mode: nil; -*-
838+ * vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
839+ *
840+ * Copyright (c) 2010 Jay Pipes <jaypipes@gmail.com>
841+ * All rights reserved.
842+ *
843+ * Redistribution and use in source and binary forms, with or without
844+ * modification, are permitted provided that the following conditions are met:
845+ *
846+ * * Redistributions of source code must retain the above copyright notice,
847+ * this list of conditions and the following disclaimer.
848+ * * Redistributions in binary form must reproduce the above copyright notice,
849+ * this list of conditions and the following disclaimer in the documentation
850+ * and/or other materials provided with the distribution.
851+ * * Neither the name of Drizzle nor the names of its contributors
852+ * may be used to endorse or promote products derived from this software
853+ * without specific prior written permission.
854+ *
855+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
856+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
857+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
858+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
859+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
860+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
861+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
862+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
863+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
864+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
865+ * THE POSSIBILITY OF SUCH DAMAGE.
866+ */
867+
868+#ifndef PLUGIN_REPLICATION_DICTIONARY_PUBLISHERS_H
869+#define PLUGIN_REPLICATION_DICTIONARY_PUBLISHERS_H
870+
871+#include <drizzled/plugin/table_function.h>
872+#include <drizzled/replication_services.h>
873+
874+class PublishersTool : public drizzled::plugin::TableFunction
875+{
876+public:
877+ PublishersTool();
878+
879+ class Generator : public drizzled::plugin::TableFunction::Generator
880+ {
881+ drizzled::ReplicationServices::Publishers::iterator it;
882+ drizzled::ReplicationServices::Publishers::iterator end;
883+
884+ public:
885+ Generator(drizzled::Field **arg);
886+
887+ bool populate();
888+ };
889+
890+ Generator *generator(drizzled::Field **arg)
891+ {
892+ return new Generator(arg);
893+ }
894+};
895+
896+#endif /* PLUGIN_REPLICATION_DICTIONARY_PUBLISHERS_H */
897
898=== modified file 'plugin/replication_dictionary/tests/r/data_dictionary.result'
899--- plugin/replication_dictionary/tests/r/data_dictionary.result 2010-04-05 16:11:15 +0000
900+++ plugin/replication_dictionary/tests/r/data_dictionary.result 2010-05-05 02:25:42 +0000
901@@ -2,9 +2,28 @@
902 SELECT count(*) FROM replication_streams;
903 count(*)
904 #
905+SELECT count(*) FROM replication_publishers;
906+count(*)
907+#
908+SELECT count(*) FROM replication_publisher_channels;
909+count(*)
910+#
911 show create table replication_streams;
912 Table Create Table
913 replication_streams CREATE TABLE `replication_streams` (
914 `REPLICATOR` varchar(64) NOT NULL DEFAULT '',
915 `APPLIER` varchar(64) NOT NULL DEFAULT ''
916 ) ENGINE=FunctionEngine
917+show create table replication_publishers;
918+Table Create Table
919+replication_publishers CREATE TABLE `replication_publishers` (
920+ `PUBLISHER_NAME` varchar(64) NOT NULL DEFAULT ''
921+) ENGINE=FunctionEngine
922+show create table replication_publisher_channels;
923+Table Create Table
924+replication_publisher_channels CREATE TABLE `replication_publisher_channels` (
925+ `PUBLISHER` varchar(64) NOT NULL DEFAULT '',
926+ `CHANNEL_ID` bigint NOT NULL DEFAULT '0',
927+ `LAST_APPLIED_TRANSACTION_ID` bigint NOT NULL DEFAULT '0',
928+ `LAST_APPLIED_END_TIMESTAMP` bigint NOT NULL DEFAULT '0'
929+) ENGINE=FunctionEngine
930
931=== added file 'plugin/replication_dictionary/tests/r/replication_publisher_channels.result'
932--- plugin/replication_dictionary/tests/r/replication_publisher_channels.result 1970-01-01 00:00:00 +0000
933+++ plugin/replication_dictionary/tests/r/replication_publisher_channels.result 2010-05-05 02:25:42 +0000
934@@ -0,0 +1,20 @@
935+use data_dictionary;
936+SET @@global.transaction_log_truncate_debug= 1;
937+SELECT * FROM data_dictionary.replication_publisher_channels;
938+PUBLISHER CHANNEL_ID LAST_APPLIED_TRANSACTION_ID LAST_APPLIED_END_TIMESTAMP
939+transaction_log_publisher 1 0 0
940+use test;
941+DROP TABLE IF EXISTS t1;
942+SET @@global.transaction_log_truncate_debug= 1;
943+CREATE TABLE t1
944+(
945+id INT NOT NULL PRIMARY KEY
946+, padding VARCHAR(20) NOT NULL
947+);
948+INSERT INTO t1 VALUES (1, 'I love testing');
949+INSERT INTO t1 VALUES (2, 'I hate testing');
950+DROP TABLE t1;
951+use data_dictionary;
952+SELECT * FROM data_dictionary.replication_publisher_channels;
953+PUBLISHER CHANNEL_ID LAST_APPLIED_TRANSACTION_ID LAST_APPLIED_END_TIMESTAMP
954+transaction_log_publisher 1 4 X
955
956=== added file 'plugin/replication_dictionary/tests/r/replication_publishers.result'
957--- plugin/replication_dictionary/tests/r/replication_publishers.result 1970-01-01 00:00:00 +0000
958+++ plugin/replication_dictionary/tests/r/replication_publishers.result 2010-05-05 02:25:42 +0000
959@@ -0,0 +1,4 @@
960+use data_dictionary;
961+SELECT * FROM data_dictionary.replication_publishers;
962+PUBLISHER_NAME
963+transaction_log_publisher
964
965=== modified file 'plugin/replication_dictionary/tests/t/data_dictionary.test'
966--- plugin/replication_dictionary/tests/t/data_dictionary.test 2010-04-05 16:06:01 +0000
967+++ plugin/replication_dictionary/tests/t/data_dictionary.test 2010-05-05 02:25:42 +0000
968@@ -5,4 +5,12 @@
969 --replace_column 1 #
970 SELECT count(*) FROM replication_streams;
971
972+--replace_column 1 #
973+SELECT count(*) FROM replication_publishers;
974+
975+--replace_column 1 #
976+SELECT count(*) FROM replication_publisher_channels;
977+
978 show create table replication_streams;
979+show create table replication_publishers;
980+show create table replication_publisher_channels;
981
982=== added symlink 'plugin/replication_dictionary/tests/t/replication_publisher_channels-master.opt'
983=== target is u'startup-master.opt'
984=== added file 'plugin/replication_dictionary/tests/t/replication_publisher_channels.test'
985--- plugin/replication_dictionary/tests/t/replication_publisher_channels.test 1970-01-01 00:00:00 +0000
986+++ plugin/replication_dictionary/tests/t/replication_publisher_channels.test 2010-05-05 02:25:42 +0000
987@@ -0,0 +1,31 @@
988+# Drizzle's replication dictionary's replication_publisher_channels table
989+
990+use data_dictionary;
991+
992+SET @@global.transaction_log_truncate_debug= 1;
993+
994+SELECT * FROM data_dictionary.replication_publisher_channels;
995+
996+use test;
997+
998+--disable_warnings
999+DROP TABLE IF EXISTS t1;
1000+--enable_warnings
1001+
1002+SET @@global.transaction_log_truncate_debug= 1;
1003+
1004+CREATE TABLE t1
1005+(
1006+ id INT NOT NULL PRIMARY KEY
1007+, padding VARCHAR(20) NOT NULL
1008+);
1009+
1010+INSERT INTO t1 VALUES (1, 'I love testing');
1011+INSERT INTO t1 VALUES (2, 'I hate testing');
1012+
1013+DROP TABLE t1;
1014+
1015+use data_dictionary;
1016+
1017+--replace_column 4 X
1018+SELECT * FROM data_dictionary.replication_publisher_channels;
1019
1020=== added symlink 'plugin/replication_dictionary/tests/t/replication_publishers-master.opt'
1021=== target is u'startup-master.opt'
1022=== added file 'plugin/replication_dictionary/tests/t/replication_publishers.test'
1023--- plugin/replication_dictionary/tests/t/replication_publishers.test 1970-01-01 00:00:00 +0000
1024+++ plugin/replication_dictionary/tests/t/replication_publishers.test 2010-05-05 02:25:42 +0000
1025@@ -0,0 +1,5 @@
1026+# Drizzle's replication dictionary's replication_publishers table.
1027+
1028+use data_dictionary;
1029+
1030+SELECT * FROM data_dictionary.replication_publishers;
1031
1032=== added symlink 'plugin/replication_dictionary/tests/t/replication_streams-master.opt'
1033=== target is u'startup-master.opt'
1034=== removed file 'plugin/replication_dictionary/tests/t/replication_streams-master.opt'
1035--- plugin/replication_dictionary/tests/t/replication_streams-master.opt 2010-04-05 16:06:01 +0000
1036+++ plugin/replication_dictionary/tests/t/replication_streams-master.opt 1970-01-01 00:00:00 +0000
1037@@ -1,1 +0,0 @@
1038---transaction-log-enable --transaction-log-use-replicator=filtered
1039
1040=== added symlink 'plugin/replication_dictionary/tests/t/slap-master.opt'
1041=== target is u'startup-master.opt'
1042=== added file 'plugin/replication_dictionary/tests/t/startup-master.opt'
1043--- plugin/replication_dictionary/tests/t/startup-master.opt 1970-01-01 00:00:00 +0000
1044+++ plugin/replication_dictionary/tests/t/startup-master.opt 2010-05-05 02:25:42 +0000
1045@@ -0,0 +1,1 @@
1046+--transaction-log-enable --transaction-log-use-replicator=filtered
1047
1048=== modified file 'plugin/schema_dictionary/tests/r/data_dictionary.result'
1049--- plugin/schema_dictionary/tests/r/data_dictionary.result 2010-04-07 21:54:30 +0000
1050+++ plugin/schema_dictionary/tests/r/data_dictionary.result 2010-05-05 02:25:42 +0000
1051@@ -466,6 +466,11 @@
1052 #
1053 #
1054 #
1055+#
1056+#
1057+#
1058+#
1059+#
1060 SELECT count(*) FROM REFERENTIAL_CONSTRAINTS;
1061 count(*)
1062 #
1063
1064=== modified file 'plugin/transaction_log/background_worker.cc'
1065--- plugin/transaction_log/background_worker.cc 2010-02-04 08:14:46 +0000
1066+++ plugin/transaction_log/background_worker.cc 2010-05-05 02:25:42 +0000
1067@@ -60,7 +60,6 @@
1068 #include <drizzled/gettext.h>
1069 #include <drizzled/errmsg_print.h>
1070
1071-
1072 #include "transaction_log.h"
1073 #include "background_worker.h"
1074
1075
1076=== modified file 'plugin/transaction_log/module.cc'
1077--- plugin/transaction_log/module.cc 2010-04-12 19:15:00 +0000
1078+++ plugin/transaction_log/module.cc 2010-05-05 02:25:42 +0000
1079@@ -39,6 +39,7 @@
1080 #include "print_transaction_message.h"
1081 #include "hexdump_transaction_message.h"
1082 #include "background_worker.h"
1083+#include "publisher.h"
1084
1085 #include <drizzled/plugin/plugin.h>
1086 #include <drizzled/session.h>
1087@@ -101,6 +102,8 @@
1088 extern TransactionLogIndex *transaction_log_index;
1089 /** Transaction Log descriptor defined in transaction_log.cc */
1090 extern TransactionLog *transaction_log;
1091+/** Publisher plugin defined in publisher.cc */
1092+extern TransactionLogPublisher *transaction_log_publisher;
1093 /** Transaction Log descriptor defined in transaction_log.cc */
1094 extern TransactionLogApplier *transaction_log_applier;
1095
1096@@ -153,10 +156,23 @@
1097 }
1098 }
1099
1100+ /* Create and initialize the transaction log publisher */
1101+ try
1102+ {
1103+ transaction_log_publisher= new TransactionLogPublisher("transaction_log_publisher");
1104+ }
1105+ catch (exception &e)
1106+ {
1107+ errmsg_printf(ERRMSG_LVL_ERROR, _("Failed to create the TransactionLogPublisher. Got error: %s\n"),
1108+ e.what());
1109+ return 1;
1110+ }
1111+
1112 /* Create the applier plugin and register it */
1113 transaction_log_applier= new (nothrow) TransactionLogApplier("transaction_log_applier",
1114 transaction_log,
1115 transaction_log_index,
1116+ transaction_log_publisher,
1117 sysvar_transaction_log_num_write_buffers);
1118 if (transaction_log_applier == NULL)
1119 {
1120@@ -165,6 +181,15 @@
1121 return 1;
1122 }
1123 context.add(transaction_log_applier);
1124+ context.add(transaction_log_publisher);
1125+
1126+ /*
1127+ * Tell the replication services component about our applier
1128+ *
1129+ * @note This is a hack to allow us to pair a replicator with
1130+ * an applied. Should be refactored to not rely on a call to the
1131+ * replication services here...
1132+ */
1133 ReplicationServices &replication_services= ReplicationServices::singleton();
1134 string replicator_name(sysvar_transaction_log_use_replicator);
1135 replication_services.attachApplier(transaction_log_applier, replicator_name);
1136@@ -213,6 +238,7 @@
1137 {
1138 transaction_log->truncate();
1139 transaction_log_index->clear();
1140+ transaction_log_publisher->clear();
1141 }
1142 }
1143 }
1144
1145=== added file 'plugin/transaction_log/plugin.am'
1146--- plugin/transaction_log/plugin.am 1970-01-01 00:00:00 +0000
1147+++ plugin/transaction_log/plugin.am 2010-05-05 02:25:42 +0000
1148@@ -0,0 +1,27 @@
1149+# vim:ft=automake
1150+# Copyright (C) 2010 Jay Pipes
1151+#
1152+# This program is free software; you can redistribute it and/or modify
1153+# it under the terms of the GNU General Public License as published by
1154+# the Free Software Foundation; version 2 of the License.
1155+#
1156+# This program is distributed in the hope that it will be useful,
1157+# but WITHOUT ANY WARRANTY; without even the implied warranty of
1158+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
1159+# GNU General Public License for more details.
1160+#
1161+# You should have received a copy of the GNU General Public License
1162+# along with this program; if not, write to the Free Software
1163+# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
1164+
1165+EXTRA_DIST += plugin/transaction_log/replication.proto
1166+
1167+BUILT_SOURCES += plugin/transaction_log/replication.pb.h plugin/transaction_log/replication.pb.cc
1168+
1169+CLEANFILES += plugin/transaction_log/replication.pb.h plugin/transaction_log/replication.pb.cc
1170+
1171+TRX_LOG_PROTO_PATH=${top_builddir}/plugin/transaction_log/:${top_srcdir}/plugin/transaction_log/
1172+
1173+plugin/transaction_log/replication.pb.cc plugin/transaction_log/replication.pb.h: plugin/transaction_log/replication.proto
1174+ $(PROTOC) --proto_path=${TRX_LOG_PROTO_PATH} \
1175+ --cpp_out=${top_builddir}/plugin/transaction_log/ $<
1176
1177=== modified file 'plugin/transaction_log/plugin.ini'
1178--- plugin/transaction_log/plugin.ini 2010-03-27 18:51:59 +0000
1179+++ plugin/transaction_log/plugin.ini 2010-05-05 02:25:42 +0000
1180@@ -6,8 +6,8 @@
1181 title=Transaction Log
1182 description=Log of Transaction Messages
1183 load_by_default=yes
1184-sources= background_worker.cc hexdump_transaction_message.cc module.cc print_transaction_message.cc transaction_log.cc transaction_log_applier.cc transaction_log_entry.cc transaction_log_index.cc transaction_log_reader.cc data_dictionary_schema.cc write_buffer.cc
1185-headers= background_worker.h hexdump_transaction_message.h print_transaction_message.h transaction_log.h transaction_log_applier.h transaction_log_entry.h transaction_log_index.h transaction_log_reader.h data_dictionary_schema.h write_buffer.h
1186+sources= background_worker.cc hexdump_transaction_message.cc module.cc print_transaction_message.cc transaction_log.cc transaction_log_applier.cc transaction_log_entry.cc transaction_log_index.cc transaction_log_reader.cc publisher.cc data_dictionary_schema.cc write_buffer.cc replication.pb.cc
1187+headers= background_worker.h hexdump_transaction_message.h print_transaction_message.h transaction_log.h transaction_log_applier.h transaction_log_entry.h transaction_log_index.h transaction_log_reader.h publisher.h data_dictionary_schema.h write_buffer.h replication.pb.h
1188 libs=${top_builddir}/drizzled/algorithm/libhash.la
1189 libadd=$(LIBZ)
1190-cxxflags=${PROTOSKIP_WARNINGS}
1191+cxxflags=${PROTOSKIP_WARNINGS} ${NO_WERROR}
1192
1193=== added file 'plugin/transaction_log/publisher.cc'
1194--- plugin/transaction_log/publisher.cc 1970-01-01 00:00:00 +0000
1195+++ plugin/transaction_log/publisher.cc 2010-05-05 02:25:42 +0000
1196@@ -0,0 +1,213 @@
1197+/* - mode: c; c-basic-offset: 2; indent-tabs-mode: nil; -*-
1198+ * vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
1199+ *
1200+ * Copyright (C) 2008-2009 Sun Microsystems
1201+ * Copyright (c) 2010 Jay Pipes <jaypipes@gmail.com>
1202+ *
1203+ * Authors:
1204+ *
1205+ * Jay Pipes <jaypipes@gmail.com>
1206+ *
1207+ * This program is free software; you can redistribute it and/or modify
1208+ * it under the terms of the GNU General Public License as published by
1209+ * the Free Software Foundation; either version 2 of the License, or
1210+ * (at your option) any later version.
1211+ *
1212+ * This program is distributed in the hope that it will be useful,
1213+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
1214+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
1215+ * GNU General Public License for more details.
1216+ *
1217+ * You should have received a copy of the GNU General Public License
1218+ * along with this program; if not, write to the Free Software
1219+ * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
1220+ */
1221+
1222+/**
1223+ * @file
1224+ *
1225+ * Defines the implementation of the publisher plugin
1226+ * for the transaction log.
1227+ *
1228+ * @see drizzled/plugin/publisher.h
1229+ * @see drizzled/plugin/subscriber.h
1230+ */
1231+
1232+#include "config.h"
1233+#include "publisher.h"
1234+
1235+#include <fcntl.h>
1236+#include <unistd.h>
1237+#include <string>
1238+#include <utility>
1239+#include <stdexcept>
1240+
1241+#include <drizzled/internal/my_sys.h> /* for internal::my_sync */
1242+#include <drizzled/gettext.h>
1243+#include <drizzled/errmsg_print.h>
1244+
1245+using namespace std;
1246+using namespace drizzled;
1247+
1248+TransactionLogPublisher *transaction_log_publisher= NULL; /* The singleton transaction log publisher */
1249+
1250+const std::string TransactionLogPublisher::MANIFEST_FILE_NAME= ("publisher.manifest");
1251+
1252+TransactionLogPublisher::TransactionLogPublisher(string name_arg) :
1253+ plugin::Publisher(name_arg),
1254+ publisher_manifest(),
1255+ channel_apply_map(),
1256+ manifest_file(-1)
1257+{
1258+ pthread_mutex_init(&latch, NULL);
1259+
1260+ if (not readManifestFile())
1261+ {
1262+ throw runtime_error(_("Failed to read publisher's manifest file"));
1263+ }
1264+
1265+ manifest_file= open(MANIFEST_FILE_NAME.c_str(), O_CREAT|O_SYNC|O_WRONLY, S_IRWXU);
1266+}
1267+
1268+TransactionLogPublisher::~TransactionLogPublisher()
1269+{
1270+ try
1271+ {
1272+ (void) writeManifestFile();
1273+ (void) syncManifestFile();
1274+ close(manifest_file);
1275+ }
1276+ catch (...)
1277+ {} /* enforce nothrow guarantee on destructors... */
1278+ pthread_mutex_destroy(&latch);
1279+}
1280+
1281+bool TransactionLogPublisher::readManifestFile()
1282+{
1283+ PublisherManifest &pm= getPublisherManifest();
1284+ int read_file= open(MANIFEST_FILE_NAME.c_str(), O_RDONLY);
1285+ if (read_file == -1)
1286+ {
1287+ /*
1288+ * If file doesn't exist, just create a default manifest message
1289+ * in memory. This message will be written to disk into the
1290+ * manifest file later.
1291+ */
1292+ if (errno == ENOENT)
1293+ {
1294+ /* Create a default manifest with channel 1 and no applied transactions */
1295+ PublisherManifest::PublisherApplyInfo *appl_info= pm.add_apply_info();
1296+ appl_info->set_channel_id(1);
1297+ appl_info->set_last_applied_transaction_id(0);
1298+ appl_info->set_last_applied_end_timestamp(0);
1299+ }
1300+ else
1301+ {
1302+ errmsg_printf(ERRMSG_LVL_ERROR, _("Unable to read publisher's manifest file! Got error: %s\n"), strerror(errno));
1303+ close(read_file);
1304+ return false;
1305+ }
1306+ }
1307+ else
1308+ {
1309+ if (not pm.ParseFromFileDescriptor(read_file))
1310+ {
1311+ errmsg_printf(ERRMSG_LVL_ERROR,
1312+ _("Unable to parse publisher's manifest file!\n"
1313+ "PublisherManifest was initialized, but contains corrupted data:\n%s\n"),
1314+ pm.DebugString().c_str());
1315+ close(read_file);
1316+ return false;
1317+ }
1318+ }
1319+ uint32_t num_channels= pm.apply_info_size();
1320+ for (uint32_t x= 0; x < num_channels; ++x)
1321+ {
1322+ PublisherManifest::PublisherApplyInfo *apply_info= pm.mutable_apply_info(x);
1323+ channel_apply_map.insert(make_pair(apply_info->channel_id(), apply_info));
1324+ }
1325+ close(read_file);
1326+ return true;
1327+}
1328+
1329+void TransactionLogPublisher::clear()
1330+{
1331+ publisher_manifest.Clear();
1332+ /* Create a default manifest with channel 1 and no applied transactions */
1333+ PublisherManifest::PublisherApplyInfo *appl_info= publisher_manifest.add_apply_info();
1334+ appl_info->set_channel_id(1);
1335+ appl_info->set_last_applied_transaction_id(0);
1336+ appl_info->set_last_applied_end_timestamp(0);
1337+ (void) ftruncate(manifest_file, 0);
1338+ (void) internal::my_sync(manifest_file, 0);
1339+}
1340+
1341+void TransactionLogPublisher::setLastApplied(const message::Transaction &applied)
1342+{
1343+ uint32_t channel_applied;
1344+ if (applied.transaction_context().has_channel_id())
1345+ channel_applied= applied.transaction_context().channel_id();
1346+ else
1347+ channel_applied= 1;
1348+ lock();
1349+ channel_apply_map[channel_applied]->set_last_applied_transaction_id(applied.transaction_context().transaction_id());
1350+ channel_apply_map[channel_applied]->set_last_applied_end_timestamp(applied.transaction_context().end_timestamp());
1351+ unlock();
1352+ (void) writeManifestFile();
1353+}
1354+
1355+TransactionServices::TransactionId TransactionLogPublisher::getLastAppliedTransactionId(uint32_t channel_id)
1356+{
1357+ lock();
1358+ TransactionServices::TransactionId result= channel_apply_map[channel_id]->last_applied_transaction_id();
1359+ unlock();
1360+ return result;
1361+}
1362+
1363+uint64_t TransactionLogPublisher::getLastAppliedEndTimestamp(uint32_t channel_id)
1364+{
1365+ lock();
1366+ uint64_t result= channel_apply_map[channel_id]->last_applied_end_timestamp();
1367+ unlock();
1368+ return result;
1369+}
1370+
1371+bool TransactionLogPublisher::writeManifestFile()
1372+{
1373+ /* write the manifest message out to our file... */
1374+ PublisherManifest &pm= getPublisherManifest();
1375+ string write_buffer("");
1376+ lock();
1377+ bool result= pm.SerializeToString(&write_buffer);
1378+ if (unlikely(not result))
1379+ {
1380+ errmsg_printf(ERRMSG_LVL_ERROR,
1381+ _("Unable to write publisher's manifest file!\n"
1382+ "PublisherManifest contains data:\n%s\n"),
1383+ pm.DebugString().c_str());
1384+ unlock();
1385+ return false;
1386+ }
1387+ uint32_t num_bytes_to_write= write_buffer.size();
1388+ ssize_t written= pwrite(manifest_file, write_buffer.c_str(), num_bytes_to_write, 0);
1389+ result= (written != static_cast<ssize_t>(num_bytes_to_write));
1390+ unlock();
1391+ return result;
1392+}
1393+
1394+bool TransactionLogPublisher::syncManifestFile()
1395+{
1396+ return internal::my_sync(manifest_file, 0) == 0;
1397+}
1398+
1399+void TransactionLogPublisher::getChannels(vector<uint32_t> *out_channels)
1400+{
1401+ lock();
1402+ PublisherManifest &pm= getPublisherManifest();
1403+ size_t num_channels= pm.apply_info_size();
1404+ for (uint32_t x= 0; x < num_channels; ++x)
1405+ {
1406+ out_channels->push_back(pm.apply_info(x).channel_id());
1407+ }
1408+ unlock();
1409+}
1410
1411=== added file 'plugin/transaction_log/publisher.h'
1412--- plugin/transaction_log/publisher.h 1970-01-01 00:00:00 +0000
1413+++ plugin/transaction_log/publisher.h 2010-05-05 02:25:42 +0000
1414@@ -0,0 +1,161 @@
1415+/* - mode: c; c-basic-offset: 2; indent-tabs-mode: nil; -*-
1416+ * vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
1417+ *
1418+ * Copyright (c) 2010 Jay Pipes <jaypipes@gmail.com>
1419+ *
1420+ * Authors:
1421+ *
1422+ * Jay Pipes <jaypipes@gmail.com>
1423+ *
1424+ * This program is free software; you can redistribute it and/or modify
1425+ * it under the terms of the GNU General Public License as published by
1426+ * the Free Software Foundation; either version 2 of the License, or
1427+ * (at your option) any later version.
1428+ *
1429+ * This program is distributed in the hope that it will be useful,
1430+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
1431+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
1432+ * GNU General Public License for more details.
1433+ *
1434+ * You should have received a copy of the GNU General Public License
1435+ * along with this program; if not, write to the Free Software
1436+ * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
1437+ */
1438+
1439+/**
1440+ * @file
1441+ *
1442+ * Defines the API of the transaction log's publisher plugin.
1443+ *
1444+ * @see drizzled/plugin/publisher.h
1445+ * @see drizzled/plugin/subscriber.h
1446+ */
1447+
1448+#ifndef PLUGIN_TRANSACTION_LOG_PUBLISHER_H
1449+#define PLUGIN_TRANSACTION_LOG_PUBLISHER_H
1450+
1451+#include <drizzled/plugin/publisher.h>
1452+
1453+#include "replication.pb.h"
1454+
1455+#include <drizzled/unordered_map.h>
1456+#include <string>
1457+#include <pthread.h>
1458+
1459+/**
1460+ * The publisher class for the transaction log. This class is
1461+ * responsible mostly for keeping track of the last applied
1462+ * transaction ID and timestamps on this node for each channel,
1463+ * and for communicating with subscribers of transaction log data.
1464+ */
1465+class TransactionLogPublisher: public drizzled::plugin::Publisher
1466+{
1467+public:
1468+ /** Constructor */
1469+ TransactionLogPublisher(std::string name_arg);
1470+ /** Destructor */
1471+ ~TransactionLogPublisher();
1472+ /**
1473+ * Returns the transaction ID for the last message that was
1474+ * successfully applied on the publisher node for a given
1475+ * channel.
1476+ *
1477+ * @param[in] Channel ID to check
1478+ */
1479+ drizzled::TransactionServices::TransactionId getLastAppliedTransactionId(uint32_t channel_id);
1480+ /**
1481+ * Returns the end timestamp for the last message that was
1482+ * successfully applied on the publisher node for a given
1483+ * channel.
1484+ *
1485+ * @param[in] Channel ID to check
1486+ */
1487+ uint64_t getLastAppliedEndTimestamp(uint32_t channel_id);
1488+ /**
1489+ * Sets the last transaction ID and timestamp based
1490+ * on supplied transaction message
1491+ *
1492+ * @param[in] Transaction message to check for ID and timestamp
1493+ */
1494+ void setLastApplied(const drizzled::message::Transaction &applied);
1495+ /**
1496+ * Simple debugging method which sets each channel's applied
1497+ * transaction ID and end timestamp to zero.
1498+ */
1499+ void clear();
1500+ /**
1501+ * Returns the number of channels this publisher
1502+ * publishes to.
1503+ */
1504+ size_t getNumChannels()
1505+ {
1506+ return getPublisherManifest().apply_info_size();
1507+ }
1508+ /**
1509+ * Adds to a supplied vector the set of channels this
1510+ * publisher publishes to.
1511+ *
1512+ * @param[out] Pointer to a Vector of channel IDs to add to
1513+ */
1514+ void getChannels(std::vector<uint32_t> *out_channels);
1515+private:
1516+ typedef drizzled::unordered_map<uint32_t, PublisherManifest::PublisherApplyInfo *> ChannelApplyMap;
1517+ static const std::string MANIFEST_FILE_NAME;
1518+ /**
1519+ * Global publisher lock
1520+ * @todo Break into a scoreboard?
1521+ */
1522+ pthread_mutex_t latch;
1523+ /**
1524+ * The manifest message which contains information about the
1525+ * latest transaction messages that were applied on the
1526+ * publishing node. This is kept on disk and in-memory, but the
1527+ * information in it is only authoritative for the publisher's
1528+ * data state, not the subscribers.
1529+ */
1530+ PublisherManifest publisher_manifest;
1531+ /**
1532+ * Map of channel ID to the mutable publisher apply info
1533+ * for that channel.
1534+ */
1535+ ChannelApplyMap channel_apply_map;
1536+ /**
1537+ * The file for our manifest
1538+ */
1539+ int manifest_file;
1540+ /**
1541+ * Locks the publisher
1542+ */
1543+ void lock()
1544+ {
1545+ pthread_mutex_lock(&latch);
1546+ }
1547+ /**
1548+ * Unlocks the publisher
1549+ */
1550+ void unlock()
1551+ {
1552+ pthread_mutex_unlock(&latch);
1553+ }
1554+ /**
1555+ * Returns the publisher's manifest message
1556+ */
1557+ PublisherManifest &getPublisherManifest()
1558+ {
1559+ return publisher_manifest;
1560+ }
1561+ /**
1562+ * Reads the publisher's manifest file on startup
1563+ */
1564+ bool readManifestFile();
1565+ /**
1566+ * Writes the publisher's manifest file
1567+ */
1568+ bool writeManifestFile();
1569+ /**
1570+ * Syncs the publisher's manifest file
1571+ */
1572+ bool syncManifestFile();
1573+};
1574+
1575+#endif /* PLUGIN_TRANSACTION_LOG_PUBLISHER_H */
1576
1577=== added file 'plugin/transaction_log/replication.proto'
1578--- plugin/transaction_log/replication.proto 1970-01-01 00:00:00 +0000
1579+++ plugin/transaction_log/replication.proto 2010-05-05 02:25:42 +0000
1580@@ -0,0 +1,59 @@
1581+/**
1582+ * A class representing this information
1583+ * a subscriber keeps about the data it
1584+ * has pulled from publishers
1585+ */
1586+message SubscriberManifest
1587+{
1588+ /**
1589+ * Sub message storing information about the
1590+ * communication this subsciber has had with
1591+ * a publisher.
1592+ */
1593+ message PublisherCommunicationInfo
1594+ {
1595+ required string hostname = 1; /* Possibly make this into a PublisherInfo class */
1596+ required uint32 channel_id = 2; /* Replication channel */
1597+ required uint64 last_communication_try = 3; /* Timestamp of last attempt at communication */
1598+ required uint64 last_communication_success = 4; /* Timestamp of last successful communication */
1599+ required uint64 last_received_transaction_id = 5; /* Transaction ID of last received transaction */
1600+ required uint64 last_received_end_timestamp = 6; /* End timestamp of last received transaction */
1601+ required uint64 last_applied_transaction_id = 7; /* Transaction ID of last applied transaction */
1602+ required uint64 last_applied_end_timestamp = 8; /* End timestamp of last applied transaction */
1603+ }
1604+ /* The set of information about communication with publishers */
1605+ repeated PublisherCommunicationInfo communication_info = 1;
1606+}
1607+
1608+message PublisherManifest
1609+{
1610+ /**
1611+ * Sub message storing information about the
1612+ * communication this subsciber has had with
1613+ * a publisher.
1614+ */
1615+ message PublisherApplyInfo
1616+ {
1617+ required uint32 channel_id = 1; /* Replication channel */
1618+ required uint64 last_applied_transaction_id = 2; /* ID of last applied transaction */
1619+ required uint64 last_applied_end_timestamp = 3; /* End timestamp of last applied transaction */
1620+ }
1621+ /**
1622+ * Sub message storing information about the
1623+ * communication this subsciber has had with
1624+ * a publisher.
1625+ */
1626+ message SubscriberCommunicationInfo
1627+ {
1628+ required string hostname = 1; /* Possibly make this into a SubscriberInfo class */
1629+ required uint32 channel_id = 2; /* Replication channel */
1630+ required uint64 last_communication_try = 3; /* Timestamp of last attempt at communication */
1631+ required uint64 last_communication_success = 4; /* Timestamp of last successful communication */
1632+ required uint64 last_sent_transaction_id = 5; /* Transaction ID of last sent transaction */
1633+ required uint64 last_sent_end_timestamp = 6; /* End timestamp of last snet transaction */
1634+ }
1635+ /* The set of information about channel applied transactions */
1636+ repeated PublisherApplyInfo apply_info = 1;
1637+ /* The set of information about communication with subscribers */
1638+ repeated SubscriberCommunicationInfo subscriber_info = 2;
1639+}
1640
1641=== added directory 'plugin/transaction_log/tests/inc'
1642=== added file 'plugin/transaction_log/tests/inc/publisher.manifest'
1643--- plugin/transaction_log/tests/inc/publisher.manifest 1970-01-01 00:00:00 +0000
1644+++ plugin/transaction_log/tests/inc/publisher.manifest 2010-05-05 02:25:42 +0000
1645@@ -0,0 +1,2 @@
1646+
1647+
1648¹è½¢Ã¥¡
1649\ No newline at end of file
1650
1651=== modified file 'plugin/transaction_log/tests/r/bad_replicator.result'
1652--- plugin/transaction_log/tests/r/bad_replicator.result 2010-04-05 16:06:01 +0000
1653+++ plugin/transaction_log/tests/r/bad_replicator.result 2010-05-05 02:25:42 +0000
1654@@ -3,4 +3,5 @@
1655 WHERE DATA_DICTIONARY.PLUGINS.PLUGIN_NAME LIKE 'Transaction%';
1656 PLUGIN_NAME PLUGIN_TYPE IS_ACTIVE MODULE_NAME
1657 transaction_log_applier TransactionApplier FALSE transaction_log
1658+transaction_log_publisher Publisher TRUE transaction_log
1659 SET GLOBAL transaction_log_truncate_debug= true;
1660
1661=== added file 'plugin/transaction_log/tests/r/publisher_manifest.result'
1662--- plugin/transaction_log/tests/r/publisher_manifest.result 1970-01-01 00:00:00 +0000
1663+++ plugin/transaction_log/tests/r/publisher_manifest.result 2010-05-05 02:25:42 +0000
1664@@ -0,0 +1,20 @@
1665+SET GLOBAL transaction_log_truncate_debug= true;
1666+DROP TABLE IF EXISTS t1;
1667+CREATE TABLE t1 (
1668+id INT NOT NULL PRIMARY KEY
1669+, padding VARCHAR(200) NOT NULL
1670+);
1671+INSERT INTO t1 VALUES (1, "I love testing.");
1672+INSERT INTO t1 VALUES (2, "I hate testing.");
1673+DROP TABLE t1;
1674+1var/master-data/publisher.manifest
1675+SET GLOBAL transaction_log_truncate_debug= true;
1676+testDefaultStartupVariables (__main__.TestPublisherManifest) ... ok
1677+testReadCorruptPublisherManifest (__main__.TestPublisherManifest) ... ok
1678+testReadPublisherManifest (__main__.TestPublisherManifest) ... ok
1679+testWritePublisherManifest (__main__.TestPublisherManifest) ... ok
1680+
1681+----------------------------------------------------------------------
1682+Ran 4 tests
1683+
1684+OK
1685
1686=== added directory 'plugin/transaction_log/tests/replication'
1687=== added file 'plugin/transaction_log/tests/replication/COPYING'
1688--- plugin/transaction_log/tests/replication/COPYING 1970-01-01 00:00:00 +0000
1689+++ plugin/transaction_log/tests/replication/COPYING 2010-05-05 02:25:42 +0000
1690@@ -0,0 +1,32 @@
1691+Drizzle Replication Test Suite
1692+
1693+Copyright (C) 2010 Jay Pipes <jaypipes@gmail.com>
1694+All rights reserved.
1695+
1696+Redistribution and use in source and binary forms, with or without
1697+modification, are permitted provided that the following conditions are
1698+met:
1699+
1700+ * Redistributions of source code must retain the above copyright
1701+notice, this list of conditions and the following disclaimer.
1702+
1703+ * Redistributions in binary form must reproduce the above
1704+copyright notice, this list of conditions and the following disclaimer
1705+in the documentation and/or other materials provided with the
1706+distribution.
1707+
1708+ * The names of its contributors may not be used to endorse or
1709+promote products derived from this software without specific prior
1710+written permission.
1711+
1712+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
1713+"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
1714+LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
1715+A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
1716+OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
1717+SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
1718+LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
1719+DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
1720+THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
1721+(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
1722+OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
1723
1724=== added directory 'plugin/transaction_log/tests/replication/bootstrap'
1725=== added file 'plugin/transaction_log/tests/replication/bootstrap/ib_logfile0'
1726Binary files plugin/transaction_log/tests/replication/bootstrap/ib_logfile0 1970-01-01 00:00:00 +0000 and plugin/transaction_log/tests/replication/bootstrap/ib_logfile0 2010-05-05 02:25:42 +0000 differ
1727=== added file 'plugin/transaction_log/tests/replication/bootstrap/ib_logfile1'
1728Binary files plugin/transaction_log/tests/replication/bootstrap/ib_logfile1 1970-01-01 00:00:00 +0000 and plugin/transaction_log/tests/replication/bootstrap/ib_logfile1 2010-05-05 02:25:42 +0000 differ
1729=== added file 'plugin/transaction_log/tests/replication/bootstrap/ibdata1'
1730Binary files plugin/transaction_log/tests/replication/bootstrap/ibdata1 1970-01-01 00:00:00 +0000 and plugin/transaction_log/tests/replication/bootstrap/ibdata1 2010-05-05 02:25:42 +0000 differ
1731=== added directory 'plugin/transaction_log/tests/replication/inc'
1732=== added file 'plugin/transaction_log/tests/replication/inc/corrupted_publisher.manifest'
1733--- plugin/transaction_log/tests/replication/inc/corrupted_publisher.manifest 1970-01-01 00:00:00 +0000
1734+++ plugin/transaction_log/tests/replication/inc/corrupted_publisher.manifest 2010-05-05 02:25:42 +0000
1735@@ -0,0 +1,2 @@
1736+
1737+¹è½¢Ã¥¡
1738
1739=== added file 'plugin/transaction_log/tests/replication/inc/publisher.manifest'
1740--- plugin/transaction_log/tests/replication/inc/publisher.manifest 1970-01-01 00:00:00 +0000
1741+++ plugin/transaction_log/tests/replication/inc/publisher.manifest 2010-05-05 02:25:42 +0000
1742@@ -0,0 +1,2 @@
1743+
1744+
17452äò€˜Í§¡
1746\ No newline at end of file
1747
1748=== added file 'plugin/transaction_log/tests/replication/test_publisher_manifest.py'
1749--- plugin/transaction_log/tests/replication/test_publisher_manifest.py 1970-01-01 00:00:00 +0000
1750+++ plugin/transaction_log/tests/replication/test_publisher_manifest.py 2010-05-05 02:25:42 +0000
1751@@ -0,0 +1,417 @@
1752+#! /usr/bin/python
1753+#
1754+# Drizzle Replication Test Suite
1755+#
1756+# Copyright (c) 2010 Jay Pipes
1757+#
1758+# All rights reserved.
1759+#
1760+# Use and distribution licensed under the BSD license. See the
1761+# COPYING file in the root project directory for full text.
1762+
1763+"""Drizzle Transaction Log Test Suite - Publisher Manifest
1764+
1765+This tests the startup process of the Drizzle server and
1766+the reading of the transaction log's publisher manifest
1767+"""
1768+
1769+__author__ = 'Jay Pipes <jaypipes@gmail.com>'
1770+
1771+#
1772+# This file contains a test for reading the transaction
1773+# log's publisher manifest file on startup.
1774+#
1775+# It is necessary to use this testing suite
1776+# because the test-run.pl script assumes too many things
1777+# about the Drizzle test environment, and we need to
1778+# initialize both proper and corrupted publisher manifests
1779+# in order to test the startup process correctly.
1780+#
1781+
1782+import os
1783+import sys
1784+import commands
1785+import unittest
1786+import shutil
1787+import re
1788+
1789+drizzled_root_dir= os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..', '..', '..'))
1790+trx_log_root_dir= os.path.join(drizzled_root_dir, 'plugin', 'transaction_log')
1791+replication_test_root_dir= os.path.join(trx_log_root_dir, 'tests', 'replication')
1792+
1793+def compile_python_proto(protodir, protofile):
1794+ """Compiles a proto into the proto_pb2.py file and puts it into the main test directory"""
1795+ protocommand= "protoc --python_out=. --proto_path=%s %s" % (protodir, os.path.join(protodir, protofile))
1796+ (retcode, output)= commands.getstatusoutput(protocommand)
1797+ if retcode != 0:
1798+ raise Exception("Failed to compile Python protobuffers for %s\nTried command: %s" % (protofile, protocommand))
1799+
1800+compile_python_proto(trx_log_root_dir, 'replication.proto') # proto is in /plugin/transaction_log/
1801+
1802+import replication_pb2 as rpb
1803+
1804+from testlib.server import DrizzledServer
1805+from testlib.client import DrizzleClient
1806+
1807+class TestPublisherManifest(unittest.TestCase):
1808+
1809+ DEFAULT_START_OPTIONS= {
1810+ 'mysql_protocol_port': DrizzledServer.DEFAULT_MYSQL_PROTOCOL_PORT,
1811+ 'drizzle_protocol_port': DrizzledServer.DEFAULT_DRIZZLE_PROTOCOL_PORT,
1812+ 'datadir': os.path.join(replication_test_root_dir, "var"),
1813+ 'basedir': drizzled_root_dir
1814+ }
1815+
1816+ def setUp(self):
1817+ pass
1818+
1819+ def tearDown(self):
1820+ if self.server and self.server.ping():
1821+ self.server.stop()
1822+ self.server.clear()
1823+ self.resetDataDir()
1824+
1825+ def resetDataDir(self):
1826+ """Copies default InnoDB data files from bootstrap into datadir to speed up starts"""
1827+ (retcode, ignored)= commands.getstatusoutput("rm -rf %s" % os.path.join(replication_test_root_dir, "var", "*"))
1828+
1829+ def bootstrapDataFiles(self):
1830+ """Copies default InnoDB data files from bootstrap into datadir to speed up starts"""
1831+ (retcode, ignored)= commands.getstatusoutput("cp -r %s %s" % (os.path.join(replication_test_root_dir, "bootstrap", "*"),
1832+ os.path.join(replication_test_root_dir, "var", "*")))
1833+
1834+ def testWritePublisherManifest(self):
1835+ #
1836+ # Here, we test the scenario of a newly
1837+ # started server with no data history. There
1838+ # should be no publisher manifest file at startup
1839+ # and after plugin processing, the publisher manifest
1840+ # file should exist but have no size. Once a few transactions
1841+ # are applied on the server, the manifest should show data
1842+ # fields with the last transaction ID and end timestamp
1843+ #
1844+ startup_options= {
1845+ 'transaction-log-enable': '1'
1846+ }
1847+ for k, v in TestPublisherManifest.DEFAULT_START_OPTIONS.iteritems():
1848+ startup_options[k]= v
1849+
1850+ self.server= DrizzledServer(startup_options)
1851+
1852+ if self.server and self.server.ping():
1853+ self.server.stop()
1854+ self.resetDataDir()
1855+
1856+ self.bootstrapDataFiles()
1857+
1858+ # Test that the publisher manifest does not exist
1859+ # before startup.
1860+
1861+ manifest_filepath= os.path.join(replication_test_root_dir, "var", "publisher.manifest")
1862+
1863+ self.assertFalse(os.path.exists(manifest_filepath))
1864+
1865+ self.assertTrue(self.server.start())
1866+ self.assertTrue(self.server.ping())
1867+
1868+ # Test that a publisher manifest exists after startup
1869+ # but has no size since it has not been written to yet
1870+
1871+ self.assertTrue(os.path.exists(manifest_filepath))
1872+ self.assertEqual(os.path.getsize(manifest_filepath), 0)
1873+
1874+ # Now add a single transaction and verify the publisher
1875+ # manifest contains proper information by reading the
1876+ # manifest into a protobuffer container class object
1877+
1878+ client_options= {
1879+ 'basedir': startup_options['basedir'],
1880+ 'port': startup_options['mysql_protocol_port']
1881+ }
1882+
1883+ self.client= DrizzleClient(client_options)
1884+ self.assertTrue(self.client.execute("CREATE SCHEMA test"))
1885+ self.assertTrue(self.client.execute("CREATE TABLE t1 (id INT NOT NULL PRIMARY KEY)", "test"))
1886+ self.assertTrue(self.client.execute("INSERT INTO t1 VALUES(1)", "test"))
1887+
1888+ # Create the manifest GPB class object
1889+ # by reading in the publisher manifest, and then
1890+ # check to ensure manifest data memebers are appropriate
1891+
1892+ f= open(manifest_filepath)
1893+
1894+ try:
1895+ manifest= rpb.PublisherManifest()
1896+ manifest.ParseFromString(f.read())
1897+ except Exception as e:
1898+ self.assertFalse(True)
1899+
1900+ f.close()
1901+
1902+ self.assertEqual(len(manifest.apply_info), 1)
1903+ self.assertEqual(manifest.apply_info[0].channel_id, 1)
1904+ self.assertEqual(manifest.apply_info[0].last_applied_transaction_id, 3)
1905+ self.assertNotEqual(manifest.apply_info[0].last_applied_end_timestamp, 0)
1906+
1907+ def testReadCorruptPublisherManifest(self):
1908+ #
1909+ # Here, we test the scenario of a restarted server
1910+ # that has a corrupted publisher manifest. We start
1911+ # the server after copying in a deliberately corrupted
1912+ # manifest file and check to ensure that the server
1913+ # appropriately disables the publisher and prints an
1914+ # error on startup.
1915+ #
1916+ startup_options= {
1917+ 'transaction-log-enable': '1'
1918+ }
1919+ for k, v in TestPublisherManifest.DEFAULT_START_OPTIONS.iteritems():
1920+ startup_options[k]= v
1921+
1922+ self.server= DrizzledServer(startup_options)
1923+
1924+ if self.server and self.server.ping():
1925+ self.server.stop()
1926+ self.resetDataDir()
1927+
1928+ self.bootstrapDataFiles()
1929+
1930+ # Copy in the pre-corrupted manifest file into the datadir
1931+ manifest_filepath= os.path.join(replication_test_root_dir, "var", "publisher.manifest")
1932+ corrupted_manifest_filepath= os.path.join(replication_test_root_dir, "inc", "corrupted_publisher.manifest")
1933+ shutil.copy(corrupted_manifest_filepath, manifest_filepath)
1934+
1935+ # Test that a publisher manifest exists after startup
1936+ # but that the publisher has been disabled due to a corrupt
1937+ # publisher manifest and the server is prevented from
1938+ # starting up
1939+
1940+ self.assertTrue(self.server.start())
1941+ self.assertTrue(self.server.ping())
1942+
1943+ f= open(os.path.join(replication_test_root_dir, "var", "error.log"))
1944+ error_log_text= f.read()
1945+
1946+ # grep the error log and ensure that an error message was
1947+ # thrown by the publisher constructor
1948+
1949+ self.assertTrue(error_log_text.find("corrupted data") > 0)
1950+
1951+ f.close()
1952+
1953+ # Check that the publisher plugin is not available
1954+
1955+ client_options= {
1956+ 'basedir': startup_options['basedir'],
1957+ 'port': startup_options['mysql_protocol_port']
1958+ }
1959+ self.client= DrizzleClient(client_options)
1960+
1961+ expected_results_1= [
1962+ {
1963+ 'COUNT(*)': '0'
1964+ }
1965+ ]
1966+ results= self.client.fetchAllAssoc("SELECT COUNT(*) FROM DATA_DICTIONARY.REPLICATION_PUBLISHERS")
1967+
1968+ self.assertTrue(len(results) == len(expected_results_1))
1969+
1970+ rowno= 0
1971+ for row in results:
1972+ for k, v in row.iteritems():
1973+ self.assertEqual(str(expected_results_1[rowno][k]), str(v))
1974+ rowno+= 1
1975+
1976+ def testReadPublisherManifest(self):
1977+ #
1978+ # Here, we test the scenario of a restarted server
1979+ # that has a stored publisher manifest. We start
1980+ # the server after copying in a deliberately constructed
1981+ # manifest file and check to ensure that the server
1982+ # appropriately reads the manifest and updates its state
1983+ #
1984+ startup_options= {
1985+ 'transaction-log-enable': '1'
1986+ }
1987+ for k, v in TestPublisherManifest.DEFAULT_START_OPTIONS.iteritems():
1988+ startup_options[k]= v
1989+
1990+ self.server= DrizzledServer(startup_options)
1991+
1992+ if self.server and self.server.ping():
1993+ self.server.stop()
1994+ self.resetDataDir()
1995+
1996+ self.bootstrapDataFiles()
1997+
1998+ # Copy in the pre-corrupted manifest file into the datadir
1999+ manifest_filepath= os.path.join(replication_test_root_dir, "var", "publisher.manifest")
2000+ corrupted_manifest_filepath= os.path.join(replication_test_root_dir, "inc", "publisher.manifest")
2001+ shutil.copy(corrupted_manifest_filepath, manifest_filepath)
2002+
2003+ # Test that a publisher manifest exists after startup
2004+ # but that the publisher has been disabled due to a corrupt
2005+ # publisher manifest and the server is prevented from
2006+ # starting up
2007+
2008+ self.assertTrue(self.server.start())
2009+ self.assertTrue(self.server.ping())
2010+
2011+ f= open(os.path.join(replication_test_root_dir, "var", "error.log"))
2012+ error_log_text= f.read()
2013+
2014+ # grep the error log and ensure that an error message was
2015+ # thrown by the publisher constructor
2016+
2017+ self.assertEqual(error_log_text.find("corrupted data"), -1)
2018+
2019+ f.close()
2020+
2021+ # Check that the publisher plugin is available
2022+
2023+ client_options= {
2024+ 'basedir': startup_options['basedir'],
2025+ 'port': startup_options['mysql_protocol_port']
2026+ }
2027+ self.client= DrizzleClient(client_options)
2028+
2029+ expected_results_1= [
2030+ {
2031+ 'COUNT(*)': '1'
2032+ }
2033+ ]
2034+ results= self.client.fetchAllAssoc("SELECT COUNT(*) FROM DATA_DICTIONARY.REPLICATION_PUBLISHERS")
2035+
2036+ self.assertTrue(len(results) == len(expected_results_1))
2037+
2038+ rowno= 0
2039+ for row in results:
2040+ for k, v in row.iteritems():
2041+ self.assertEqual(str(expected_results_1[rowno][k]), str(v))
2042+ rowno+= 1
2043+
2044+ # Check that the publisher reports a transaction ID of 50
2045+ # and an end timestamp of 1272396191381860, which is what the pre-constructed
2046+ # manifest file contains
2047+
2048+ expected_results_2= [
2049+ {
2050+ 'PUBLISHER': 'transaction_log_publisher',
2051+ 'CHANNEL_ID': 1,
2052+ 'LAST_APPLIED_TRANSACTION_ID': 50,
2053+ 'LAST_APPLIED_END_TIMESTAMP': 1272396191381860
2054+ }
2055+ ]
2056+ results= self.client.fetchAllAssoc("SELECT * FROM DATA_DICTIONARY.REPLICATION_PUBLISHER_CHANNELS")
2057+
2058+ self.assertTrue(len(results) == len(expected_results_2))
2059+
2060+ rowno= 0
2061+ for row in results:
2062+ for k, v in row.iteritems():
2063+ self.assertEqual(str(expected_results_2[rowno][k]), str(v))
2064+ rowno+= 1
2065+
2066+ def testDefaultStartupVariables(self):
2067+ #
2068+ # Here, we test the "default" scenario of a newly
2069+ # started server with no data history. We check to see
2070+ # if the replication_publisher and replication_publisher_channels
2071+ # views in the data_dictionary contain proper values
2072+ #
2073+ startup_options= {
2074+ 'transaction-log-enable': '1'
2075+ }
2076+ for k, v in TestPublisherManifest.DEFAULT_START_OPTIONS.iteritems():
2077+ startup_options[k]= v
2078+
2079+ client_options= {
2080+ 'basedir': startup_options['basedir'],
2081+ 'port': startup_options['mysql_protocol_port']
2082+ }
2083+
2084+ self.server= DrizzledServer(startup_options)
2085+
2086+ if self.server and self.server.ping():
2087+ self.server.stop()
2088+ self.resetDataDir()
2089+
2090+ self.bootstrapDataFiles()
2091+ self.assertTrue(self.server.start())
2092+ self.assertTrue(self.server.ping())
2093+ self.client= DrizzleClient(client_options)
2094+
2095+ expected_results_1= [
2096+ {
2097+ 'PUBLISHER_NAME': 'transaction_log_publisher'
2098+ }
2099+ ]
2100+ results= self.client.fetchAllAssoc("SELECT * FROM DATA_DICTIONARY.REPLICATION_PUBLISHERS")
2101+
2102+ self.assertTrue(len(results) == len(expected_results_1))
2103+
2104+ rowno= 0
2105+ for row in results:
2106+ for k, v in row.iteritems():
2107+ self.assertEqual(str(expected_results_1[rowno][k]), str(v))
2108+ rowno+= 1
2109+
2110+ expected_results_2= [
2111+ {
2112+ 'PUBLISHER': 'transaction_log_publisher',
2113+ 'CHANNEL_ID': 1,
2114+ 'LAST_APPLIED_TRANSACTION_ID': 0,
2115+ 'LAST_APPLIED_END_TIMESTAMP': 0
2116+ }
2117+ ]
2118+ results= self.client.fetchAllAssoc("SELECT * FROM DATA_DICTIONARY.REPLICATION_PUBLISHER_CHANNELS")
2119+
2120+ self.assertTrue(len(results) == len(expected_results_2))
2121+
2122+ rowno= 0
2123+ for row in results:
2124+ for k, v in row.iteritems():
2125+ self.assertEqual(str(expected_results_2[rowno][k]), str(v))
2126+ rowno+= 1
2127+
2128+#
2129+# Code below taken verbatim from Eric Day's prototest
2130+# test suite in the mysql_protocol Drizzle plugin
2131+#
2132+import optparse
2133+
2134+parser = optparse.OptionParser()
2135+
2136+parser.add_option("-t", "--test", dest="testcase", default=None,
2137+ help="Test case to run", metavar="TESTCASE")
2138+
2139+(options, args) = parser.parse_args()
2140+
2141+class CustomTestRunner(unittest.TextTestRunner):
2142+ def run(self, test):
2143+ result = self._makeResult()
2144+ test(result)
2145+ result.printErrors()
2146+ self.stream.writeln(result.separator2)
2147+ run = result.testsRun
2148+ self.stream.writeln("Ran %d test%s" % (run, run != 1 and "s" or ""))
2149+ self.stream.writeln()
2150+ if not result.wasSuccessful():
2151+ self.stream.write("FAILED (")
2152+ failed, errored = map(len, (result.failures, result.errors))
2153+ if failed:
2154+ self.stream.write("failures=%d" % failed)
2155+ if errored:
2156+ if failed: self.stream.write(", ")
2157+ self.stream.write("errors=%d" % errored)
2158+ self.stream.writeln(")")
2159+ else:
2160+ self.stream.writeln("OK")
2161+ return result
2162+
2163+if __name__ == '__main__':
2164+ if options.testcase is None:
2165+ suite = unittest.TestLoader().loadTestsFromModule(__import__('__main__'))
2166+ else:
2167+ suite = unittest.TestLoader().loadTestsFromTestCase(eval(options.testcase))
2168+ CustomTestRunner(stream=sys.stdout, verbosity=2).run(suite)
2169
2170=== added directory 'plugin/transaction_log/tests/replication/testlib'
2171=== added file 'plugin/transaction_log/tests/replication/testlib/__init__.py'
2172--- plugin/transaction_log/tests/replication/testlib/__init__.py 1970-01-01 00:00:00 +0000
2173+++ plugin/transaction_log/tests/replication/testlib/__init__.py 2010-05-05 02:25:42 +0000
2174@@ -0,0 +1,10 @@
2175+#! /usr/bin/python
2176+#
2177+# Drizzle Replication Test Suite
2178+#
2179+# Copyright (c) 2010 Jay Pipes
2180+#
2181+# All rights reserved.
2182+#
2183+# Use and distribution licensed under the BSD license. See the
2184+# COPYING file in the root project directory for full text.
2185
2186=== added file 'plugin/transaction_log/tests/replication/testlib/client.py'
2187--- plugin/transaction_log/tests/replication/testlib/client.py 1970-01-01 00:00:00 +0000
2188+++ plugin/transaction_log/tests/replication/testlib/client.py 2010-05-05 02:25:42 +0000
2189@@ -0,0 +1,137 @@
2190+#! /usr/bin/python
2191+#
2192+# Drizzle Replication Test Suite
2193+#
2194+# Copyright (c) 2010 Jay Pipes
2195+#
2196+# All rights reserved.
2197+#
2198+# Use and distribution licensed under the BSD license. See the
2199+# COPYING file in the root project directory for full text.
2200+
2201+"""Defines the adapter for controlling a Drizzle database client"""
2202+
2203+import os
2204+import commands
2205+import sys
2206+import time
2207+import unittest
2208+
2209+class DrizzleClientException(Exception):
2210+ pass
2211+
2212+class DrizzleClient:
2213+ """A class wrapping a Drizzled client"""
2214+
2215+ DEFAULT_PORT= 9306
2216+
2217+ def __init__(self, options= {}):
2218+ self.start_options= options
2219+ self.cleanStartOptions()
2220+
2221+ def getPort(self):
2222+ """Returns port the client connects to"""
2223+ return self.start_options["port"]
2224+
2225+ def cleanStartOptions(self):
2226+ """Cleans up the startup options dictionary"""
2227+ clean_start_options= {}
2228+ for key in self.start_options.keys():
2229+ # Ensure option names are output as --option_name
2230+ # and clean the self.start_options array to remove any leading --
2231+ # and convert - to _
2232+ value= self.start_options[key]
2233+
2234+ if not key.startswith("--"):
2235+ clean_key= key.replace('-','_')
2236+ else:
2237+ clean_key= key[2:].replace("-","_")
2238+ clean_start_options[clean_key]= value
2239+
2240+ self.start_options= clean_start_options
2241+
2242+ def getStartOptionString(self):
2243+ """Builds the string of options to pass to drizzled on startup"""
2244+ out_options= ["%s=%s" % ("--" + key, value) for key, value in self.start_options.iteritems()]
2245+
2246+ return " ".join(out_options)
2247+
2248+ def execute(self, statement, schema=None):
2249+ """Executes a supplied SQL statement"""
2250+ if schema is None:
2251+ schema_string= ""
2252+ else:
2253+ schema_string= "--database=%s" % schema
2254+ client_cmd= "%s --port=%d %s -e\"%s\"" % (os.path.join(self.start_options["basedir"], "client", "drizzle")
2255+ , self.start_options["port"]
2256+ , schema_string
2257+ , statement)
2258+ (retcode, output)= commands.getstatusoutput(client_cmd)
2259+ if retcode != 0:
2260+ print "Client on port %d failed to execute SQL:\n\"%s\"\nGot error: %s" % (self.start_options["port"], statement, output)
2261+ return False
2262+ return True
2263+
2264+ def executeFile(self, file):
2265+ """Executes SQL statements in a supplied file"""
2266+ client_cmd= "%s --port=%d < %s" % (os.path.join(self.start_options["basedir"], "client", "drizzle")
2267+ , self.start_options["port"]
2268+ , file)
2269+ (retcode, output)= commands.getstatusoutput(client_cmd)
2270+ if retcode != 0:
2271+ print "Client on port %d failed to execute SQL in file:\n\"%s\"\nGot error: %s" % (self.start_options["port"], file, output)
2272+ return False
2273+ return True
2274+
2275+ def fetchAsDict(self, statement):
2276+ """Executes a supplied SELECT or SHOW statement and returns a dictionary with first column as keys, second column as values."""
2277+ client_cmd= "%s --port=%d -e\"%s\"" % (os.path.join(self.start_options["basedir"], "client", "drizzle")
2278+ , self.start_options["port"]
2279+ , statement)
2280+ (retcode, output)= commands.getstatusoutput(client_cmd)
2281+ if retcode != 0:
2282+ print "Client on port %d failed to execute SQL:\n\"%s\"\nGot error: %s" % (self.start_options["port"], statement, output)
2283+ return {}
2284+ else:
2285+ # Now we build the results dictionary...
2286+ results= {}
2287+ # The first line is the field names separated by a tab
2288+ # Every line after is a tab-delimited field values.
2289+ lines= output.split("\n")
2290+ fields= lines[0].split("\t")
2291+ for line in lines[1:]:
2292+ data_fields= line.split("\t")
2293+ results[data_fields[0]]= data_fields[1]
2294+ return results
2295+
2296+ def fetchAllAssoc(self, statement):
2297+ """Executes a supplied SELECT or SHOW statement and returns a sequence with associative dictionary of results."""
2298+ client_cmd= "%s --port=%d -e\"%s\"" % (os.path.join(self.start_options["basedir"], "client", "drizzle")
2299+ , self.start_options["port"]
2300+ , statement)
2301+ (retcode, output)= commands.getstatusoutput(client_cmd)
2302+ if retcode != 0:
2303+ print "Client on port %d failed to execute SQL:\n\"%s\"\nGot error: %s" % (self.start_options["port"], statement, output)
2304+ return []
2305+ else:
2306+ # Now we build the results dictionary...
2307+ results= []
2308+ # The first line is the field names separated by a tab
2309+ # Every line after is a tab-delimited field values.
2310+ lines= output.split("\n")
2311+ fields= lines[0].split("\t")
2312+ for line in lines[1:]:
2313+ row= {}
2314+ data_fields= line.split("\t")
2315+ fieldno= 0
2316+ for field in fields:
2317+ row[field]= data_fields[fieldno]
2318+ fieldno= fieldno + 1
2319+ results.append(row)
2320+ return results
2321+
2322+class TestDrizzleClient(unittest.TestCase):
2323+ pass
2324+
2325+if __name__ == '__main__':
2326+ unittest.main()
2327
2328=== added file 'plugin/transaction_log/tests/replication/testlib/server.py'
2329--- plugin/transaction_log/tests/replication/testlib/server.py 1970-01-01 00:00:00 +0000
2330+++ plugin/transaction_log/tests/replication/testlib/server.py 2010-05-05 02:25:42 +0000
2331@@ -0,0 +1,152 @@
2332+#! /usr/bin/python
2333+#
2334+# Drizzle Replication Test Suite
2335+#
2336+# Copyright (c) 2010 Jay Pipes
2337+#
2338+# All rights reserved.
2339+#
2340+# Use and distribution licensed under the BSD license. See the
2341+# COPYING file in the root project directory for full text.
2342+
2343+"""Defines the adapter for controlling a Drizzle database server"""
2344+
2345+import os
2346+import commands
2347+import sys
2348+import time
2349+import unittest
2350+
2351+class DrizzledServerStartupException(Exception):
2352+ pass
2353+
2354+class DrizzledServerShutdownException(Exception):
2355+ pass
2356+
2357+class DrizzledServer:
2358+ """A class responsible for starting, stopping, and pinging a Drizzled server"""
2359+
2360+ DEFAULT_MYSQL_PROTOCOL_PORT= 9306
2361+ DEFAULT_DRIZZLE_PROTOCOL_PORT= 9307
2362+
2363+ def __init__(self, options= {}):
2364+ self.start_options= options
2365+ self.defaults_file= ""
2366+ self.cleanStartOptions()
2367+
2368+ def getPort(self, protocol="drizzle"):
2369+ """Returns port the server listens for supplied protocol"""
2370+ return self.start_options[protocol + "_protocol_port"]
2371+
2372+ def cleanStartOptions(self):
2373+ """Cleans up the startup options dictionary"""
2374+ clean_start_options= {}
2375+ for key in self.start_options.keys():
2376+ # Ensure option names are output as --option_name
2377+ # and clean the self.start_options array to remove any leading --
2378+ # and convert - to _
2379+ value= self.start_options[key]
2380+
2381+ if not key.startswith("--"):
2382+ clean_key= key.replace('-','_')
2383+ else:
2384+ clean_key= key[2:].replace("-","_")
2385+
2386+ # The defaults-file must ALWAYS be the first command-line
2387+ # option given to a drizzled server, so we strip it and
2388+ # store it locally here...
2389+ if clean_key == 'defaults_file':
2390+ self.defaults_file= "--defaults-file=%s" % value
2391+ else:
2392+ clean_start_options[clean_key]= value
2393+ self.start_options= clean_start_options
2394+
2395+ def getStartOptionString(self):
2396+ """Builds the string of options to pass to drizzled on startup"""
2397+ out_options= ["%s=%s" % ("--" + key, value) for key, value in self.start_options.iteritems()]
2398+
2399+ return " ".join(out_options)
2400+
2401+ def start(self):
2402+ """Starts a drizzled server"""
2403+
2404+ for x in ("mysql_protocol_port","basedir","datadir"):
2405+ if x not in self.start_options.keys():
2406+ raise DrizzledServerStartupException("Attempt to start server without required startup options")
2407+
2408+ start_option_string= self.getStartOptionString()
2409+ start_cmd= "%s %s %s > %s 2>&1 &" % (os.path.join(self.start_options['basedir'], "drizzled", "drizzled"),
2410+ self.defaults_file,
2411+ start_option_string,
2412+ os.path.join(self.start_options['datadir'], 'error.log'))
2413+ server_output= os.system(start_cmd)
2414+
2415+ # Here, we sleep until the server is up and running or until a timeout occurs...
2416+ timeout= 10
2417+ timer= 0
2418+ while not self.ping() and timer != timeout:
2419+ time.sleep(.25)
2420+ timer= timer + 1
2421+
2422+ return True
2423+
2424+ def ping(self, quiet= False):
2425+ """Pings the server. Returns True if server is up and running, False otherwise."""
2426+ ping_cmd= "%s --ping --port=%d" % (os.path.join(self.start_options['basedir'], "client", "drizzle"),
2427+ self.start_options['mysql_protocol_port'])
2428+
2429+ (retcode, output)= commands.getstatusoutput(ping_cmd)
2430+
2431+ return retcode == 0
2432+
2433+ def clear(self):
2434+ """Clears data files for the server."""
2435+ (retcode, ignored)= commands.getstatusoutput("rm -rf %s; mkdir %s" % (self.start_options['datadir'], self.start_options['datadir']))
2436+
2437+ def stop(self):
2438+ """Stops the server"""
2439+ shutdown_cmd= "%s --shutdown --port=%d" % (os.path.join(self.start_options['basedir'], "client", "drizzle"),
2440+ self.start_options['mysql_protocol_port'])
2441+
2442+ (retcode, output)= commands.getstatusoutput(shutdown_cmd)
2443+
2444+ return retcode == 0
2445+
2446+ def restart(self):
2447+ self.stop()
2448+ return self.start()
2449+
2450+ def stopAll(self):
2451+ """Stops ALL drizzled servers"""
2452+ (retcode, num_drizzled_running)= commands.getstatusoutput("ps aux | grep drizzled | wc -l")
2453+ if retcode == 0 and int(num_drizzled_running) > 0:
2454+ (retcode, ignored)= commands.getstatusoutput("killall -9 drizzled")
2455+ time.sleep(3)
2456+
2457+class TestDrizzledServer(unittest.TestCase):
2458+
2459+ DEFAULT_START_OPTIONS= {
2460+ 'mysql_protocol_port': DrizzledServer.DEFAULT_MYSQL_PROTOCOL_PORT,
2461+ 'drizzle_protocol_port': DrizzledServer.DEFAULT_DRIZZLE_PROTOCOL_PORT,
2462+ 'datadir': os.path.join(os.getcwd().partition("plugin")[0], "tests", "var"),
2463+ 'basedir': os.path.join(os.getcwd().partition("plugin")[0], "drizzled")
2464+ }
2465+
2466+ def setUp(self):
2467+ self.server= DrizzledServer(TestDrizzledServer.DEFAULT_START_OPTIONS)
2468+ if self.server.ping():
2469+ self.server.stopAll()
2470+
2471+ def tearDown(self):
2472+ self.server.stop()
2473+
2474+ def testStartAndPing(self):
2475+ self.assertTrue(self.server.start())
2476+ self.assertTrue(self.server.ping())
2477+
2478+ def testStartAndStop(self):
2479+ self.assertTrue(self.server.start())
2480+ self.assertTrue(self.server.stop())
2481+
2482+if __name__ == '__main__':
2483+ unittest.main()
2484
2485=== added directory 'plugin/transaction_log/tests/replication/var'
2486=== added file 'plugin/transaction_log/tests/t/default-master.opt'
2487--- plugin/transaction_log/tests/t/default-master.opt 1970-01-01 00:00:00 +0000
2488+++ plugin/transaction_log/tests/t/default-master.opt 2010-05-05 02:25:42 +0000
2489@@ -0,0 +1,1 @@
2490+--transaction-log-enable --scheduler=multi_thread
2491
2492=== added symlink 'plugin/transaction_log/tests/t/publisher_manifest-master.opt'
2493=== target is u'default-master.opt'
2494=== added file 'plugin/transaction_log/tests/t/publisher_manifest.test'
2495--- plugin/transaction_log/tests/t/publisher_manifest.test 1970-01-01 00:00:00 +0000
2496+++ plugin/transaction_log/tests/t/publisher_manifest.test 2010-05-05 02:25:42 +0000
2497@@ -0,0 +1,19 @@
2498+#
2499+# Tests that the publisher manifest file is written
2500+# at shutdown
2501+#
2502+
2503+# Truncate the log file to reset for the test
2504+--source ../plugin/transaction_log/tests/t/truncate_log.inc
2505+
2506+# Populate log with some records...
2507+--source ../plugin/transaction_log/tests/t/insert.inc
2508+
2509+# Trim result since Solaris/BSD wc program apparently adds whitespace before output
2510+--exec wc -l var/master-data/publisher.manifest | tr -d ' '
2511+
2512+# Truncate the log file to reset for the next test
2513+--source ../plugin/transaction_log/tests/t/truncate_log.inc
2514+
2515+# Execute the external test suite for replication
2516+--exec $DRIZZLE_TEST_DIR/../plugin/transaction_log/tests/replication/test_publisher_manifest.py
2517
2518=== modified file 'plugin/transaction_log/transaction_log.cc'
2519--- plugin/transaction_log/transaction_log.cc 2010-03-31 19:04:12 +0000
2520+++ plugin/transaction_log/transaction_log.cc 2010-05-05 02:25:42 +0000
2521@@ -291,7 +291,7 @@
2522 drizzled::TransactionServices::singleton().resetTransactionId();
2523 }
2524
2525-bool TransactionLog::findLogFilenameContainingTransactionId(const ReplicationServices::GlobalTransactionId&,
2526+bool TransactionLog::findLogFilenameContainingTransactionId(const TransactionServices::TransactionId&,
2527 string &out_filename) const
2528 {
2529 /*
2530
2531=== modified file 'plugin/transaction_log/transaction_log.h'
2532--- plugin/transaction_log/transaction_log.h 2010-03-27 18:51:59 +0000
2533+++ plugin/transaction_log/transaction_log.h 2010-05-05 02:25:42 +0000
2534@@ -39,7 +39,7 @@
2535 #define PLUGIN_TRANSACTION_LOG_TRANSACTION_LOG_H
2536
2537 #include <drizzled/atomics.h>
2538-#include <drizzled/replication_services.h>
2539+#include <drizzled/transaction_services.h>
2540
2541 #include "transaction_log_entry.h"
2542
2543@@ -162,7 +162,7 @@
2544 * @retval
2545 * false otherwise
2546 */
2547- bool findLogFilenameContainingTransactionId(const drizzled::ReplicationServices::GlobalTransactionId &to_find,
2548+ bool findLogFilenameContainingTransactionId(const drizzled::TransactionServices::TransactionId &to_find,
2549 std::string &out_filename) const;
2550
2551 /**
2552
2553=== modified file 'plugin/transaction_log/transaction_log_applier.cc'
2554--- plugin/transaction_log/transaction_log_applier.cc 2010-04-12 19:28:43 +0000
2555+++ plugin/transaction_log/transaction_log_applier.cc 2010-05-05 02:25:42 +0000
2556@@ -47,6 +47,7 @@
2557 #include "transaction_log.h"
2558 #include "transaction_log_applier.h"
2559 #include "transaction_log_index.h"
2560+#include "publisher.h"
2561
2562 #include <vector>
2563
2564@@ -62,10 +63,12 @@
2565 TransactionLogApplier::TransactionLogApplier(const string name_arg,
2566 TransactionLog *in_transaction_log,
2567 TransactionLogIndex *in_transaction_log_index,
2568+ TransactionLogPublisher *in_transaction_log_publisher,
2569 uint32_t in_num_write_buffers) :
2570 plugin::TransactionApplier(name_arg),
2571 transaction_log(in_transaction_log),
2572 transaction_log_index(in_transaction_log_index),
2573+ transaction_log_publisher(in_transaction_log_publisher),
2574 num_write_buffers(in_num_write_buffers),
2575 write_buffers()
2576 {
2577@@ -119,5 +122,7 @@
2578 entry_size),
2579 to_apply,
2580 checksum);
2581+ /* Tell the publisher about our apply */
2582+ transaction_log_publisher->setLastApplied(to_apply);
2583 return plugin::SUCCESS;
2584 }
2585
2586=== modified file 'plugin/transaction_log/transaction_log_applier.h'
2587--- plugin/transaction_log/transaction_log_applier.h 2010-04-12 19:15:00 +0000
2588+++ plugin/transaction_log/transaction_log_applier.h 2010-05-05 02:25:42 +0000
2589@@ -50,6 +50,7 @@
2590
2591 class TransactionLog;
2592 class TransactionLogIndex;
2593+class TransactionLogPublisher;
2594 class WriteBuffer;
2595
2596 class TransactionLogApplier: public drizzled::plugin::TransactionApplier
2597@@ -58,6 +59,7 @@
2598 TransactionLogApplier(const std::string name_arg,
2599 TransactionLog *in_transaction_log,
2600 TransactionLogIndex *in_transaction_log_index,
2601+ TransactionLogPublisher *in_transaction_log_publisher,
2602 uint32_t in_num_write_buffers);
2603
2604 /** Destructor */
2605@@ -89,10 +91,11 @@
2606 TransactionLogApplier &operator=(const TransactionLogApplier &other);
2607 /**
2608 * This Applier owns the memory of the associated TransactionLog
2609- * and its index - so we have to track it.
2610+ * and its index and publisher - so we have to track it.
2611 */
2612 TransactionLog *transaction_log;
2613 TransactionLogIndex *transaction_log_index;
2614+ TransactionLogPublisher *transaction_log_publisher;
2615 uint32_t num_write_buffers; ///< Number of write buffers used
2616 std::vector<WriteBuffer *> write_buffers; ///< array of write buffers
2617
2618
2619=== modified file 'plugin/transaction_log/transaction_log_reader.cc'
2620--- plugin/transaction_log/transaction_log_reader.cc 2010-04-23 01:02:40 +0000
2621+++ plugin/transaction_log/transaction_log_reader.cc 2010-05-05 02:25:42 +0000
2622@@ -65,7 +65,7 @@
2623 using namespace drizzled;
2624 using namespace google;
2625
2626-bool TransactionLogReader::read(const ReplicationServices::GlobalTransactionId &to_read_trx_id,
2627+bool TransactionLogReader::read(const TransactionServices::TransactionId &to_read_trx_id,
2628 message::Transaction *to_fill)
2629 {
2630 /*
2631
2632=== modified file 'plugin/transaction_log/transaction_log_reader.h'
2633--- plugin/transaction_log/transaction_log_reader.h 2010-03-05 18:08:49 +0000
2634+++ plugin/transaction_log/transaction_log_reader.h 2010-05-05 02:25:42 +0000
2635@@ -69,7 +69,7 @@
2636 * @retval
2637 * false if not found or read successfully
2638 */
2639- bool read(const drizzled::ReplicationServices::GlobalTransactionId &to_read_trx_id,
2640+ bool read(const drizzled::TransactionServices::TransactionId &to_read_trx_id,
2641 drizzled::message::Transaction *to_fill);
2642 };
2643
2644
2645=== modified file 'tests/r/data_dictionary_like_info.result'
2646--- tests/r/data_dictionary_like_info.result 2010-04-07 21:54:30 +0000
2647+++ tests/r/data_dictionary_like_info.result 2010-05-05 02:25:42 +0000
2648@@ -191,7 +191,7 @@
2649 or DATA_TYPE = 'varchar')
2650 group by DATA_TYPE order by DATA_TYPE, num;
2651 DATA_TYPE group_concat(table_schema, '.', table_name) num
2652-VARCHAR DATA_DICTIONARY.CHARACTER_SETS,DATA_DICTIONARY.CHARACTER_SETS,DATA_DICTIONARY.CHARACTER_SETS,DATA_DICTIONARY.COLLATIONS,DATA_DICTIONARY.COLLATIONS,DATA_DICTIONARY.COLLATIONS,DATA_DICTIONARY.COLLATIONS,DATA_DICTIONARY.COLLATIONS,DATA_DICTIONARY.COLUMNS,DATA_DICTIONARY.COLUMNS,DATA_DICTIONARY.COLUMNS,DATA_DICTIONARY.COLUMNS,DATA_DICTIONARY.COLUMNS,DATA_DICTIONARY.COLUMNS,DATA_DICTIONARY.COLUMNS,DATA_DICTIONARY.COLUMNS,DATA_DICTIONARY.COLUMNS,DATA_DICTIONARY.COLUMNS,DATA_DICTIONARY.COLUMNS,DATA_DICTIONARY.COLUMNS,DATA_DICTIONARY.COLUMNS,DATA_DICTIONARY.COLUMNS,DATA_DICTIONARY.COLUMNS,DATA_DICTIONARY.COLUMNS,DATA_DICTIONARY.GLOBAL_STATEMENTS,DATA_DICTIONARY.GLOBAL_STATEMENTS,DATA_DICTIONARY.GLOBAL_STATUS,DATA_DICTIONARY.GLOBAL_STATUS,DATA_DICTIONARY.GLOBAL_VARIABLES,DATA_DICTIONARY.GLOBAL_VARIABLES,DATA_DICTIONARY.INDEXES,DATA_DICTIONARY.INDEXES,DATA_DICTIONARY.INDEXES,DATA_DICTIONARY.INDEXES,DATA_DICTIONARY.INDEXES,DATA_DICTIONARY.INDEXES,DATA_DICTIONARY.INDEXES,DATA_DICTIONARY.INDEXES,DATA_DICTIONARY.INDEX_PART 145
2653+VARCHAR DATA_DICTIONARY.CHARACTER_SETS,DATA_DICTIONARY.CHARACTER_SETS,DATA_DICTIONARY.CHARACTER_SETS,DATA_DICTIONARY.COLLATIONS,DATA_DICTIONARY.COLLATIONS,DATA_DICTIONARY.COLLATIONS,DATA_DICTIONARY.COLLATIONS,DATA_DICTIONARY.COLLATIONS,DATA_DICTIONARY.COLUMNS,DATA_DICTIONARY.COLUMNS,DATA_DICTIONARY.COLUMNS,DATA_DICTIONARY.COLUMNS,DATA_DICTIONARY.COLUMNS,DATA_DICTIONARY.COLUMNS,DATA_DICTIONARY.COLUMNS,DATA_DICTIONARY.COLUMNS,DATA_DICTIONARY.COLUMNS,DATA_DICTIONARY.COLUMNS,DATA_DICTIONARY.COLUMNS,DATA_DICTIONARY.COLUMNS,DATA_DICTIONARY.COLUMNS,DATA_DICTIONARY.COLUMNS,DATA_DICTIONARY.COLUMNS,DATA_DICTIONARY.COLUMNS,DATA_DICTIONARY.GLOBAL_STATEMENTS,DATA_DICTIONARY.GLOBAL_STATEMENTS,DATA_DICTIONARY.GLOBAL_STATUS,DATA_DICTIONARY.GLOBAL_STATUS,DATA_DICTIONARY.GLOBAL_VARIABLES,DATA_DICTIONARY.GLOBAL_VARIABLES,DATA_DICTIONARY.INDEXES,DATA_DICTIONARY.INDEXES,DATA_DICTIONARY.INDEXES,DATA_DICTIONARY.INDEXES,DATA_DICTIONARY.INDEXES,DATA_DICTIONARY.INDEXES,DATA_DICTIONARY.INDEXES,DATA_DICTIONARY.INDEXES,DATA_DICTIONARY.INDEX_PART 147
2654 Warnings:
2655 Warning 1260 1 line(s) were cut by GROUP_CONCAT()
2656 create table t1(f1 char(1) not null, f2 char(9) not null);
2657
2658=== modified file 'tests/r/information_schema.result'
2659Binary files tests/r/information_schema.result 2010-04-05 16:30:06 +0000 and tests/r/information_schema.result 2010-05-05 02:25:42 +0000 differ