Merge lp:~sven-nierlein/gearmand/gearmand into lp:gearmand

Proposed by Sven Nierlein
Status: Merged
Merged at revision: 669
Proposed branch: lp:~sven-nierlein/gearmand/gearmand
Merge into: lp:gearmand
Diff against target: 610 lines (+363/-18)
14 files modified
libgearman-server/constants.h (+2/-0)
libgearman-server/gearmand.cc (+12/-8)
libgearman-server/plugins.cc (+2/-0)
libgearman-server/plugins/base.h (+2/-0)
libgearman-server/plugins/queue.h (+2/-0)
libgearman-server/plugins/queue/include.am (+1/-0)
libgearman-server/plugins/queue/retention/include.am (+15/-0)
libgearman-server/plugins/queue/retention/queue.cc (+230/-0)
libgearman-server/plugins/queue/retention/queue.h (+22/-0)
libgearman-server/queue.cc (+23/-0)
libgearman-server/queue.hpp (+8/-0)
libgearman-server/server.cc (+29/-9)
libgearman-server/server.h (+12/-0)
libgearman-server/struct/server.h (+3/-1)
To merge this branch: bzr merge lp:~sven-nierlein/gearmand/gearmand
Reviewer Review Type Date Requested Status
Tangent Trunk Pending
Review via email: mp+133809@code.launchpad.net

Description of the change

added retention queue plugin

The retention queue plugin will save the queue to disk on shutdown. Therefor
it does not slow down the gearmand during runtime and still preserves the
queue during restarts.

To post a comment you must log in.
Revision history for this message
Brian Aker (brianaker) wrote :

Sorry, just looking at this now.

Revision history for this message
Sven Nierlein (sven-nierlein) wrote :

so what do you think? I noticed the status changed to "merged" but i cannot find these changes in gearmand yet.

Revision history for this message
Brian Aker (brianaker) wrote :

Hi,

What I did to test this was to extend the sqlite plugin to record on shutdown to see how this works out. In testing,... so far so good.

Look in lp:gearmand

Cheers,
 -Bian

On Nov 25, 2012, at 1:46 AM, Sven Nierlein <email address hidden> wrote:

> so what do you think? I noticed the status changed to "merged" but i cannot find these changes in gearmand yet.
> --
> https://code.launchpad.net/~sven-nierlein/gearmand/gearmand/+merge/133809
> Your team Tangent is subscribed to branch lp:gearmand.

Revision history for this message
Sven Nierlein (sven-nierlein) wrote :

I can't get this working:
when i start gearmand with either this:
-q libsqlite3 --libsqlite3-db=/tmp/gearmand.sqlite --libsqlite3-store-on-shutdown
then it stores all jobs in sqlite3, not only on shutdown. This is exactly what i wanted
to avoid.

When i start gearmand with:
--libsqlite3-db=/tmp/gearmand.sqlite --libsqlite3-store-on-shutdown
only, then nothing is stored, not even on shutdown.

Besides that, this adds a dependency to sqlite3 which wasn't necessary with my merge proposal.

Revision history for this message
Brian Aker (brianaker) wrote :

Hi,

On Nov 25, 2012, at 3:28 AM, Sven Nierlein <email address hidden> wrote:

> I can't get this working:
> when i start gearmand with either this:
> -q libsqlite3 --libsqlite3-db=/tmp/gearmand.sqlite --libsqlite3-store-on-shutdown
> then it stores all jobs in sqlite3, not only on shutdown. This is exactly what i wanted
> to avoid.

Let me see what is happening with it then. The test case should be covering this problem.

> Besides that, this adds a dependency to sqlite3 which wasn't necessary with my merge proposal.

The creation of a new queue plugin in separate from adding the ability for a queue to store on shutdown.

Which brings up the question of what do you want to achieve with a new queue type?

Cheers,
 -Brian

Revision history for this message
Sven Nierlein (sven-nierlein) wrote :

The intention of the new queue type is to create a queue plugin, which adds no overhead
during runtime and just stores the queue to disk on shutdown.

I am "sni" in the freenode #gearman channel if you want to discuss details.

Revision history for this message
Brian Aker (brianaker) wrote :

Hi!

https://bugs.launchpad.net/gearmand/+bug/1087654

I've created a bug to track this, and I have linked a tree that provides a test case.

The test case creates eight jobs and then checks to before shutting down the server that sqlite is empty. The test then shuts down the server and checks to see if they jobs are there. It then starts up the server and verifies that the jobs are processed.

Cheers,
 -Brian

On Nov 25, 2012, at 11:40 PM, Brian Aker <email address hidden> wrote:

> Hi,
>
> On Nov 25, 2012, at 3:28 AM, Sven Nierlein <email address hidden> wrote:
>
>> I can't get this working:
>> when i start gearmand with either this:
>> -q libsqlite3 --libsqlite3-db=/tmp/gearmand.sqlite --libsqlite3-store-on-shutdown
>> then it stores all jobs in sqlite3, not only on shutdown. This is exactly what i wanted
>> to avoid.
>
> Let me see what is happening with it then. The test case should be covering this problem.
>
>> Besides that, this adds a dependency to sqlite3 which wasn't necessary with my merge proposal.
>
> The creation of a new queue plugin in separate from adding the ability for a queue to store on shutdown.
>
> Which brings up the question of what do you want to achieve with a new queue type?
>
> Cheers,
> -Brian
>
>
> --
> https://code.launchpad.net/~sven-nierlein/gearmand/gearmand/+merge/133809
> Your team Tangent is subscribed to branch lp:gearmand.

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
=== modified file 'libgearman-server/constants.h'
--- libgearman-server/constants.h 2012-08-18 06:26:36 +0000
+++ libgearman-server/constants.h 2012-11-11 01:10:27 +0000
@@ -155,6 +155,8 @@
155 void *context,155 void *context,
156 gearman_queue_add_fn *add_fn,156 gearman_queue_add_fn *add_fn,
157 void *add_context);157 void *add_context);
158typedef gearmand_error_t (gearman_queue_shutdown_fn)(gearman_server_st *server,
159 void *context);
158160
159typedef gearmand_error_t (gearmand_connection_add_fn)(gearman_server_con_st *con);161typedef gearmand_error_t (gearmand_connection_add_fn)(gearman_server_con_st *con);
160162
161163
=== modified file 'libgearman-server/gearmand.cc'
--- libgearman-server/gearmand.cc 2012-10-29 07:06:01 +0000
+++ libgearman-server/gearmand.cc 2012-11-11 01:10:27 +0000
@@ -1,5 +1,5 @@
1/* vim:expandtab:shiftwidth=2:tabstop=2:smarttab:1/* vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
2 * 2 *
3 * Gearmand client and server library.3 * Gearmand client and server library.
4 *4 *
5 * Copyright (C) 2011-2012 Data Differential, http://datadifferential.com/5 * Copyright (C) 2011-2012 Data Differential, http://datadifferential.com/
@@ -100,6 +100,10 @@
100 /* All threads should be cleaned up before calling this. */100 /* All threads should be cleaned up before calling this. */
101 assert(server.thread_list == NULL);101 assert(server.thread_list == NULL);
102102
103 gearmand_debug("shutdown queue: begin");
104 gearman_server_queue_shutdown(server);
105 gearmand_debug("shutdown queue: end");
106
103 for (uint32_t key= 0; key < GEARMAND_JOB_HASH_SIZE; key++)107 for (uint32_t key= 0; key < GEARMAND_JOB_HASH_SIZE; key++)
104 {108 {
105 while (server.job_hash[key] != NULL)109 while (server.job_hash[key] != NULL)
@@ -335,7 +339,7 @@
335 /* Initialize server components. */339 /* Initialize server components. */
336 if (gearmand->base == NULL)340 if (gearmand->base == NULL)
337 {341 {
338 gearmand_log_info(GEARMAN_DEFAULT_LOG_PARAM, "Starting up, verbose set to %s", 342 gearmand_log_info(GEARMAN_DEFAULT_LOG_PARAM, "Starting up, verbose set to %s",
339 gearmand_verbose_name(gearmand->verbose));343 gearmand_verbose_name(gearmand->verbose));
340344
341 if (gearmand->threads > 0)345 if (gearmand->threads > 0)
@@ -425,7 +429,7 @@
425 }429 }
426 else430 else
427 {431 {
428 gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM, 432 gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM,
429 "gearmand_wakeup() incorrectly wrote %lu bytes of data.", (unsigned long)written);433 "gearmand_wakeup() incorrectly wrote %lu bytes of data.", (unsigned long)written);
430 }434 }
431 }435 }
@@ -587,7 +591,7 @@
587 }591 }
588592
589 // We are in single user threads, so strerror() is fine.593 // We are in single user threads, so strerror() is fine.
590 gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "Retrying bind(%s) on %s:%s %u >= %u", 594 gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "Retrying bind(%s) on %s:%s %u >= %u",
591 strerror(errno), host, port->port,595 strerror(errno), host, port->port,
592 waited, bind_timeout);596 waited, bind_timeout);
593 this_wait= retry * retry / 3 + 1;597 this_wait= retry * retry / 3 + 1;
@@ -722,7 +726,7 @@
722 {726 {
723 for (uint32_t y= 0; y < gearmand->port_list[x].listen_count; y++)727 for (uint32_t y= 0; y < gearmand->port_list[x].listen_count; y++)
724 {728 {
725 gearmand_log_info(GEARMAN_DEFAULT_LOG_PARAM, 729 gearmand_log_info(GEARMAN_DEFAULT_LOG_PARAM,
726 "Clearing event for listening socket (%d)",730 "Clearing event for listening socket (%d)",
727 gearmand->port_list[x].listen_fd[y]);731 gearmand->port_list[x].listen_fd[y]);
728732
@@ -768,7 +772,7 @@
768 return;772 return;
769 }773 }
770774
771 /* 775 /*
772 Since this is numeric, it should never fail. Even if it did we don't want to really error from it.776 Since this is numeric, it should never fail. Even if it did we don't want to really error from it.
773 */777 */
774 char host[NI_MAXHOST];778 char host[NI_MAXHOST];
@@ -922,7 +926,7 @@
922 gearmand_debug("Received SHUTDOWN_GRACEFUL wakeup event");926 gearmand_debug("Received SHUTDOWN_GRACEFUL wakeup event");
923 _listen_close(gearmand);927 _listen_close(gearmand);
924928
925 for (gearmand_thread_st* thread= gearmand->thread_list; 929 for (gearmand_thread_st* thread= gearmand->thread_list;
926 thread != NULL;930 thread != NULL;
927 thread= thread->next)931 thread= thread->next)
928 {932 {
@@ -1081,7 +1085,7 @@
1081 return success;1085 return success;
1082}1086}
10831087
1084static bool gearman_server_create(gearman_server_st& server, 1088static bool gearman_server_create(gearman_server_st& server,
1085 uint8_t job_retries_arg,1089 uint8_t job_retries_arg,
1086 uint8_t worker_wakeup_arg,1090 uint8_t worker_wakeup_arg,
1087 bool round_robin_arg)1091 bool round_robin_arg)
10881092
=== modified file 'libgearman-server/plugins.cc'
--- libgearman-server/plugins.cc 2012-10-28 20:04:35 +0000
+++ libgearman-server/plugins.cc 2012-11-11 01:10:27 +0000
@@ -50,6 +50,8 @@
50{50{
51 queue::initialize_default();51 queue::initialize_default();
5252
53 queue::initialize_retention();
54
53 if (HAVE_LIBDRIZZLE)55 if (HAVE_LIBDRIZZLE)
54 {56 {
55 queue::initialize_drizzle();57 queue::initialize_drizzle();
5658
=== modified file 'libgearman-server/plugins/base.h'
--- libgearman-server/plugins/base.h 2012-10-08 03:37:53 +0000
+++ libgearman-server/plugins/base.h 2012-11-11 01:10:27 +0000
@@ -96,6 +96,8 @@
9696
97 virtual gearmand_error_t replay(gearman_server_st *server)= 0;97 virtual gearmand_error_t replay(gearman_server_st *server)= 0;
9898
99 virtual gearmand_error_t shutdown(gearman_server_st *server)= 0;
100
99 static gearmand_error_t replay_add(gearman_server_st *server,101 static gearmand_error_t replay_add(gearman_server_st *server,
100 void *context __attribute__ ((unused)),102 void *context __attribute__ ((unused)),
101 const char *unique, size_t unique_size,103 const char *unique, size_t unique_size,
102104
=== modified file 'libgearman-server/plugins/queue.h'
--- libgearman-server/plugins/queue.h 2012-06-12 18:02:10 +0000
+++ libgearman-server/plugins/queue.h 2012-11-11 01:10:27 +0000
@@ -52,3 +52,5 @@
52#include <libgearman-server/plugins/queue/redis/queue.h>52#include <libgearman-server/plugins/queue/redis/queue.h>
5353
54#include <libgearman-server/plugins/queue/mysql/queue.h>54#include <libgearman-server/plugins/queue/mysql/queue.h>
55
56#include <libgearman-server/plugins/queue/retention/queue.h>
5557
=== modified file 'libgearman-server/plugins/queue/include.am'
--- libgearman-server/plugins/queue/include.am 2012-08-11 01:50:29 +0000
+++ libgearman-server/plugins/queue/include.am 2012-11-11 01:10:27 +0000
@@ -21,3 +21,4 @@
21include libgearman-server/plugins/queue/sqlite/include.am21include libgearman-server/plugins/queue/sqlite/include.am
22include libgearman-server/plugins/queue/tokyocabinet/include.am22include libgearman-server/plugins/queue/tokyocabinet/include.am
23include libgearman-server/plugins/queue/mysql/include.am23include libgearman-server/plugins/queue/mysql/include.am
24include libgearman-server/plugins/queue/retention/include.am
2425
=== added directory 'libgearman-server/plugins/queue/retention'
=== added file 'libgearman-server/plugins/queue/retention/include.am'
--- libgearman-server/plugins/queue/retention/include.am 1970-01-01 00:00:00 +0000
+++ libgearman-server/plugins/queue/retention/include.am 2012-11-11 01:10:27 +0000
@@ -0,0 +1,15 @@
1# vim:ft=automake
2# Gearman
3# Copyright (C) 2012 Data Differential, http://datadifferential.com/
4# Copyright (C) 2012 Sven Nierlein, sven@consol.de
5# All rights reserved.
6#
7# Use and distribution licensed under the BSD license. See
8# the COPYING file in the parent directory for full text.
9#
10# All paths should be given relative to the root
11#
12
13noinst_HEADERS+= libgearman-server/plugins/queue/retention/queue.h
14
15libgearman_server_libgearman_server_la_SOURCES+= libgearman-server/plugins/queue/retention/queue.cc
016
=== added file 'libgearman-server/plugins/queue/retention/queue.cc'
--- libgearman-server/plugins/queue/retention/queue.cc 1970-01-01 00:00:00 +0000
+++ libgearman-server/plugins/queue/retention/queue.cc 2012-11-11 01:10:27 +0000
@@ -0,0 +1,230 @@
1/* vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
2 *
3 * Gearmand client and server library.
4 *
5 * Copyright (C) 2012 Data Differential, http://datadifferential.com/
6 * Copyright (C) 2012 Sven Nierlein, sven@consol.de
7 * All rights reserved.
8 *
9 */
10
11
12/**
13 * @file
14 * @brief Retention Queue Storage Definitions
15 */
16
17#include <gear_config.h>
18#include <libgearman-server/common.h>
19
20#include <libgearman-server/plugins/queue/retention/queue.h>
21#include <libgearman-server/plugins/queue/base.h>
22
23#include <stdio.h>
24
25namespace gearmand { namespace plugins { namespace queue { class Retention; }}}
26
27#pragma GCC diagnostic ignored "-Wold-style-cast"
28
29static gearmand_error_t _initialize(gearman_server_st& server, gearmand::plugins::queue::Retention *queue);
30
31namespace gearmand {
32namespace plugins {
33namespace queue {
34
35class Retention : public gearmand::plugins::Queue {
36public:
37 Retention();
38 ~Retention();
39 gearmand_error_t initialize();
40 std::string retention_file;
41
42private:
43};
44
45Retention::Retention() :
46 Queue("retention")
47{
48 command_line_options().add_options()
49 ("retention-file", boost::program_options::value(&retention_file)->default_value("/tmp/retention.dat"), "retention file to save jobs on shutdown");
50}
51
52Retention::~Retention()
53{
54}
55
56gearmand_error_t Retention::initialize()
57{
58 return _initialize(Gearmand()->server, this);
59}
60
61
62void initialize_retention()
63{
64 static Retention local_instance;
65}
66
67} // namespace queue
68} // namespace plugins
69} // namespace gearmand
70
71
72
73/**
74 * @addtogroup gearman_queue_retention_static Static retention Queue Storage Definitions
75 * @ingroup gearman_queue_retention
76 * @{
77 */
78
79/* Queue callback functions. */
80static gearmand_error_t __add(gearman_server_st *server __attribute__((unused)),
81 void *context __attribute__((unused)),
82 const char *unique __attribute__((unused)), size_t unique_size __attribute__((unused)),
83 const char *function_name __attribute__((unused)),
84 size_t function_name_size __attribute__((unused)),
85 const void *data __attribute__((unused)), size_t data_size __attribute__((unused)),
86 gearman_job_priority_t priority __attribute__((unused)),
87 int64_t when __attribute__((unused)))
88{
89 gearmand_debug(__func__);
90 return GEARMAN_SUCCESS;
91}
92
93
94static gearmand_error_t __flush(gearman_server_st *server __attribute__((unused)),
95 void *context __attribute__((unused)))
96{
97 gearmand_debug(__func__);
98 return GEARMAN_SUCCESS;
99}
100
101static gearmand_error_t __done(gearman_server_st *server __attribute__((unused)),
102 void *context __attribute__((unused)),
103 const char *unique __attribute__((unused)),
104 size_t unique_size __attribute__((unused)),
105 const char *function_name __attribute__((unused)),
106 size_t function_name_size __attribute__((unused)))
107{
108 gearmand_debug(__func__);
109 return GEARMAN_SUCCESS;
110}
111
112
113static gearmand_error_t __replay(gearman_server_st *server,
114 void *context,
115 gearman_queue_add_fn *add_fn,
116 void *add_context)
117{
118 int read;
119 int when;
120 int size;
121 char *data_line;
122 char *data;
123 char *function;
124 char *unique;
125 char *prio_s;
126 FILE * fp;
127 gearman_job_priority_t priority= (gearman_job_priority_t)0;
128 gearmand_error_t ret = GEARMAN_SUCCESS;
129 gearmand::plugins::queue::Retention *queue= (gearmand::plugins::queue::Retention *)context;
130
131 gearmand_debug(__func__);
132 gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "reading retention file: %s", queue->retention_file.c_str());
133
134 fp = fopen(queue->retention_file.c_str(), "r");
135 if(!fp)
136 return GEARMAN_SUCCESS;
137
138 data_line = (char*)malloc(1024);
139 if(data_line == NULL){
140 gearmand_perror("malloc failed");
141 return GEARMAN_MEMORY_ALLOCATION_FAILURE;
142 }
143 while(!feof(fp)) {
144 if(fgets(data_line, 1024, fp) == NULL)
145 break; // empty files
146
147 function = strsep( &data_line, ";" );
148 unique = strsep( &data_line, ";" );
149 prio_s = strsep( &data_line, ";" );
150 priority = (gearman_job_priority_t) atoi(prio_s);
151 when = atoi(strsep( &data_line, ";" ));
152 size = atoi(strsep( &data_line, "\n" ));
153 data = (char*)malloc(size+1);
154 if (data == NULL){
155 gearmand_perror("malloc failed");
156 return GEARMAN_MEMORY_ALLOCATION_FAILURE;
157 }
158 read = fread(data, size, 1, fp);
159 ret= (*add_fn)(server, add_context,
160 unique, (size_t) strlen(unique),
161 function, (size_t) strlen(function),
162 data, size,
163 priority,
164 when);
165 if (ret != GEARMAN_SUCCESS && ret != GEARMAN_JOB_EXISTS)
166 return ret;
167
168 // complete rest of line till newline
169 if(fgets(data_line, 1024, fp) == NULL)
170 break; // no more jobs
171 }
172
173 fclose(fp);
174
175 // cleanup retention file
176 if(unlink(queue->retention_file.c_str()) == -1)
177 perror(queue->retention_file.c_str());
178
179 return GEARMAN_SUCCESS;
180}
181
182static gearmand_error_t __shutdown(gearman_server_st *server,
183 void *context)
184{
185 FILE * fp;
186 gearmand::plugins::queue::Retention *queue= (gearmand::plugins::queue::Retention *)context;
187
188 gearmand_debug(__func__);
189 gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "writing retention file: %s for %i jobs", queue->retention_file.c_str(), server->job_count);
190
191 fp = fopen(queue->retention_file.c_str(), "w+");
192 if(fp == NULL) {
193 gearmand_perror(queue->retention_file.c_str());
194 return GEARMAN_ERRNO;
195 }
196
197 for (uint32_t key= 0; key < GEARMAND_JOB_HASH_SIZE; key++) {
198 if(server->job_hash[key] != NULL) {
199 gearman_server_job_st *server_job = server->job_hash[key];
200 fprintf(fp, "%s;%s;%i;%" PRId64 ";%i\n%.*s\n",
201 server_job->function->function_name,
202 server_job->unique,
203 (int)server_job->priority,
204 server_job->when,
205 server_job->data_size,
206 server_job->data_size, (const char*)server_job->data);
207 while (server_job->unique_next != NULL) {
208 server_job = server_job->unique_next;
209 fprintf(fp, "%s;%s;%i;%" PRId64 ";%i\n%.*s\n",
210 server_job->function->function_name,
211 server_job->unique,
212 (int)server_job->priority,
213 server_job->when,
214 server_job->data_size,
215 server_job->data_size, (const char*)server_job->data);
216 }
217 }
218 }
219
220 fclose(fp);
221
222 return GEARMAN_SUCCESS;
223}
224
225gearmand_error_t _initialize(gearman_server_st& server, gearmand::plugins::queue::Retention *queue)
226{
227 gearman_server_set_queue(server, queue, __add, __flush, __done, __replay, __shutdown);
228
229 return GEARMAN_SUCCESS;
230}
0231
=== added file 'libgearman-server/plugins/queue/retention/queue.h'
--- libgearman-server/plugins/queue/retention/queue.h 1970-01-01 00:00:00 +0000
+++ libgearman-server/plugins/queue/retention/queue.h 2012-11-11 01:10:27 +0000
@@ -0,0 +1,22 @@
1/* vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
2 *
3 * Gearmand client and server library.
4 *
5 * Copyright (C) 2012 Data Differential, http://datadifferential.com/
6 * Copyright (C) 2012 Sven Nierlein, sven@consol.de
7 * All rights reserved.
8 *
9 */
10
11#pragma once
12
13
14namespace gearmand {
15namespace plugins {
16namespace queue {
17
18void initialize_retention();
19
20} // namespace queue
21} // namespace plugin
22} // namespace gearmand
023
=== modified file 'libgearman-server/queue.cc'
--- libgearman-server/queue.cc 2012-10-28 20:04:35 +0000
+++ libgearman-server/queue.cc 2012-11-11 01:10:27 +0000
@@ -137,6 +137,29 @@
137}137}
138138
139void gearman_server_set_queue(gearman_server_st& server,139void gearman_server_set_queue(gearman_server_st& server,
140 void *context,
141 gearman_queue_add_fn *add,
142 gearman_queue_flush_fn *flush,
143 gearman_queue_done_fn *done,
144 gearman_queue_replay_fn *replay,
145 gearman_queue_shutdown_fn *shutdown)
146{
147 server.queue_version= QUEUE_VERSION_FUNCTION;
148 server.queue.functions= new queue_st();
149 if (server.queue.functions)
150 {
151 server.queue.functions->_context= context;
152 server.queue.functions->_add_fn= add;
153 server.queue.functions->_flush_fn= flush;
154 server.queue.functions->_done_fn= done;
155 server.queue.functions->_replay_fn= replay;
156 server.queue.functions->_shutdown_fn= shutdown;
157 }
158 assert(server.queue.functions);
159}
160
161
162void gearman_server_set_queue(gearman_server_st& server,
140 gearmand::queue::Context* context)163 gearmand::queue::Context* context)
141{164{
142 assert(context);165 assert(context);
143166
=== modified file 'libgearman-server/queue.hpp'
--- libgearman-server/queue.hpp 2012-10-08 03:37:53 +0000
+++ libgearman-server/queue.hpp 2012-11-11 01:10:27 +0000
@@ -70,4 +70,12 @@
70 gearman_queue_replay_fn *replay);70 gearman_queue_replay_fn *replay);
7171
72void gearman_server_set_queue(gearman_server_st& server,72void gearman_server_set_queue(gearman_server_st& server,
73 void *context,
74 gearman_queue_add_fn *add,
75 gearman_queue_flush_fn *flush,
76 gearman_queue_done_fn *done,
77 gearman_queue_replay_fn *replay,
78 gearman_queue_shutdown_fn *shutdown);
79
80void gearman_server_set_queue(gearman_server_st& server,
73 gearmand::queue::Context* context);81 gearmand::queue::Context* context);
7482
=== modified file 'libgearman-server/server.cc'
--- libgearman-server/server.cc 2012-10-28 20:04:35 +0000
+++ libgearman-server/server.cc 2012-11-11 01:10:27 +0000
@@ -1,5 +1,5 @@
1/* vim:expandtab:shiftwidth=2:tabstop=2:smarttab:1/* vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
2 * 2 *
3 * Gearmand client and server library.3 * Gearmand client and server library.
4 *4 *
5 * Copyright (C) 2011-2012 Data Differential, http://datadifferential.com/5 * Copyright (C) 2011-2012 Data Differential, http://datadifferential.com/
@@ -98,7 +98,7 @@
98 gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM,98 gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM,
99 "%15s:%5s packet command %s",99 "%15s:%5s packet command %s",
100 server_con->con.context == NULL || server_con->con.context->host == NULL ? "-" : server_con->con.context->host,100 server_con->con.context == NULL || server_con->con.context->host == NULL ? "-" : server_con->con.context->host,
101 server_con->con.context == NULL || server_con->con.context->port == NULL ? "-" : server_con->con.context->port, 101 server_con->con.context == NULL || server_con->con.context->port == NULL ? "-" : server_con->con.context->port,
102 gearmand_strcommand(packet));102 gearmand_strcommand(packet));
103103
104 switch (packet->command)104 switch (packet->command)
@@ -242,7 +242,7 @@
242 {242 {
243 return gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM, "strtoul(%ul)", when);243 return gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM, "strtoul(%ul)", when);
244 }244 }
245 gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, 245 gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM,
246 "Received EPOCH job submission, function:%.*s unique:%.*s with data for %jd at %jd, args %d",246 "Received EPOCH job submission, function:%.*s unique:%.*s with data for %jd at %jd, args %d",
247 packet->arg_size[0], packet->arg[0],247 packet->arg_size[0], packet->arg[0],
248 packet->arg_size[1], packet->arg[1],248 packet->arg_size[1], packet->arg[1],
@@ -576,8 +576,8 @@
576 }576 }
577 else if (packet->command == GEARMAN_COMMAND_GRAB_JOB_UNIQ)577 else if (packet->command == GEARMAN_COMMAND_GRAB_JOB_UNIQ)
578 {578 {
579 /* 579 /*
580 We found a runnable job, queue job assigned packet and take the job off the queue. 580 We found a runnable job, queue job assigned packet and take the job off the queue.
581 */581 */
582 ret= gearman_server_io_packet_add(server_con, false,582 ret= gearman_server_io_packet_add(server_con, false,
583 GEARMAN_MAGIC_RESPONSE,583 GEARMAN_MAGIC_RESPONSE,
@@ -596,8 +596,8 @@
596 strlen(server_job->reducer), server_job->reducer, strlen(server_job->reducer),596 strlen(server_job->reducer), server_job->reducer, strlen(server_job->reducer),
597 server_job->unique_length, server_job->unique, server_job->unique_length,597 server_job->unique_length, server_job->unique, server_job->unique_length,
598 (unsigned long)server_job->data_size);598 (unsigned long)server_job->data_size);
599 /* 599 /*
600 We found a runnable job, queue job assigned packet and take the job off the queue. 600 We found a runnable job, queue job assigned packet and take the job off the queue.
601 */601 */
602 ret= gearman_server_io_packet_add(server_con, false,602 ret= gearman_server_io_packet_add(server_con, false,
603 GEARMAN_MAGIC_RESPONSE,603 GEARMAN_MAGIC_RESPONSE,
@@ -611,8 +611,8 @@
611 }611 }
612 else if (packet->command == GEARMAN_COMMAND_GRAB_JOB_ALL)612 else if (packet->command == GEARMAN_COMMAND_GRAB_JOB_ALL)
613 {613 {
614 /* 614 /*
615 We found a runnable job, queue job assigned packet and take the job off the queue. 615 We found a runnable job, queue job assigned packet and take the job off the queue.
616 */616 */
617 ret= gearman_server_io_packet_add(server_con, false,617 ret= gearman_server_io_packet_add(server_con, false,
618 GEARMAN_MAGIC_RESPONSE,618 GEARMAN_MAGIC_RESPONSE,
@@ -922,6 +922,26 @@
922 return ret;922 return ret;
923}923}
924924
925static gearmand_error_t gearman_queue_shutdown(gearman_server_st& server)
926{
927 if(server.queue.functions->_shutdown_fn == NULL)
928 return GEARMAN_SUCCESS;
929 if (server.queue_version == QUEUE_VERSION_FUNCTION)
930 {
931 return (*(server.queue.functions->_shutdown_fn))(&server,
932 (void *)server.queue.functions->_context);
933 }
934
935 assert(server.queue.object);
936 return server.queue.object->shutdown(&server);
937}
938
939gearmand_error_t gearman_server_queue_shutdown(gearman_server_st& server)
940{
941 gearmand_error_t ret= gearman_queue_shutdown(server);
942 return ret;
943}
944
925void *gearman_server_queue_context(const gearman_server_st *server)945void *gearman_server_queue_context(const gearman_server_st *server)
926{946{
927 if (server->queue_version == QUEUE_VERSION_FUNCTION)947 if (server->queue_version == QUEUE_VERSION_FUNCTION)
928948
=== modified file 'libgearman-server/server.h'
--- libgearman-server/server.h 2012-09-28 09:55:33 +0000
+++ libgearman-server/server.h 2012-11-11 01:10:27 +0000
@@ -90,6 +90,18 @@
90#endif90#endif
9191
92/**92/**
93 * shutdown the persistent queue to save all remaining jobs.
94 * should only be run at shutdown.
95 * @param server Server structure previously initialized with
96 * gearman_server_create.
97 * @return Standard gearman return value.
98 */
99#ifdef __cplusplus
100GEARMAN_API
101gearmand_error_t gearman_server_queue_shutdown(gearman_server_st& server);
102#endif
103
104/**
93 * Get persistent queue context.105 * Get persistent queue context.
94 */106 */
95GEARMAN_API107GEARMAN_API
96108
=== modified file 'libgearman-server/struct/server.h'
--- libgearman-server/struct/server.h 2012-08-11 01:53:36 +0000
+++ libgearman-server/struct/server.h 2012-11-11 01:10:27 +0000
@@ -43,6 +43,7 @@
43 gearman_queue_flush_fn *_flush_fn;43 gearman_queue_flush_fn *_flush_fn;
44 gearman_queue_done_fn *_done_fn;44 gearman_queue_done_fn *_done_fn;
45 gearman_queue_replay_fn *_replay_fn;45 gearman_queue_replay_fn *_replay_fn;
46 gearman_queue_shutdown_fn *_shutdown_fn;
4647
47#ifdef __cplusplus48#ifdef __cplusplus
48 queue_st() :49 queue_st() :
@@ -50,7 +51,8 @@
50 _add_fn(NULL),51 _add_fn(NULL),
51 _flush_fn(NULL),52 _flush_fn(NULL),
52 _done_fn(NULL),53 _done_fn(NULL),
53 _replay_fn(NULL)54 _replay_fn(NULL),
55 _shutdown_fn(NULL)
54 {56 {
55 }57 }
56#endif58#endif

Subscribers

People subscribed via source and target branches

to all changes: