Merge lp:~tgates/gearmand/nuodb-queue-adapter into lp:gearmand

Proposed by Tom Gates
Status: Needs review
Proposed branch: lp:~tgates/gearmand/nuodb-queue-adapter
Merge into: lp:gearmand
Diff against target: 833 lines (+768/-0)
8 files modified
configure.ac (+6/-0)
libgearman-server/plugins.cc (+7/-0)
libgearman-server/plugins/queue.h (+2/-0)
libgearman-server/plugins/queue/include.am (+1/-0)
libgearman-server/plugins/queue/nuodb/include.am (+23/-0)
libgearman-server/plugins/queue/nuodb/queue.cc (+550/-0)
libgearman-server/plugins/queue/nuodb/queue.h (+49/-0)
m4/ax_lib_nuodb.m4 (+130/-0)
To merge this branch: bzr merge lp:~tgates/gearmand/nuodb-queue-adapter
Reviewer Review Type Date Requested Status
Tangent Trunk Pending
Review via email: mp+219047@code.launchpad.net

Description of the change

Add support for using NuoDB as a persistent queue for Gearman.

To post a comment you must log in.

Unmerged revisions

895. By Tom Gates

Add support for NuoDB persistent queues

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'configure.ac'
2--- configure.ac 2014-02-16 02:15:49 +0000
3+++ configure.ac 2014-05-09 18:35:29 +0000
4@@ -104,6 +104,11 @@
5 [AC_DEFINE([HAVE_LIBSQLITE3],[0],[Have the SQLITE3 library])])
6 AM_CONDITIONAL([HAVE_LIBSQLITE3],[test "x${WANT_SQLITE3}" = "xyes"])
7
8+AX_LIB_NUODB
9+AS_IF([test "x${WANT_NUODB}" = "xyes"],
10+ [AC_DEFINE([HAVE_NUODB],[1],[Have the NuoDB library])],
11+ [AC_DEFINE([HAVE_NUODB],[0],[Have the NuoDB library])])
12+AM_CONDITIONAL([HAVE_NUODB],[test "x${WANT_NUODB}" = "xyes"])
13
14 PANDORA_ENABLE_DTRACE
15 AX_HAVE_LIBPQ
16@@ -345,6 +350,7 @@
17 echo " * Debug enabled: $ax_enable_debug"
18 echo " * Warnings as failure: $ac_cv_warnings_as_errors"
19 echo " * Building with libsqlite3 $WANT_SQLITE3"
20+echo " * Building with nuodb $WANT_NUODB"
21 echo " * Building with libdrizzle $ac_enable_libdrizzle"
22 echo " * Building with libmemcached $ax_enable_libmemcached"
23 echo " * Building with libpq $ac_cv_libpq"
24
25=== modified file 'libgearman-server/plugins.cc'
26--- libgearman-server/plugins.cc 2013-10-20 11:00:25 +0000
27+++ libgearman-server/plugins.cc 2014-05-09 18:35:29 +0000
28@@ -71,6 +71,13 @@
29 }
30 #endif
31
32+#if defined(HAVE_NUODB) && HAVE_NUODB
33+ if (HAVE_NUODB)
34+ {
35+ queue::initialize_nuodb();
36+ }
37+#endif
38+
39 #if defined(HAVE_LIBPQ) && HAVE_LIBPQ
40 if (HAVE_LIBPQ)
41 {
42
43=== modified file 'libgearman-server/plugins/queue.h'
44--- libgearman-server/plugins/queue.h 2012-11-23 09:17:06 +0000
45+++ libgearman-server/plugins/queue.h 2014-05-09 18:35:29 +0000
46@@ -52,3 +52,5 @@
47 #include <libgearman-server/plugins/queue/redis/queue.h>
48
49 #include <libgearman-server/plugins/queue/mysql/queue.h>
50+
51+#include <libgearman-server/plugins/queue/nuodb/queue.h>
52
53=== modified file 'libgearman-server/plugins/queue/include.am'
54--- libgearman-server/plugins/queue/include.am 2012-11-23 09:17:06 +0000
55+++ libgearman-server/plugins/queue/include.am 2014-05-09 18:35:29 +0000
56@@ -21,3 +21,4 @@
57 include libgearman-server/plugins/queue/sqlite/include.am
58 include libgearman-server/plugins/queue/tokyocabinet/include.am
59 include libgearman-server/plugins/queue/mysql/include.am
60+include libgearman-server/plugins/queue/nuodb/include.am
61
62=== added directory 'libgearman-server/plugins/queue/nuodb'
63=== added file 'libgearman-server/plugins/queue/nuodb/include.am'
64--- libgearman-server/plugins/queue/nuodb/include.am 1970-01-01 00:00:00 +0000
65+++ libgearman-server/plugins/queue/nuodb/include.am 2014-05-09 18:35:29 +0000
66@@ -0,0 +1,23 @@
67+# vim:ft=automake
68+# Gearman
69+# Copyright (C) 2011 Data Differential, http://datadifferential.com/
70+# All rights reserved.
71+#
72+# Use and distribution licensed under the BSD license. See
73+# the COPYING file in the parent directory for full text.
74+#
75+# All paths should be given relative to the root
76+#
77+
78+noinst_HEADERS+= libgearman-server/plugins/queue/nuodb/queue.h
79+
80+if HAVE_NUODB
81+
82+libgearman_server_libgearman_server_la_SOURCES+= libgearman-server/plugins/queue/nuodb/queue.cc
83+libgearman_server_libgearman_server_la_LIBADD+= $(NUODB_LDFLAGS)
84+libgearman_server_libgearman_server_la_CXXFLAGS+= $(NUODB_CFLAGS)
85+
86+gearmand_gearmand_LDADD+= $(NUODB_LDFLAGS)
87+
88+endif
89+
90
91=== added file 'libgearman-server/plugins/queue/nuodb/queue.cc'
92--- libgearman-server/plugins/queue/nuodb/queue.cc 1970-01-01 00:00:00 +0000
93+++ libgearman-server/plugins/queue/nuodb/queue.cc 2014-05-09 18:35:29 +0000
94@@ -0,0 +1,550 @@
95+/* vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
96+ *
97+ * Gearmand client and server library.
98+ *
99+ * Copyright (C) 2011 Data Differential, http://datadifferential.com/
100+ * Copyright (C) 2008 Brian Aker, Eric Day
101+ * All rights reserved.
102+ *
103+ * Redistribution and use in source and binary forms, with or without
104+ * modification, are permitted provided that the following conditions are
105+ * met:
106+ *
107+ * * Redistributions of source code must retain the above copyright
108+ * notice, this list of conditions and the following disclaimer.
109+ *
110+ * * Redistributions in binary form must reproduce the above
111+ * copyright notice, this list of conditions and the following disclaimer
112+ * in the documentation and/or other materials provided with the
113+ * distribution.
114+ *
115+ * * The names of its contributors may not be used to endorse or
116+ * promote products derived from this software without specific prior
117+ * written permission.
118+ *
119+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
120+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
121+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
122+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
123+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
124+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
125+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
126+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
127+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
128+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
129+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
130+ *
131+ */
132+
133+/**
134+ * @file
135+ * @brief NuoDB Queue Storage Definitions
136+ */
137+
138+#include <gear_config.h>
139+#include <libgearman-server/common.h>
140+#include <libgearman-server/byte.h>
141+#include <libgearman-server/plugins/queue/nuodb/queue.h>
142+#include <libgearman-server/plugins/queue/base.h>
143+
144+#pragma GCC diagnostic push
145+#pragma GCC diagnostic ignored "-Wundef"
146+#pragma GCC diagnostic ignored "-Wnon-virtual-dtor"
147+#pragma GCC diagnostic ignored "-Woverloaded-virtual"
148+#pragma GCC diagnostic ignored "-Wreorder"
149+#pragma GCC diagnostic ignored "-Wunused-parameter"
150+#pragma GCC diagnostic ignored "-Wold-style-cast"
151+
152+#include <NuoDB.h>
153+#include <cerrno>
154+
155+/**
156+ * @addtogroup plugins::queue::NuoDB Static NuoDB Queue Storage Definitions
157+ * @ingroup gearman_queue_nuodb
158+ * @{
159+ */
160+
161+/**
162+ * Default values.
163+ */
164+#define GEARMAND_QUEUE_NUODB_DEFAULT_SCHEMA "gearman"
165+#define GEARMAND_QUEUE_NUODB_DEFAULT_TABLE "queue"
166+
167+namespace gearmand { namespace plugins { namespace queue { class Nuodb; }}}
168+
169+static gearmand_error_t _initialize(gearman_server_st& server, gearmand::plugins::queue::Nuodb *queue);
170+
171+namespace gearmand {
172+namespace plugins {
173+namespace queue {
174+
175+class Nuodb : public gearmand::plugins::Queue {
176+public:
177+ Nuodb();
178+ ~Nuodb();
179+
180+ gearmand_error_t initialize();
181+
182+ const std::string &insert() {
183+ return _insert_query;
184+ }
185+
186+ const std::string &select() {
187+ return _select_query;
188+ }
189+
190+ const std::string &create() {
191+ return _create_query;
192+ }
193+
194+ NuoDB::Connection *con;
195+ std::string nuodb_connect_string;
196+ std::string nuodb_user;
197+ std::string nuodb_pw;
198+ std::string schema;
199+ std::string table;
200+ NuoDB::PreparedStatement *_insert_stmt;
201+ NuoDB::PreparedStatement *_delete_stmt;
202+ NuoDB::PreparedStatement *_select_stmt;
203+
204+public:
205+ std::string _insert_query;
206+ std::string _select_query;
207+ std::string _create_query;
208+
209+ NuoDB::PreparedStatement *insert_stmt() {
210+ return _insert_stmt;
211+ }
212+
213+ NuoDB::PreparedStatement *delete_stmt() {
214+ return _delete_stmt;
215+ }
216+
217+ NuoDB::PreparedStatement *select_stmt() {
218+ return _select_stmt;
219+ }
220+
221+};
222+
223+Nuodb::Nuodb() :
224+ Queue("NuoDB"),
225+ con(NULL),
226+ nuodb_connect_string(""),
227+ nuodb_user(""),
228+ nuodb_pw(""),
229+ table(""),
230+ schema("")
231+{
232+ command_line_options().add_options()
233+ ("nuodb-conninfo", boost::program_options::value(&nuodb_connect_string)->default_value(""), "NuoDB connection information string.")
234+ ("nuodb-user", boost::program_options::value(&nuodb_user)->default_value(""), "NuoDB connection username.")
235+ ("nuodb-password", boost::program_options::value(&nuodb_pw)->default_value(""), "NuoDB connection password.")
236+ ("nuodb-table", boost::program_options::value(&table)->default_value(GEARMAND_QUEUE_NUODB_DEFAULT_TABLE), "Table to use.")
237+ ("nuodb-schema", boost::program_options::value(&schema)->default_value(GEARMAND_QUEUE_NUODB_DEFAULT_SCHEMA), "Schema to use.");
238+}
239+
240+Nuodb::~Nuodb ()
241+{
242+ if (con) {
243+ con->close();
244+ }
245+ con = NULL;
246+}
247+
248+gearmand_error_t Nuodb::initialize()
249+{
250+ _create_query+= "CREATE TABLE " +table +" (unique_key VARCHAR" +"(" + TOSTRING(GEARMAN_UNIQUE_SIZE) +"), ";
251+ _create_query+= "function_name VARCHAR(255), priority INTEGER, data BLOB, when_to_run INTEGER, UNIQUE (unique_key, function_name))";
252+ _insert_query+= "INSERT INTO " +table +" (priority, unique_key, function_name, data, when_to_run) VALUES(?,?,?,?,?)";
253+ _select_query+= "SELECT unique_key,function_name,priority,data,when_to_run FROM " +table;
254+ gearmand_error_t ret= _initialize(Gearmand()->server, this);
255+ return ret;
256+}
257+
258+void initialize_nuodb()
259+{
260+ static Nuodb local_instance;
261+}
262+
263+} // namespace queue
264+} // namespace plugins
265+} // namespace gearmand
266+
267+
268+/* Queue callback functions. */
269+static gearmand_error_t _nuodb_add(gearman_server_st *server, void *context,
270+ const char *unique, size_t unique_size,
271+ const char *function_name,
272+ size_t function_name_size,
273+ const void *data, size_t data_size,
274+ gearman_job_priority_t priority,
275+ int64_t when);
276+
277+static gearmand_error_t _nuodb_flush(gearman_server_st *server, void *context);
278+
279+static gearmand_error_t _nuodb_done(gearman_server_st *server, void *context,
280+ const char *unique,
281+ size_t unique_size,
282+ const char *function_name,
283+ size_t function_name_size);
284+
285+static gearmand_error_t _nuodb_replay(gearman_server_st *server, void *context,
286+ gearman_queue_add_fn *add_fn,
287+ void *add_context);
288+
289+/** @} */
290+
291+/*
292+ * Public definitions
293+ */
294+
295+gearmand_error_t _initialize(gearman_server_st& server,
296+ gearmand::plugins::queue::Nuodb *queue)
297+{
298+ // TODO: use nuodb_connnect_string
299+ const char *database = queue->nuodb_connect_string.c_str();
300+ const char *user = queue->nuodb_user.c_str();
301+ const char *pw = queue->nuodb_pw.c_str();
302+ const char *schema = queue->schema.c_str();
303+
304+ gearmand_info("Initializing nuodb module");
305+
306+ gearman_server_set_queue(server, queue, _nuodb_add, _nuodb_flush, _nuodb_done, _nuodb_replay);
307+
308+ try {
309+ queue->con = NuoDB::Connection::createUtf8();
310+ NuoDB::Properties* properties = queue->con->allocProperties();
311+ properties->putValue("user", user);
312+ properties->putValue("password", pw);
313+ properties->putValue("schema", schema);
314+ queue->con->openDatabase(database, properties);
315+ } catch (NuoDB::SQLException &e) {
316+ gearman_server_set_queue(server, NULL, NULL, NULL, NULL, NULL);
317+ return gearmand_log_gerror(GEARMAN_DEFAULT_LOG_PARAM, GEARMAND_QUEUE_ERROR, "NuoDB Connection: [%s] %s", e.getSqlcode(), e.getText());
318+ } catch (...) {
319+ // Unknown error
320+ queue->con = NULL;
321+ }
322+ if (queue->con == NULL)
323+ {
324+ gearman_server_set_queue(server, NULL, NULL, NULL, NULL, NULL);
325+ return gearmand_log_gerror(GEARMAN_DEFAULT_LOG_PARAM, GEARMAND_QUEUE_ERROR, "NuoDB Connection: %s", "Unknown error");
326+ }
327+
328+
329+ std::string query("SELECT TABLENAME FROM SYSTEM.TABLES WHERE TYPE = 'TABLE' AND SCHEMA = '" +queue->schema +"' AND TABLENAME = '" +queue->table + "'");
330+
331+ NuoDB::PreparedStatement *stmt = NULL;
332+ NuoDB::ResultSet *rs = NULL;
333+
334+ try {
335+ stmt = queue->con->prepareStatement(query.c_str());
336+ stmt->execute();
337+ rs = stmt->getResultSet();
338+ } catch (NuoDB::SQLException &e) {
339+ std::string error_string= "NuoDB:";
340+ error_string += "Queue error: [";
341+ error_string += e.getSqlcode();
342+ error_string += "] ";
343+ error_string += e.getText();
344+ gearmand_gerror(error_string.c_str(), GEARMAND_QUEUE_ERROR);
345+ return GEARMAND_QUEUE_ERROR;
346+ } catch (...) {
347+ // Unknown error
348+ rs = NULL;
349+ }
350+
351+ if (rs == NULL) {
352+ std::string error_string= "NuoDB:";
353+ error_string+= "No results for query: ";
354+ error_string+= query.c_str();
355+ gearmand_gerror(error_string.c_str(), GEARMAND_QUEUE_ERROR);
356+ return GEARMAND_QUEUE_ERROR;
357+ }
358+
359+ if (!rs->next())
360+ {
361+ gearmand_log_info(GEARMAN_DEFAULT_LOG_PARAM, "nuodb module creating table '%s'", queue->table.c_str());
362+
363+ bool result = 1;
364+ const char *sql = queue->create().c_str();
365+ try {
366+ stmt = queue->con->prepareStatement(sql);
367+ result = stmt->execute();
368+ } catch (NuoDB::SQLException &e) {
369+ std::string error_string= "NuoDB:";
370+ error_string += "Table create error: [";
371+ error_string += e.getSqlcode();
372+ error_string += "] ";
373+ error_string += e.getText();
374+ gearmand_gerror(error_string.c_str(), GEARMAND_QUEUE_ERROR);
375+ return GEARMAND_QUEUE_ERROR;
376+ } catch (...) {
377+ // Unknown error
378+ std::string error_string= "NuoDB:";
379+ error_string+= "Table create error for statement: ";
380+ error_string+= query.c_str();
381+ gearmand_gerror(error_string.c_str(), GEARMAND_QUEUE_ERROR);
382+ return GEARMAND_QUEUE_ERROR;
383+ }
384+
385+ if (result == 1)
386+ {
387+ gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM,
388+ "NuoDB:Problem creating table: %s", sql);
389+ gearman_server_set_queue(server, NULL, NULL, NULL, NULL, NULL);
390+ return GEARMAND_QUEUE_ERROR;
391+ }
392+ }
393+
394+ const char *insert_sql = queue->insert().c_str();
395+ try {
396+ queue->_insert_stmt = queue->con->prepareStatement(insert_sql);
397+ } catch (NuoDB::SQLException &e) {
398+ std::string error_string= "NuoDB:";
399+ error_string += "Error preparing insert statement: [";
400+ error_string += e.getSqlcode();
401+ error_string += "] ";
402+ error_string += e.getText();
403+ gearmand_gerror(error_string.c_str(), GEARMAND_QUEUE_ERROR);
404+ return GEARMAND_QUEUE_ERROR;
405+ } catch (...) {
406+ // Unknown error
407+ std::string error_string= "NuoDB:";
408+ error_string+= "Error preparing insert statement: ";
409+ error_string+= insert_sql;
410+ gearmand_gerror(error_string.c_str(), GEARMAND_QUEUE_ERROR);
411+ return GEARMAND_QUEUE_ERROR;
412+ }
413+
414+ std::string delete_query;
415+ delete_query+= "DELETE FROM ";
416+ delete_query+= queue->table;
417+ delete_query+= " WHERE unique_key = ?";
418+ delete_query+= " AND function_name = ?";
419+ const char *delete_sql = delete_query.c_str();
420+ try {
421+ queue->_delete_stmt = queue->con->prepareStatement(delete_sql);
422+ } catch (NuoDB::SQLException &e) {
423+ std::string error_string= "NuoDB:";
424+ error_string += "Error preparing delete statement: [";
425+ error_string += e.getSqlcode();
426+ error_string += "] ";
427+ error_string += e.getText();
428+ gearmand_gerror(error_string.c_str(), GEARMAND_QUEUE_ERROR);
429+ return GEARMAND_QUEUE_ERROR;
430+ } catch (...) {
431+ // Unknown error
432+ std::string error_string= "NuoDB:";
433+ error_string+= "Error preparing delete statement: ";
434+ error_string+= insert_sql;
435+ gearmand_gerror(error_string.c_str(), GEARMAND_QUEUE_ERROR);
436+ return GEARMAND_QUEUE_ERROR;
437+ }
438+
439+ std::string select_query("SELECT unique_key,function_name,priority,data,when_to_run FROM " + queue->table);
440+ const char *select_sql = select_query.c_str();
441+ try {
442+ queue->_select_stmt = queue->con->prepareStatement(select_sql);
443+ } catch (NuoDB::SQLException &e) {
444+ std::string error_string= "NuoDB:";
445+ error_string += "Error preparing select statement: [";
446+ error_string += e.getSqlcode();
447+ error_string += "] ";
448+ error_string += e.getText();
449+ gearmand_gerror(error_string.c_str(), GEARMAND_QUEUE_ERROR);
450+ return GEARMAND_QUEUE_ERROR;
451+ } catch (...) {
452+ // Unknown error
453+ std::string error_string= "NuoDB:";
454+ error_string+= "Error preparing select statement: ";
455+ error_string+= insert_sql;
456+ gearmand_gerror(error_string.c_str(), GEARMAND_QUEUE_ERROR);
457+ return GEARMAND_QUEUE_ERROR;
458+ }
459+
460+ rs->close();
461+ rs = NULL;
462+ stmt->close();
463+ stmt = NULL;
464+ return GEARMAND_SUCCESS;
465+}
466+
467+/*
468+ * Static definitions
469+ */
470+
471+static gearmand_error_t _nuodb_add(gearman_server_st*, void *context,
472+ const char *unique, size_t unique_size,
473+ const char *function_name,
474+ size_t function_name_size,
475+ const void *data, size_t data_size,
476+ gearman_job_priority_t priority,
477+ int64_t when)
478+{
479+ gearmand::plugins::queue::Nuodb *queue= (gearmand::plugins::queue::Nuodb *)context;
480+
481+ gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "nuodb add: %.*s", (uint32_t)unique_size, (char *)unique);
482+
483+
484+ NuoDB::PreparedStatement *stmt = queue->insert_stmt();
485+
486+ try {
487+ stmt->setInt(1, static_cast<uint32_t>(priority));
488+ stmt->setString(2, unique);
489+ stmt->setString(3, function_name);
490+ stmt->setBytes(4, data_size, (const void *) data);
491+ stmt->setLong(5, when);
492+ stmt->execute();
493+ } catch (NuoDB::SQLException &e) {
494+ std::string error_string= "NuoDB:";
495+ error_string += "Add queue error: [";
496+ error_string += e.getSqlcode() + "] ";
497+ error_string += e.getText();
498+ gearmand_gerror(error_string.c_str(), GEARMAND_QUEUE_ERROR);
499+ return GEARMAND_QUEUE_ERROR;
500+ } catch (...) {
501+ // Unknown error
502+ std::string error_string= "NuoDB:";
503+ error_string+= "Unable to add to queue ";
504+ gearmand_gerror(error_string.c_str(), GEARMAND_QUEUE_ERROR);
505+ return GEARMAND_QUEUE_ERROR;
506+ }
507+
508+ return GEARMAND_SUCCESS;
509+}
510+
511+static gearmand_error_t _nuodb_flush(gearman_server_st *, void *)
512+{
513+ gearmand_debug("nuodb flush");
514+
515+ return GEARMAND_SUCCESS;
516+}
517+
518+static gearmand_error_t _nuodb_done(gearman_server_st*, void *context,
519+ const char *unique,
520+ size_t unique_size,
521+ const char *function_name,
522+ size_t function_name_size)
523+{
524+ (void)function_name_size;
525+ gearmand::plugins::queue::Nuodb *queue= (gearmand::plugins::queue::Nuodb *)context;
526+ gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "nuodb done: %.*s", (uint32_t)unique_size, (char *)unique);
527+
528+ NuoDB::PreparedStatement *stmt = queue->delete_stmt();
529+
530+ try {
531+ stmt->setString(1, unique);
532+ stmt->setString(2, function_name);
533+ stmt->execute();
534+ } catch (NuoDB::SQLException &e) {
535+ std::string error_string= "NuoDB:";
536+ error_string += "Delete queue error: [";
537+ error_string += e.getSqlcode();
538+ error_string += "] ";
539+ error_string += e.getText();
540+ gearmand_gerror(error_string.c_str(), GEARMAND_QUEUE_ERROR);
541+ return GEARMAND_QUEUE_ERROR;
542+ } catch (...) {
543+ // Unknown error
544+ std::string error_string= "NuoDB:";
545+ error_string+= "Unable to delete from queue ";
546+ gearmand_gerror(error_string.c_str(), GEARMAND_QUEUE_ERROR);
547+ return GEARMAND_QUEUE_ERROR;
548+ }
549+
550+ return GEARMAND_SUCCESS;
551+}
552+
553+static gearmand_error_t _nuodb_replay(gearman_server_st *server, void *context,
554+ gearman_queue_add_fn *add_fn,
555+ void *add_context)
556+{
557+ gearmand::plugins::queue::Nuodb *queue= (gearmand::plugins::queue::Nuodb *)context;
558+
559+ gearmand_info("nuodb replay start");
560+
561+
562+ NuoDB::PreparedStatement *stmt = queue->select_stmt();
563+ NuoDB::ResultSet *rs = NULL;
564+
565+
566+ try {
567+ stmt->execute();
568+ rs = stmt->getResultSet();
569+ } catch (NuoDB::SQLException &e) {
570+ std::string error_string= "NuoDB:";
571+ error_string += "Select error: [";
572+ error_string += e.getSqlcode();
573+ error_string += "] ";
574+ error_string += e.getText();
575+ gearmand_gerror(error_string.c_str(), GEARMAND_QUEUE_ERROR);
576+ return GEARMAND_QUEUE_ERROR;
577+ } catch (...) {
578+ // Unknown error
579+ rs = NULL;
580+ }
581+
582+ if (rs == NULL)
583+ {
584+ std::string error_string= "NuoDB:";
585+ error_string+= "No results for query ";
586+ gearmand_gerror(error_string.c_str(), GEARMAND_QUEUE_ERROR);
587+ return GEARMAND_QUEUE_ERROR;
588+ }
589+
590+
591+ while(rs->next())
592+ {
593+ const char *unique_key;
594+ const char *function_name;
595+ int priority;
596+ size_t data_length;
597+ char *data;
598+ int when_to_run;
599+
600+ unique_key = rs->getString(1);
601+ gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM,
602+ "nuodb replay: %s",
603+ unique_key);
604+
605+ NuoDB::Blob *blob = rs->getBlob(4);
606+ if (blob == NULL)
607+ {
608+ data= NULL;
609+ data_length= 0;
610+ }
611+ else
612+ {
613+ data_length = size_t(blob->length());
614+ data= (char *)malloc(data_length);
615+ if (data == NULL)
616+ {
617+ return gearmand_perror(errno, "malloc");
618+ }
619+ blob->getBytes(0, data_length, (unsigned char *)data);
620+ }
621+
622+ gearmand_error_t ret;
623+ function_name = rs->getString(2);
624+ priority = rs->getInt(3);
625+ when_to_run = rs->getInt(5);
626+ ret= (*add_fn)(server, add_context, unique_key,
627+ (size_t) strlen(unique_key),
628+ function_name,
629+ (size_t) strlen(function_name),
630+ data, data_length,
631+ (gearman_job_priority_t) priority,
632+ when_to_run);
633+ if (gearmand_failed(ret))
634+ {
635+ rs->close();
636+ return ret;
637+ }
638+ }
639+
640+ rs->close();
641+ return GEARMAND_SUCCESS;
642+}
643+
644+#pragma GCC diagnostic pop
645
646=== added file 'libgearman-server/plugins/queue/nuodb/queue.h'
647--- libgearman-server/plugins/queue/nuodb/queue.h 1970-01-01 00:00:00 +0000
648+++ libgearman-server/plugins/queue/nuodb/queue.h 2014-05-09 18:35:29 +0000
649@@ -0,0 +1,49 @@
650+/* vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
651+ *
652+ * Gearmand client and server library.
653+ *
654+ * Copyright (C) 2011 Data Differential, http://datadifferential.com/
655+ * All rights reserved.
656+ *
657+ * Redistribution and use in source and binary forms, with or without
658+ * modification, are permitted provided that the following conditions are
659+ * met:
660+ *
661+ * * Redistributions of source code must retain the above copyright
662+ * notice, this list of conditions and the following disclaimer.
663+ *
664+ * * Redistributions in binary form must reproduce the above
665+ * copyright notice, this list of conditions and the following disclaimer
666+ * in the documentation and/or other materials provided with the
667+ * distribution.
668+ *
669+ * * The names of its contributors may not be used to endorse or
670+ * promote products derived from this software without specific prior
671+ * written permission.
672+ *
673+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
674+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
675+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
676+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
677+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
678+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
679+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
680+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
681+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
682+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
683+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
684+ *
685+ */
686+
687+#pragma once
688+
689+
690+namespace gearmand {
691+namespace plugins {
692+namespace queue {
693+
694+void initialize_nuodb();
695+
696+} // namespace queue
697+} // namespace plugin
698+} // namespace gearmand
699
700=== added file 'm4/ax_lib_nuodb.m4'
701--- m4/ax_lib_nuodb.m4 1970-01-01 00:00:00 +0000
702+++ m4/ax_lib_nuodb.m4 2014-05-09 18:35:29 +0000
703@@ -0,0 +1,130 @@
704+#
705+# SYNOPSIS
706+#
707+# AX_LIB_NUODB([MINIMUM-VERSION])
708+#
709+# DESCRIPTION
710+#
711+# This macro provides tests of availability of NuoDB 'libRemote' library
712+# of particular version or newer.
713+#
714+# AX_LIB_NUODB macro takes only one argument which is optional. If
715+# there is no required version passed, then macro does not run version
716+# test.
717+#
718+# The --with-nuodb option takes one of three possible values:
719+#
720+# no - do not check for NuoDB client library
721+#
722+# yes - do check for NuoDB client library in standard locations
723+#
724+# path - complete path to NuoDB home directory
725+#
726+# This macro calls:
727+#
728+# AC_SUBST(NUODB_CFLAGS)
729+# AC_SUBST(NUODB_LDFLAGS)
730+# AC_SUBST(NUODB_VERSION)
731+#
732+# And sets:
733+#
734+# HAVE_NUODB
735+#
736+
737+AC_DEFUN([AX_LIB_NUODB],
738+[
739+ AC_ARG_WITH([nuodb],
740+ AS_HELP_STRING([--with-nuodb=@<:@ARG@:>@],
741+ [use NuoDB libNuoRemote library @<:@default=yes@:>@, optionally specify path to NuoDB Home Directory]
742+ ),
743+ [
744+ if test "$withval" = "no"; then
745+ WANT_NUODB="no"
746+ elif test "$withval" = "yes"; then
747+ WANT_NUODB="yes"
748+ NUOHOME="/opt/nuodb"
749+ else
750+ WANT_NUODB="yes"
751+ NUOHOME="$withval"
752+ fi
753+ ],
754+ [WANT_NUODB="yes"]
755+ )
756+
757+ NUODB_CFLAGS=""
758+ NUODB_LDFLAGS=""
759+ NUODB_VERSION=""
760+
761+ dnl
762+ dnl Check NuoDB libraries (libNuoRemote)
763+ dnl
764+
765+ if test "$WANT_NUODB" = "yes"; then
766+
767+ if test -z "$NUOHOME" -o test; then
768+
769+ AC_MSG_CHECKING([for NuoDB libraries])
770+
771+ NUODB_CFLAGS="-I$NUOHOME/include"
772+ NUODB_LDFLAGS="-L$NUOHOME/lib64 -lNuoRemote"
773+ NUODB_VERSION=`$NUOHOME/bin/nuochk --version | sed -e 's#NuoDB Server build ##'`
774+ AC_DEFINE([HAVE_NUODB], [1],
775+ [Define to 1 if NuoDB libraries are available])
776+
777+ found_nuodb="yes"
778+ AC_MSG_RESULT([yes])
779+ else
780+ found_nuodb="no"
781+ AC_MSG_RESULT([no])
782+ fi
783+ fi
784+
785+ dnl
786+ dnl Check if required version of NuoDB is available
787+ dnl
788+
789+
790+ nuodb_version_req=ifelse([$1], [], [], [$1])
791+
792+ if test "$found_nuodb" = "yes" -a -n "$nuodb_version_req"; then
793+
794+ AC_MSG_CHECKING([if NuoDB version is >= $nuodb_version_req])
795+
796+ dnl Decompose required version string of NuoDB
797+ dnl and calculate its number representation
798+ nuodb_version_req_major=`expr $nuodb_version_req : '\([[0-9]]*\)'`
799+ nuodb_version_req_minor=`expr $nuodb_version_req : '[[0-9]]*\.\([[0-9]]*\)'`
800+ nuodb_version_req_micro=`expr $nuodb_version_req : '[[0-9]]*\.[[0-9]]*\.\([[0-9]]*\)'`
801+ if test "x$nuodb_version_req_micro" = "x"; then
802+ nuodb_version_req_micro="0"
803+ fi
804+
805+ nuodb_version_req_number=`expr $nuodb_version_req_major \* 1000000 \
806+ \+ $nuodb_version_req_minor \* 1000 \
807+ \+ $nuodb_version_req_micro`
808+
809+ dnl Decompose version string of installed NuoDB
810+ dnl and calculate its number representation
811+ nuodb_version_major=`expr $NUODB_VERSION : '\([[0-9]]*\)'`
812+ nuodb_version_minor=`expr $NUODB_VERSION : '[[0-9]]*\.\([[0-9]]*\)'`
813+ nuodb_version_micro=`expr $NUODB_VERSION : '[[0-9]]*\.[[0-9]]*\.\([[0-9]]*\)'`
814+ if test "x$nuodb_version_micro" = "x"; then
815+ nuodb_version_micro="0"
816+ fi
817+
818+ nuodb_version_number=`expr $nuodb_version_major \* 1000000 \
819+ \+ $nuodb_version_minor \* 1000 \
820+ \+ $nuodb_version_micro`
821+
822+ nuodb_version_check=`expr $nuodb_version_number \>\= $nuodb_version_req_number`
823+ if test "$nuodb_version_check" = "1"; then
824+ AC_MSG_RESULT([yes])
825+ else
826+ AC_MSG_RESULT([no])
827+ fi
828+ fi
829+
830+ AC_SUBST([NUODB_VERSION])
831+ AC_SUBST([NUODB_CFLAGS])
832+ AC_SUBST([NUODB_LDFLAGS])
833+])

Subscribers

People subscribed via source and target branches

to all changes: