Merge lp:~dshrews/drizzle/bug643935 into lp:~drizzle-trunk/drizzle/development

Proposed by David Shrewsbury
Status: Merged
Approved by: Brian Aker
Approved revision: 1785
Merged at revision: 1798
Proposed branch: lp:~dshrews/drizzle/bug643935
Merge into: lp:~drizzle-trunk/drizzle/development
Diff against target: 355 lines (+274/-7)
4 files modified
drizzled/message/include.am (+4/-2)
drizzled/message/transaction_manager.cc (+76/-0)
drizzled/message/transaction_manager.h (+129/-0)
drizzled/message/transaction_reader.cc (+65/-5)
To merge this branch: bzr merge lp:~dshrews/drizzle/bug643935
Reviewer Review Type Date Requested Status
Drizzle Merge Team Pending
Review via email: mp+36721@code.launchpad.net

Description of the change

This changes the transaction_reader program, which is not distributed and is used for our test suite, to output correct SQL by grouping GPB Transaction messages by transaction ID when the input stream has them intermingled. Doing this allows a single threaded reader to execute the SQL statements as they are read.

To post a comment you must log in.

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'drizzled/message/include.am'
2--- drizzled/message/include.am 2010-08-24 16:45:09 +0000
3+++ drizzled/message/include.am 2010-09-27 14:31:05 +0000
4@@ -60,7 +60,8 @@
5
6 noinst_HEADERS+= \
7 drizzled/message/ioutil.h \
8- drizzled/message/all.h
9+ drizzled/message/all.h \
10+ drizzled/message/transaction_manager.h
11
12
13 drizzled_message_schema_reader_SOURCES = drizzled/message/schema_reader.cc
14@@ -87,7 +88,8 @@
15 drizzled_message_transaction_writer_LDADD = ${MESSAGE_LDADD} ${top_builddir}/drizzled/algorithm/libhash.la
16 drizzled_message_transaction_writer_CXXFLAGS = ${MESSAGE_AM_CXXFLAGS} ${NO_WERROR}
17
18-drizzled_message_transaction_reader_SOURCES = drizzled/message/transaction_reader.cc
19+drizzled_message_transaction_reader_SOURCES = drizzled/message/transaction_reader.cc \
20+ drizzled/message/transaction_manager.cc
21 drizzled_message_transaction_reader_LDADD = ${MESSAGE_LDADD} ${top_builddir}/drizzled/algorithm/libhash.la ${top_builddir}/drizzled/util/libutil.la
22 drizzled_message_transaction_reader_CXXFLAGS = ${MESSAGE_AM_CXXFLAGS} ${NO_WERROR}
23
24
25=== added file 'drizzled/message/transaction_manager.cc'
26--- drizzled/message/transaction_manager.cc 1970-01-01 00:00:00 +0000
27+++ drizzled/message/transaction_manager.cc 2010-09-27 14:31:05 +0000
28@@ -0,0 +1,76 @@
29+/* -*- mode: c++; c-basic-offset: 2; indent-tabs-mode: nil; -*-
30+ * vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
31+ *
32+ * Copyright (C) 2010 David Shrewsbury <shrewsbury.dave@gmail.com>
33+ *
34+ * This program is free software; you can redistribute it and/or modify
35+ * it under the terms of the GNU General Public License as published by
36+ * the Free Software Foundation; version 2 of the License.
37+ *
38+ * This program is distributed in the hope that it will be useful,
39+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
40+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
41+ * GNU General Public License for more details.
42+ *
43+ * You should have received a copy of the GNU General Public License
44+ * along with this program; if not, write to the Free Software
45+ * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
46+ */
47+
48+/**
49+ * @file
50+ * Implementation of the TransactionManager class.
51+ */
52+
53+#include "config.h"
54+#include "drizzled/message/transaction_manager.h"
55+
56+using namespace std;
57+
58+namespace drizzled
59+{
60+namespace message
61+{
62+
63+bool TransactionManager::store(const message::Transaction &transaction)
64+{
65+ string msg;
66+ const message::TransactionContext trx_ctxt= transaction.transaction_context();
67+ uint64_t trx_id= trx_ctxt.transaction_id();
68+ transaction.SerializeToString(&msg);
69+
70+ cache[trx_id].push_back(msg);
71+ return true;
72+}
73+
74+bool TransactionManager::remove(uint64_t trx_id)
75+{
76+ cache.erase(trx_id);
77+ return true;
78+}
79+
80+bool TransactionManager::contains(uint64_t trx_id)
81+{
82+ boost::unordered_map< uint64_t, vector<string> >::const_iterator it= cache.find(trx_id);
83+
84+ if (it != cache.end())
85+ return true;
86+
87+ return false;
88+}
89+
90+uint32_t TransactionManager::getTransactionBufferSize(uint64_t trx_id)
91+{
92+ return static_cast<uint32_t>(cache[trx_id].size());
93+}
94+
95+bool TransactionManager::getTransactionMessage(message::Transaction &trx,
96+ uint64_t trx_id,
97+ uint32_t position)
98+{
99+ trx.ParseFromString(cache[trx_id].at(position));
100+ return true;
101+}
102+
103+} /* namespace message */
104+} /* namespace drizzled */
105
106=== added file 'drizzled/message/transaction_manager.h'
107--- drizzled/message/transaction_manager.h 1970-01-01 00:00:00 +0000
108+++ drizzled/message/transaction_manager.h 2010-09-27 14:31:05 +0000
109@@ -0,0 +1,129 @@
110+/* -*- mode: c++; c-basic-offset: 2; indent-tabs-mode: nil; -*-
111+ * vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
112+ *
113+ * Copyright (c) 2010 David Shrewsbury <shrewsbury.dave@gmail.com>
114+ *
115+ * This program is free software; you can redistribute it and/or modify
116+ * it under the terms of the GNU General Public License as published by
117+ * the Free Software Foundation; version 2 of the License.
118+ *
119+ * This program is distributed in the hope that it will be useful,
120+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
121+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
122+ * GNU General Public License for more details.
123+ *
124+ * You should have received a copy of the GNU General Public License
125+ * along with this program; if not, write to the Free Software
126+ * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
127+ */
128+
129+/**
130+ * @file
131+ * Declaration of a class used to manage Transaction messages so that they
132+ * can be retrieved as a collection.
133+ */
134+
135+#ifndef DRIZZLED_MESSAGE_TRANSACTION_MANAGER_H
136+#define DRIZZLED_MESSAGE_TRANSACTION_MANAGER_H
137+
138+#include "drizzled/message/transaction.pb.h"
139+#include <string>
140+#include <vector>
141+#include <boost/unordered_map.hpp>
142+
143+typedef std::vector<std::string> MsgBufferType;
144+
145+namespace drizzled
146+{
147+namespace message
148+{
149+
150+/**
151+ * Simple (example) Transaction message buffer and content manager.
152+ *
153+ * @details
154+ * This class groups Transaction messages together by transaction ID and
155+ * buffers them together in memory. Obviously, this could eat up a lot of
156+ * memory if you have very large transactions. A more robust implementation
157+ * would buffer larger transactions to disk rather than memory.
158+ *
159+ * @note
160+ * Once you have a complete transaction and have processed it, you should
161+ * call remove() to remove the cache contents for that transaction from memory.
162+ */
163+class TransactionManager
164+{
165+public:
166+ /**
167+ * Store the given Transaction message in a buffer.
168+ *
169+ * @param[in] transaction Pointer to the Transaction message to store.
170+ *
171+ * @retval true Success
172+ * @retval false Failure
173+ */
174+ bool store(const drizzled::message::Transaction &transaction);
175+
176+ /**
177+ * Clear the buffer contents for a given transaction ID.
178+ *
179+ * @param[in] trx_id The transaction ID for the transaction to remove
180+ *
181+ * @retval true Success
182+ * @retval false Failure
183+ */
184+ bool remove(uint64_t trx_id);
185+
186+ /**
187+ * Check to see if any Transaction messages exist for a given transaction.
188+ *
189+ * @param[in] trx_id The transaction ID to check for.
190+ *
191+ * @retval true Transaction messages exist
192+ * @retval false No Transaction messages found
193+ */
194+ bool contains(uint64_t trx_id);
195+
196+ /**
197+ * Return number of cached elements for the given transaction ID.
198+ *
199+ * @param[in] trx_id Transaction ID
200+ *
201+ * @returns The number of cached elements associated with trx_id.
202+ */
203+ uint32_t getTransactionBufferSize(uint64_t trx_id);
204+
205+ /**
206+ * Retrieve a Transaction message from the managed cache.
207+ *
208+ * Caller must supply a Transaction message to populate. The Transaction
209+ * message to retrieve is indexed by a combination of transaction ID and
210+ * position.
211+ *
212+ * @param[out] transaction Transaction message to populate
213+ * @param[in] trx_id Transaction ID
214+ * @param[in] position Index into the buffer associated with trx_id
215+ *
216+ * @retval true Success
217+ * @retval false Failure
218+ */
219+ bool getTransactionMessage(drizzled::message::Transaction &transaction,
220+ uint64_t trx_id,
221+ uint32_t position);
222+
223+private:
224+ /**
225+ * Our message buffer cache, mapped by the transaction ID.
226+ *
227+ * We organize Transactions messages by grouping them by transaction ID,
228+ * then storing the messages in std::vectors in std::string format. The
229+ * string format is convenient because it can be easily copied around
230+ * (GPB messages do not provide deep copying).
231+ */
232+ boost::unordered_map<uint64_t,MsgBufferType> cache;
233+};
234+
235+} /* namespace message */
236+} /* namespace drizzled */
237+
238+#endif /* DRIZZLED_MESSAGE_TRANSACTION_MANAGER_H */
239
240=== modified file 'drizzled/message/transaction_reader.cc'
241--- drizzled/message/transaction_reader.cc 2010-08-27 13:02:11 +0000
242+++ drizzled/message/transaction_reader.cc 2010-09-27 14:31:05 +0000
243@@ -39,6 +39,7 @@
244 #include <unistd.h>
245 #include <drizzled/message/transaction.pb.h>
246 #include <drizzled/message/statement_transform.h>
247+#include <drizzled/message/transaction_manager.h>
248 #include <drizzled/util/convert.h>
249
250 #include <google/protobuf/io/coded_stream.h>
251@@ -126,6 +127,27 @@
252 return true;
253 }
254
255+static bool isEndTransaction(const message::Transaction &transaction)
256+{
257+ const message::TransactionContext trx= transaction.transaction_context();
258+
259+ size_t num_statements= transaction.statement_size();
260+
261+ /*
262+ * If any Statement is partial, then we can expect another Transaction
263+ * message.
264+ */
265+ for (size_t x= 0; x < num_statements; ++x)
266+ {
267+ const message::Statement &statement= transaction.statement(x);
268+
269+ if (not isEndStatement(statement))
270+ return false;
271+ }
272+
273+ return true;
274+}
275+
276 static void printTransaction(const message::Transaction &transaction)
277 {
278 static uint64_t last_trx_id= 0;
279@@ -137,7 +159,8 @@
280
281 /*
282 * One way to determine when a new transaction begins is when the
283- * transaction id changes. We check that here.
284+ * transaction id changes (if all transactions have their GPB messages
285+ * grouped together, which this program will). We check that here.
286 */
287 if (trx.transaction_id() != last_trx_id)
288 cout << "START TRANSACTION;" << endl;
289@@ -196,6 +219,8 @@
290 do_checksum= true;
291 }
292
293+ message::TransactionManager trx_mgr;
294+
295 protobuf::io::ZeroCopyInputStream *raw_input= new protobuf::io::FileInputStream(file);
296 protobuf::io::CodedInputStream *coded_input= new protobuf::io::CodedInputStream(raw_input);
297
298@@ -276,8 +301,42 @@
299 break;
300 }
301
302- /* Print the transaction */
303- printTransaction(transaction);
304+ if (not isEndTransaction(transaction))
305+ {
306+ trx_mgr.store(transaction);
307+ }
308+ else
309+ {
310+ const message::TransactionContext trx= transaction.transaction_context();
311+ uint64_t transaction_id= trx.transaction_id();
312+
313+ /*
314+ * If there are any previous Transaction messages for this transaction,
315+ * store this one, then output all of them together.
316+ */
317+ if (trx_mgr.contains(transaction_id))
318+ {
319+ trx_mgr.store(transaction);
320+
321+ uint32_t size= trx_mgr.getTransactionBufferSize(transaction_id);
322+ uint32_t idx= 0;
323+
324+ while (idx != size)
325+ {
326+ message::Transaction new_trx;
327+ trx_mgr.getTransactionMessage(new_trx, transaction_id, idx);
328+ printTransaction(new_trx);
329+ idx++;
330+ }
331+
332+ /* No longer need this transaction */
333+ trx_mgr.remove(transaction_id);
334+ }
335+ else
336+ {
337+ printTransaction(transaction);
338+ }
339+ }
340
341 /* Skip 4 byte checksum */
342 coded_input->ReadLittleEndian32(&checksum);
343@@ -291,10 +350,11 @@
344 }
345
346 previous_length= length;
347- }
348+ } /* end while */
349+
350 if (buffer)
351 free(buffer);
352-
353+
354 delete coded_input;
355 delete raw_input;
356