Merge lp:~sven-nierlein/gearmand/gearmand into lp:gearmand
- gearmand
- Merge into 1.2
Status: | Merged |
---|---|
Merged at revision: | 669 |
Proposed branch: | lp:~sven-nierlein/gearmand/gearmand |
Merge into: | lp:gearmand |
Diff against target: |
610 lines (+363/-18) 14 files modified
libgearman-server/constants.h (+2/-0) libgearman-server/gearmand.cc (+12/-8) libgearman-server/plugins.cc (+2/-0) libgearman-server/plugins/base.h (+2/-0) libgearman-server/plugins/queue.h (+2/-0) libgearman-server/plugins/queue/include.am (+1/-0) libgearman-server/plugins/queue/retention/include.am (+15/-0) libgearman-server/plugins/queue/retention/queue.cc (+230/-0) libgearman-server/plugins/queue/retention/queue.h (+22/-0) libgearman-server/queue.cc (+23/-0) libgearman-server/queue.hpp (+8/-0) libgearman-server/server.cc (+29/-9) libgearman-server/server.h (+12/-0) libgearman-server/struct/server.h (+3/-1) |
To merge this branch: | bzr merge lp:~sven-nierlein/gearmand/gearmand |
Related bugs: |
Reviewer | Review Type | Date Requested | Status |
---|---|---|---|
Tangent Trunk | Pending | ||
Review via email: mp+133809@code.launchpad.net |
Commit message
Description of the change
added retention queue plugin
The retention queue plugin will save the queue to disk on shutdown. Therefor
it does not slow down the gearmand during runtime and still preserves the
queue during restarts.
Brian Aker (brianaker) wrote : | # |
Sven Nierlein (sven-nierlein) wrote : | # |
so what do you think? I noticed the status changed to "merged" but i cannot find these changes in gearmand yet.
Brian Aker (brianaker) wrote : | # |
Hi,
What I did to test this was to extend the sqlite plugin to record on shutdown to see how this works out. In testing,... so far so good.
Look in lp:gearmand
Cheers,
-Bian
On Nov 25, 2012, at 1:46 AM, Sven Nierlein <email address hidden> wrote:
> so what do you think? I noticed the status changed to "merged" but i cannot find these changes in gearmand yet.
> --
> https:/
> Your team Tangent is subscribed to branch lp:gearmand.
Sven Nierlein (sven-nierlein) wrote : | # |
I can't get this working:
when i start gearmand with either this:
-q libsqlite3 --libsqlite3-
then it stores all jobs in sqlite3, not only on shutdown. This is exactly what i wanted
to avoid.
When i start gearmand with:
--libsqlite3-
only, then nothing is stored, not even on shutdown.
Besides that, this adds a dependency to sqlite3 which wasn't necessary with my merge proposal.
Brian Aker (brianaker) wrote : | # |
Hi,
On Nov 25, 2012, at 3:28 AM, Sven Nierlein <email address hidden> wrote:
> I can't get this working:
> when i start gearmand with either this:
> -q libsqlite3 --libsqlite3-
> then it stores all jobs in sqlite3, not only on shutdown. This is exactly what i wanted
> to avoid.
Let me see what is happening with it then. The test case should be covering this problem.
> Besides that, this adds a dependency to sqlite3 which wasn't necessary with my merge proposal.
The creation of a new queue plugin in separate from adding the ability for a queue to store on shutdown.
Which brings up the question of what do you want to achieve with a new queue type?
Cheers,
-Brian
Sven Nierlein (sven-nierlein) wrote : | # |
The intention of the new queue type is to create a queue plugin, which adds no overhead
during runtime and just stores the queue to disk on shutdown.
I am "sni" in the freenode #gearman channel if you want to discuss details.
Brian Aker (brianaker) wrote : | # |
Hi!
https:/
I've created a bug to track this, and I have linked a tree that provides a test case.
The test case creates eight jobs and then checks to before shutting down the server that sqlite is empty. The test then shuts down the server and checks to see if they jobs are there. It then starts up the server and verifies that the jobs are processed.
Cheers,
-Brian
On Nov 25, 2012, at 11:40 PM, Brian Aker <email address hidden> wrote:
> Hi,
>
> On Nov 25, 2012, at 3:28 AM, Sven Nierlein <email address hidden> wrote:
>
>> I can't get this working:
>> when i start gearmand with either this:
>> -q libsqlite3 --libsqlite3-
>> then it stores all jobs in sqlite3, not only on shutdown. This is exactly what i wanted
>> to avoid.
>
> Let me see what is happening with it then. The test case should be covering this problem.
>
>> Besides that, this adds a dependency to sqlite3 which wasn't necessary with my merge proposal.
>
> The creation of a new queue plugin in separate from adding the ability for a queue to store on shutdown.
>
> Which brings up the question of what do you want to achieve with a new queue type?
>
> Cheers,
> -Brian
>
>
> --
> https:/
> Your team Tangent is subscribed to branch lp:gearmand.
Preview Diff
1 | === modified file 'libgearman-server/constants.h' |
2 | --- libgearman-server/constants.h 2012-08-18 06:26:36 +0000 |
3 | +++ libgearman-server/constants.h 2012-11-11 01:10:27 +0000 |
4 | @@ -155,6 +155,8 @@ |
5 | void *context, |
6 | gearman_queue_add_fn *add_fn, |
7 | void *add_context); |
8 | +typedef gearmand_error_t (gearman_queue_shutdown_fn)(gearman_server_st *server, |
9 | + void *context); |
10 | |
11 | typedef gearmand_error_t (gearmand_connection_add_fn)(gearman_server_con_st *con); |
12 | |
13 | |
14 | === modified file 'libgearman-server/gearmand.cc' |
15 | --- libgearman-server/gearmand.cc 2012-10-29 07:06:01 +0000 |
16 | +++ libgearman-server/gearmand.cc 2012-11-11 01:10:27 +0000 |
17 | @@ -1,5 +1,5 @@ |
18 | /* vim:expandtab:shiftwidth=2:tabstop=2:smarttab: |
19 | - * |
20 | + * |
21 | * Gearmand client and server library. |
22 | * |
23 | * Copyright (C) 2011-2012 Data Differential, http://datadifferential.com/ |
24 | @@ -100,6 +100,10 @@ |
25 | /* All threads should be cleaned up before calling this. */ |
26 | assert(server.thread_list == NULL); |
27 | |
28 | + gearmand_debug("shutdown queue: begin"); |
29 | + gearman_server_queue_shutdown(server); |
30 | + gearmand_debug("shutdown queue: end"); |
31 | + |
32 | for (uint32_t key= 0; key < GEARMAND_JOB_HASH_SIZE; key++) |
33 | { |
34 | while (server.job_hash[key] != NULL) |
35 | @@ -335,7 +339,7 @@ |
36 | /* Initialize server components. */ |
37 | if (gearmand->base == NULL) |
38 | { |
39 | - gearmand_log_info(GEARMAN_DEFAULT_LOG_PARAM, "Starting up, verbose set to %s", |
40 | + gearmand_log_info(GEARMAN_DEFAULT_LOG_PARAM, "Starting up, verbose set to %s", |
41 | gearmand_verbose_name(gearmand->verbose)); |
42 | |
43 | if (gearmand->threads > 0) |
44 | @@ -425,7 +429,7 @@ |
45 | } |
46 | else |
47 | { |
48 | - gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM, |
49 | + gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM, |
50 | "gearmand_wakeup() incorrectly wrote %lu bytes of data.", (unsigned long)written); |
51 | } |
52 | } |
53 | @@ -587,7 +591,7 @@ |
54 | } |
55 | |
56 | // We are in single user threads, so strerror() is fine. |
57 | - gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "Retrying bind(%s) on %s:%s %u >= %u", |
58 | + gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "Retrying bind(%s) on %s:%s %u >= %u", |
59 | strerror(errno), host, port->port, |
60 | waited, bind_timeout); |
61 | this_wait= retry * retry / 3 + 1; |
62 | @@ -722,7 +726,7 @@ |
63 | { |
64 | for (uint32_t y= 0; y < gearmand->port_list[x].listen_count; y++) |
65 | { |
66 | - gearmand_log_info(GEARMAN_DEFAULT_LOG_PARAM, |
67 | + gearmand_log_info(GEARMAN_DEFAULT_LOG_PARAM, |
68 | "Clearing event for listening socket (%d)", |
69 | gearmand->port_list[x].listen_fd[y]); |
70 | |
71 | @@ -768,7 +772,7 @@ |
72 | return; |
73 | } |
74 | |
75 | - /* |
76 | + /* |
77 | Since this is numeric, it should never fail. Even if it did we don't want to really error from it. |
78 | */ |
79 | char host[NI_MAXHOST]; |
80 | @@ -922,7 +926,7 @@ |
81 | gearmand_debug("Received SHUTDOWN_GRACEFUL wakeup event"); |
82 | _listen_close(gearmand); |
83 | |
84 | - for (gearmand_thread_st* thread= gearmand->thread_list; |
85 | + for (gearmand_thread_st* thread= gearmand->thread_list; |
86 | thread != NULL; |
87 | thread= thread->next) |
88 | { |
89 | @@ -1081,7 +1085,7 @@ |
90 | return success; |
91 | } |
92 | |
93 | -static bool gearman_server_create(gearman_server_st& server, |
94 | +static bool gearman_server_create(gearman_server_st& server, |
95 | uint8_t job_retries_arg, |
96 | uint8_t worker_wakeup_arg, |
97 | bool round_robin_arg) |
98 | |
99 | === modified file 'libgearman-server/plugins.cc' |
100 | --- libgearman-server/plugins.cc 2012-10-28 20:04:35 +0000 |
101 | +++ libgearman-server/plugins.cc 2012-11-11 01:10:27 +0000 |
102 | @@ -50,6 +50,8 @@ |
103 | { |
104 | queue::initialize_default(); |
105 | |
106 | + queue::initialize_retention(); |
107 | + |
108 | if (HAVE_LIBDRIZZLE) |
109 | { |
110 | queue::initialize_drizzle(); |
111 | |
112 | === modified file 'libgearman-server/plugins/base.h' |
113 | --- libgearman-server/plugins/base.h 2012-10-08 03:37:53 +0000 |
114 | +++ libgearman-server/plugins/base.h 2012-11-11 01:10:27 +0000 |
115 | @@ -96,6 +96,8 @@ |
116 | |
117 | virtual gearmand_error_t replay(gearman_server_st *server)= 0; |
118 | |
119 | + virtual gearmand_error_t shutdown(gearman_server_st *server)= 0; |
120 | + |
121 | static gearmand_error_t replay_add(gearman_server_st *server, |
122 | void *context __attribute__ ((unused)), |
123 | const char *unique, size_t unique_size, |
124 | |
125 | === modified file 'libgearman-server/plugins/queue.h' |
126 | --- libgearman-server/plugins/queue.h 2012-06-12 18:02:10 +0000 |
127 | +++ libgearman-server/plugins/queue.h 2012-11-11 01:10:27 +0000 |
128 | @@ -52,3 +52,5 @@ |
129 | #include <libgearman-server/plugins/queue/redis/queue.h> |
130 | |
131 | #include <libgearman-server/plugins/queue/mysql/queue.h> |
132 | + |
133 | +#include <libgearman-server/plugins/queue/retention/queue.h> |
134 | |
135 | === modified file 'libgearman-server/plugins/queue/include.am' |
136 | --- libgearman-server/plugins/queue/include.am 2012-08-11 01:50:29 +0000 |
137 | +++ libgearman-server/plugins/queue/include.am 2012-11-11 01:10:27 +0000 |
138 | @@ -21,3 +21,4 @@ |
139 | include libgearman-server/plugins/queue/sqlite/include.am |
140 | include libgearman-server/plugins/queue/tokyocabinet/include.am |
141 | include libgearman-server/plugins/queue/mysql/include.am |
142 | +include libgearman-server/plugins/queue/retention/include.am |
143 | |
144 | === added directory 'libgearman-server/plugins/queue/retention' |
145 | === added file 'libgearman-server/plugins/queue/retention/include.am' |
146 | --- libgearman-server/plugins/queue/retention/include.am 1970-01-01 00:00:00 +0000 |
147 | +++ libgearman-server/plugins/queue/retention/include.am 2012-11-11 01:10:27 +0000 |
148 | @@ -0,0 +1,15 @@ |
149 | +# vim:ft=automake |
150 | +# Gearman |
151 | +# Copyright (C) 2012 Data Differential, http://datadifferential.com/ |
152 | +# Copyright (C) 2012 Sven Nierlein, sven@consol.de |
153 | +# All rights reserved. |
154 | +# |
155 | +# Use and distribution licensed under the BSD license. See |
156 | +# the COPYING file in the parent directory for full text. |
157 | +# |
158 | +# All paths should be given relative to the root |
159 | +# |
160 | + |
161 | +noinst_HEADERS+= libgearman-server/plugins/queue/retention/queue.h |
162 | + |
163 | +libgearman_server_libgearman_server_la_SOURCES+= libgearman-server/plugins/queue/retention/queue.cc |
164 | |
165 | === added file 'libgearman-server/plugins/queue/retention/queue.cc' |
166 | --- libgearman-server/plugins/queue/retention/queue.cc 1970-01-01 00:00:00 +0000 |
167 | +++ libgearman-server/plugins/queue/retention/queue.cc 2012-11-11 01:10:27 +0000 |
168 | @@ -0,0 +1,230 @@ |
169 | +/* vim:expandtab:shiftwidth=2:tabstop=2:smarttab: |
170 | + * |
171 | + * Gearmand client and server library. |
172 | + * |
173 | + * Copyright (C) 2012 Data Differential, http://datadifferential.com/ |
174 | + * Copyright (C) 2012 Sven Nierlein, sven@consol.de |
175 | + * All rights reserved. |
176 | + * |
177 | + */ |
178 | + |
179 | + |
180 | +/** |
181 | + * @file |
182 | + * @brief Retention Queue Storage Definitions |
183 | + */ |
184 | + |
185 | +#include <gear_config.h> |
186 | +#include <libgearman-server/common.h> |
187 | + |
188 | +#include <libgearman-server/plugins/queue/retention/queue.h> |
189 | +#include <libgearman-server/plugins/queue/base.h> |
190 | + |
191 | +#include <stdio.h> |
192 | + |
193 | +namespace gearmand { namespace plugins { namespace queue { class Retention; }}} |
194 | + |
195 | +#pragma GCC diagnostic ignored "-Wold-style-cast" |
196 | + |
197 | +static gearmand_error_t _initialize(gearman_server_st& server, gearmand::plugins::queue::Retention *queue); |
198 | + |
199 | +namespace gearmand { |
200 | +namespace plugins { |
201 | +namespace queue { |
202 | + |
203 | +class Retention : public gearmand::plugins::Queue { |
204 | +public: |
205 | + Retention(); |
206 | + ~Retention(); |
207 | + gearmand_error_t initialize(); |
208 | + std::string retention_file; |
209 | + |
210 | +private: |
211 | +}; |
212 | + |
213 | +Retention::Retention() : |
214 | + Queue("retention") |
215 | +{ |
216 | + command_line_options().add_options() |
217 | + ("retention-file", boost::program_options::value(&retention_file)->default_value("/tmp/retention.dat"), "retention file to save jobs on shutdown"); |
218 | +} |
219 | + |
220 | +Retention::~Retention() |
221 | +{ |
222 | +} |
223 | + |
224 | +gearmand_error_t Retention::initialize() |
225 | +{ |
226 | + return _initialize(Gearmand()->server, this); |
227 | +} |
228 | + |
229 | + |
230 | +void initialize_retention() |
231 | +{ |
232 | + static Retention local_instance; |
233 | +} |
234 | + |
235 | +} // namespace queue |
236 | +} // namespace plugins |
237 | +} // namespace gearmand |
238 | + |
239 | + |
240 | + |
241 | +/** |
242 | + * @addtogroup gearman_queue_retention_static Static retention Queue Storage Definitions |
243 | + * @ingroup gearman_queue_retention |
244 | + * @{ |
245 | + */ |
246 | + |
247 | +/* Queue callback functions. */ |
248 | +static gearmand_error_t __add(gearman_server_st *server __attribute__((unused)), |
249 | + void *context __attribute__((unused)), |
250 | + const char *unique __attribute__((unused)), size_t unique_size __attribute__((unused)), |
251 | + const char *function_name __attribute__((unused)), |
252 | + size_t function_name_size __attribute__((unused)), |
253 | + const void *data __attribute__((unused)), size_t data_size __attribute__((unused)), |
254 | + gearman_job_priority_t priority __attribute__((unused)), |
255 | + int64_t when __attribute__((unused))) |
256 | +{ |
257 | + gearmand_debug(__func__); |
258 | + return GEARMAN_SUCCESS; |
259 | +} |
260 | + |
261 | + |
262 | +static gearmand_error_t __flush(gearman_server_st *server __attribute__((unused)), |
263 | + void *context __attribute__((unused))) |
264 | +{ |
265 | + gearmand_debug(__func__); |
266 | + return GEARMAN_SUCCESS; |
267 | +} |
268 | + |
269 | +static gearmand_error_t __done(gearman_server_st *server __attribute__((unused)), |
270 | + void *context __attribute__((unused)), |
271 | + const char *unique __attribute__((unused)), |
272 | + size_t unique_size __attribute__((unused)), |
273 | + const char *function_name __attribute__((unused)), |
274 | + size_t function_name_size __attribute__((unused))) |
275 | +{ |
276 | + gearmand_debug(__func__); |
277 | + return GEARMAN_SUCCESS; |
278 | +} |
279 | + |
280 | + |
281 | +static gearmand_error_t __replay(gearman_server_st *server, |
282 | + void *context, |
283 | + gearman_queue_add_fn *add_fn, |
284 | + void *add_context) |
285 | +{ |
286 | + int read; |
287 | + int when; |
288 | + int size; |
289 | + char *data_line; |
290 | + char *data; |
291 | + char *function; |
292 | + char *unique; |
293 | + char *prio_s; |
294 | + FILE * fp; |
295 | + gearman_job_priority_t priority= (gearman_job_priority_t)0; |
296 | + gearmand_error_t ret = GEARMAN_SUCCESS; |
297 | + gearmand::plugins::queue::Retention *queue= (gearmand::plugins::queue::Retention *)context; |
298 | + |
299 | + gearmand_debug(__func__); |
300 | + gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "reading retention file: %s", queue->retention_file.c_str()); |
301 | + |
302 | + fp = fopen(queue->retention_file.c_str(), "r"); |
303 | + if(!fp) |
304 | + return GEARMAN_SUCCESS; |
305 | + |
306 | + data_line = (char*)malloc(1024); |
307 | + if(data_line == NULL){ |
308 | + gearmand_perror("malloc failed"); |
309 | + return GEARMAN_MEMORY_ALLOCATION_FAILURE; |
310 | + } |
311 | + while(!feof(fp)) { |
312 | + if(fgets(data_line, 1024, fp) == NULL) |
313 | + break; // empty files |
314 | + |
315 | + function = strsep( &data_line, ";" ); |
316 | + unique = strsep( &data_line, ";" ); |
317 | + prio_s = strsep( &data_line, ";" ); |
318 | + priority = (gearman_job_priority_t) atoi(prio_s); |
319 | + when = atoi(strsep( &data_line, ";" )); |
320 | + size = atoi(strsep( &data_line, "\n" )); |
321 | + data = (char*)malloc(size+1); |
322 | + if (data == NULL){ |
323 | + gearmand_perror("malloc failed"); |
324 | + return GEARMAN_MEMORY_ALLOCATION_FAILURE; |
325 | + } |
326 | + read = fread(data, size, 1, fp); |
327 | + ret= (*add_fn)(server, add_context, |
328 | + unique, (size_t) strlen(unique), |
329 | + function, (size_t) strlen(function), |
330 | + data, size, |
331 | + priority, |
332 | + when); |
333 | + if (ret != GEARMAN_SUCCESS && ret != GEARMAN_JOB_EXISTS) |
334 | + return ret; |
335 | + |
336 | + // complete rest of line till newline |
337 | + if(fgets(data_line, 1024, fp) == NULL) |
338 | + break; // no more jobs |
339 | + } |
340 | + |
341 | + fclose(fp); |
342 | + |
343 | + // cleanup retention file |
344 | + if(unlink(queue->retention_file.c_str()) == -1) |
345 | + perror(queue->retention_file.c_str()); |
346 | + |
347 | + return GEARMAN_SUCCESS; |
348 | +} |
349 | + |
350 | +static gearmand_error_t __shutdown(gearman_server_st *server, |
351 | + void *context) |
352 | +{ |
353 | + FILE * fp; |
354 | + gearmand::plugins::queue::Retention *queue= (gearmand::plugins::queue::Retention *)context; |
355 | + |
356 | + gearmand_debug(__func__); |
357 | + gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "writing retention file: %s for %i jobs", queue->retention_file.c_str(), server->job_count); |
358 | + |
359 | + fp = fopen(queue->retention_file.c_str(), "w+"); |
360 | + if(fp == NULL) { |
361 | + gearmand_perror(queue->retention_file.c_str()); |
362 | + return GEARMAN_ERRNO; |
363 | + } |
364 | + |
365 | + for (uint32_t key= 0; key < GEARMAND_JOB_HASH_SIZE; key++) { |
366 | + if(server->job_hash[key] != NULL) { |
367 | + gearman_server_job_st *server_job = server->job_hash[key]; |
368 | + fprintf(fp, "%s;%s;%i;%" PRId64 ";%i\n%.*s\n", |
369 | + server_job->function->function_name, |
370 | + server_job->unique, |
371 | + (int)server_job->priority, |
372 | + server_job->when, |
373 | + server_job->data_size, |
374 | + server_job->data_size, (const char*)server_job->data); |
375 | + while (server_job->unique_next != NULL) { |
376 | + server_job = server_job->unique_next; |
377 | + fprintf(fp, "%s;%s;%i;%" PRId64 ";%i\n%.*s\n", |
378 | + server_job->function->function_name, |
379 | + server_job->unique, |
380 | + (int)server_job->priority, |
381 | + server_job->when, |
382 | + server_job->data_size, |
383 | + server_job->data_size, (const char*)server_job->data); |
384 | + } |
385 | + } |
386 | + } |
387 | + |
388 | + fclose(fp); |
389 | + |
390 | + return GEARMAN_SUCCESS; |
391 | +} |
392 | + |
393 | +gearmand_error_t _initialize(gearman_server_st& server, gearmand::plugins::queue::Retention *queue) |
394 | +{ |
395 | + gearman_server_set_queue(server, queue, __add, __flush, __done, __replay, __shutdown); |
396 | + |
397 | + return GEARMAN_SUCCESS; |
398 | +} |
399 | |
400 | === added file 'libgearman-server/plugins/queue/retention/queue.h' |
401 | --- libgearman-server/plugins/queue/retention/queue.h 1970-01-01 00:00:00 +0000 |
402 | +++ libgearman-server/plugins/queue/retention/queue.h 2012-11-11 01:10:27 +0000 |
403 | @@ -0,0 +1,22 @@ |
404 | +/* vim:expandtab:shiftwidth=2:tabstop=2:smarttab: |
405 | + * |
406 | + * Gearmand client and server library. |
407 | + * |
408 | + * Copyright (C) 2012 Data Differential, http://datadifferential.com/ |
409 | + * Copyright (C) 2012 Sven Nierlein, sven@consol.de |
410 | + * All rights reserved. |
411 | + * |
412 | + */ |
413 | + |
414 | +#pragma once |
415 | + |
416 | + |
417 | +namespace gearmand { |
418 | +namespace plugins { |
419 | +namespace queue { |
420 | + |
421 | +void initialize_retention(); |
422 | + |
423 | +} // namespace queue |
424 | +} // namespace plugin |
425 | +} // namespace gearmand |
426 | |
427 | === modified file 'libgearman-server/queue.cc' |
428 | --- libgearman-server/queue.cc 2012-10-28 20:04:35 +0000 |
429 | +++ libgearman-server/queue.cc 2012-11-11 01:10:27 +0000 |
430 | @@ -137,6 +137,29 @@ |
431 | } |
432 | |
433 | void gearman_server_set_queue(gearman_server_st& server, |
434 | + void *context, |
435 | + gearman_queue_add_fn *add, |
436 | + gearman_queue_flush_fn *flush, |
437 | + gearman_queue_done_fn *done, |
438 | + gearman_queue_replay_fn *replay, |
439 | + gearman_queue_shutdown_fn *shutdown) |
440 | +{ |
441 | + server.queue_version= QUEUE_VERSION_FUNCTION; |
442 | + server.queue.functions= new queue_st(); |
443 | + if (server.queue.functions) |
444 | + { |
445 | + server.queue.functions->_context= context; |
446 | + server.queue.functions->_add_fn= add; |
447 | + server.queue.functions->_flush_fn= flush; |
448 | + server.queue.functions->_done_fn= done; |
449 | + server.queue.functions->_replay_fn= replay; |
450 | + server.queue.functions->_shutdown_fn= shutdown; |
451 | + } |
452 | + assert(server.queue.functions); |
453 | +} |
454 | + |
455 | + |
456 | +void gearman_server_set_queue(gearman_server_st& server, |
457 | gearmand::queue::Context* context) |
458 | { |
459 | assert(context); |
460 | |
461 | === modified file 'libgearman-server/queue.hpp' |
462 | --- libgearman-server/queue.hpp 2012-10-08 03:37:53 +0000 |
463 | +++ libgearman-server/queue.hpp 2012-11-11 01:10:27 +0000 |
464 | @@ -70,4 +70,12 @@ |
465 | gearman_queue_replay_fn *replay); |
466 | |
467 | void gearman_server_set_queue(gearman_server_st& server, |
468 | + void *context, |
469 | + gearman_queue_add_fn *add, |
470 | + gearman_queue_flush_fn *flush, |
471 | + gearman_queue_done_fn *done, |
472 | + gearman_queue_replay_fn *replay, |
473 | + gearman_queue_shutdown_fn *shutdown); |
474 | + |
475 | +void gearman_server_set_queue(gearman_server_st& server, |
476 | gearmand::queue::Context* context); |
477 | |
478 | === modified file 'libgearman-server/server.cc' |
479 | --- libgearman-server/server.cc 2012-10-28 20:04:35 +0000 |
480 | +++ libgearman-server/server.cc 2012-11-11 01:10:27 +0000 |
481 | @@ -1,5 +1,5 @@ |
482 | /* vim:expandtab:shiftwidth=2:tabstop=2:smarttab: |
483 | - * |
484 | + * |
485 | * Gearmand client and server library. |
486 | * |
487 | * Copyright (C) 2011-2012 Data Differential, http://datadifferential.com/ |
488 | @@ -98,7 +98,7 @@ |
489 | gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, |
490 | "%15s:%5s packet command %s", |
491 | server_con->con.context == NULL || server_con->con.context->host == NULL ? "-" : server_con->con.context->host, |
492 | - server_con->con.context == NULL || server_con->con.context->port == NULL ? "-" : server_con->con.context->port, |
493 | + server_con->con.context == NULL || server_con->con.context->port == NULL ? "-" : server_con->con.context->port, |
494 | gearmand_strcommand(packet)); |
495 | |
496 | switch (packet->command) |
497 | @@ -242,7 +242,7 @@ |
498 | { |
499 | return gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM, "strtoul(%ul)", when); |
500 | } |
501 | - gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, |
502 | + gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, |
503 | "Received EPOCH job submission, function:%.*s unique:%.*s with data for %jd at %jd, args %d", |
504 | packet->arg_size[0], packet->arg[0], |
505 | packet->arg_size[1], packet->arg[1], |
506 | @@ -576,8 +576,8 @@ |
507 | } |
508 | else if (packet->command == GEARMAN_COMMAND_GRAB_JOB_UNIQ) |
509 | { |
510 | - /* |
511 | - We found a runnable job, queue job assigned packet and take the job off the queue. |
512 | + /* |
513 | + We found a runnable job, queue job assigned packet and take the job off the queue. |
514 | */ |
515 | ret= gearman_server_io_packet_add(server_con, false, |
516 | GEARMAN_MAGIC_RESPONSE, |
517 | @@ -596,8 +596,8 @@ |
518 | strlen(server_job->reducer), server_job->reducer, strlen(server_job->reducer), |
519 | server_job->unique_length, server_job->unique, server_job->unique_length, |
520 | (unsigned long)server_job->data_size); |
521 | - /* |
522 | - We found a runnable job, queue job assigned packet and take the job off the queue. |
523 | + /* |
524 | + We found a runnable job, queue job assigned packet and take the job off the queue. |
525 | */ |
526 | ret= gearman_server_io_packet_add(server_con, false, |
527 | GEARMAN_MAGIC_RESPONSE, |
528 | @@ -611,8 +611,8 @@ |
529 | } |
530 | else if (packet->command == GEARMAN_COMMAND_GRAB_JOB_ALL) |
531 | { |
532 | - /* |
533 | - We found a runnable job, queue job assigned packet and take the job off the queue. |
534 | + /* |
535 | + We found a runnable job, queue job assigned packet and take the job off the queue. |
536 | */ |
537 | ret= gearman_server_io_packet_add(server_con, false, |
538 | GEARMAN_MAGIC_RESPONSE, |
539 | @@ -922,6 +922,26 @@ |
540 | return ret; |
541 | } |
542 | |
543 | +static gearmand_error_t gearman_queue_shutdown(gearman_server_st& server) |
544 | +{ |
545 | + if(server.queue.functions->_shutdown_fn == NULL) |
546 | + return GEARMAN_SUCCESS; |
547 | + if (server.queue_version == QUEUE_VERSION_FUNCTION) |
548 | + { |
549 | + return (*(server.queue.functions->_shutdown_fn))(&server, |
550 | + (void *)server.queue.functions->_context); |
551 | + } |
552 | + |
553 | + assert(server.queue.object); |
554 | + return server.queue.object->shutdown(&server); |
555 | +} |
556 | + |
557 | +gearmand_error_t gearman_server_queue_shutdown(gearman_server_st& server) |
558 | +{ |
559 | + gearmand_error_t ret= gearman_queue_shutdown(server); |
560 | + return ret; |
561 | +} |
562 | + |
563 | void *gearman_server_queue_context(const gearman_server_st *server) |
564 | { |
565 | if (server->queue_version == QUEUE_VERSION_FUNCTION) |
566 | |
567 | === modified file 'libgearman-server/server.h' |
568 | --- libgearman-server/server.h 2012-09-28 09:55:33 +0000 |
569 | +++ libgearman-server/server.h 2012-11-11 01:10:27 +0000 |
570 | @@ -90,6 +90,18 @@ |
571 | #endif |
572 | |
573 | /** |
574 | + * shutdown the persistent queue to save all remaining jobs. |
575 | + * should only be run at shutdown. |
576 | + * @param server Server structure previously initialized with |
577 | + * gearman_server_create. |
578 | + * @return Standard gearman return value. |
579 | + */ |
580 | +#ifdef __cplusplus |
581 | +GEARMAN_API |
582 | +gearmand_error_t gearman_server_queue_shutdown(gearman_server_st& server); |
583 | +#endif |
584 | + |
585 | +/** |
586 | * Get persistent queue context. |
587 | */ |
588 | GEARMAN_API |
589 | |
590 | === modified file 'libgearman-server/struct/server.h' |
591 | --- libgearman-server/struct/server.h 2012-08-11 01:53:36 +0000 |
592 | +++ libgearman-server/struct/server.h 2012-11-11 01:10:27 +0000 |
593 | @@ -43,6 +43,7 @@ |
594 | gearman_queue_flush_fn *_flush_fn; |
595 | gearman_queue_done_fn *_done_fn; |
596 | gearman_queue_replay_fn *_replay_fn; |
597 | + gearman_queue_shutdown_fn *_shutdown_fn; |
598 | |
599 | #ifdef __cplusplus |
600 | queue_st() : |
601 | @@ -50,7 +51,8 @@ |
602 | _add_fn(NULL), |
603 | _flush_fn(NULL), |
604 | _done_fn(NULL), |
605 | - _replay_fn(NULL) |
606 | + _replay_fn(NULL), |
607 | + _shutdown_fn(NULL) |
608 | { |
609 | } |
610 | #endif |
Sorry, just looking at this now.