Merge lp:~dshrews/drizzle/bug643935 into lp:~drizzle-trunk/drizzle/development

Proposed by David Shrewsbury
Status: Merged
Approved by: Brian Aker
Approved revision: 1785
Merged at revision: 1798
Proposed branch: lp:~dshrews/drizzle/bug643935
Merge into: lp:~drizzle-trunk/drizzle/development
Diff against target: 355 lines (+274/-7)
4 files modified
drizzled/message/include.am (+4/-2)
drizzled/message/transaction_manager.cc (+76/-0)
drizzled/message/transaction_manager.h (+129/-0)
drizzled/message/transaction_reader.cc (+65/-5)
To merge this branch: bzr merge lp:~dshrews/drizzle/bug643935
Reviewer Review Type Date Requested Status
Drizzle Merge Team Pending
Review via email: mp+36721@code.launchpad.net

Description of the change

This changes the transaction_reader program, which is not distributed and is used for our test suite, to output correct SQL by grouping GPB Transaction messages by transaction ID when the input stream has them intermingled. Doing this allows a single threaded reader to execute the SQL statements as they are read.

To post a comment you must log in.

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
=== modified file 'drizzled/message/include.am'
--- drizzled/message/include.am 2010-08-24 16:45:09 +0000
+++ drizzled/message/include.am 2010-09-27 14:31:05 +0000
@@ -60,7 +60,8 @@
6060
61noinst_HEADERS+= \61noinst_HEADERS+= \
62 drizzled/message/ioutil.h \62 drizzled/message/ioutil.h \
63 drizzled/message/all.h63 drizzled/message/all.h \
64 drizzled/message/transaction_manager.h
6465
6566
66drizzled_message_schema_reader_SOURCES = drizzled/message/schema_reader.cc67drizzled_message_schema_reader_SOURCES = drizzled/message/schema_reader.cc
@@ -87,7 +88,8 @@
87drizzled_message_transaction_writer_LDADD = ${MESSAGE_LDADD} ${top_builddir}/drizzled/algorithm/libhash.la88drizzled_message_transaction_writer_LDADD = ${MESSAGE_LDADD} ${top_builddir}/drizzled/algorithm/libhash.la
88drizzled_message_transaction_writer_CXXFLAGS = ${MESSAGE_AM_CXXFLAGS} ${NO_WERROR}89drizzled_message_transaction_writer_CXXFLAGS = ${MESSAGE_AM_CXXFLAGS} ${NO_WERROR}
8990
90drizzled_message_transaction_reader_SOURCES = drizzled/message/transaction_reader.cc91drizzled_message_transaction_reader_SOURCES = drizzled/message/transaction_reader.cc \
92 drizzled/message/transaction_manager.cc
91drizzled_message_transaction_reader_LDADD = ${MESSAGE_LDADD} ${top_builddir}/drizzled/algorithm/libhash.la ${top_builddir}/drizzled/util/libutil.la93drizzled_message_transaction_reader_LDADD = ${MESSAGE_LDADD} ${top_builddir}/drizzled/algorithm/libhash.la ${top_builddir}/drizzled/util/libutil.la
92drizzled_message_transaction_reader_CXXFLAGS = ${MESSAGE_AM_CXXFLAGS} ${NO_WERROR}94drizzled_message_transaction_reader_CXXFLAGS = ${MESSAGE_AM_CXXFLAGS} ${NO_WERROR}
9395
9496
=== added file 'drizzled/message/transaction_manager.cc'
--- drizzled/message/transaction_manager.cc 1970-01-01 00:00:00 +0000
+++ drizzled/message/transaction_manager.cc 2010-09-27 14:31:05 +0000
@@ -0,0 +1,76 @@
1/* -*- mode: c++; c-basic-offset: 2; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
3 *
4 * Copyright (C) 2010 David Shrewsbury <shrewsbury.dave@gmail.com>
5 *
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation; version 2 of the License.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License
16 * along with this program; if not, write to the Free Software
17 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
18 */
19
20/**
21 * @file
22 * Implementation of the TransactionManager class.
23 */
24
25#include "config.h"
26#include "drizzled/message/transaction_manager.h"
27
28using namespace std;
29
30namespace drizzled
31{
32namespace message
33{
34
35bool TransactionManager::store(const message::Transaction &transaction)
36{
37 string msg;
38 const message::TransactionContext trx_ctxt= transaction.transaction_context();
39 uint64_t trx_id= trx_ctxt.transaction_id();
40 transaction.SerializeToString(&msg);
41
42 cache[trx_id].push_back(msg);
43 return true;
44}
45
46bool TransactionManager::remove(uint64_t trx_id)
47{
48 cache.erase(trx_id);
49 return true;
50}
51
52bool TransactionManager::contains(uint64_t trx_id)
53{
54 boost::unordered_map< uint64_t, vector<string> >::const_iterator it= cache.find(trx_id);
55
56 if (it != cache.end())
57 return true;
58
59 return false;
60}
61
62uint32_t TransactionManager::getTransactionBufferSize(uint64_t trx_id)
63{
64 return static_cast<uint32_t>(cache[trx_id].size());
65}
66
67bool TransactionManager::getTransactionMessage(message::Transaction &trx,
68 uint64_t trx_id,
69 uint32_t position)
70{
71 trx.ParseFromString(cache[trx_id].at(position));
72 return true;
73}
74
75} /* namespace message */
76} /* namespace drizzled */
077
=== added file 'drizzled/message/transaction_manager.h'
--- drizzled/message/transaction_manager.h 1970-01-01 00:00:00 +0000
+++ drizzled/message/transaction_manager.h 2010-09-27 14:31:05 +0000
@@ -0,0 +1,129 @@
1/* -*- mode: c++; c-basic-offset: 2; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
3 *
4 * Copyright (c) 2010 David Shrewsbury <shrewsbury.dave@gmail.com>
5 *
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation; version 2 of the License.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License
16 * along with this program; if not, write to the Free Software
17 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
18 */
19
20/**
21 * @file
22 * Declaration of a class used to manage Transaction messages so that they
23 * can be retrieved as a collection.
24 */
25
26#ifndef DRIZZLED_MESSAGE_TRANSACTION_MANAGER_H
27#define DRIZZLED_MESSAGE_TRANSACTION_MANAGER_H
28
29#include "drizzled/message/transaction.pb.h"
30#include <string>
31#include <vector>
32#include <boost/unordered_map.hpp>
33
34typedef std::vector<std::string> MsgBufferType;
35
36namespace drizzled
37{
38namespace message
39{
40
41/**
42 * Simple (example) Transaction message buffer and content manager.
43 *
44 * @details
45 * This class groups Transaction messages together by transaction ID and
46 * buffers them together in memory. Obviously, this could eat up a lot of
47 * memory if you have very large transactions. A more robust implementation
48 * would buffer larger transactions to disk rather than memory.
49 *
50 * @note
51 * Once you have a complete transaction and have processed it, you should
52 * call remove() to remove the cache contents for that transaction from memory.
53 */
54class TransactionManager
55{
56public:
57 /**
58 * Store the given Transaction message in a buffer.
59 *
60 * @param[in] transaction Pointer to the Transaction message to store.
61 *
62 * @retval true Success
63 * @retval false Failure
64 */
65 bool store(const drizzled::message::Transaction &transaction);
66
67 /**
68 * Clear the buffer contents for a given transaction ID.
69 *
70 * @param[in] trx_id The transaction ID for the transaction to remove
71 *
72 * @retval true Success
73 * @retval false Failure
74 */
75 bool remove(uint64_t trx_id);
76
77 /**
78 * Check to see if any Transaction messages exist for a given transaction.
79 *
80 * @param[in] trx_id The transaction ID to check for.
81 *
82 * @retval true Transaction messages exist
83 * @retval false No Transaction messages found
84 */
85 bool contains(uint64_t trx_id);
86
87 /**
88 * Return number of cached elements for the given transaction ID.
89 *
90 * @param[in] trx_id Transaction ID
91 *
92 * @returns The number of cached elements associated with trx_id.
93 */
94 uint32_t getTransactionBufferSize(uint64_t trx_id);
95
96 /**
97 * Retrieve a Transaction message from the managed cache.
98 *
99 * Caller must supply a Transaction message to populate. The Transaction
100 * message to retrieve is indexed by a combination of transaction ID and
101 * position.
102 *
103 * @param[out] transaction Transaction message to populate
104 * @param[in] trx_id Transaction ID
105 * @param[in] position Index into the buffer associated with trx_id
106 *
107 * @retval true Success
108 * @retval false Failure
109 */
110 bool getTransactionMessage(drizzled::message::Transaction &transaction,
111 uint64_t trx_id,
112 uint32_t position);
113
114private:
115 /**
116 * Our message buffer cache, mapped by the transaction ID.
117 *
118 * We organize Transactions messages by grouping them by transaction ID,
119 * then storing the messages in std::vectors in std::string format. The
120 * string format is convenient because it can be easily copied around
121 * (GPB messages do not provide deep copying).
122 */
123 boost::unordered_map<uint64_t,MsgBufferType> cache;
124};
125
126} /* namespace message */
127} /* namespace drizzled */
128
129#endif /* DRIZZLED_MESSAGE_TRANSACTION_MANAGER_H */
0130
=== modified file 'drizzled/message/transaction_reader.cc'
--- drizzled/message/transaction_reader.cc 2010-08-27 13:02:11 +0000
+++ drizzled/message/transaction_reader.cc 2010-09-27 14:31:05 +0000
@@ -39,6 +39,7 @@
39#include <unistd.h>39#include <unistd.h>
40#include <drizzled/message/transaction.pb.h>40#include <drizzled/message/transaction.pb.h>
41#include <drizzled/message/statement_transform.h>41#include <drizzled/message/statement_transform.h>
42#include <drizzled/message/transaction_manager.h>
42#include <drizzled/util/convert.h>43#include <drizzled/util/convert.h>
4344
44#include <google/protobuf/io/coded_stream.h>45#include <google/protobuf/io/coded_stream.h>
@@ -126,6 +127,27 @@
126 return true;127 return true;
127}128}
128129
130static bool isEndTransaction(const message::Transaction &transaction)
131{
132 const message::TransactionContext trx= transaction.transaction_context();
133
134 size_t num_statements= transaction.statement_size();
135
136 /*
137 * If any Statement is partial, then we can expect another Transaction
138 * message.
139 */
140 for (size_t x= 0; x < num_statements; ++x)
141 {
142 const message::Statement &statement= transaction.statement(x);
143
144 if (not isEndStatement(statement))
145 return false;
146 }
147
148 return true;
149}
150
129static void printTransaction(const message::Transaction &transaction)151static void printTransaction(const message::Transaction &transaction)
130{152{
131 static uint64_t last_trx_id= 0;153 static uint64_t last_trx_id= 0;
@@ -137,7 +159,8 @@
137159
138 /*160 /*
139 * One way to determine when a new transaction begins is when the161 * One way to determine when a new transaction begins is when the
140 * transaction id changes. We check that here.162 * transaction id changes (if all transactions have their GPB messages
163 * grouped together, which this program will). We check that here.
141 */164 */
142 if (trx.transaction_id() != last_trx_id)165 if (trx.transaction_id() != last_trx_id)
143 cout << "START TRANSACTION;" << endl;166 cout << "START TRANSACTION;" << endl;
@@ -196,6 +219,8 @@
196 do_checksum= true;219 do_checksum= true;
197 }220 }
198221
222 message::TransactionManager trx_mgr;
223
199 protobuf::io::ZeroCopyInputStream *raw_input= new protobuf::io::FileInputStream(file);224 protobuf::io::ZeroCopyInputStream *raw_input= new protobuf::io::FileInputStream(file);
200 protobuf::io::CodedInputStream *coded_input= new protobuf::io::CodedInputStream(raw_input);225 protobuf::io::CodedInputStream *coded_input= new protobuf::io::CodedInputStream(raw_input);
201226
@@ -276,8 +301,42 @@
276 break;301 break;
277 }302 }
278303
279 /* Print the transaction */304 if (not isEndTransaction(transaction))
280 printTransaction(transaction);305 {
306 trx_mgr.store(transaction);
307 }
308 else
309 {
310 const message::TransactionContext trx= transaction.transaction_context();
311 uint64_t transaction_id= trx.transaction_id();
312
313 /*
314 * If there are any previous Transaction messages for this transaction,
315 * store this one, then output all of them together.
316 */
317 if (trx_mgr.contains(transaction_id))
318 {
319 trx_mgr.store(transaction);
320
321 uint32_t size= trx_mgr.getTransactionBufferSize(transaction_id);
322 uint32_t idx= 0;
323
324 while (idx != size)
325 {
326 message::Transaction new_trx;
327 trx_mgr.getTransactionMessage(new_trx, transaction_id, idx);
328 printTransaction(new_trx);
329 idx++;
330 }
331
332 /* No longer need this transaction */
333 trx_mgr.remove(transaction_id);
334 }
335 else
336 {
337 printTransaction(transaction);
338 }
339 }
281340
282 /* Skip 4 byte checksum */341 /* Skip 4 byte checksum */
283 coded_input->ReadLittleEndian32(&checksum);342 coded_input->ReadLittleEndian32(&checksum);
@@ -291,10 +350,11 @@
291 }350 }
292351
293 previous_length= length;352 previous_length= length;
294 }353 } /* end while */
354
295 if (buffer)355 if (buffer)
296 free(buffer);356 free(buffer);
297 357
298 delete coded_input;358 delete coded_input;
299 delete raw_input;359 delete raw_input;
300360