Merge lp:~sven-nierlein/gearmand/gearmand into lp:gearmand

Proposed by Sven Nierlein on 2012-11-11
Status: Merged
Merged at revision: 669
Proposed branch: lp:~sven-nierlein/gearmand/gearmand
Merge into: lp:gearmand
Diff against target: 610 lines (+363/-18)
14 files modified
libgearman-server/constants.h (+2/-0)
libgearman-server/gearmand.cc (+12/-8)
libgearman-server/plugins.cc (+2/-0)
libgearman-server/plugins/base.h (+2/-0)
libgearman-server/plugins/queue.h (+2/-0)
libgearman-server/plugins/queue/include.am (+1/-0)
libgearman-server/plugins/queue/retention/include.am (+15/-0)
libgearman-server/plugins/queue/retention/queue.cc (+230/-0)
libgearman-server/plugins/queue/retention/queue.h (+22/-0)
libgearman-server/queue.cc (+23/-0)
libgearman-server/queue.hpp (+8/-0)
libgearman-server/server.cc (+29/-9)
libgearman-server/server.h (+12/-0)
libgearman-server/struct/server.h (+3/-1)
To merge this branch: bzr merge lp:~sven-nierlein/gearmand/gearmand
Reviewer Review Type Date Requested Status
Tangent Trunk 2012-11-11 Pending
Review via email: mp+133809@code.launchpad.net

Description of the change

added retention queue plugin

The retention queue plugin will save the queue to disk on shutdown. Therefor
it does not slow down the gearmand during runtime and still preserves the
queue during restarts.

To post a comment you must log in.
Brian Aker (brianaker) wrote :

Sorry, just looking at this now.

Sven Nierlein (sven-nierlein) wrote :

so what do you think? I noticed the status changed to "merged" but i cannot find these changes in gearmand yet.

Brian Aker (brianaker) wrote :

Hi,

What I did to test this was to extend the sqlite plugin to record on shutdown to see how this works out. In testing,... so far so good.

Look in lp:gearmand

Cheers,
 -Bian

On Nov 25, 2012, at 1:46 AM, Sven Nierlein <email address hidden> wrote:

> so what do you think? I noticed the status changed to "merged" but i cannot find these changes in gearmand yet.
> --
> https://code.launchpad.net/~sven-nierlein/gearmand/gearmand/+merge/133809
> Your team Tangent is subscribed to branch lp:gearmand.

Sven Nierlein (sven-nierlein) wrote :

I can't get this working:
when i start gearmand with either this:
-q libsqlite3 --libsqlite3-db=/tmp/gearmand.sqlite --libsqlite3-store-on-shutdown
then it stores all jobs in sqlite3, not only on shutdown. This is exactly what i wanted
to avoid.

When i start gearmand with:
--libsqlite3-db=/tmp/gearmand.sqlite --libsqlite3-store-on-shutdown
only, then nothing is stored, not even on shutdown.

Besides that, this adds a dependency to sqlite3 which wasn't necessary with my merge proposal.

Brian Aker (brianaker) wrote :

Hi,

On Nov 25, 2012, at 3:28 AM, Sven Nierlein <email address hidden> wrote:

> I can't get this working:
> when i start gearmand with either this:
> -q libsqlite3 --libsqlite3-db=/tmp/gearmand.sqlite --libsqlite3-store-on-shutdown
> then it stores all jobs in sqlite3, not only on shutdown. This is exactly what i wanted
> to avoid.

Let me see what is happening with it then. The test case should be covering this problem.

> Besides that, this adds a dependency to sqlite3 which wasn't necessary with my merge proposal.

The creation of a new queue plugin in separate from adding the ability for a queue to store on shutdown.

Which brings up the question of what do you want to achieve with a new queue type?

Cheers,
 -Brian

Sven Nierlein (sven-nierlein) wrote :

The intention of the new queue type is to create a queue plugin, which adds no overhead
during runtime and just stores the queue to disk on shutdown.

I am "sni" in the freenode #gearman channel if you want to discuss details.

Brian Aker (brianaker) wrote :

Hi!

https://bugs.launchpad.net/gearmand/+bug/1087654

I've created a bug to track this, and I have linked a tree that provides a test case.

The test case creates eight jobs and then checks to before shutting down the server that sqlite is empty. The test then shuts down the server and checks to see if they jobs are there. It then starts up the server and verifies that the jobs are processed.

Cheers,
 -Brian

On Nov 25, 2012, at 11:40 PM, Brian Aker <email address hidden> wrote:

> Hi,
>
> On Nov 25, 2012, at 3:28 AM, Sven Nierlein <email address hidden> wrote:
>
>> I can't get this working:
>> when i start gearmand with either this:
>> -q libsqlite3 --libsqlite3-db=/tmp/gearmand.sqlite --libsqlite3-store-on-shutdown
>> then it stores all jobs in sqlite3, not only on shutdown. This is exactly what i wanted
>> to avoid.
>
> Let me see what is happening with it then. The test case should be covering this problem.
>
>> Besides that, this adds a dependency to sqlite3 which wasn't necessary with my merge proposal.
>
> The creation of a new queue plugin in separate from adding the ability for a queue to store on shutdown.
>
> Which brings up the question of what do you want to achieve with a new queue type?
>
> Cheers,
> -Brian
>
>
> --
> https://code.launchpad.net/~sven-nierlein/gearmand/gearmand/+merge/133809
> Your team Tangent is subscribed to branch lp:gearmand.

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'libgearman-server/constants.h'
2--- libgearman-server/constants.h 2012-08-18 06:26:36 +0000
3+++ libgearman-server/constants.h 2012-11-11 01:10:27 +0000
4@@ -155,6 +155,8 @@
5 void *context,
6 gearman_queue_add_fn *add_fn,
7 void *add_context);
8+typedef gearmand_error_t (gearman_queue_shutdown_fn)(gearman_server_st *server,
9+ void *context);
10
11 typedef gearmand_error_t (gearmand_connection_add_fn)(gearman_server_con_st *con);
12
13
14=== modified file 'libgearman-server/gearmand.cc'
15--- libgearman-server/gearmand.cc 2012-10-29 07:06:01 +0000
16+++ libgearman-server/gearmand.cc 2012-11-11 01:10:27 +0000
17@@ -1,5 +1,5 @@
18 /* vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
19- *
20+ *
21 * Gearmand client and server library.
22 *
23 * Copyright (C) 2011-2012 Data Differential, http://datadifferential.com/
24@@ -100,6 +100,10 @@
25 /* All threads should be cleaned up before calling this. */
26 assert(server.thread_list == NULL);
27
28+ gearmand_debug("shutdown queue: begin");
29+ gearman_server_queue_shutdown(server);
30+ gearmand_debug("shutdown queue: end");
31+
32 for (uint32_t key= 0; key < GEARMAND_JOB_HASH_SIZE; key++)
33 {
34 while (server.job_hash[key] != NULL)
35@@ -335,7 +339,7 @@
36 /* Initialize server components. */
37 if (gearmand->base == NULL)
38 {
39- gearmand_log_info(GEARMAN_DEFAULT_LOG_PARAM, "Starting up, verbose set to %s",
40+ gearmand_log_info(GEARMAN_DEFAULT_LOG_PARAM, "Starting up, verbose set to %s",
41 gearmand_verbose_name(gearmand->verbose));
42
43 if (gearmand->threads > 0)
44@@ -425,7 +429,7 @@
45 }
46 else
47 {
48- gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM,
49+ gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM,
50 "gearmand_wakeup() incorrectly wrote %lu bytes of data.", (unsigned long)written);
51 }
52 }
53@@ -587,7 +591,7 @@
54 }
55
56 // We are in single user threads, so strerror() is fine.
57- gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "Retrying bind(%s) on %s:%s %u >= %u",
58+ gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "Retrying bind(%s) on %s:%s %u >= %u",
59 strerror(errno), host, port->port,
60 waited, bind_timeout);
61 this_wait= retry * retry / 3 + 1;
62@@ -722,7 +726,7 @@
63 {
64 for (uint32_t y= 0; y < gearmand->port_list[x].listen_count; y++)
65 {
66- gearmand_log_info(GEARMAN_DEFAULT_LOG_PARAM,
67+ gearmand_log_info(GEARMAN_DEFAULT_LOG_PARAM,
68 "Clearing event for listening socket (%d)",
69 gearmand->port_list[x].listen_fd[y]);
70
71@@ -768,7 +772,7 @@
72 return;
73 }
74
75- /*
76+ /*
77 Since this is numeric, it should never fail. Even if it did we don't want to really error from it.
78 */
79 char host[NI_MAXHOST];
80@@ -922,7 +926,7 @@
81 gearmand_debug("Received SHUTDOWN_GRACEFUL wakeup event");
82 _listen_close(gearmand);
83
84- for (gearmand_thread_st* thread= gearmand->thread_list;
85+ for (gearmand_thread_st* thread= gearmand->thread_list;
86 thread != NULL;
87 thread= thread->next)
88 {
89@@ -1081,7 +1085,7 @@
90 return success;
91 }
92
93-static bool gearman_server_create(gearman_server_st& server,
94+static bool gearman_server_create(gearman_server_st& server,
95 uint8_t job_retries_arg,
96 uint8_t worker_wakeup_arg,
97 bool round_robin_arg)
98
99=== modified file 'libgearman-server/plugins.cc'
100--- libgearman-server/plugins.cc 2012-10-28 20:04:35 +0000
101+++ libgearman-server/plugins.cc 2012-11-11 01:10:27 +0000
102@@ -50,6 +50,8 @@
103 {
104 queue::initialize_default();
105
106+ queue::initialize_retention();
107+
108 if (HAVE_LIBDRIZZLE)
109 {
110 queue::initialize_drizzle();
111
112=== modified file 'libgearman-server/plugins/base.h'
113--- libgearman-server/plugins/base.h 2012-10-08 03:37:53 +0000
114+++ libgearman-server/plugins/base.h 2012-11-11 01:10:27 +0000
115@@ -96,6 +96,8 @@
116
117 virtual gearmand_error_t replay(gearman_server_st *server)= 0;
118
119+ virtual gearmand_error_t shutdown(gearman_server_st *server)= 0;
120+
121 static gearmand_error_t replay_add(gearman_server_st *server,
122 void *context __attribute__ ((unused)),
123 const char *unique, size_t unique_size,
124
125=== modified file 'libgearman-server/plugins/queue.h'
126--- libgearman-server/plugins/queue.h 2012-06-12 18:02:10 +0000
127+++ libgearman-server/plugins/queue.h 2012-11-11 01:10:27 +0000
128@@ -52,3 +52,5 @@
129 #include <libgearman-server/plugins/queue/redis/queue.h>
130
131 #include <libgearman-server/plugins/queue/mysql/queue.h>
132+
133+#include <libgearman-server/plugins/queue/retention/queue.h>
134
135=== modified file 'libgearman-server/plugins/queue/include.am'
136--- libgearman-server/plugins/queue/include.am 2012-08-11 01:50:29 +0000
137+++ libgearman-server/plugins/queue/include.am 2012-11-11 01:10:27 +0000
138@@ -21,3 +21,4 @@
139 include libgearman-server/plugins/queue/sqlite/include.am
140 include libgearman-server/plugins/queue/tokyocabinet/include.am
141 include libgearman-server/plugins/queue/mysql/include.am
142+include libgearman-server/plugins/queue/retention/include.am
143
144=== added directory 'libgearman-server/plugins/queue/retention'
145=== added file 'libgearman-server/plugins/queue/retention/include.am'
146--- libgearman-server/plugins/queue/retention/include.am 1970-01-01 00:00:00 +0000
147+++ libgearman-server/plugins/queue/retention/include.am 2012-11-11 01:10:27 +0000
148@@ -0,0 +1,15 @@
149+# vim:ft=automake
150+# Gearman
151+# Copyright (C) 2012 Data Differential, http://datadifferential.com/
152+# Copyright (C) 2012 Sven Nierlein, sven@consol.de
153+# All rights reserved.
154+#
155+# Use and distribution licensed under the BSD license. See
156+# the COPYING file in the parent directory for full text.
157+#
158+# All paths should be given relative to the root
159+#
160+
161+noinst_HEADERS+= libgearman-server/plugins/queue/retention/queue.h
162+
163+libgearman_server_libgearman_server_la_SOURCES+= libgearman-server/plugins/queue/retention/queue.cc
164
165=== added file 'libgearman-server/plugins/queue/retention/queue.cc'
166--- libgearman-server/plugins/queue/retention/queue.cc 1970-01-01 00:00:00 +0000
167+++ libgearman-server/plugins/queue/retention/queue.cc 2012-11-11 01:10:27 +0000
168@@ -0,0 +1,230 @@
169+/* vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
170+ *
171+ * Gearmand client and server library.
172+ *
173+ * Copyright (C) 2012 Data Differential, http://datadifferential.com/
174+ * Copyright (C) 2012 Sven Nierlein, sven@consol.de
175+ * All rights reserved.
176+ *
177+ */
178+
179+
180+/**
181+ * @file
182+ * @brief Retention Queue Storage Definitions
183+ */
184+
185+#include <gear_config.h>
186+#include <libgearman-server/common.h>
187+
188+#include <libgearman-server/plugins/queue/retention/queue.h>
189+#include <libgearman-server/plugins/queue/base.h>
190+
191+#include <stdio.h>
192+
193+namespace gearmand { namespace plugins { namespace queue { class Retention; }}}
194+
195+#pragma GCC diagnostic ignored "-Wold-style-cast"
196+
197+static gearmand_error_t _initialize(gearman_server_st& server, gearmand::plugins::queue::Retention *queue);
198+
199+namespace gearmand {
200+namespace plugins {
201+namespace queue {
202+
203+class Retention : public gearmand::plugins::Queue {
204+public:
205+ Retention();
206+ ~Retention();
207+ gearmand_error_t initialize();
208+ std::string retention_file;
209+
210+private:
211+};
212+
213+Retention::Retention() :
214+ Queue("retention")
215+{
216+ command_line_options().add_options()
217+ ("retention-file", boost::program_options::value(&retention_file)->default_value("/tmp/retention.dat"), "retention file to save jobs on shutdown");
218+}
219+
220+Retention::~Retention()
221+{
222+}
223+
224+gearmand_error_t Retention::initialize()
225+{
226+ return _initialize(Gearmand()->server, this);
227+}
228+
229+
230+void initialize_retention()
231+{
232+ static Retention local_instance;
233+}
234+
235+} // namespace queue
236+} // namespace plugins
237+} // namespace gearmand
238+
239+
240+
241+/**
242+ * @addtogroup gearman_queue_retention_static Static retention Queue Storage Definitions
243+ * @ingroup gearman_queue_retention
244+ * @{
245+ */
246+
247+/* Queue callback functions. */
248+static gearmand_error_t __add(gearman_server_st *server __attribute__((unused)),
249+ void *context __attribute__((unused)),
250+ const char *unique __attribute__((unused)), size_t unique_size __attribute__((unused)),
251+ const char *function_name __attribute__((unused)),
252+ size_t function_name_size __attribute__((unused)),
253+ const void *data __attribute__((unused)), size_t data_size __attribute__((unused)),
254+ gearman_job_priority_t priority __attribute__((unused)),
255+ int64_t when __attribute__((unused)))
256+{
257+ gearmand_debug(__func__);
258+ return GEARMAN_SUCCESS;
259+}
260+
261+
262+static gearmand_error_t __flush(gearman_server_st *server __attribute__((unused)),
263+ void *context __attribute__((unused)))
264+{
265+ gearmand_debug(__func__);
266+ return GEARMAN_SUCCESS;
267+}
268+
269+static gearmand_error_t __done(gearman_server_st *server __attribute__((unused)),
270+ void *context __attribute__((unused)),
271+ const char *unique __attribute__((unused)),
272+ size_t unique_size __attribute__((unused)),
273+ const char *function_name __attribute__((unused)),
274+ size_t function_name_size __attribute__((unused)))
275+{
276+ gearmand_debug(__func__);
277+ return GEARMAN_SUCCESS;
278+}
279+
280+
281+static gearmand_error_t __replay(gearman_server_st *server,
282+ void *context,
283+ gearman_queue_add_fn *add_fn,
284+ void *add_context)
285+{
286+ int read;
287+ int when;
288+ int size;
289+ char *data_line;
290+ char *data;
291+ char *function;
292+ char *unique;
293+ char *prio_s;
294+ FILE * fp;
295+ gearman_job_priority_t priority= (gearman_job_priority_t)0;
296+ gearmand_error_t ret = GEARMAN_SUCCESS;
297+ gearmand::plugins::queue::Retention *queue= (gearmand::plugins::queue::Retention *)context;
298+
299+ gearmand_debug(__func__);
300+ gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "reading retention file: %s", queue->retention_file.c_str());
301+
302+ fp = fopen(queue->retention_file.c_str(), "r");
303+ if(!fp)
304+ return GEARMAN_SUCCESS;
305+
306+ data_line = (char*)malloc(1024);
307+ if(data_line == NULL){
308+ gearmand_perror("malloc failed");
309+ return GEARMAN_MEMORY_ALLOCATION_FAILURE;
310+ }
311+ while(!feof(fp)) {
312+ if(fgets(data_line, 1024, fp) == NULL)
313+ break; // empty files
314+
315+ function = strsep( &data_line, ";" );
316+ unique = strsep( &data_line, ";" );
317+ prio_s = strsep( &data_line, ";" );
318+ priority = (gearman_job_priority_t) atoi(prio_s);
319+ when = atoi(strsep( &data_line, ";" ));
320+ size = atoi(strsep( &data_line, "\n" ));
321+ data = (char*)malloc(size+1);
322+ if (data == NULL){
323+ gearmand_perror("malloc failed");
324+ return GEARMAN_MEMORY_ALLOCATION_FAILURE;
325+ }
326+ read = fread(data, size, 1, fp);
327+ ret= (*add_fn)(server, add_context,
328+ unique, (size_t) strlen(unique),
329+ function, (size_t) strlen(function),
330+ data, size,
331+ priority,
332+ when);
333+ if (ret != GEARMAN_SUCCESS && ret != GEARMAN_JOB_EXISTS)
334+ return ret;
335+
336+ // complete rest of line till newline
337+ if(fgets(data_line, 1024, fp) == NULL)
338+ break; // no more jobs
339+ }
340+
341+ fclose(fp);
342+
343+ // cleanup retention file
344+ if(unlink(queue->retention_file.c_str()) == -1)
345+ perror(queue->retention_file.c_str());
346+
347+ return GEARMAN_SUCCESS;
348+}
349+
350+static gearmand_error_t __shutdown(gearman_server_st *server,
351+ void *context)
352+{
353+ FILE * fp;
354+ gearmand::plugins::queue::Retention *queue= (gearmand::plugins::queue::Retention *)context;
355+
356+ gearmand_debug(__func__);
357+ gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "writing retention file: %s for %i jobs", queue->retention_file.c_str(), server->job_count);
358+
359+ fp = fopen(queue->retention_file.c_str(), "w+");
360+ if(fp == NULL) {
361+ gearmand_perror(queue->retention_file.c_str());
362+ return GEARMAN_ERRNO;
363+ }
364+
365+ for (uint32_t key= 0; key < GEARMAND_JOB_HASH_SIZE; key++) {
366+ if(server->job_hash[key] != NULL) {
367+ gearman_server_job_st *server_job = server->job_hash[key];
368+ fprintf(fp, "%s;%s;%i;%" PRId64 ";%i\n%.*s\n",
369+ server_job->function->function_name,
370+ server_job->unique,
371+ (int)server_job->priority,
372+ server_job->when,
373+ server_job->data_size,
374+ server_job->data_size, (const char*)server_job->data);
375+ while (server_job->unique_next != NULL) {
376+ server_job = server_job->unique_next;
377+ fprintf(fp, "%s;%s;%i;%" PRId64 ";%i\n%.*s\n",
378+ server_job->function->function_name,
379+ server_job->unique,
380+ (int)server_job->priority,
381+ server_job->when,
382+ server_job->data_size,
383+ server_job->data_size, (const char*)server_job->data);
384+ }
385+ }
386+ }
387+
388+ fclose(fp);
389+
390+ return GEARMAN_SUCCESS;
391+}
392+
393+gearmand_error_t _initialize(gearman_server_st& server, gearmand::plugins::queue::Retention *queue)
394+{
395+ gearman_server_set_queue(server, queue, __add, __flush, __done, __replay, __shutdown);
396+
397+ return GEARMAN_SUCCESS;
398+}
399
400=== added file 'libgearman-server/plugins/queue/retention/queue.h'
401--- libgearman-server/plugins/queue/retention/queue.h 1970-01-01 00:00:00 +0000
402+++ libgearman-server/plugins/queue/retention/queue.h 2012-11-11 01:10:27 +0000
403@@ -0,0 +1,22 @@
404+/* vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
405+ *
406+ * Gearmand client and server library.
407+ *
408+ * Copyright (C) 2012 Data Differential, http://datadifferential.com/
409+ * Copyright (C) 2012 Sven Nierlein, sven@consol.de
410+ * All rights reserved.
411+ *
412+ */
413+
414+#pragma once
415+
416+
417+namespace gearmand {
418+namespace plugins {
419+namespace queue {
420+
421+void initialize_retention();
422+
423+} // namespace queue
424+} // namespace plugin
425+} // namespace gearmand
426
427=== modified file 'libgearman-server/queue.cc'
428--- libgearman-server/queue.cc 2012-10-28 20:04:35 +0000
429+++ libgearman-server/queue.cc 2012-11-11 01:10:27 +0000
430@@ -137,6 +137,29 @@
431 }
432
433 void gearman_server_set_queue(gearman_server_st& server,
434+ void *context,
435+ gearman_queue_add_fn *add,
436+ gearman_queue_flush_fn *flush,
437+ gearman_queue_done_fn *done,
438+ gearman_queue_replay_fn *replay,
439+ gearman_queue_shutdown_fn *shutdown)
440+{
441+ server.queue_version= QUEUE_VERSION_FUNCTION;
442+ server.queue.functions= new queue_st();
443+ if (server.queue.functions)
444+ {
445+ server.queue.functions->_context= context;
446+ server.queue.functions->_add_fn= add;
447+ server.queue.functions->_flush_fn= flush;
448+ server.queue.functions->_done_fn= done;
449+ server.queue.functions->_replay_fn= replay;
450+ server.queue.functions->_shutdown_fn= shutdown;
451+ }
452+ assert(server.queue.functions);
453+}
454+
455+
456+void gearman_server_set_queue(gearman_server_st& server,
457 gearmand::queue::Context* context)
458 {
459 assert(context);
460
461=== modified file 'libgearman-server/queue.hpp'
462--- libgearman-server/queue.hpp 2012-10-08 03:37:53 +0000
463+++ libgearman-server/queue.hpp 2012-11-11 01:10:27 +0000
464@@ -70,4 +70,12 @@
465 gearman_queue_replay_fn *replay);
466
467 void gearman_server_set_queue(gearman_server_st& server,
468+ void *context,
469+ gearman_queue_add_fn *add,
470+ gearman_queue_flush_fn *flush,
471+ gearman_queue_done_fn *done,
472+ gearman_queue_replay_fn *replay,
473+ gearman_queue_shutdown_fn *shutdown);
474+
475+void gearman_server_set_queue(gearman_server_st& server,
476 gearmand::queue::Context* context);
477
478=== modified file 'libgearman-server/server.cc'
479--- libgearman-server/server.cc 2012-10-28 20:04:35 +0000
480+++ libgearman-server/server.cc 2012-11-11 01:10:27 +0000
481@@ -1,5 +1,5 @@
482 /* vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
483- *
484+ *
485 * Gearmand client and server library.
486 *
487 * Copyright (C) 2011-2012 Data Differential, http://datadifferential.com/
488@@ -98,7 +98,7 @@
489 gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM,
490 "%15s:%5s packet command %s",
491 server_con->con.context == NULL || server_con->con.context->host == NULL ? "-" : server_con->con.context->host,
492- server_con->con.context == NULL || server_con->con.context->port == NULL ? "-" : server_con->con.context->port,
493+ server_con->con.context == NULL || server_con->con.context->port == NULL ? "-" : server_con->con.context->port,
494 gearmand_strcommand(packet));
495
496 switch (packet->command)
497@@ -242,7 +242,7 @@
498 {
499 return gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM, "strtoul(%ul)", when);
500 }
501- gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM,
502+ gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM,
503 "Received EPOCH job submission, function:%.*s unique:%.*s with data for %jd at %jd, args %d",
504 packet->arg_size[0], packet->arg[0],
505 packet->arg_size[1], packet->arg[1],
506@@ -576,8 +576,8 @@
507 }
508 else if (packet->command == GEARMAN_COMMAND_GRAB_JOB_UNIQ)
509 {
510- /*
511- We found a runnable job, queue job assigned packet and take the job off the queue.
512+ /*
513+ We found a runnable job, queue job assigned packet and take the job off the queue.
514 */
515 ret= gearman_server_io_packet_add(server_con, false,
516 GEARMAN_MAGIC_RESPONSE,
517@@ -596,8 +596,8 @@
518 strlen(server_job->reducer), server_job->reducer, strlen(server_job->reducer),
519 server_job->unique_length, server_job->unique, server_job->unique_length,
520 (unsigned long)server_job->data_size);
521- /*
522- We found a runnable job, queue job assigned packet and take the job off the queue.
523+ /*
524+ We found a runnable job, queue job assigned packet and take the job off the queue.
525 */
526 ret= gearman_server_io_packet_add(server_con, false,
527 GEARMAN_MAGIC_RESPONSE,
528@@ -611,8 +611,8 @@
529 }
530 else if (packet->command == GEARMAN_COMMAND_GRAB_JOB_ALL)
531 {
532- /*
533- We found a runnable job, queue job assigned packet and take the job off the queue.
534+ /*
535+ We found a runnable job, queue job assigned packet and take the job off the queue.
536 */
537 ret= gearman_server_io_packet_add(server_con, false,
538 GEARMAN_MAGIC_RESPONSE,
539@@ -922,6 +922,26 @@
540 return ret;
541 }
542
543+static gearmand_error_t gearman_queue_shutdown(gearman_server_st& server)
544+{
545+ if(server.queue.functions->_shutdown_fn == NULL)
546+ return GEARMAN_SUCCESS;
547+ if (server.queue_version == QUEUE_VERSION_FUNCTION)
548+ {
549+ return (*(server.queue.functions->_shutdown_fn))(&server,
550+ (void *)server.queue.functions->_context);
551+ }
552+
553+ assert(server.queue.object);
554+ return server.queue.object->shutdown(&server);
555+}
556+
557+gearmand_error_t gearman_server_queue_shutdown(gearman_server_st& server)
558+{
559+ gearmand_error_t ret= gearman_queue_shutdown(server);
560+ return ret;
561+}
562+
563 void *gearman_server_queue_context(const gearman_server_st *server)
564 {
565 if (server->queue_version == QUEUE_VERSION_FUNCTION)
566
567=== modified file 'libgearman-server/server.h'
568--- libgearman-server/server.h 2012-09-28 09:55:33 +0000
569+++ libgearman-server/server.h 2012-11-11 01:10:27 +0000
570@@ -90,6 +90,18 @@
571 #endif
572
573 /**
574+ * shutdown the persistent queue to save all remaining jobs.
575+ * should only be run at shutdown.
576+ * @param server Server structure previously initialized with
577+ * gearman_server_create.
578+ * @return Standard gearman return value.
579+ */
580+#ifdef __cplusplus
581+GEARMAN_API
582+gearmand_error_t gearman_server_queue_shutdown(gearman_server_st& server);
583+#endif
584+
585+/**
586 * Get persistent queue context.
587 */
588 GEARMAN_API
589
590=== modified file 'libgearman-server/struct/server.h'
591--- libgearman-server/struct/server.h 2012-08-11 01:53:36 +0000
592+++ libgearman-server/struct/server.h 2012-11-11 01:10:27 +0000
593@@ -43,6 +43,7 @@
594 gearman_queue_flush_fn *_flush_fn;
595 gearman_queue_done_fn *_done_fn;
596 gearman_queue_replay_fn *_replay_fn;
597+ gearman_queue_shutdown_fn *_shutdown_fn;
598
599 #ifdef __cplusplus
600 queue_st() :
601@@ -50,7 +51,8 @@
602 _add_fn(NULL),
603 _flush_fn(NULL),
604 _done_fn(NULL),
605- _replay_fn(NULL)
606+ _replay_fn(NULL),
607+ _shutdown_fn(NULL)
608 {
609 }
610 #endif

Subscribers

People subscribed via source and target branches

to all changes: