Merge lp:~brianaker/gearmand/job-cleanup-for-exception into lp:gearmand

Proposed by Brian Aker on 2013-07-05
Status: Merged
Merged at revision: 803
Proposed branch: lp:~brianaker/gearmand/job-cleanup-for-exception
Merge into: lp:gearmand
Diff against target: 499 lines (+116/-59)
13 files modified
libgearman-server/queue.cc (+18/-8)
libgearman-server/server.cc (+20/-0)
libgearman/add.cc (+3/-9)
libgearman/add.hpp (+1/-1)
libgearman/client.cc (+6/-6)
libgearman/function/function_v1.hpp (+5/-5)
libgearman/function/function_v2.cc (+4/-4)
libgearman/function/partition.cc (+4/-4)
libgearman/interface/worker.hpp (+5/-0)
libgearman/job.cc (+8/-3)
libgearman/job.hpp (+11/-1)
libgearman/worker.cc (+15/-15)
tests/libgearman-1.0/worker_test.cc (+16/-3)
To merge this branch: bzr merge lp:~brianaker/gearmand/job-cleanup-for-exception
Reviewer Review Type Date Requested Status
Tangent Trunk 2013-07-05 Pending
Review via email: mp+173287@code.launchpad.net
To post a comment you must log in.

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'libgearman-server/queue.cc'
2--- libgearman-server/queue.cc 2013-06-05 21:12:38 +0000
3+++ libgearman-server/queue.cc 2013-07-05 22:08:31 +0000
4@@ -61,7 +61,11 @@
5 {
6 assert(server->state.queue_startup == false);
7 gearmand_error_t ret;
8- if (server->queue_version == QUEUE_VERSION_FUNCTION)
9+ if (server->queue_version == QUEUE_VERSION_NONE)
10+ {
11+ return GEARMAND_SUCCESS;
12+ }
13+ else if (server->queue_version == QUEUE_VERSION_FUNCTION)
14 {
15 assert(server->queue.functions->_add_fn);
16 ret= (*(server->queue.functions->_add_fn))(server,
17@@ -114,7 +118,11 @@
18 const char *function_name,
19 size_t function_name_size)
20 {
21- if (server->queue_version == QUEUE_VERSION_FUNCTION)
22+ if (server->queue_version == QUEUE_VERSION_NONE)
23+ {
24+ return GEARMAND_SUCCESS;
25+ }
26+ else if (server->queue_version == QUEUE_VERSION_FUNCTION)
27 {
28 assert(server->queue.functions->_done_fn);
29 return (*(server->queue.functions->_done_fn))(server,
30@@ -123,12 +131,14 @@
31 function_name,
32 function_name_size);
33 }
34-
35- assert(server->queue.object);
36- return server->queue.object->done(server,
37- unique, unique_size,
38- function_name,
39- function_name_size);
40+ else
41+ {
42+ assert(server->queue.object);
43+ return server->queue.object->done(server,
44+ unique, unique_size,
45+ function_name,
46+ function_name_size);
47+ }
48 }
49
50 void gearman_server_save_job(gearman_server_st& server,
51
52=== modified file 'libgearman-server/server.cc'
53--- libgearman-server/server.cc 2013-06-27 22:00:23 +0000
54+++ libgearman-server/server.cc 2013-07-05 22:08:31 +0000
55@@ -827,6 +827,26 @@
56 {
57 return gearmand_gerror("_server_queue_work_data", ret);
58 }
59+
60+#if 0
61+ /* Remove from persistent queue if one exists. */
62+ if (server_job->job_queued)
63+ {
64+ ret= gearman_queue_done(Server,
65+ server_job->unique,
66+ server_job->unique_length,
67+ server_job->function->function_name,
68+ server_job->function->function_name_size);
69+ if (gearmand_failed(ret))
70+ {
71+ gearmand_gerror("Remove from persistent queue", ret);
72+ return ret;
73+ }
74+ }
75+
76+ /* Job is done, remove it. */
77+ gearman_server_job_free(server_job);
78+#endif
79 }
80
81 break;
82
83=== modified file 'libgearman/add.cc'
84--- libgearman/add.cc 2013-06-23 08:23:26 +0000
85+++ libgearman/add.cc 2013-07-05 22:08:31 +0000
86@@ -139,15 +139,9 @@
87 const char *unique,
88 const void *workload_str, size_t workload_size,
89 time_t when,
90- gearman_return_t *ret_ptr,
91+ gearman_return_t& ret_ptr,
92 const gearman_actions_t &actions)
93 {
94- gearman_return_t unused;
95- if (ret_ptr == NULL)
96- {
97- ret_ptr= &unused;
98- }
99-
100 gearman_string_t function= { gearman_string_param_cstr(function_name) };
101 gearman_unique_t local_unique= gearman_unique_make(unique, unique ? strlen(unique) : 0);
102 gearman_string_t workload= { static_cast<const char *>(workload_str), workload_size };
103@@ -155,11 +149,11 @@
104 task= add_task(client, task, context, command, function, local_unique, workload, when, actions);
105 if (task == NULL)
106 {
107- *ret_ptr= client.impl()->universal.error_code();
108+ ret_ptr= client.impl()->universal.error_code();
109 return NULL;
110 }
111
112- *ret_ptr= GEARMAN_SUCCESS;
113+ ret_ptr= GEARMAN_SUCCESS;
114
115 return task;
116 }
117
118=== modified file 'libgearman/add.hpp'
119--- libgearman/add.hpp 2012-05-13 23:19:56 +0000
120+++ libgearman/add.hpp 2013-07-05 22:08:31 +0000
121@@ -59,7 +59,7 @@
122 const char *unique,
123 const void *workload_str, size_t workload_size,
124 time_t when,
125- gearman_return_t *ret_ptr,
126+ gearman_return_t& ret_ptr,
127 const gearman_actions_t &actions);
128
129 gearman_task_st *add_task(gearman_client_st& client,
130
131=== modified file 'libgearman/client.cc'
132--- libgearman/client.cc 2013-06-27 22:00:23 +0000
133+++ libgearman/client.cc 2013-07-05 22:08:31 +0000
134@@ -1052,7 +1052,7 @@
135 unique,
136 workload, workload_size,
137 time_t(0),
138- ret_ptr,
139+ *ret_ptr,
140 client->impl()->actions);
141 }
142
143@@ -1082,7 +1082,7 @@
144 unique,
145 workload, workload_size,
146 time_t(0),
147- ret_ptr,
148+ *ret_ptr,
149 client->impl()->actions);
150 }
151
152@@ -1111,7 +1111,7 @@
153 unique,
154 workload, workload_size,
155 time_t(0),
156- ret_ptr,
157+ *ret_ptr,
158 client->impl()->actions);
159 }
160
161@@ -1140,7 +1140,7 @@
162 unique,
163 workload, workload_size,
164 time_t(0),
165- ret_ptr,
166+ *ret_ptr,
167 client->impl()->actions);
168 }
169
170@@ -1171,7 +1171,7 @@
171 unique,
172 workload, workload_size,
173 time_t(0),
174- ret_ptr,
175+ *ret_ptr,
176 client->impl()->actions);
177 }
178
179@@ -1201,7 +1201,7 @@
180 unique,
181 workload, workload_size,
182 time_t(0),
183- ret_ptr,
184+ *ret_ptr,
185 client->impl()->actions);
186
187 }
188
189=== modified file 'libgearman/function/function_v1.hpp'
190--- libgearman/function/function_v1.hpp 2013-07-02 23:51:10 +0000
191+++ libgearman/function/function_v1.hpp 2013-07-05 22:08:31 +0000
192@@ -62,20 +62,20 @@
193 gearman_job_build_reducer(job, NULL);
194 }
195
196- job->error_code= GEARMAN_SUCCESS;
197- job->_worker.work_result= _worker_fn(job, context_arg, &(job->_worker.work_result_size), &job->error_code);
198+ job->_error_code= GEARMAN_SUCCESS;
199+ job->_worker.work_result= _worker_fn(job, context_arg, &(job->_worker.work_result_size), &job->_error_code);
200
201- if (job->error_code == GEARMAN_LOST_CONNECTION)
202+ if (job->_error_code == GEARMAN_LOST_CONNECTION)
203 {
204 return GEARMAN_FUNCTION_ERROR;
205 }
206
207- if (job->error_code == GEARMAN_SHUTDOWN)
208+ if (job->_error_code == GEARMAN_SHUTDOWN)
209 {
210 return GEARMAN_FUNCTION_SHUTDOWN;
211 }
212
213- if (gearman_failed(job->error_code))
214+ if (gearman_failed(job->_error_code))
215 {
216 return GEARMAN_FUNCTION_FATAL;
217 }
218
219=== modified file 'libgearman/function/function_v2.cc'
220--- libgearman/function/function_v2.cc 2013-02-03 13:13:26 +0000
221+++ libgearman/function/function_v2.cc 2013-07-05 22:08:31 +0000
222@@ -56,19 +56,19 @@
223 switch (error)
224 {
225 case GEARMAN_SHUTDOWN:
226- job->error_code= GEARMAN_SUCCESS;
227+ job->_error_code= GEARMAN_SUCCESS;
228 return GEARMAN_FUNCTION_SHUTDOWN;
229
230 case GEARMAN_FATAL:
231- job->error_code= GEARMAN_FATAL;
232+ job->_error_code= GEARMAN_FATAL;
233 return GEARMAN_FUNCTION_FATAL;
234
235 case GEARMAN_ERROR:
236- job->error_code= GEARMAN_ERROR;
237+ job->_error_code= GEARMAN_ERROR;
238 return GEARMAN_FUNCTION_ERROR;
239
240 case GEARMAN_SUCCESS:
241- job->error_code= GEARMAN_SUCCESS;
242+ job->_error_code= GEARMAN_SUCCESS;
243 return GEARMAN_FUNCTION_SUCCESS;
244
245 case GEARMAN_IO_WAIT:
246
247=== modified file 'libgearman/function/partition.cc'
248--- libgearman/function/partition.cc 2013-02-03 13:13:26 +0000
249+++ libgearman/function/partition.cc 2013-07-05 22:08:31 +0000
250@@ -56,19 +56,19 @@
251 switch (error)
252 {
253 case GEARMAN_FATAL:
254- job->error_code= GEARMAN_FATAL;
255+ job->_error_code= GEARMAN_FATAL;
256 return GEARMAN_FUNCTION_FATAL;
257
258 case GEARMAN_SHUTDOWN:
259- job->error_code= GEARMAN_SUCCESS;
260+ job->_error_code= GEARMAN_SUCCESS;
261 return GEARMAN_FUNCTION_SHUTDOWN;
262
263 case GEARMAN_ERROR:
264- job->error_code= GEARMAN_ERROR;
265+ job->_error_code= GEARMAN_ERROR;
266 return GEARMAN_FUNCTION_ERROR;
267
268 case GEARMAN_SUCCESS:
269- job->error_code= GEARMAN_SUCCESS;
270+ job->_error_code= GEARMAN_SUCCESS;
271 return GEARMAN_FUNCTION_SUCCESS;
272
273 case GEARMAN_IO_WAIT:
274
275=== modified file 'libgearman/interface/worker.hpp'
276--- libgearman/interface/worker.hpp 2013-07-02 23:51:10 +0000
277+++ libgearman/interface/worker.hpp 2013-07-05 22:08:31 +0000
278@@ -132,6 +132,11 @@
279 return universal.error();
280 }
281
282+ gearman_return_t error_code() const
283+ {
284+ return universal.error_code();
285+ }
286+
287 private:
288 gearman_worker_st* _shell;
289 gearman_worker_st _owned_shell;
290
291=== modified file 'libgearman/job.cc'
292--- libgearman/job.cc 2013-07-03 04:42:18 +0000
293+++ libgearman/job.cc 2013-07-05 22:08:31 +0000
294@@ -191,7 +191,7 @@
295 job->options.finished= false;
296
297 job->reducer= NULL;
298- job->error_code= GEARMAN_UNKNOWN_STATE;
299+ job->_error_code= GEARMAN_UNKNOWN_STATE;
300
301 if (job->_worker.job_list)
302 {
303@@ -456,7 +456,6 @@
304 {
305 return ret;
306 }
307-
308 job->finished(true);
309 }
310
311@@ -488,7 +487,13 @@
312 job->options.work_in_use= true;
313 }
314
315- return _job_send(job);
316+ if (gearman_failed(_job_send(job)))
317+ {
318+ return job->error_code();
319+ }
320+#if 0
321+ job->finished(true);
322+#endif
323 }
324
325 return GEARMAN_SUCCESS;
326
327=== modified file 'libgearman/job.hpp'
328--- libgearman/job.hpp 2013-07-02 23:51:10 +0000
329+++ libgearman/job.hpp 2013-07-05 22:08:31 +0000
330@@ -69,10 +69,20 @@
331 gearman_packet_st assigned;
332 gearman_packet_st work;
333 struct gearman_job_reducer_st *reducer;
334- gearman_return_t error_code;
335+ gearman_return_t _error_code;
336
337 gearman_universal_st& universal()
338 {
339 return _worker.universal;
340 }
341+
342+ gearman_universal_st& universal() const
343+ {
344+ return _worker.universal;
345+ }
346+
347+ gearman_return_t error_code() const
348+ {
349+ return universal().error_code();
350+ }
351 };
352
353=== modified file 'libgearman/worker.cc'
354--- libgearman/worker.cc 2013-07-02 23:51:10 +0000
355+++ libgearman/worker.cc 2013-07-05 22:08:31 +0000
356@@ -1043,19 +1043,19 @@
357 static_cast<void *>(worker->impl()->work_function->context)))
358 {
359 case GEARMAN_FUNCTION_INVALID_ARGUMENT:
360- worker->impl()->work_job->error_code= gearman_error(worker->impl()->universal, GEARMAN_INVALID_ARGUMENT, "worker returned an invalid response, gearman_return_t");
361+ worker->impl()->work_job->_error_code= gearman_error(worker->impl()->universal, GEARMAN_INVALID_ARGUMENT, "worker returned an invalid response, gearman_return_t");
362 case GEARMAN_FUNCTION_FATAL:
363 if (gearman_job_send_fail_fin(worker->impl()->work_job) == GEARMAN_LOST_CONNECTION) // If we fail this, we have no connection, @note this causes us to lose the current error
364 {
365- worker->impl()->work_job->error_code= GEARMAN_LOST_CONNECTION;
366+ worker->impl()->work_job->_error_code= GEARMAN_LOST_CONNECTION;
367 break;
368 }
369 worker->impl()->work_state= GEARMAN_WORKER_WORK_UNIVERSAL_FAIL;
370- return worker->impl()->work_job->error_code;
371+ return worker->impl()->work_job->_error_code;
372
373 case GEARMAN_FUNCTION_ERROR: // retry
374 gearman_reset(worker->impl()->universal);
375- worker->impl()->work_job->error_code= GEARMAN_LOST_CONNECTION;
376+ worker->impl()->work_job->_error_code= GEARMAN_LOST_CONNECTION;
377 break;
378
379 case GEARMAN_FUNCTION_SHUTDOWN:
380@@ -1065,7 +1065,7 @@
381 break;
382 }
383
384- if (worker->impl()->work_job->error_code == GEARMAN_LOST_CONNECTION)
385+ if (worker->impl()->work_job->_error_code == GEARMAN_LOST_CONNECTION)
386 {
387 break;
388 }
389@@ -1073,12 +1073,12 @@
390
391 case GEARMAN_WORKER_WORK_UNIVERSAL_COMPLETE:
392 {
393- worker->impl()->work_job->error_code= gearman_job_send_complete_fin(worker->impl()->work_job,
394+ worker->impl()->work_job->_error_code= gearman_job_send_complete_fin(worker->impl()->work_job,
395 worker->impl()->work_result, worker->impl()->work_result_size);
396- if (worker->impl()->work_job->error_code == GEARMAN_IO_WAIT)
397+ if (worker->impl()->work_job->_error_code == GEARMAN_IO_WAIT)
398 {
399 worker->impl()->work_state= GEARMAN_WORKER_WORK_UNIVERSAL_COMPLETE;
400- return gearman_error(worker->impl()->universal, worker->impl()->work_job->error_code,
401+ return gearman_error(worker->impl()->universal, worker->impl()->work_job->_error_code,
402 "A failure occurred after worker had successful complete, unless gearman_job_send_complete() was called directly by worker, client has not been informed of success.");
403 }
404
405@@ -1089,31 +1089,31 @@
406 }
407
408 // If we lost the connection, we retry the work, otherwise we error
409- if (worker->impl()->work_job->error_code == GEARMAN_LOST_CONNECTION)
410+ if (worker->impl()->work_job->_error_code == GEARMAN_LOST_CONNECTION)
411 {
412 break;
413 }
414- else if (worker->impl()->work_job->error_code == GEARMAN_SHUTDOWN)
415+ else if (worker->impl()->work_job->_error_code == GEARMAN_SHUTDOWN)
416 { }
417- else if (gearman_failed(worker->impl()->work_job->error_code))
418+ else if (gearman_failed(worker->impl()->work_job->_error_code))
419 {
420 worker->impl()->work_state= GEARMAN_WORKER_WORK_UNIVERSAL_FAIL;
421
422- return worker->impl()->work_job->error_code;
423+ return worker->impl()->work_job->_error_code;
424 }
425 }
426 break;
427
428 case GEARMAN_WORKER_WORK_UNIVERSAL_FAIL:
429 {
430- if (gearman_failed(worker->impl()->work_job->error_code= gearman_job_send_fail_fin(worker->impl()->work_job)))
431+ if (gearman_failed(worker->impl()->work_job->_error_code= gearman_job_send_fail_fin(worker->impl()->work_job)))
432 {
433- if (worker->impl()->work_job->error_code == GEARMAN_LOST_CONNECTION)
434+ if (worker->impl()->work_job->_error_code == GEARMAN_LOST_CONNECTION)
435 {
436 break;
437 }
438
439- return worker->impl()->work_job->error_code;
440+ return worker->impl()->work_job->_error_code;
441 }
442 }
443 break;
444
445=== modified file 'tests/libgearman-1.0/worker_test.cc'
446--- tests/libgearman-1.0/worker_test.cc 2013-06-27 22:00:23 +0000
447+++ tests/libgearman-1.0/worker_test.cc 2013-07-05 22:08:31 +0000
448@@ -759,6 +759,12 @@
449 return TEST_SUCCESS;
450 }
451
452+static gearman_return_t exception_fn(gearman_task_st* task)
453+{
454+ Out << "Task Handle: " << gearman_task_job_handle(task);
455+ return GEARMAN_SUCCESS;
456+}
457+
458 static test_return_t gearman_job_send_exception_mass_TEST(void *)
459 {
460 gearman_function_t call_exception_WORKER_FN= gearman_function_create(call_exception_WORKER);
461@@ -772,6 +778,10 @@
462
463 std::vector<gearman_task_st*> tasks;
464 libgearman::Client client(libtest::default_port());
465+
466+ gearman_exception_fn *func= exception_fn;
467+ gearman_client_set_exception_fn(&client, func);
468+
469 for (size_t x= 0; x < 100; ++x)
470 {
471 char buffer[GEARMAN_MAXIMUM_INTEGER_DISPLAY_LENGTH];
472@@ -1371,14 +1381,17 @@
473 max_block_size= 24;
474 }
475
476+ libtest::vchar_t workload;
477+ libtest::vchar::make(workload, block_size);
478+
479 for (size_t x= 1; x < max_block_size; ++x)
480 {
481 if (valgrind_is_caller() and (x * block_size) > 15728640)
482 {
483 continue;
484 }
485- libtest::vchar_t workload;
486- libtest::vchar::make(workload, x * block_size);
487+
488+ workload.resize(x * block_size);
489
490 gearman_argument_t value= gearman_argument_make(0, 0, vchar_param(workload));
491
492@@ -1403,7 +1416,7 @@
493 gearman_task_return(task));
494
495 gearman_result_st *result= gearman_task_result(task);
496- test_true(result);
497+ ASSERT_TRUE(result);
498 ASSERT_EQ(gearman_result_size(result), workload.size());
499 }
500

Subscribers

People subscribed via source and target branches

to all changes: